Skip to content

Commit 7929b38

Browse files
author
Rickard Lundin
committed
async writer working but no speed difference
1 parent 55aded0 commit 7929b38

2 files changed

Lines changed: 10 additions & 19 deletions

File tree

src/chunked/arrow_converter.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,6 @@ impl<'a> Converter<'a> for Slice2Arrow<'a> {
252252
}
253253

254254
fn shutdown(&mut self,rt: &runtime::Runtime,jh:JoinHandle<Result<Stats>>) {
255-
// converter.shutdown();
256255
let schema = Schema::new(vec![Field::new(
257256
"id",
258257
arrow::datatypes::DataType::Int32,
@@ -262,12 +261,8 @@ impl<'a> Converter<'a> for Slice2Arrow<'a> {
262261
let emptyrb = arrow::record_batch::RecordBatch::new_empty(Arc::new(schema));
263262
let c = self.consistent_counter.get();
264263
let _ = &self.masterbuilders.sender.clone().unwrap().send(c, emptyrb);
265-
rt.spawn_blocking(|| async { });
264+
rt.spawn_blocking(|| async {jh.await.unwrap() });
266265

267-
// jh.spawn_blocking(async move {threaded_writer.1.await;});
268-
// threaded_writer.1.await.expect("The task being joined has panicked");;
269-
270-
271266
}
272267
}
273268

src/chunked/recordbatch_output.rs

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,14 @@ use std::env::VarError;
2929
use crate::chunked::trimmer::{trimmer_factory, ColumnTrimmer};
3030
use arrow::array::{ArrayRef, BooleanBuilder, Int32Builder, Int64Builder, StringBuilder};
3131
use ordered_channel::bounded;
32-
use parquet::arrow::ArrowWriter;
32+
use parquet::arrow::{ArrowWriter, AsyncArrowWriter};
3333
use parquet::basic::Compression;
3434
use parquet::file::properties::WriterProperties;
3535
use parquet::format;
3636
use rayon::iter::IndexedParallelIterator;
3737
use rayon::prelude::*;
3838

39-
use std::fs;
40-
use std::fs::File;
39+
use tokio::fs::File;
4140
use std::path::{Path, PathBuf};
4241
use std::str::from_utf8_unchecked;
4342
use std::sync::Arc;
@@ -208,24 +207,21 @@ pub(crate) struct ParquetFileOut {
208207
}
209208
impl ParquetFileOut {
210209
pub(crate) async fn myParquet(mut receiver: Receiver<RecordBatch>, schema: SchemaRef,fixed_schema: FixedSchema, outfile: PathBuf)->(Result<Stats>) {
211-
// Self::deltasetup(fixed_schema).await.unwrap();
212-
let _out_file = fs::OpenOptions::new()
210+
let _out_file = File::options()
213211
.create(true)
214212
.append(true)
215-
.open(outfile)
216-
.expect("aaa");
217-
213+
.open(outfile).await.unwrap();
214+
218215
let props = WriterProperties::builder().set_compression(SNAPPY).build();
219-
220-
let mut writer: ArrowWriter<File> =
221-
ArrowWriter::try_new(_out_file, schema, Some(props.clone())).unwrap();
216+
let mut writer: AsyncArrowWriter<File> =
217+
AsyncArrowWriter::try_new(_out_file, schema, Some(props.clone())).unwrap();
222218

223219
'outer: loop {
224220
let mut message = receiver.recv();
225221

226222
match message {
227223
Ok(rb) => {
228-
writer.write(&rb).expect("Error Writing batch");
224+
writer.write(&rb).await.expect("Error Writing batch");
229225
if (rb.num_rows() == 0) {
230226
break 'outer;
231227
}
@@ -237,7 +233,7 @@ impl ParquetFileOut {
237233
}
238234
}
239235
info!("closing the writer for parquet");
240-
writer.finish();
236+
writer.close();
241237
Ok(Stats {
242238
bytes_in: 0,
243239
bytes_out: 0,

0 commit comments

Comments
 (0)