Skip to content

Commit e711d23

Browse files
author
Rickard Lundin
committed
fix choice of ipc or parquet
1 parent 43aad20 commit e711d23

5 files changed

Lines changed: 15 additions & 20 deletions

File tree

src/chunked/arrow_converter.rs

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ use rayon::prelude::*;
3939

4040
use std::fs;
4141
use std::fs::File;
42+
use std::ops::Deref;
4243
use std::path::PathBuf;
4344
use std::str::from_utf8_unchecked;
4445
use std::sync::Arc;
@@ -48,7 +49,7 @@ use super::{
4849
Stats,
4950
};
5051
use crate::chunked;
51-
use crate::chunked::threaded_file_output::{ipc_file_out, parquet_file_out};
52+
use crate::chunked::threaded_file_output::{ipc_file_out, output_factory, parquet_file_out};
5253
use crate::datatype::DataType;
5354
use crate::schema;
5455
use arrow::datatypes::{Field, Schema, SchemaRef};
@@ -65,6 +66,7 @@ use std::thread;
6566
use std::thread::JoinHandle;
6667
use std::time::{Duration, Instant};
6768
use thread::spawn;
69+
use crate::cli::Targets;
6870
//use ordered_channel::Sender;
6971
//use crossbeam::channel::{Receiver, Sender};
7072

@@ -76,7 +78,7 @@ pub(crate) struct Slice2Arrow<'a> {
7678
pub(crate) fn_line_break_len: FnLineBreakLen,
7779
pub(crate) masterbuilders: MasterBuilders,
7880
pub(crate) consistent_counter: ConsistentCounter,
79-
pub(crate) threaded_write: (Sender<RecordBatch>, JoinHandle<Result<Stats>>),
81+
pub(crate) target: Targets
8082
}
8183

8284
pub(crate) struct MasterBuilders {
@@ -233,17 +235,11 @@ impl<'a> Converter<'a> for Slice2Arrow<'a> {
233235
(bytes_in, bytes_out, parse_duration, builder_write_duration)
234236
}
235237

236-
fn setup(&mut self) -> JoinHandle<Result<Stats>> {
237-
let schema = self.masterbuilders.schema_factory();
238-
let _outfile = self.masterbuilders.outfile.clone();
239-
240-
let mut pfo: Box<dyn arrow_file_output> = Box::new(ipc_file_out { sender: None });
241-
let (sender, joinhandler) = pfo.setup(schema, _outfile);
242-
243-
self.masterbuilders.sender = Some(sender);
244-
joinhandler
238+
fn setup(&mut self) -> (Sender<RecordBatch>, JoinHandle<Result<Stats>>) {
239+
output_factory(self.target.clone(), self.masterbuilders.schema_factory(), self.masterbuilders.outfile.clone())
245240
}
246241

242+
247243
fn shutdown(&mut self) {
248244
// converter.shutdown();
249245
let schema = Schema::new(vec![Field::new(
@@ -255,6 +251,7 @@ impl<'a> Converter<'a> for Slice2Arrow<'a> {
255251
let emptyrb = arrow::record_batch::RecordBatch::new_empty(Arc::new(schema));
256252
let c = self.consistent_counter.get();
257253
let _ = &self.masterbuilders.sender.clone().unwrap().send(c, emptyrb);
254+
258255
}
259256
}
260257

src/chunked/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ pub(crate) trait Converter<'a> {
161161

162162
// fn process(& mut self, slices: Vec< &'a[u8]>) -> usize;
163163
fn process(&mut self, slices: Vec<&'a [u8]>) -> (usize, usize, Duration, Duration);
164-
fn setup(&mut self) -> JoinHandle<Result<Stats>>;
164+
fn setup(&mut self)->(Sender<RecordBatch>, JoinHandle<Result<Stats>>);
165165
fn shutdown(&mut self);
166166
}
167167

src/chunked/residual_slicer.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ impl<'a> Slicer<'a> for ResidualSlicer<'a> {
8989
.build_global()
9090
.unwrap();
9191

92-
let j = converter.setup();
92+
let threaded_writer=converter.setup();
9393

9494
loop {
9595
let cr = ir.next().unwrap();
@@ -141,8 +141,8 @@ impl<'a> Slicer<'a> for ResidualSlicer<'a> {
141141
info!("about to shudown converter...");
142142

143143
converter.shutdown();
144+
threaded_writer.1.join();
144145
info!("converter has been shutdown");
145-
let _ = j.join(); // Make sure writer thread is done.
146146

147147
info!(
148148
"Bytes in= {}\n out= {}\nparse duration= {:?}\n \n builder write_duration {:?}\n",

src/chunked/self_converter.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ use std::io::Write;
3838
use std::sync::mpsc::SyncSender;
3939
use std::thread::JoinHandle;
4040
use std::time::Duration;
41+
use ordered_channel::Sender;
4142

4243
use super::{Converter, FnFindLastLineBreak};
4344

@@ -70,10 +71,11 @@ impl<'a> Converter<'a> for SampleSliceAggregator<'a> {
7071
(bytes_processed, 0, duration, duration)
7172
}
7273

73-
fn setup(&mut self) -> JoinHandle<Result<Stats>> {
74+
fn setup(&mut self) -> (Sender<RecordBatch>, JoinHandle<Result<Stats>>) {
7475
todo!()
7576
}
7677

78+
7779
fn shutdown(&mut self) {
7880
todo!()
7981
}

src/cli.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -300,11 +300,7 @@ impl Cli {
300300
fn_line_break_len: line_break_len_cr,
301301
masterbuilders: master_builders,
302302
consistent_counter: ConsistentCounter::new(0),
303-
threaded_write: output_factory(
304-
self.target.clone(),
305-
sc,
306-
out_file.clone().to_path_buf(),
307-
),
303+
target: self.target.clone(),
308304
});
309305
s2a
310306
}

0 commit comments

Comments
 (0)