Skip to content

Commit 8d8c860

Browse files
authored
Merge pull request #42 from firelink-data/feature/threaded_revolver
Feature/threaded revolver
2 parents e990510 + 814491a commit 8d8c860

2 files changed

Lines changed: 33 additions & 35 deletions

File tree

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ libc = "0.2.154"
4444
arrow = "51.0.0"
4545
parquet = "51.0.0"
4646
padder = { version = "1.2.0", features = ["serde"] }
47-
47+
sorted-list = "0.2.0"
4848
[dev-dependencies]
4949
glob = "0.3.1"
5050

src/chunked/arrow_converter.rs

Lines changed: 32 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
// File created: 2023-12-11
2525
// Last updated: 2024-05-15
2626
//
27+
use sorted_list::SortedList;
2728

2829
use arrow::array::{ArrayRef, BooleanBuilder, Int32Builder, Int64Builder, StringBuilder};
2930
use arrow::record_batch::RecordBatch;
@@ -48,6 +49,7 @@ use crate::trimmer::{trimmer_factory, ColumnTrimmer};
4849
use crate::{chunked, trimmer};
4950
use arrow::datatypes::{Field, Schema, SchemaRef};
5051
use crossbeam::atomic::AtomicConsume;
52+
use libc::bsearch;
5153
use parquet::errors::{ParquetError, Result};
5254
use parquet::file::metadata::FileMetaData;
5355
use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
@@ -70,7 +72,12 @@ pub(crate) struct Slice2Arrow<'a> {
7072
pub(crate) struct MasterBuilders {
7173
builders: Vec<Vec<Box<dyn Sync + Send + ColumnBuilder>>>,
7274
outfile: PathBuf,
73-
sender: Option<SyncSender<RecordBatch>>,
75+
sender: Option<SyncSender<OrderedRecordBatch>>,
76+
}
77+
78+
pub(crate) struct OrderedRecordBatch {
79+
record_batch: RecordBatch,
80+
batch_nr: i32,
7481
}
7582

7683
unsafe impl Send for MasterBuilders {}
@@ -207,41 +214,30 @@ impl<'a> Converter<'a> for Slice2Arrow<'a> {
207214
n,
208215
(self.fn_line_break_len)(),
209216
);
217+
let mut br: Vec<(&str, ArrayRef)> = vec![];
218+
219+
for bb in n.iter_mut() {
220+
br.push(bb.finish());
221+
}
222+
let record_batch = OrderedRecordBatch {
223+
record_batch: RecordBatch::try_from_iter(br).unwrap(),
224+
batch_nr: i as i32,
225+
};
226+
let _ = self
227+
.masterbuilders
228+
.sender
229+
.clone()
230+
.unwrap()
231+
.send(record_batch);
210232
}
211233
}
212234
});
213235

214236
for ii in slices.iter() {
215237
bytes_in += ii.len();
216238
}
217-
parse_duration = start_parse.elapsed();
218-
219-
let mut rb: Vec<RecordBatch> = Vec::new();
220-
221-
for b in self.masterbuilders.builders.iter_mut() {
222-
let mut br: Vec<(&str, ArrayRef)> = vec![];
223-
224-
for bb in b.iter_mut() {
225-
br.push(bb.finish());
226-
}
227-
228-
let start_builder_write = Instant::now();
229-
let record_batch = RecordBatch::try_from_iter(br).unwrap();
230-
// rb.push(RecordBatch::try_from_iter(br).unwrap());
231-
232-
let _ = self
233-
.masterbuilders
234-
.sender
235-
.clone()
236-
.unwrap()
237-
.send(record_batch);
238-
239-
builder_write_duration += start_builder_write.elapsed();
240-
}
241-
// let writer: ArrowWriter<File> =
242-
243-
debug!("Batch write: accumulated bytes_written {}", bytes_out);
244239

240+
parse_duration = start_parse.elapsed();
245241
(bytes_in, bytes_out, parse_duration, builder_write_duration)
246242
}
247243

@@ -252,27 +248,26 @@ impl<'a> Converter<'a> for Slice2Arrow<'a> {
252248
&_outfile,
253249
schema.clone(),
254250
);
255-
let (sender, receiver) = sync_channel::<RecordBatch>(1);
251+
let (sender, receiver) = sync_channel::<OrderedRecordBatch>(100);
256252

257253
let t: JoinHandle<Result<format::FileMetaData>> = thread::spawn(move || {
258254
'outer: loop {
259255
let message = receiver.recv();
256+
260257
match message {
261258
Ok(rb) => {
262-
writer.write(&rb).expect("Error Writing batch");
263-
if (rb.num_rows() == 0) {
259+
writer.write(&rb.record_batch).expect("Error Writing batch");
260+
if (rb.record_batch.num_rows() == 0) {
264261
break 'outer;
265262
}
266263
}
267264
Err(e) => {
268265
info!("got RecvError in channel , break to outer");
269-
270266
break 'outer;
271267
}
272268
}
273269
}
274270
info!("closing the writer for parquet");
275-
276271
writer.finish()
277272
});
278273
self.masterbuilders.sender = Some(sender.clone());
@@ -289,7 +284,10 @@ impl<'a> Converter<'a> for Slice2Arrow<'a> {
289284
false,
290285
)]);
291286

292-
let emptyrb = arrow::record_batch::RecordBatch::new_empty(Arc::new(schema));
287+
let emptyrb = OrderedRecordBatch {
288+
record_batch: arrow::record_batch::RecordBatch::new_empty(Arc::new(schema)),
289+
batch_nr: 0,
290+
};
293291

294292
let _ = &self.masterbuilders.sender.clone().unwrap().send(emptyrb);
295293
}

0 commit comments

Comments
 (0)