Skip to content

Commit aa695be

Browse files
author
Rickard Lundin
committed
changed to use async for all outpures.
1 parent aa45925 commit aa695be

5 files changed

Lines changed: 59 additions & 89 deletions

File tree

src/chunked/arrow_converter.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,11 @@ use rayon::join;
6464
use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
6565
use std::sync::mpsc::{sync_channel, Receiver, RecvError, SyncSender};
6666
use std::thread;
67-
use std::thread::JoinHandle;
67+
use tokio::task::JoinHandle;
6868
use std::time::{Duration, Instant};
6969
use thread::spawn;
70+
use tokio::runtime;
71+
use tokio::runtime::Runtime;
7072
use crate::schema::FixedSchema;
7173
//use ordered_channel::Sender;
7274
//use crossbeam::channel::{Receiver, Sender};
@@ -237,18 +239,19 @@ impl<'a> Converter<'a> for Slice2Arrow<'a> {
237239
(bytes_in, bytes_out, parse_duration, builder_write_duration)
238240
}
239241

240-
fn setup(&mut self) -> (Sender<RecordBatch>, JoinHandle<Result<Stats>>) {
242+
fn setup(&mut self, rt: runtime::Runtime) -> (Sender<RecordBatch>, JoinHandle<Result<Stats>>) {
241243
let o = output_factory(
242244
self.target.clone(),
243245
self.fixed_schema.clone(),
244246
self.masterbuilders.schema_factory(),
245247
self.masterbuilders.outfile.clone(),
248+
rt
246249
);
247250
self.masterbuilders.sender = Some(o.0.clone());
248251
o
249252
}
250-
251-
fn shutdown(&mut self) {
253+
254+
fn shutdown(&mut self,jh:JoinHandle<Result<Stats>>) {
252255
// converter.shutdown();
253256
let schema = Schema::new(vec![Field::new(
254257
"id",
@@ -259,6 +262,8 @@ impl<'a> Converter<'a> for Slice2Arrow<'a> {
259262
let emptyrb = arrow::record_batch::RecordBatch::new_empty(Arc::new(schema));
260263
let c = self.consistent_counter.get();
261264
let _ = &self.masterbuilders.sender.clone().unwrap().send(c, emptyrb);
265+
266+
262267
}
263268
}
264269

src/chunked/mod.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ use ordered_channel::Sender;
3434
use std::fs;
3535
use std::path::PathBuf;
3636
use std::sync::mpsc::SyncSender;
37-
use std::thread::JoinHandle;
37+
use tokio::task::JoinHandle;
38+
use tokio::runtime::Runtime;
3839
use std::time::Duration;
3940

4041
use self::residual_slicer::SLICER_IN_CHUNK_SIZE;
@@ -162,8 +163,8 @@ pub(crate) trait Converter<'a> {
162163

163164
// fn process(& mut self, slices: Vec< &'a[u8]>) -> usize;
164165
fn process(&mut self, slices: Vec<&'a [u8]>) -> (usize, usize, Duration, Duration);
165-
fn setup(&mut self) -> (Sender<RecordBatch>, JoinHandle<Result<Stats>>);
166-
fn shutdown(&mut self);
166+
fn setup(&mut self, rt: Runtime) -> (Sender<RecordBatch>, JoinHandle<Result<Stats>>);
167+
fn shutdown(&mut self,jh:JoinHandle<Result<Stats>>);
167168
}
168169

169170
pub trait ColumnBuilder {
@@ -178,6 +179,6 @@ pub trait RecordBatchOutput {
178179
schema: SchemaRef,
179180
fixed_schema: FixedSchema,
180181
outfile: PathBuf,
181-
182+
rt: tokio::runtime::Runtime
182183
) -> (Sender<RecordBatch>, JoinHandle<Result<Stats>>);
183184
}

src/chunked/recordbatch_output.rs

Lines changed: 36 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
// File created: 2024-05-07
2525
// Last updated: 2024-05-25
2626
//
27-
27+
use tokio::runtime::Runtime;
2828
use std::env::VarError;
2929
use crate::chunked::trimmer::{trimmer_factory, ColumnTrimmer};
3030
use arrow::array::{ArrayRef, BooleanBuilder, Int32Builder, Int64Builder, StringBuilder};
@@ -56,14 +56,15 @@ use arrow_ipc::writer::IpcWriteOptions;
5656
use arrow_ipc::CompressionType;
5757
use atomic_counter::{AtomicCounter, ConsistentCounter};
5858
use crossbeam::atomic::AtomicConsume;
59-
use libc::bsearch;
59+
use libc::{bsearch, send};
6060
use ordered_channel::Sender;
6161
use parquet::errors::{ParquetError, Result};
6262
use parquet::file::metadata::FileMetaData;
6363
use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
6464
use std::sync::mpsc::{sync_channel, Receiver, RecvError, SyncSender};
6565
use std::thread;
66-
use std::thread::JoinHandle;
66+
//use std::thread::JoinHandle;
67+
use tokio::task::JoinHandle;
6768
use std::time::{Duration, Instant};
6869
use thread::spawn;
6970
use Compression::SNAPPY;
@@ -88,6 +89,7 @@ pub(crate) fn output_factory(
8889
fixed_schema: FixedSchema,
8990
schema: SchemaRef,
9091
_outfile: PathBuf,
92+
rt:tokio::runtime::Runtime
9193
) -> (Sender<RecordBatch>, JoinHandle<Result<Stats>>) {
9294
let mut pfo: Box<dyn RecordBatchOutput> =match target {
9395
Targets::Parquet => {
@@ -114,7 +116,7 @@ pub(crate) fn output_factory(
114116

115117
};
116118

117-
pfo.setup(schema,fixed_schema, _outfile)
119+
pfo.setup(schema,fixed_schema, _outfile,rt)
118120

119121
}
120122

@@ -158,23 +160,22 @@ impl DeltaOut {
158160

159161
todo!()
160162
}
161-
async fn myDelta(schema: SchemaRef,fixed_schema: FixedSchema, outfile: PathBuf) {
162-
let dout = Self::deltasetup(fixed_schema);
163+
async fn myDelta(schema: SchemaRef,fixed_schema: FixedSchema, outfile: PathBuf)->(Result<Stats>) {
164+
Self::deltasetup(fixed_schema).await.unwrap();
165+
todo!()
166+
163167
}
164168

165169
}
166170

167171

168172
impl RecordBatchOutput for DeltaOut {
169173

170-
fn setup(&mut self, schema: SchemaRef,fixed_schema: FixedSchema, outfile: PathBuf) -> (Sender<RecordBatch>, JoinHandle<Result<Stats>>) {
171-
let runtime = Builder::new_multi_thread()
172-
.worker_threads(1)
173-
.enable_all()
174-
.build()
175-
.unwrap();
174+
fn setup(&mut self, schema: SchemaRef,fixed_schema: FixedSchema, outfile: PathBuf,rt:tokio::runtime::Runtime) -> (Sender<RecordBatch>, JoinHandle<Result<Stats>>) {
176175

177-
runtime.spawn(Self::myDelta(schema,fixed_schema,outfile));
176+
let j:JoinHandle<Result<Stats>>=rt.spawn(Self::myDelta(schema,fixed_schema,outfile));
177+
178+
178179

179180
// let dout = Self::deltasetup(fixed_schema);
180181

@@ -185,7 +186,7 @@ pub(crate) struct IcebergOut {
185186
pub(crate) sender: Option<Sender<RecordBatch>>,
186187
}
187188
impl RecordBatchOutput for IcebergOut {
188-
fn setup(&mut self, schema: SchemaRef,fixed_schema: FixedSchema, outfile: PathBuf) -> (Sender<RecordBatch>, JoinHandle<Result<Stats>>) {
189+
fn setup(&mut self, schema: SchemaRef,fixed_schema: FixedSchema, outfile: PathBuf,rt: tokio::runtime::Runtime) -> (Sender<RecordBatch>, JoinHandle<Result<Stats>>) {
189190
todo!()
190191
}
191192
}
@@ -194,7 +195,7 @@ pub(crate) struct FlightOut {
194195
}
195196

196197
impl RecordBatchOutput for FlightOut {
197-
fn setup(&mut self, schema: SchemaRef,fixed_schema: FixedSchema, outfile: PathBuf) -> (Sender<RecordBatch>, JoinHandle<Result<Stats>>) {
198+
fn setup(&mut self, schema: SchemaRef,fixed_schema: FixedSchema, outfile: PathBuf,rt: tokio::runtime::Runtime) -> (Sender<RecordBatch>, JoinHandle<Result<Stats>>) {
198199
todo!()
199200
}
200201
}
@@ -203,8 +204,9 @@ impl RecordBatchOutput for FlightOut {
203204
pub(crate) struct ParquetFileOut {
204205
pub(crate) sender: Option<Sender<RecordBatch>>,
205206
}
206-
impl RecordBatchOutput for ParquetFileOut {
207-
fn setup(&mut self, schema: SchemaRef,fixed_schema: FixedSchema, outfile: PathBuf) -> (Sender<RecordBatch>, JoinHandle<Result<Stats>>) {
207+
impl ParquetFileOut {
208+
pub(crate) async fn myParquet(schema: SchemaRef,fixed_schema: FixedSchema, outfile: PathBuf,rt:tokio::runtime::Runtime)->(Result<Stats>) {
209+
// Self::deltasetup(fixed_schema).await.unwrap();
208210
let _out_file = fs::OpenOptions::new()
209211
.create(true)
210212
.append(true)
@@ -218,7 +220,8 @@ impl RecordBatchOutput for ParquetFileOut {
218220

219221
let (sender, mut receiver) = bounded::<RecordBatch>(100);
220222

221-
let t: JoinHandle<Result<Stats>> = thread::spawn(move || {
223+
self.sender = Some(sender.clone());
224+
222225
'outer: loop {
223226
let mut message = receiver.recv();
224227

@@ -245,70 +248,29 @@ impl RecordBatchOutput for ParquetFileOut {
245248
parse_duration: Default::default(),
246249
builder_write_duration: Default::default(),
247250
})
248-
});
249-
self.sender = Some(sender.clone());
250-
(sender, t)
251-
}
252-
}
253251

254-
pub struct IpcFileOut {
255-
pub(crate) sender: Option<Sender<RecordBatch>>,
252+
253+
}
256254
}
255+
impl RecordBatchOutput for ParquetFileOut {
256+
fn setup(&mut self, schema: SchemaRef,fixed_schema: FixedSchema, outfile: PathBuf,rt: tokio::runtime::Runtime) -> (Sender<RecordBatch>, JoinHandle<Result<Stats>>) {
257257

258+
let j:JoinHandle<Result<Stats>>=rt.spawn(Self::myParquet(schema,fixed_schema,outfile,rt));
258259

259-
impl RecordBatchOutput for IpcFileOut {
260-
fn setup(&mut self, schema: SchemaRef,fixed_schema: FixedSchema, outfile: PathBuf) -> (Sender<RecordBatch>, JoinHandle<Result<Stats>>) {
261-
let _out_file = fs::OpenOptions::new()
262-
.create(true)
263-
.append(true)
264-
.open(outfile)
265-
.expect("aaa");
260+
// let j:JoinHandle<Result<Stats>>=rt.spawn(Self::myDelta(schema,fixed_schema,outfile));
266261

267-
let p = IpcWriteOptions::try_with_compression(
268-
Default::default(),
269-
Some(CompressionType::LZ4_FRAME),
270-
);
262+
(self.sender.as_mut().cloned().unwrap(), j)
271263

272-
let mut writer = arrow_ipc::writer::FileWriter::try_new_with_options(
273-
_out_file,
274-
&schema,
275-
Default::default(),
276-
)
277-
.expect("TODO: panic message");
264+
}
265+
}
278266

279-
let props = WriterProperties::builder().set_compression(SNAPPY).build();
267+
pub struct IpcFileOut {
268+
pub(crate) sender: Option<Sender<RecordBatch>>,
269+
}
280270

281-
let (sender, mut receiver) = bounded::<RecordBatch>(1000);
282271

283-
let t: JoinHandle<Result<Stats>> = thread::spawn(move || {
284-
'outer: loop {
285-
let mut message = receiver.recv();
286-
287-
match message {
288-
Ok(rb) => {
289-
writer.write(&rb).expect("Error Writing batch");
290-
if (rb.num_rows() == 0) {
291-
break 'outer;
292-
}
293-
}
294-
Err(e) => {
295-
info!("got RecvError in channel , break to outer");
296-
break 'outer;
297-
}
298-
}
299-
}
300-
info!("closing the writer for parquet");
301-
writer.finish();
302-
Ok(Stats {
303-
bytes_in: 0,
304-
bytes_out: 0,
305-
num_rows: 0,
306-
read_duration: Default::default(),
307-
parse_duration: Default::default(),
308-
builder_write_duration: Default::default(),
309-
})
310-
});
311-
self.sender = Some(sender.clone());
312-
(sender, t)
272+
impl RecordBatchOutput for IpcFileOut {
273+
fn setup(&mut self, schema: SchemaRef,fixed_schema: FixedSchema, outfile: PathBuf,rt: tokio::runtime::Runtime) -> (Sender<RecordBatch>, JoinHandle<Result<Stats>>) {
274+
todo!()
313275
}
314276
}

src/chunked/residual_slicer.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ use std::io::{BufReader, Read};
3737
use std::sync::atomic::Ordering;
3838
use std::sync::Arc;
3939
use std::time::{Duration, Instant};
40+
use tokio::runtime::Runtime;
4041

4142
use super::{ChunkAndResidue, Converter, FnFindLastLineBreak, IterRevolver, Slicer, Stats};
4243

@@ -88,8 +89,8 @@ impl<'a> Slicer<'a> for ResidualSlicer<'a> {
8889
.stack_size(((SLICER_IN_CHUNK_SIZE as f32) * 2f32) as usize)
8990
.build_global()
9091
.unwrap();
91-
92-
let threaded_writer = converter.setup();
92+
let rt = Runtime::new().unwrap();
93+
let threaded_writer = converter.setup(rt);
9394

9495
loop {
9596
let cr = ir.next().unwrap();
@@ -140,8 +141,8 @@ impl<'a> Slicer<'a> for ResidualSlicer<'a> {
140141
}
141142
info!("about to shudown converter...");
142143

143-
converter.shutdown();
144-
threaded_writer.1.join();
144+
converter.shutdown(threaded_writer.1);
145+
// threaded_writer.1.await.expect("The task being joined has panicked");;
145146
info!("converter has been shutdown");
146147

147148
info!(

src/chunked/self_converter.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,9 @@ use parquet::format;
3737
use std::fs::File;
3838
use std::io::Write;
3939
use std::sync::mpsc::SyncSender;
40-
use std::thread::JoinHandle;
40+
use tokio::task::JoinHandle;
4141
use std::time::Duration;
42+
use tokio::runtime::Runtime;
4243

4344
use super::{Converter, FnFindLastLineBreak};
4445

@@ -71,11 +72,11 @@ impl<'a> Converter<'a> for SampleSliceAggregator<'a> {
7172
(bytes_processed, 0, duration, duration)
7273
}
7374

74-
fn setup(&mut self) -> (Sender<RecordBatch>, JoinHandle<Result<Stats>>) {
75+
fn setup(&mut self, rt: Runtime) -> (Sender<RecordBatch>, JoinHandle<Result<Stats>>) {
7576
todo!()
7677
}
7778

78-
fn shutdown(&mut self) {
79+
fn shutdown(&mut self,jh:JoinHandle<Result<Stats>>) {
7980
todo!()
8081
}
8182
}

0 commit comments

Comments
 (0)