Skip to content

Commit 55aded0

Browse files
author
Rickard Lundin
committed
compiles and everthing is Tokio
1 parent 35c3de6 commit 55aded0

5 files changed

Lines changed: 22 additions & 21 deletions

File tree

src/chunked/arrow_converter.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ impl<'a> Converter<'a> for Slice2Arrow<'a> {
239239
(bytes_in, bytes_out, parse_duration, builder_write_duration)
240240
}
241241

242-
fn setup(&mut self, rt: runtime::Runtime) -> (Sender<RecordBatch>, JoinHandle<Result<Stats>>) {
242+
fn setup(&mut self, rt: &runtime::Runtime) -> (Sender<RecordBatch>, JoinHandle<Result<Stats>>) {
243243
let o = output_factory(
244244
self.target.clone(),
245245
self.fixed_schema.clone(),
@@ -251,7 +251,7 @@ impl<'a> Converter<'a> for Slice2Arrow<'a> {
251251
o
252252
}
253253

254-
fn shutdown(&mut self,jh:JoinHandle<Result<Stats>>) {
254+
fn shutdown(&mut self,rt: &runtime::Runtime,jh:JoinHandle<Result<Stats>>) {
255255
// converter.shutdown();
256256
let schema = Schema::new(vec![Field::new(
257257
"id",
@@ -262,7 +262,8 @@ impl<'a> Converter<'a> for Slice2Arrow<'a> {
262262
let emptyrb = arrow::record_batch::RecordBatch::new_empty(Arc::new(schema));
263263
let c = self.consistent_counter.get();
264264
let _ = &self.masterbuilders.sender.clone().unwrap().send(c, emptyrb);
265-
265+
rt.spawn_blocking(|| async { });
266+
266267
// jh.spawn_blocking(async move {threaded_writer.1.await;});
267268
// threaded_writer.1.await.expect("The task being joined has panicked");;
268269

src/chunked/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ use std::time::Duration;
4040

4141
use self::residual_slicer::SLICER_IN_CHUNK_SIZE;
4242
use parquet::errors::{ParquetError, Result};
43+
use tokio::runtime;
4344
use crate::schema::FixedSchema;
4445

4546
pub(crate) mod arrow_converter;
@@ -163,8 +164,8 @@ pub(crate) trait Converter<'a> {
163164

164165
// fn process(& mut self, slices: Vec< &'a[u8]>) -> usize;
165166
fn process(&mut self, slices: Vec<&'a [u8]>) -> (usize, usize, Duration, Duration);
166-
fn setup(&mut self, rt: Runtime) -> (Sender<RecordBatch>, JoinHandle<Result<Stats>>);
167-
fn shutdown(&mut self,jh:JoinHandle<Result<Stats>>);
167+
fn setup(&mut self, rt: &Runtime) -> (Sender<RecordBatch>, JoinHandle<Result<Stats>>);
168+
fn shutdown(&mut self,rt: &Runtime,jh:JoinHandle<Result<Stats>>);
168169
}
169170

170171
pub trait ColumnBuilder {
@@ -179,6 +180,6 @@ pub trait RecordBatchOutput {
179180
schema: SchemaRef,
180181
fixed_schema: FixedSchema,
181182
outfile: PathBuf,
182-
rt: tokio::runtime::Runtime
183+
rt: &Runtime
183184
) -> (Sender<RecordBatch>, JoinHandle<Result<Stats>>);
184185
}

src/chunked/recordbatch_output.rs

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ pub(crate) fn output_factory(
9191
fixed_schema: FixedSchema,
9292
schema: SchemaRef,
9393
_outfile: PathBuf,
94-
rt:tokio::runtime::Runtime
94+
rt:& tokio::runtime::Runtime
9595
) -> (Sender<RecordBatch>, JoinHandle<Result<Stats>>) {
9696
let mut pfo: Box<dyn RecordBatchOutput> =match target {
9797
Targets::Parquet => {
@@ -173,7 +173,7 @@ impl DeltaOut {
173173

174174
impl RecordBatchOutput for DeltaOut {
175175

176-
fn setup(&mut self, schema: SchemaRef,fixed_schema: FixedSchema, outfile: PathBuf,rt:tokio::runtime::Runtime) -> (Sender<RecordBatch>, JoinHandle<Result<Stats>>) {
176+
fn setup(&mut self, schema: SchemaRef,fixed_schema: FixedSchema, outfile: PathBuf,rt:&Runtime) -> (Sender<RecordBatch>, JoinHandle<Result<Stats>>) {
177177

178178
let j:JoinHandle<Result<Stats>>=rt.spawn(Self::myDelta(schema,fixed_schema,outfile));
179179

@@ -188,7 +188,7 @@ pub(crate) struct IcebergOut {
188188
pub(crate) sender: Option<Sender<RecordBatch>>,
189189
}
190190
impl RecordBatchOutput for IcebergOut {
191-
fn setup(&mut self, schema: SchemaRef,fixed_schema: FixedSchema, outfile: PathBuf,rt: tokio::runtime::Runtime) -> (Sender<RecordBatch>, JoinHandle<Result<Stats>>) {
191+
fn setup(&mut self, schema: SchemaRef,fixed_schema: FixedSchema, outfile: PathBuf,rt: &Runtime) -> (Sender<RecordBatch>, JoinHandle<Result<Stats>>) {
192192
todo!()
193193
}
194194
}
@@ -197,7 +197,7 @@ pub(crate) struct FlightOut {
197197
}
198198

199199
impl RecordBatchOutput for FlightOut {
200-
fn setup(&mut self, schema: SchemaRef,fixed_schema: FixedSchema, outfile: PathBuf,rt: tokio::runtime::Runtime) -> (Sender<RecordBatch>, JoinHandle<Result<Stats>>) {
200+
fn setup(&mut self, schema: SchemaRef,fixed_schema: FixedSchema, outfile: PathBuf,rt: &Runtime) -> (Sender<RecordBatch>, JoinHandle<Result<Stats>>) {
201201
todo!()
202202
}
203203
}
@@ -207,7 +207,7 @@ pub(crate) struct ParquetFileOut {
207207
pub(crate) sender: Option<Sender<RecordBatch>>,
208208
}
209209
impl ParquetFileOut {
210-
pub(crate) async fn myParquet(sender:Sender<RecordBatch>,mut receiver: Receiver<RecordBatch>, schema: SchemaRef,fixed_schema: FixedSchema, outfile: PathBuf)->(Result<Stats>) {
210+
pub(crate) async fn myParquet(mut receiver: Receiver<RecordBatch>, schema: SchemaRef,fixed_schema: FixedSchema, outfile: PathBuf)->(Result<Stats>) {
211211
// Self::deltasetup(fixed_schema).await.unwrap();
212212
let _out_file = fs::OpenOptions::new()
213213
.create(true)
@@ -219,7 +219,7 @@ impl ParquetFileOut {
219219

220220
let mut writer: ArrowWriter<File> =
221221
ArrowWriter::try_new(_out_file, schema, Some(props.clone())).unwrap();
222-
222+
223223
'outer: loop {
224224
let mut message = receiver.recv();
225225

@@ -251,14 +251,12 @@ impl ParquetFileOut {
251251
}
252252
}
253253
impl RecordBatchOutput for ParquetFileOut {
254-
fn setup(&mut self, schema: SchemaRef,fixed_schema: FixedSchema, outfile: PathBuf,rt: tokio::runtime::Runtime) -> (Sender<RecordBatch>, JoinHandle<Result<Stats>>) {
254+
fn setup(&mut self, schema: SchemaRef,fixed_schema: FixedSchema, outfile: PathBuf,rt: &Runtime) -> (Sender<RecordBatch>, JoinHandle<Result<Stats>>) {
255255
let (sender, mut receiver) = bounded::<RecordBatch>(100);
256256

257257
self.sender = Some(sender.clone());
258258

259-
let j:JoinHandle<Result<Stats>>=rt.spawn(Self::myParquet(sender,receiver,schema,fixed_schema,outfile));
260-
261-
// let j:JoinHandle<Result<Stats>>=rt.spawn(Self::myDelta(schema,fixed_schema,outfile));
259+
let j:JoinHandle<Result<Stats>>=rt.spawn(Self::myParquet(receiver,schema,fixed_schema,outfile));
262260

263261
(self.sender.as_mut().cloned().unwrap(), j)
264262

@@ -271,7 +269,7 @@ pub struct IpcFileOut {
271269

272270

273271
impl RecordBatchOutput for IpcFileOut {
274-
fn setup(&mut self, schema: SchemaRef,fixed_schema: FixedSchema, outfile: PathBuf,rt: tokio::runtime::Runtime) -> (Sender<RecordBatch>, JoinHandle<Result<Stats>>) {
272+
fn setup(&mut self, schema: SchemaRef,fixed_schema: FixedSchema, outfile: PathBuf,rt: &Runtime) -> (Sender<RecordBatch>, JoinHandle<Result<Stats>>) {
275273
todo!()
276274
}
277275
}

src/chunked/residual_slicer.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ impl<'a> Slicer<'a> for ResidualSlicer<'a> {
9090
.build_global()
9191
.unwrap();
9292
let rt = Runtime::new().unwrap();
93-
let threaded_writer = converter.setup(rt);
93+
let threaded_writer = converter.setup(&rt);
9494

9595
loop {
9696
let cr = ir.next().unwrap();
@@ -141,7 +141,7 @@ impl<'a> Slicer<'a> for ResidualSlicer<'a> {
141141
}
142142
info!("about to shudown converter...");
143143

144-
converter.shutdown(threaded_writer.1);
144+
converter.shutdown(&rt,threaded_writer.1);
145145
// let hhh=threaded_writer.1;
146146
// rt.spawn(async {
147147
// hhh.await.unwrap();

src/chunked/self_converter.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ use std::io::Write;
3939
use std::sync::mpsc::SyncSender;
4040
use tokio::task::JoinHandle;
4141
use std::time::Duration;
42+
use tokio::runtime;
4243
use tokio::runtime::Runtime;
4344

4445
use super::{Converter, FnFindLastLineBreak};
@@ -72,11 +73,11 @@ impl<'a> Converter<'a> for SampleSliceAggregator<'a> {
7273
(bytes_processed, 0, duration, duration)
7374
}
7475

75-
fn setup(&mut self, rt: Runtime) -> (Sender<RecordBatch>, JoinHandle<Result<Stats>>) {
76+
fn setup(&mut self, rt: &Runtime) -> (Sender<RecordBatch>, JoinHandle<Result<Stats>>) {
7677
todo!()
7778
}
7879

79-
fn shutdown(&mut self,jh:JoinHandle<Result<Stats>>) {
80+
fn shutdown(&mut self,rt: &runtime::Runtime,jh:JoinHandle<Result<Stats>>) {
8081
todo!()
8182
}
8283
}

0 commit comments

Comments
 (0)