Skip to content

Commit c19ae12

Browse files
author
Rickard Lundin
committed
fmt
1 parent eb90031 commit c19ae12

7 files changed

Lines changed: 169 additions & 117 deletions

File tree

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "evolution"
3-
version = "1.0.0"
3+
version = "1.1.0"
44
edition = "2021"
55
description = "🦖 Evolve your fixed-length data files into Apache Parquet, fully parallelized!"
66
authors = [
@@ -49,7 +49,7 @@ deltalake-core = "0.17.3"
4949
padder = { version = "1.2.0", features = ["serde"] }
5050
ordered-channel="1.1.0"
5151
atomic-counter = "1.0.1"
52-
crossbeam-channel = "0.5.12"
52+
crossbeam-channel = "0.5.13"
5353
tracing = "0.1.40"
5454
tokio = "1.37.0"
5555
[dev-dependencies]

src/chunked/arrow_converter.rs

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,15 @@ use std::str::from_utf8_unchecked;
4545
use std::sync::Arc;
4646

4747
use super::{
48-
RecordBatchOutput, trimmer, ColumnBuilder, Converter, FnFindLastLineBreak, FnLineBreakLen,
48+
trimmer, ColumnBuilder, Converter, FnFindLastLineBreak, FnLineBreakLen, RecordBatchOutput,
4949
Stats,
5050
};
5151
use crate::chunked;
52-
use crate::chunked::recordbatch_output::{IpcFileOut, output_factory, ParquetFileOut};
52+
use crate::chunked::recordbatch_output::{output_factory, IpcFileOut, ParquetFileOut};
5353
pub use crate::cli::Targets;
5454
use crate::datatype::DataType;
5555
use crate::schema;
56+
use crate::schema::FixedSchema;
5657
use arrow::datatypes::{Field, Schema, SchemaRef};
5758
use atomic_counter::{AtomicCounter, ConsistentCounter};
5859
use crossbeam::atomic::AtomicConsume;
@@ -64,12 +65,11 @@ use rayon::join;
6465
use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
6566
use std::sync::mpsc::{sync_channel, Receiver, RecvError, SyncSender};
6667
use std::thread;
67-
use tokio::task::JoinHandle;
6868
use std::time::{Duration, Instant};
6969
use thread::spawn;
7070
use tokio::runtime;
7171
use tokio::runtime::Runtime;
72-
use crate::schema::FixedSchema;
72+
use tokio::task::JoinHandle;
7373
//use ordered_channel::Sender;
7474
//use crossbeam::channel::{Receiver, Sender};
7575

@@ -82,7 +82,7 @@ pub(crate) struct Slice2Arrow<'a> {
8282
pub(crate) masterbuilders: MasterBuilders,
8383
pub(crate) consistent_counter: ConsistentCounter,
8484
pub(crate) target: Targets,
85-
pub(crate) fixed_schema: FixedSchema
85+
pub(crate) fixed_schema: FixedSchema,
8686
}
8787

8888
pub(crate) struct MasterBuilders {
@@ -245,13 +245,13 @@ impl<'a> Converter<'a> for Slice2Arrow<'a> {
245245
self.fixed_schema.clone(),
246246
self.masterbuilders.schema_factory(),
247247
self.masterbuilders.outfile.clone(),
248-
rt
248+
rt,
249249
);
250250
self.masterbuilders.sender = Some(o.0.clone());
251251
o
252252
}
253-
254-
fn shutdown(&mut self,rt: &runtime::Runtime,jh:JoinHandle<Result<Stats>>) {
253+
254+
fn shutdown(&mut self, rt: &runtime::Runtime, jh: JoinHandle<Result<Stats>>) {
255255
let schema = Schema::new(vec![Field::new(
256256
"id",
257257
arrow::datatypes::DataType::Int32,
@@ -261,8 +261,7 @@ impl<'a> Converter<'a> for Slice2Arrow<'a> {
261261
let emptyrb = arrow::record_batch::RecordBatch::new_empty(Arc::new(schema));
262262
let c = self.consistent_counter.get();
263263
let _ = &self.masterbuilders.sender.clone().unwrap().send(c, emptyrb);
264-
rt.spawn_blocking(|| async {jh.await.unwrap() });
265-
264+
rt.spawn_blocking(|| async { jh.await.unwrap() });
266265
}
267266
}
268267

src/chunked/mod.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,19 +34,19 @@ use ordered_channel::Sender;
3434
use std::fs;
3535
use std::path::PathBuf;
3636
use std::sync::mpsc::SyncSender;
37-
use tokio::task::JoinHandle;
38-
use tokio::runtime::Runtime;
3937
use std::time::Duration;
38+
use tokio::runtime::Runtime;
39+
use tokio::task::JoinHandle;
4040

4141
use self::residual_slicer::SLICER_IN_CHUNK_SIZE;
42+
use crate::schema::FixedSchema;
4243
use parquet::errors::{ParquetError, Result};
4344
use tokio::runtime;
44-
use crate::schema::FixedSchema;
4545

4646
pub(crate) mod arrow_converter;
47+
pub(crate) mod recordbatch_output;
4748
pub(crate) mod residual_slicer;
4849
pub(crate) mod self_converter;
49-
pub(crate) mod recordbatch_output;
5050
mod trimmer;
5151

5252
pub(crate) struct ChunkAndResidue {
@@ -64,7 +64,7 @@ pub(crate) struct Stats {
6464
pub(crate) bytes_in: usize,
6565
pub(crate) bytes_out: usize,
6666
pub(crate) num_rows: i64,
67-
67+
pub(crate) adds: i64,
6868
pub(crate) read_duration: Duration,
6969
pub(crate) parse_duration: Duration,
7070
pub(crate) builder_write_duration: Duration,
@@ -165,7 +165,7 @@ pub(crate) trait Converter<'a> {
165165
// fn process(& mut self, slices: Vec< &'a[u8]>) -> usize;
166166
fn process(&mut self, slices: Vec<&'a [u8]>) -> (usize, usize, Duration, Duration);
167167
fn setup(&mut self, rt: &Runtime) -> (Sender<RecordBatch>, JoinHandle<Result<Stats>>);
168-
fn shutdown(&mut self,rt: &Runtime,jh:JoinHandle<Result<Stats>>);
168+
fn shutdown(&mut self, rt: &Runtime, jh: JoinHandle<Result<Stats>>);
169169
}
170170

171171
pub trait ColumnBuilder {
@@ -180,6 +180,6 @@ pub trait RecordBatchOutput {
180180
schema: SchemaRef,
181181
fixed_schema: FixedSchema,
182182
outfile: PathBuf,
183-
rt: &Runtime
183+
rt: &Runtime,
184184
) -> (Sender<RecordBatch>, JoinHandle<Result<Stats>>);
185185
}

0 commit comments

Comments
 (0)