Skip to content

Commit eb90031

Browse files
author
Rickard Lundin
committed
removed async
1 parent 7929b38 commit eb90031

1 file changed

Lines changed: 12 additions & 9 deletions

File tree

src/chunked/recordbatch_output.rs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,15 @@ use std::env::VarError;
2929
use crate::chunked::trimmer::{trimmer_factory, ColumnTrimmer};
3030
use arrow::array::{ArrayRef, BooleanBuilder, Int32Builder, Int64Builder, StringBuilder};
3131
use ordered_channel::bounded;
32-
use parquet::arrow::{ArrowWriter, AsyncArrowWriter};
32+
use parquet::arrow::ArrowWriter;
3333
use parquet::basic::Compression;
3434
use parquet::file::properties::WriterProperties;
3535
use parquet::format;
3636
use rayon::iter::IndexedParallelIterator;
3737
use rayon::prelude::*;
3838

39-
use tokio::fs::File;
39+
use std::fs;
40+
use std::fs::File;
4041
use std::path::{Path, PathBuf};
4142
use std::str::from_utf8_unchecked;
4243
use std::sync::Arc;
@@ -207,21 +208,23 @@ pub(crate) struct ParquetFileOut {
207208
}
208209
impl ParquetFileOut {
209210
pub(crate) async fn myParquet(mut receiver: Receiver<RecordBatch>, schema: SchemaRef,fixed_schema: FixedSchema, outfile: PathBuf)->(Result<Stats>) {
210-
let _out_file = File::options()
211+
let _out_file = fs::OpenOptions::new()
211212
.create(true)
212213
.append(true)
213-
.open(outfile).await.unwrap();
214-
214+
.open(outfile)
215+
.expect("aaa");
216+
215217
let props = WriterProperties::builder().set_compression(SNAPPY).build();
216-
let mut writer: AsyncArrowWriter<File> =
217-
AsyncArrowWriter::try_new(_out_file, schema, Some(props.clone())).unwrap();
218+
219+
let mut writer: ArrowWriter<File> =
220+
ArrowWriter::try_new(_out_file, schema, Some(props.clone())).unwrap();
218221

219222
'outer: loop {
220223
let mut message = receiver.recv();
221224

222225
match message {
223226
Ok(rb) => {
224-
writer.write(&rb).await.expect("Error Writing batch");
227+
writer.write(&rb).expect("Error Writing batch");
225228
if (rb.num_rows() == 0) {
226229
break 'outer;
227230
}
@@ -233,7 +236,7 @@ impl ParquetFileOut {
233236
}
234237
}
235238
info!("closing the writer for parquet");
236-
writer.close();
239+
writer.finish();
237240
Ok(Stats {
238241
bytes_in: 0,
239242
bytes_out: 0,

0 commit comments

Comments
 (0)