Skip to content

Commit 2fc73d4

Browse files
authored
Merge pull request #43 from firelink-data/feature/threaded_revolver
data is now in order
2 parents 8d8c860 + 40cf086 commit 2fc73d4

3 files changed

Lines changed: 27 additions & 23 deletions

File tree

Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,9 @@ libc = "0.2.154"
4444
arrow = "51.0.0"
4545
parquet = "51.0.0"
4646
padder = { version = "1.2.0", features = ["serde"] }
47-
sorted-list = "0.2.0"
47+
ordered-channel="1.1.0"
48+
atomic-counter = "1.0.1"
49+
crossbeam-channel = "0.5.12"
4850
[dev-dependencies]
4951
glob = "0.3.1"
5052

src/chunked/arrow_converter.rs

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@
2424
// File created: 2023-12-11
2525
// Last updated: 2024-05-15
2626
//
27-
use sorted_list::SortedList;
2827

2928
use arrow::array::{ArrayRef, BooleanBuilder, Int32Builder, Int64Builder, StringBuilder};
3029
use arrow::record_batch::RecordBatch;
3130
use log::{debug, info};
31+
use ordered_channel::bounded;
3232
use parquet::arrow::ArrowWriter;
3333
use parquet::basic::Compression;
3434
use parquet::file::properties::WriterProperties;
@@ -48,8 +48,10 @@ use crate::schema;
4848
use crate::trimmer::{trimmer_factory, ColumnTrimmer};
4949
use crate::{chunked, trimmer};
5050
use arrow::datatypes::{Field, Schema, SchemaRef};
51+
use atomic_counter::{AtomicCounter, ConsistentCounter};
5152
use crossbeam::atomic::AtomicConsume;
5253
use libc::bsearch;
54+
use ordered_channel::Sender;
5355
use parquet::errors::{ParquetError, Result};
5456
use parquet::file::metadata::FileMetaData;
5557
use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
@@ -58,6 +60,7 @@ use std::thread;
5860
use std::thread::JoinHandle;
5961
use std::time::{Duration, Instant};
6062
use thread::spawn;
63+
//use ordered_channel::Sender;
6164
//use crossbeam::channel::{Receiver, Sender};
6265

6366
static GLOBAL_COUNTER: AtomicUsize = ATOMIC_USIZE_INIT;
@@ -67,17 +70,13 @@ pub(crate) struct Slice2Arrow<'a> {
6770
pub(crate) fn_line_break: FnFindLastLineBreak<'a>,
6871
pub(crate) fn_line_break_len: FnLineBreakLen,
6972
pub(crate) masterbuilders: MasterBuilders,
73+
pub(crate) consistent_counter: ConsistentCounter,
7074
}
7175

7276
pub(crate) struct MasterBuilders {
7377
builders: Vec<Vec<Box<dyn Sync + Send + ColumnBuilder>>>,
7478
outfile: PathBuf,
75-
sender: Option<SyncSender<OrderedRecordBatch>>,
76-
}
77-
78-
pub(crate) struct OrderedRecordBatch {
79-
record_batch: RecordBatch,
80-
batch_nr: i32,
79+
sender: Option<Sender<RecordBatch>>,
8180
}
8281

8382
unsafe impl Send for MasterBuilders {}
@@ -197,6 +196,7 @@ impl<'a> Converter<'a> for Slice2Arrow<'a> {
197196
let mut builder_write_duration: Duration = Duration::new(0, 0);
198197

199198
let start_parse = Instant::now();
199+
let offset: usize = self.consistent_counter.get();
200200

201201
let arc_slices = Arc::new(&slices);
202202
self.masterbuilders
@@ -219,23 +219,24 @@ impl<'a> Converter<'a> for Slice2Arrow<'a> {
219219
for bb in n.iter_mut() {
220220
br.push(bb.finish());
221221
}
222-
let record_batch = OrderedRecordBatch {
223-
record_batch: RecordBatch::try_from_iter(br).unwrap(),
224-
batch_nr: i as i32,
225-
};
222+
let record_batch = RecordBatch::try_from_iter(br).unwrap();
223+
226224
let _ = self
227225
.masterbuilders
228226
.sender
229227
.clone()
230228
.unwrap()
231-
.send(record_batch);
229+
.send(offset + i, record_batch);
232230
}
233231
}
234232
});
235233

236234
for ii in slices.iter() {
237235
bytes_in += ii.len();
238236
}
237+
let offset: usize = self
238+
.consistent_counter
239+
.add(self.masterbuilders.builders.len());
239240

240241
parse_duration = start_parse.elapsed();
241242
(bytes_in, bytes_out, parse_duration, builder_write_duration)
@@ -248,16 +249,18 @@ impl<'a> Converter<'a> for Slice2Arrow<'a> {
248249
&_outfile,
249250
schema.clone(),
250251
);
251-
let (sender, receiver) = sync_channel::<OrderedRecordBatch>(100);
252+
253+
let (sender, mut receiver) = bounded::<RecordBatch>(10);
254+
// let (sender, receiver) = sync_channel::<RecordBatch>(100);
252255

253256
let t: JoinHandle<Result<format::FileMetaData>> = thread::spawn(move || {
254257
'outer: loop {
255-
let message = receiver.recv();
258+
let mut message = receiver.recv();
256259

257260
match message {
258261
Ok(rb) => {
259-
writer.write(&rb.record_batch).expect("Error Writing batch");
260-
if (rb.record_batch.num_rows() == 0) {
262+
writer.write(&rb).expect("Error Writing batch");
263+
if (rb.num_rows() == 0) {
261264
break 'outer;
262265
}
263266
}
@@ -284,12 +287,9 @@ impl<'a> Converter<'a> for Slice2Arrow<'a> {
284287
false,
285288
)]);
286289

287-
let emptyrb = OrderedRecordBatch {
288-
record_batch: arrow::record_batch::RecordBatch::new_empty(Arc::new(schema)),
289-
batch_nr: 0,
290-
};
291-
292-
let _ = &self.masterbuilders.sender.clone().unwrap().send(emptyrb);
290+
let emptyrb = arrow::record_batch::RecordBatch::new_empty(Arc::new(schema));
291+
let c = self.consistent_counter.get();
292+
let _ = &self.masterbuilders.sender.clone().unwrap().send(c, emptyrb);
293293
}
294294
}
295295

src/cli.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use log::info;
3333
#[cfg(feature = "rayon")]
3434
use parquet::arrow::ArrowWriter;
3535

36+
use atomic_counter::ConsistentCounter;
3637
#[cfg(feature = "rayon")]
3738
use std::fs;
3839
#[cfg(feature = "rayon")]
@@ -280,6 +281,7 @@ impl Cli {
280281
fn_line_break: find_last_nl,
281282
fn_line_break_len: line_break_len_cr,
282283
masterbuilders: master_builders,
284+
consistent_counter: ConsistentCounter::new(0),
283285
});
284286
s2a
285287
}

0 commit comments

Comments
 (0)