Skip to content

Commit 7ae2fce

Browse files
authored
Merge pull request #44 from firelink-data/feature/ipc
Feature/ipc
2 parents 09a7a0b + 18f6ead commit 7ae2fce

9 files changed

Lines changed: 293 additions & 84 deletions

File tree

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ arrow2 = "0.18.0"
4343
libc = "0.2.154"
4444
arrow = "51.0.0"
4545
parquet = "51.0.0"
46+
arrow-ipc = "51.0.0"
47+
4648
padder = { version = "1.2.0", features = ["serde"] }
4749
ordered-channel="1.1.0"
4850
atomic-counter = "1.0.1"

src/chunked/arrow_converter.rs

Lines changed: 18 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
// Last updated: 2024-05-15
2626
//
2727

28+
use crate::chunked::trimmer::{trimmer_factory, ColumnTrimmer};
2829
use arrow::array::{ArrayRef, BooleanBuilder, Int32Builder, Int64Builder, StringBuilder};
2930
use arrow::record_batch::RecordBatch;
3031
use log::{debug, info};
@@ -38,22 +39,28 @@ use rayon::prelude::*;
3839

3940
use std::fs;
4041
use std::fs::File;
42+
use std::ops::Deref;
4143
use std::path::PathBuf;
4244
use std::str::from_utf8_unchecked;
4345
use std::sync::Arc;
4446

45-
use super::{ColumnBuilder, Converter, FnFindLastLineBreak, FnLineBreakLen};
47+
use super::{
48+
arrow_file_output, trimmer, ColumnBuilder, Converter, FnFindLastLineBreak, FnLineBreakLen,
49+
Stats,
50+
};
51+
use crate::chunked;
52+
use crate::chunked::threaded_file_output::{ipc_file_out, output_factory, parquet_file_out};
53+
pub use crate::cli::Targets;
4654
use crate::datatype::DataType;
4755
use crate::schema;
48-
use crate::trimmer::{trimmer_factory, ColumnTrimmer};
49-
use crate::{chunked, trimmer};
5056
use arrow::datatypes::{Field, Schema, SchemaRef};
5157
use atomic_counter::{AtomicCounter, ConsistentCounter};
5258
use crossbeam::atomic::AtomicConsume;
5359
use libc::bsearch;
5460
use ordered_channel::Sender;
5561
use parquet::errors::{ParquetError, Result};
5662
use parquet::file::metadata::FileMetaData;
63+
use rayon::join;
5764
use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
5865
use std::sync::mpsc::{sync_channel, Receiver, RecvError, SyncSender};
5966
use std::thread;
@@ -71,6 +78,7 @@ pub(crate) struct Slice2Arrow<'a> {
7178
pub(crate) fn_line_break_len: FnLineBreakLen,
7279
pub(crate) masterbuilders: MasterBuilders,
7380
pub(crate) consistent_counter: ConsistentCounter,
81+
pub(crate) target: Targets,
7482
}
7583

