Skip to content

Commit e990510

Browse files
authored
Merge pull request #41 from firelink-data/feature/threaded_revolver
Feature/threaded revolver
2 parents 2871944 + 545a8c3 commit e990510

6 files changed

Lines changed: 132 additions & 46 deletions

File tree

Cargo.toml

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,19 +25,19 @@ include = [ "**/*.rs", "Cargo.toml", "LICENSE", "README.md" ]
2525
default-run = "evolution"
2626

2727
[dependencies]
28-
chrono = "0.4.31"
29-
crossbeam = "0.8.2"
28+
chrono = "0.4.38"
29+
crossbeam = "0.8.4"
3030
colored = "2.0.4"
31-
env_logger = "0.11.1"
31+
env_logger = "0.11.3"
3232
half = "2.3.1"
3333
log = "0.4.20"
3434
num_cpus = "1.16.0"
3535
rand = "0.8.5"
36-
serde_json = "1.0.108"
36+
serde_json = "1.0.117"
3737
threadpool = "1.8.1"
38-
clap = { version = "4.4.8", features = ["default", "derive"] }
39-
serde = { version = "1.0.193", features = ["derive"] }
40-
rayon = { version = "1.8.1", optional = true }
38+
clap = { version = "4.5.4", features = ["default", "derive"] }
39+
serde = { version = "1.0.202", features = ["derive"] }
40+
rayon = { version = "1.10.0", optional = true }
4141
atoi_simd = { version = "0.15.6", optional = true }
4242
arrow2 = "0.18.0"
4343
libc = "0.2.154"

src/chunked/arrow_converter.rs

Lines changed: 90 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727

2828
use arrow::array::{ArrayRef, BooleanBuilder, Int32Builder, Int64Builder, StringBuilder};
2929
use arrow::record_batch::RecordBatch;
30-
use log::debug;
30+
use log::{debug, info};
3131
use parquet::arrow::ArrowWriter;
3232
use parquet::basic::Compression;
3333
use parquet::file::properties::WriterProperties;
@@ -46,26 +46,48 @@ use crate::datatype::DataType;
4646
use crate::schema;
4747
use crate::trimmer::{trimmer_factory, ColumnTrimmer};
4848
use crate::{chunked, trimmer};
49+
use arrow::datatypes::{Field, Schema, SchemaRef};
50+
use crossbeam::atomic::AtomicConsume;
51+
use parquet::errors::{ParquetError, Result};
52+
use parquet::file::metadata::FileMetaData;
53+
use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
54+
use std::sync::mpsc::{sync_channel, Receiver, RecvError, SyncSender};
55+
use std::thread;
56+
use std::thread::JoinHandle;
4957
use std::time::{Duration, Instant};
58+
use thread::spawn;
59+
//use crossbeam::channel::{Receiver, Sender};
60+
61+
static GLOBAL_COUNTER: AtomicUsize = ATOMIC_USIZE_INIT;
5062

5163
pub(crate) struct Slice2Arrow<'a> {
5264
// pub(crate) file_out: File,
53-
pub(crate) writer: ArrowWriter<File>,
5465
pub(crate) fn_line_break: FnFindLastLineBreak<'a>,
5566
pub(crate) fn_line_break_len: FnLineBreakLen,
5667
pub(crate) masterbuilders: MasterBuilders,
5768
}
5869

5970
pub(crate) struct MasterBuilders {
6071
builders: Vec<Vec<Box<dyn Sync + Send + ColumnBuilder>>>,
61-
// schema: arrow_schema::SchemaRef
72+
outfile: PathBuf,
73+
sender: Option<SyncSender<RecordBatch>>,
6274
}
6375

6476
unsafe impl Send for MasterBuilders {}
6577
unsafe impl Sync for MasterBuilders {}
6678

