Skip to content

Commit 75159a0

Browse files
author
Rickard Lundin
committed
clippy
1 parent abcbfdb commit 75159a0

2 files changed

Lines changed: 38 additions & 27 deletions

File tree

src/chunked/recordbatch_output.rs

Lines changed: 36 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -76,11 +76,11 @@ use deltalake::kernel::{PrimitiveType, StructField, StructType};
7676
use deltalake::parquet::basic::ZstdLevel;
7777
use deltalake::writer::{DeltaWriter, RecordBatchWriter};
7878
use deltalake::*;
79+
use deltalake_core::writer::WriteMode;
7980
use std::time::{Duration, Instant};
8081
use thread::spawn;
8182
use tokio::task::JoinHandle;
8283
use Compression::SNAPPY;
83-
use deltalake_core::writer::WriteMode;
8484
//use deltalake::writer::test_utils::create_initialized_table;
8585
use tokio::runtime::Builder;
8686
use tokio::time::sleep;
@@ -116,23 +116,25 @@ pub(crate) struct DeltaOut {
116116

117117
impl DeltaOut {
118118
async fn deltasetup(schema: FixedSchema, out: PathBuf) -> Result<DeltaTable, DeltaTableError> {
119-
120119
info!("Using the location of: {:?}", out);
121120

122-
let table_path =out.to_str().unwrap();
121+
let table_path = out.to_str().unwrap();
123122

124123
let maybe_table = deltalake::open_table(&table_path).await;
125-
// let hardoded_partition_cols = vec!["id".to_owned()];
124+
// let hardoded_partition_cols = vec!["id".to_owned()];
126125
let mut table = match maybe_table {
127126
Ok(table) => table,
128127
Err(DeltaTableError::NotATable(_)) => {
129-
let fxschema_to_delta =schema.into_delta_columns();
130-
info!("It doesn't look like our delta table has been created\n {:?}",fxschema_to_delta);
131-
DeltaOps:: try_from_uri(table_path)
128+
let fxschema_to_delta = schema.into_delta_columns();
129+
info!(
130+
"It doesn't look like our delta table has been created\n {:?}",
131+
fxschema_to_delta
132+
);
133+
DeltaOps::try_from_uri(table_path)
132134
.await
133135
.unwrap()
134136
.create()
135-
// with_partition_columns(hardoded_partition_cols)
137+
// with_partition_columns(hardoded_partition_cols)
136138
.with_columns(fxschema_to_delta)
137139
.await
138140
.unwrap()
@@ -149,12 +151,16 @@ impl DeltaOut {
149151
outfile: PathBuf,
150152
) -> (Result<Stats>) {
151153
let mut table = Self::deltasetup(fixed_schema, outfile).await.unwrap();
152-
// TODO @TEDDY AND @WILLE THESE MAX SETTINGS DO NOTHING ?
153-
let writer_properties = WriterProperties::builder().set_max_row_group_size(300).set_write_batch_size(300).set_writer_version(WriterVersion::PARQUET_2_0)
154+
// TODO @TEDDY AND @WILLE THESE MAX SETTINGS DO NOTHING ?
155+
let writer_properties = WriterProperties::builder()
156+
.set_max_row_group_size(300)
157+
.set_write_batch_size(300)
158+
.set_writer_version(WriterVersion::PARQUET_2_0)
154159
.set_compression(Compression::ZSTD(ZstdLevel::try_new(3).unwrap()))
155160
.build();
156-
// table.metadata().unwrap().partition_columns
157-
let mut writer = RecordBatchWriter::for_table(&table).expect("Failed to make RecordBatchWriter")
161+
// table.metadata().unwrap().partition_columns
162+
let mut writer = RecordBatchWriter::for_table(&table)
163+
.expect("Failed to make RecordBatchWriter")
158164
.with_writer_properties(writer_properties);
159165

160166
let metadata = table
@@ -164,7 +170,7 @@ impl DeltaOut {
164170
&metadata.schema().expect("failed to get schema"),
165171
)
166172
.expect("Failed to convert to arrow schema");
167-
let mut adds_tot:usize=0;
173+
let mut adds_tot: usize = 0;
168174

169175
'outer: loop {
170176
let mut message = receiver.recv();
@@ -174,35 +180,40 @@ impl DeltaOut {
174180
if (rb.num_rows() == 0) {
175181
break 'outer;
176182
}
177-
let nrb=rb.with_schema(Arc::new(arrow_schema.clone())).unwrap();
183+
let nrb = rb.with_schema(Arc::new(arrow_schema.clone())).unwrap();
178184
// let nrb=rb;
179-
info!("nrb rows={} nrb col len {} batchcount={} usize={}",nrb.num_rows(),nrb.columns().len(),writer.buffered_record_batch_count(),writer.buffer_len());
185+
info!(
186+
"nrb rows={} nrb col len {} batchcount={} usize={}",
187+
nrb.num_rows(),
188+
nrb.columns().len(),
189+
writer.buffered_record_batch_count(),
190+
writer.buffer_len()
191+
);
180192
writer.write(nrb).await.expect("writing");
181-
if (writer.buffered_record_batch_count()>200) {
182-
writer
183-
.flush()
184-
.await
185-
.expect("Failed to flush write");
193+
if (writer.buffered_record_batch_count() > 200) {
194+
writer.flush().await.expect("Failed to flush write");
186195
}
187-
// writer.write_with_mode(nrb,WriteMode::MergeSchema);
188-
196+
// writer.write_with_mode(nrb,WriteMode::MergeSchema);
189197
}
190198
Err(e) => {
191199
info!("got RecvError in channel , break to outer");
192200
break 'outer;
193201
}
194202
}
195203
}
196-
info!("closing the writer for parquet {}",writer.buffered_record_batch_count());
204+
info!(
205+
"closing the writer for parquet {}",
206+
writer.buffered_record_batch_count()
207+
);
197208

198209
let adds = writer
199210
.flush_and_commit(&mut table)
200211
.await
201212
.expect("Failed to flush write");
202-
adds_tot+=adds as usize;
213+
adds_tot += adds as usize;
203214

204215
info!("{} adds written", adds_tot);
205-
216+
206217
Ok(Stats {
207218
bytes_in: 0,
208219
bytes_out: 0,

src/schema.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,11 @@ use padder::{Alignment, Symbol};
3131
use rand::rngs::ThreadRng;
3232
use serde::{Deserialize, Serialize};
3333

34+
use log::info;
3435
use std::fs;
3536
use std::io;
3637
use std::path::PathBuf;
3738
use std::sync::Arc;
38-
use log::info;
3939

4040
use crate::builder::{
4141
BooleanColumnBuilder, ColumnBuilder, Float16ColumnBuilder, Float32ColumnBuilder,
@@ -312,7 +312,7 @@ impl FixedSchema {
312312
)
313313
})
314314
.collect();
315-
info!("as_arrow_schema= {:?}",fields);
315+
info!("as_arrow_schema= {:?}", fields);
316316
Schema::new(fields)
317317
}
318318

0 commit comments

Comments
 (0)