Skip to content

Commit 0380ea8

Browse files
author
Rickard Lundin
committed
added datafusion
1 parent cd070e1 commit 0380ea8

4 files changed

Lines changed: 21 additions & 10 deletions

File tree

Cargo.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,11 @@ libc = "0.2.154"
4444
arrow = "51.0.0"
4545
parquet = "51.0.0"
4646
arrow-ipc = "51.0.0"
47-
deltalake = "0.17.3"
47+
deltalake = { version="0.17.3", features = ["datafusion"]}
48+
datafusion = "38.0.0"
49+
4850
deltalake-core = "0.17.3"
51+
4952
padder = { version = "1.2.0", features = ["serde"] }
5053
ordered-channel="1.1.0"
5154
atomic-counter = "1.0.1"

src/chunked/recordbatch_output.rs

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use arrow::array::{ArrayRef, BooleanBuilder, Int32Builder, Int64Builder, StringB
2929
use ordered_channel::bounded;
3030
use parquet::arrow::ArrowWriter;
3131
use parquet::basic::Compression;
32-
use parquet::file::properties::WriterProperties;
32+
use parquet::file::properties::{WriterProperties, WriterVersion};
3333
use parquet::format;
3434
use rayon::iter::IndexedParallelIterator;
3535
use rayon::prelude::*;
@@ -78,6 +78,7 @@ use std::time::{Duration, Instant};
7878
use thread::spawn;
7979
use tokio::task::JoinHandle;
8080
use Compression::SNAPPY;
81+
use deltalake_core::writer::WriteMode;
8182
//use deltalake::writer::test_utils::create_initialized_table;
8283
use tokio::runtime::Builder;
8384
use tokio::time::sleep;
@@ -122,12 +123,13 @@ impl DeltaOut {
122123
let mut table = match maybe_table {
123124
Ok(table) => table,
124125
Err(DeltaTableError::NotATable(_)) => {
125-
info!("It doesn't look like our delta table has been created");
126+
let fxschema_to_delta =schema.into_delta_columns();
127+
info!("It doesn't look like our delta table has been created\n {:?}",fxschema_to_delta);
126128
DeltaOps::try_from_uri(table_path)
127129
.await
128130
.unwrap()
129131
.create()
130-
.with_columns(schema.into_delta_columns())
132+
.with_columns(fxschema_to_delta)
131133
.await
132134
.unwrap()
133135
}
@@ -144,32 +146,38 @@ impl DeltaOut {
144146
) -> (Result<Stats>) {
145147
let mut table = Self::deltasetup(fixed_schema, outfile).await.unwrap();
146148

147-
let writer_properties = WriterProperties::builder()
149+
let writer_properties = WriterProperties::builder().set_writer_version(WriterVersion::PARQUET_2_0)
148150
.set_compression(Compression::ZSTD(ZstdLevel::try_new(3).unwrap()))
149151
.build();
150152

153+
151154
let mut writer = RecordBatchWriter::for_table(&table)
152155
.expect("Failed to make RecordBatchWriter")
153156
.with_writer_properties(writer_properties);
154157

158+
159+
155160
let metadata = table
156161
.metadata()
157162
.expect("Failed to get metadata for the table");
158163
let arrow_schema = <deltalake::arrow::datatypes::Schema as TryFrom<&StructType>>::try_from(
159164
&metadata.schema().expect("failed to get schema"),
160165
)
161166
.expect("Failed to convert to arrow schema");
162-
let arrow_schema_ref = Arc::new(arrow_schema);
163167

164168
'outer: loop {
165169
let mut message = receiver.recv();
166170

167171
match message {
168172
Ok(rb) => {
169-
if (rb.num_rows() == 1) {
173+
if (rb.num_rows() == 0) {
170174
break 'outer;
171175
}
172-
writer.write(rb).await.expect("Error Writing batch");
176+
let nrb=rb.with_schema(Arc::new(arrow_schema.clone())).unwrap();
177+
info!("nrb rows={} ",nrb.num_rows());
178+
// writer.write(nrb).await.expect("Error Writing batch");
179+
writer.write_with_mode(nrb,WriteMode::MergeSchema);
180+
173181
}
174182
Err(e) => {
175183
info!("got RecvError in channel , break to outer");

src/cli.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,6 @@ impl Cli {
295295
schema.to_path_buf(),
296296
n_threads as i16,
297297
);
298-
let sc = master_builders.schema_factory();
299298

300299
let s2a: Box<Slice2Arrow> = Box::new(Slice2Arrow {
301300
fn_line_break: find_last_nl,

src/schema.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use std::fs;
3535
use std::io;
3636
use std::path::PathBuf;
3737
use std::sync::Arc;
38+
use log::info;
3839

3940
use crate::builder::{
4041
BooleanColumnBuilder, ColumnBuilder, Float16ColumnBuilder, Float32ColumnBuilder,
@@ -311,7 +312,7 @@ impl FixedSchema {
311312
)
312313
})
313314
.collect();
314-
315+
info!("as_arrow_schema= {:?}",fields);
315316
Schema::new(fields)
316317
}
317318

0 commit comments

Comments
 (0)