7684
pub(crate) struct MasterBuilders {
@@ -93,21 +101,6 @@ impl MasterBuilders {
93101
let batch = RecordBatch::try_from_iter(br).unwrap();
94102
batch.schema()
95103
}
96-
pub fn writer_factory(out_file: &PathBuf, schema: SchemaRef) -> ArrowWriter<File> {
97-
let _out_file = fs::OpenOptions::new()
98-
.create(true)
99-
.append(true)
100-
.open(out_file)
101-
.expect("aaa");
102-
103-
let props = WriterProperties::builder()
104-
.set_compression(Compression::SNAPPY)
105-
.build();
106-
107-
let writer: ArrowWriter<File> =
108-
ArrowWriter::try_new(_out_file, schema, Some(props.clone())).unwrap();
109-
writer
110-
}
111104

112105
pub fn builders_factory(out_file: PathBuf, schema_path: PathBuf, instances: i16) -> Self {
113106
let schema = schema::FixedSchema::from_path(schema_path).unwrap();
@@ -242,40 +235,14 @@ impl<'a> Converter<'a> for Slice2Arrow<'a> {
242235
(bytes_in, bytes_out, parse_duration, builder_write_duration)
243236
}
244237

245-
fn setup(&mut self) -> JoinHandle<Result<format::FileMetaData>> {
246-
let schema = self.masterbuilders.schema_factory();
247-
let _outfile = self.masterbuilders.outfile.clone();
248-
let mut writer = crate::chunked::arrow_converter::MasterBuilders::writer_factory(
249-
&_outfile,
250-
schema.clone(),
238+
fn setup(&mut self) -> (Sender<RecordBatch>, JoinHandle<Result<Stats>>) {
239+
let o = output_factory(
240+
self.target.clone(),
241+
self.masterbuilders.schema_factory(),
242+
self.masterbuilders.outfile.clone(),
251243
);
252-
253-
let (sender, mut receiver) = bounded::<RecordBatch>(100);
254-
255-
let t: JoinHandle<Result<format::FileMetaData>> = thread::spawn(move || {
256-
'outer: loop {
257-
let mut message = receiver.recv();
258-
259-
match message {
260-
Ok(rb) => {
261-
writer.write(&rb).expect("Error Writing batch");
262-
if (rb.num_rows() == 0) {
263-
break 'outer;
264-
}
265-
}
266-
Err(e) => {
267-
info!("got RecvError in channel , break to outer");
268-
break 'outer;
269-
}
270-
}
271-
}
272-
info!("closing the writer for parquet");
273-
writer.finish()
274-
});
275-
self.masterbuilders.sender = Some(sender.clone());
276-
t
277-
278-
// bytes_out += crate::chunked::arrow_converter::GLOBAL_COUNTER.load_consume();
244+
self.masterbuilders.sender = Some(o.0.clone());
245+
o
279246
}
280247

281248
fn shutdown(&mut self) {

src/chunked/mod.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,12 @@
2727

2828
use arrow::array::{ArrayRef, RecordBatch};
2929
use parquet::format;
30+
use std::fmt::Debug;
3031

32+
use arrow::datatypes::SchemaRef;
33+
use ordered_channel::Sender;
3134
use std::fs;
35+
use std::path::PathBuf;
3236
use std::sync::mpsc::SyncSender;
3337
use std::thread::JoinHandle;
3438
use std::time::Duration;
@@ -39,6 +43,8 @@ use parquet::errors::{ParquetError, Result};
3943
pub(crate) mod arrow_converter;
4044
pub(crate) mod residual_slicer;
4145
pub(crate) mod self_converter;
46+
pub(crate) mod threaded_file_output;
47+
mod trimmer;
4248

4349
pub(crate) struct ChunkAndResidue {
4450
pub(crate) chunk: Box<[u8; SLICER_IN_CHUNK_SIZE]>,
@@ -155,7 +161,7 @@ pub(crate) trait Converter<'a> {
155161

156162
// fn process(& mut self, slices: Vec< &'a[u8]>) -> usize;
157163
fn process(&mut self, slices: Vec<&'a [u8]>) -> (usize, usize, Duration, Duration);
158-
fn setup(&mut self) -> JoinHandle<Result<format::FileMetaData>>;
164+
fn setup(&mut self) -> (Sender<RecordBatch>, JoinHandle<Result<Stats>>);
159165
fn shutdown(&mut self);
160166
}
161167

@@ -164,3 +170,11 @@ pub trait ColumnBuilder {
164170
fn finish(&mut self) -> (&str, ArrayRef);
165171
// fn name(& self) -> &String;
166172
}
173+
174+
pub trait arrow_file_output {
175+
fn setup(
176+
&mut self,
177+
schema: SchemaRef,
178+
outfile: PathBuf,
179+
) -> (Sender<RecordBatch>, JoinHandle<Result<Stats>>);
180+
}

src/chunked/residual_slicer.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ use std::time::{Duration, Instant};
4040

4141
use super::{ChunkAndResidue, Converter, FnFindLastLineBreak, IterRevolver, Slicer, Stats};
4242

43-
pub(crate) const SLICER_IN_CHUNK_SIZE: usize = 1024 * 2024;
43+
pub(crate) const SLICER_IN_CHUNK_SIZE: usize = 1024 * 2048;
4444
pub(crate) const SLICER_MAX_RESIDUE_SIZE: usize = SLICER_IN_CHUNK_SIZE;
4545

4646
pub(crate) const IN_MAX_CHUNKS: usize = 2;
@@ -89,7 +89,7 @@ impl<'a> Slicer<'a> for ResidualSlicer<'a> {
8989
.build_global()
9090
.unwrap();
9191

92-
let j = converter.setup();
92+
let threaded_writer = converter.setup();
9393

9494
loop {
9595
let cr = ir.next().unwrap();
@@ -141,8 +141,8 @@ impl<'a> Slicer<'a> for ResidualSlicer<'a> {
141141
info!("about to shudown converter...");
142142

143143
converter.shutdown();
144+
threaded_writer.1.join();
144145
info!("converter has been shutdown");
145-
let _ = j.join(); // Make sure writer thread is done.
146146

147147
info!(
148148
"Bytes in= {}\n out= {}\nparse duration= {:?}\n \n builder write_duration {:?}\n",

src/chunked/self_converter.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,12 @@
2525
// Last updated: 2024-05-15
2626
//
2727

28+
use crate::chunked::Stats;
2829
use log::info;
2930
use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator};
3031

3132
use arrow::array::RecordBatch;
33+
use ordered_channel::Sender;
3234
use parquet::errors::{ParquetError, Result};
3335
use parquet::file::metadata::FileMetaData;
3436
use parquet::format;
@@ -69,7 +71,7 @@ impl<'a> Converter<'a> for SampleSliceAggregator<'a> {
6971
(bytes_processed, 0, duration, duration)
7072
}
7173

72-
fn setup(&mut self) -> JoinHandle<Result<format::FileMetaData>> {
74+
fn setup(&mut self) -> (Sender<RecordBatch>, JoinHandle<Result<Stats>>) {
7375
todo!()
7476
}
7577

0 commit comments

Comments
 (0)