6779
impl MasterBuilders {
68-
pub fn writer_factory(&mut self, out_file: &PathBuf) -> ArrowWriter<File> {
80+
pub fn schema_factory(&mut self) -> SchemaRef {
81+
let b: &mut Vec<Box<dyn Sync + Send + ColumnBuilder>> = self.builders.get_mut(0).unwrap();
82+
let mut br: Vec<(&str, ArrayRef)> = vec![];
83+
for bb in b.iter_mut() {
84+
br.push(bb.finish());
85+
}
86+
87+
let batch = RecordBatch::try_from_iter(br).unwrap();
88+
batch.schema()
89+
}
90+
pub fn writer_factory(out_file: &PathBuf, schema: SchemaRef) -> ArrowWriter<File> {
6991
let _out_file = fs::OpenOptions::new()
7092
.create(true)
7193
.append(true)
@@ -76,19 +98,12 @@ impl MasterBuilders {
7698
.set_compression(Compression::SNAPPY)
7799
.build();
78100

79-
let b: &mut Vec<Box<dyn Sync + Send + ColumnBuilder>> = self.builders.get_mut(0).unwrap();
80-
let mut br: Vec<(&str, ArrayRef)> = vec![];
81-
for bb in b.iter_mut() {
82-
br.push(bb.finish());
83-
}
84-
85-
let batch = RecordBatch::try_from_iter(br).unwrap();
86101
let writer: ArrowWriter<File> =
87-
ArrowWriter::try_new(_out_file, batch.schema(), Some(props.clone())).unwrap();
102+
ArrowWriter::try_new(_out_file, schema, Some(props.clone())).unwrap();
88103
writer
89104
}
90105

91-
pub fn builders_factory(schema_path: PathBuf, instances: i16) -> Self {
106+
pub fn builders_factory(out_file: PathBuf, schema_path: PathBuf, instances: i16) -> Self {
92107
let schema = schema::FixedSchema::from_path(schema_path).unwrap();
93108
let antal_col = schema.num_columns();
94109
let mut builders: Vec<Vec<Box<dyn ColumnBuilder + Sync + Send>>> = Vec::new();
@@ -152,7 +167,11 @@ impl MasterBuilders {
152167
}
153168
builders.push(buildersmut);
154169
}
155-
MasterBuilders { builders }
170+
MasterBuilders {
171+
builders: builders,
172+
outfile: out_file,
173+
sender: None,
174+
}
156175
}
157176
}
158177

@@ -197,6 +216,8 @@ impl<'a> Converter<'a> for Slice2Arrow<'a> {
197216
}
198217
parse_duration = start_parse.elapsed();
199218

219+
let mut rb: Vec<RecordBatch> = Vec::new();
220+
200221
for b in self.masterbuilders.builders.iter_mut() {
201222
let mut br: Vec<(&str, ArrayRef)> = vec![];
202223

@@ -205,24 +226,72 @@ impl<'a> Converter<'a> for Slice2Arrow<'a> {
205226
}
206227

207228
let start_builder_write = Instant::now();
208-
let batch = RecordBatch::try_from_iter(br).unwrap();
229+
let record_batch = RecordBatch::try_from_iter(br).unwrap();
230+
// rb.push(RecordBatch::try_from_iter(br).unwrap());
209231

210-
self.writer.write(&batch).expect("Error Writing batch");
211-
bytes_out += self.writer.bytes_written();
232+
let _ = self
233+
.masterbuilders
234+
.sender
235+
.clone()
236+
.unwrap()
237+
.send(record_batch);
212238

213239
builder_write_duration += start_builder_write.elapsed();
214240
}
241+
// let writer: ArrowWriter<File> =
242+
215243
debug!("Batch write: accumulated bytes_written {}", bytes_out);
216244

217245
(bytes_in, bytes_out, parse_duration, builder_write_duration)
218246
}
219247

220-
fn finish(&mut self) -> parquet::errors::Result<format::FileMetaData> {
221-
self.writer.finish()
248+
fn setup(&mut self) -> JoinHandle<Result<format::FileMetaData>> {
249+
let schema = self.masterbuilders.schema_factory();
250+
let _outfile = self.masterbuilders.outfile.clone();
251+
let mut writer = crate::chunked::arrow_converter::MasterBuilders::writer_factory(
252+
&_outfile,
253+
schema.clone(),
254+
);
255+
let (sender, receiver) = sync_channel::<RecordBatch>(1);
256+
257+
let t: JoinHandle<Result<format::FileMetaData>> = thread::spawn(move || {
258+
'outer: loop {
259+
let message = receiver.recv();
260+
match message {
261+
Ok(rb) => {
262+
writer.write(&rb).expect("Error Writing batch");
263+
if (rb.num_rows() == 0) {
264+
break 'outer;
265+
}
266+
}
267+
Err(e) => {
268+
info!("got RecvError in channel , break to outer");
269+
270+
break 'outer;
271+
}
272+
}
273+
}
274+
info!("closing the writer for parquet");
275+
276+
writer.finish()
277+
});
278+
self.masterbuilders.sender = Some(sender.clone());
279+
t
280+
281+
// bytes_out += crate::chunked::arrow_converter::GLOBAL_COUNTER.load_consume();
222282
}
223283

224-
fn get_finish_bytes_written(&mut self) -> usize {
225-
self.writer.bytes_written()
284+
fn shutdown(&mut self) {
285+
// converter.shutdown();
286+
let schema = Schema::new(vec![Field::new(
287+
"id",
288+
arrow::datatypes::DataType::Int32,
289+
false,
290+
)]);
291+
292+
let emptyrb = arrow::record_batch::RecordBatch::new_empty(Arc::new(schema));
293+
294+
let _ = &self.masterbuilders.sender.clone().unwrap().send(emptyrb);
226295
}
227296
}
228297

src/chunked/mod.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,16 @@
2525
// Last updated: 2024-05-15
2626
//
2727

28-
use arrow::array::ArrayRef;
28+
use arrow::array::{ArrayRef, RecordBatch};
2929
use parquet::format;
3030

3131
use std::fs;
32+
use std::sync::mpsc::SyncSender;
33+
use std::thread::JoinHandle;
3234
use std::time::Duration;
3335

3436
use self::residual_slicer::SLICER_IN_CHUNK_SIZE;
37+
use parquet::errors::{ParquetError, Result};
3538

