Skip to content

Commit abcbfdb

Browse files
author
Rickard Lundin
committed
works but extremly slow
1 parent c2747a3 commit abcbfdb

1 file changed

Lines changed: 16 additions & 11 deletions

File tree

src/chunked/recordbatch_output.rs

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ use ordered_channel::Sender;
6464
use parquet::errors::{ParquetError, Result};
6565
use parquet::file::metadata::FileMetaData;
6666
use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
67+
use std::sync::mpsc::RecvError;
6768
//use std::sync::mpsc::{sync_channel, Receiver, RecvError, SyncSender};
6869
use std::thread;
6970
//use std::thread::JoinHandle;
@@ -121,15 +122,17 @@ impl DeltaOut {
121122
let table_path =out.to_str().unwrap();
122123

123124
let maybe_table = deltalake::open_table(&table_path).await;
125+
// let hardoded_partition_cols = vec!["id".to_owned()];
124126
let mut table = match maybe_table {
125127
Ok(table) => table,
126128
Err(DeltaTableError::NotATable(_)) => {
127129
let fxschema_to_delta =schema.into_delta_columns();
128130
info!("It doesn't look like our delta table has been created\n {:?}",fxschema_to_delta);
129-
DeltaOps::try_from_uri(table_path)
131+
DeltaOps:: try_from_uri(table_path)
130132
.await
131133
.unwrap()
132134
.create()
135+
// with_partition_columns(hardoded_partition_cols)
133136
.with_columns(fxschema_to_delta)
134137
.await
135138
.unwrap()
@@ -146,12 +149,11 @@ impl DeltaOut {
146149
outfile: PathBuf,
147150
) -> (Result<Stats>) {
148151
let mut table = Self::deltasetup(fixed_schema, outfile).await.unwrap();
149-
150-
let writer_properties = WriterProperties::builder().set_writer_version(WriterVersion::PARQUET_2_0)
152+
// TODO @TEDDY AND @WILLE THESE MAX SETTINGS DO NOTHING ?
153+
let writer_properties = WriterProperties::builder().set_max_row_group_size(300).set_write_batch_size(300).set_writer_version(WriterVersion::PARQUET_2_0)
151154
.set_compression(Compression::ZSTD(ZstdLevel::try_new(3).unwrap()))
152155
.build();
153-
154-
156+
// table.metadata().unwrap().partition_columns
155157
let mut writer = RecordBatchWriter::for_table(&table).expect("Failed to make RecordBatchWriter")
156158
.with_writer_properties(writer_properties);
157159

@@ -162,6 +164,7 @@ impl DeltaOut {
162164
&metadata.schema().expect("failed to get schema"),
163165
)
164166
.expect("Failed to convert to arrow schema");
167+
let mut adds_tot:usize=0;
165168

166169
'outer: loop {
167170
let mut message = receiver.recv();
@@ -172,14 +175,14 @@ impl DeltaOut {
172175
break 'outer;
173176
}
174177
let nrb=rb.with_schema(Arc::new(arrow_schema.clone())).unwrap();
175-
// let nrb=rb;
178+
// let nrb=rb;
176179
info!("nrb rows={} nrb col len {} batchcount={} usize={}",nrb.num_rows(),nrb.columns().len(),writer.buffered_record_batch_count(),writer.buffer_len());
177180
writer.write(nrb).await.expect("writing");
178181
if (writer.buffered_record_batch_count()>200) {
179-
writer
180-
.flush()
181-
.await
182-
.expect("Failed to flush write");
182+
writer
183+
.flush()
184+
.await
185+
.expect("Failed to flush write");
183186
}
184187
// writer.write_with_mode(nrb,WriteMode::MergeSchema);
185188

@@ -196,8 +199,10 @@ impl DeltaOut {
196199
.flush_and_commit(&mut table)
197200
.await
198201
.expect("Failed to flush write");
199-
info!("{} adds written", adds);
202+
adds_tot+=adds as usize;
200203

204+
info!("{} adds written", adds_tot);
205+
201206
Ok(Stats {
202207
bytes_in: 0,
203208
bytes_out: 0,

0 commit comments

Comments
 (0)