3639
pub(crate) mod arrow_converter;
3740
pub(crate) mod residual_slicer;
@@ -152,8 +155,8 @@ pub(crate) trait Converter<'a> {
152155

153156
// fn process(& mut self, slices: Vec< &'a[u8]>) -> usize;
154157
fn process(&mut self, slices: Vec<&'a [u8]>) -> (usize, usize, Duration, Duration);
155-
fn finish(&mut self) -> parquet::errors::Result<format::FileMetaData>;
156-
fn get_finish_bytes_written(&mut self) -> usize;
158+
fn setup(&mut self) -> JoinHandle<Result<format::FileMetaData>>;
159+
fn shutdown(&mut self);
157160
}
158161

159162
pub trait ColumnBuilder {

src/chunked/residual_slicer.rs

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,15 @@
2727

2828
use log::info;
2929

30+
use arrow::array::Int32Array;
31+
use arrow::datatypes::{DataType, Field, Schema};
32+
use arrow::ipc::RecordBatch;
3033
use std::cmp;
3134
use std::fs;
3235
use std::fs::File;
3336
use std::io::{BufReader, Read};
37+
use std::sync::atomic::Ordering;
38+
use std::sync::Arc;
3439
use std::time::{Duration, Instant};
3540

3641
use super::{ChunkAndResidue, Converter, FnFindLastLineBreak, IterRevolver, Slicer, Stats};
@@ -84,6 +89,8 @@ impl<'a> Slicer<'a> for ResidualSlicer<'a> {
8489
.build_global()
8590
.unwrap();
8691

92+
let j = converter.setup();
93+
8794
loop {
8895
let cr = ir.next().unwrap();
8996

@@ -131,23 +138,25 @@ impl<'a> Slicer<'a> for ResidualSlicer<'a> {
131138
parse_duration_tot += parse_duration;
132139
builder_write_duration_tot += builder_write_duration;
133140
}
141+
info!("about to shudown converter...");
142+
143+
converter.shutdown();
144+
info!("converter has been shutdown");
145+
let _ = j.join(); // Make sure writer thread is done.
134146

135147
info!(
136148
"Bytes in= {}\n out= {}\nparse duration= {:?}\n \n builder write_duration {:?}\n",
137149
bytes_in, bytes_out, parse_duration_tot, builder_write_duration_tot
138150
);
139151

140-
match converter.finish() {
141-
Ok(x) => Result::Ok(Stats {
142-
bytes_in,
143-
bytes_out: converter.get_finish_bytes_written(),
144-
num_rows: x.num_rows,
145-
read_duration: read_duration_tot,
146-
parse_duration: parse_duration_tot,
147-
builder_write_duration: builder_write_duration_tot,
148-
}),
149-
Err(_x) => Result::Err("Could not produce Parquet"),
150-
}
152+
Ok(Stats {
153+
bytes_in,
154+
bytes_out: bytes_out,
155+
num_rows: 0,
156+
read_duration: read_duration_tot,
157+
parse_duration: parse_duration_tot,
158+
builder_write_duration: builder_write_duration_tot,
159+
})
151160
}
152161
}
153162

src/chunked/self_converter.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,14 @@
2828
use log::info;
2929
use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator};
3030

31+
use arrow::array::RecordBatch;
32+
use parquet::errors::{ParquetError, Result};
33+
use parquet::file::metadata::FileMetaData;
34+
use parquet::format;
3135
use std::fs::File;
3236
use std::io::Write;
37+
use std::sync::mpsc::SyncSender;
38+
use std::thread::JoinHandle;
3339
use std::time::Duration;
3440

3541
use super::{Converter, FnFindLastLineBreak};
@@ -63,11 +69,11 @@ impl<'a> Converter<'a> for SampleSliceAggregator<'a> {
6369
(bytes_processed, 0, duration, duration)
6470
}
6571

66-
fn finish(&mut self) -> Result<parquet::format::FileMetaData, parquet::errors::ParquetError> {
72+
fn setup(&mut self) -> JoinHandle<Result<format::FileMetaData>> {
6773
todo!()
6874
}
6975

70-
fn get_finish_bytes_written(&mut self) -> usize {
76+
fn shutdown(&mut self) {
7177
todo!()
7278
}
7379
}

src/cli.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -271,13 +271,12 @@ impl Cli {
271271
let converter_instance: Box<dyn ChunkedConverter> = match converter {
272272
Converters::Arrow => {
273273
let mut master_builders = MasterBuilders::builders_factory(
274+
out_file.clone().to_path_buf(),
274275
schema.to_path_buf(),
275276
n_threads as i16,
276277
);
277-
let writer: ArrowWriter<File> = master_builders.writer_factory(out_file);
278278

279279
let s2a: Box<Slice2Arrow> = Box::new(Slice2Arrow {
280-
writer,
281280
fn_line_break: find_last_nl,
282281
fn_line_break_len: line_break_len_cr,
283282
masterbuilders: master_builders,

0 commit comments

Comments
 (0)