Skip to content

Commit 04c5e30

Browse files
authored
Merge pull request #46 from firelink-data/feature/lake_outputs
Feature/lake outputs
2 parents 7ae2fce + 24424d8 commit 04c5e30

10 files changed

Lines changed: 452 additions & 240 deletions

Cargo.toml

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "evolution"
3-
version = "1.0.0"
3+
version = "1.1.0"
44
edition = "2021"
55
description = "🦖 Evolve your fixed-length data files into Apache Parquet, fully parallelized!"
66
authors = [
@@ -44,11 +44,18 @@ libc = "0.2.154"
4444
arrow = "51.0.0"
4545
parquet = "51.0.0"
4646
arrow-ipc = "51.0.0"
47+
deltalake = { version="0.17.3", features = ["datafusion"]}
48+
datafusion = "38.0.0"
49+
50+
deltalake-core = "0.17.3"
4751

4852
padder = { version = "1.2.0", features = ["serde"] }
4953
ordered-channel="1.1.0"
5054
atomic-counter = "1.0.1"
51-
crossbeam-channel = "0.5.12"
55+
crossbeam-channel = "0.5.13"
56+
tracing = "0.1.40"
57+
tokio = "1.37.0"
58+
futures = "0.3.30"
5259
[dev-dependencies]
5360
glob = "0.3.1"
5461

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,14 @@
1919
[![CD](https://github.com/firelink-data/evolution/actions/workflows/cd.yml/badge.svg)](https://github.com/firelink-data/evolution/actions/workflows/cd.yml)
2020
[![Tests](https://github.com/firelink-data/evolution/actions/workflows/tests.yml/badge.svg)](https://github.com/firelink-data/evolution/actions/workflows/tests.yml)
2121

22-
🦖 *Evolve your fixed-length data files into Apache Parquet, fully parallelized!*
22+
🦖 *Evolve your fixed-length data files using Apache Arrow , fully parallelized!*
2323

2424
</div>
2525

2626

2727
## 🔎 Overview
2828

29-
This repository hosts the **evolution** program which both allows you to convert existing fixed-length files into other data formats, but also allows you to create large amounts of mocked data blazingly fast. The program supports full parallelism and utilizes SIMD techniques, when possible, for highly efficient parsing of data.
29+
This repository hosts the **evolution** program which both allows you to convert existing fixed-length files into dataformats like parquet or lake targets like Delta , but also allows you to create large amounts of mocked data blazingly fast. The program supports full parallelism and utilizes SIMD techniques, when possible, for highly efficient parsing of data.
3030

3131
To get started, follow the installation, schema setup, and example usage sections below in this README. Happy hacking! 👋🥳
3232

src/chunked/arrow_converter.rs

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,15 @@ use std::str::from_utf8_unchecked;
4545
use std::sync::Arc;
4646

4747
use super::{
48-
arrow_file_output, trimmer, ColumnBuilder, Converter, FnFindLastLineBreak, FnLineBreakLen,
48+
trimmer, ColumnBuilder, Converter, FnFindLastLineBreak, FnLineBreakLen, RecordBatchOutput,
4949
Stats,
5050
};
5151
use crate::chunked;
52-
use crate::chunked::threaded_file_output::{ipc_file_out, output_factory, parquet_file_out};
52+
use crate::chunked::recordbatch_output::{output_factory, IpcFileOut, ParquetFileOut};
5353
pub use crate::cli::Targets;
5454
use crate::datatype::DataType;
5555
use crate::schema;
56+
use crate::schema::FixedSchema;
5657
use arrow::datatypes::{Field, Schema, SchemaRef};
5758
use atomic_counter::{AtomicCounter, ConsistentCounter};
5859
use crossbeam::atomic::AtomicConsume;
@@ -64,9 +65,11 @@ use rayon::join;
6465
use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
6566
use std::sync::mpsc::{sync_channel, Receiver, RecvError, SyncSender};
6667
use std::thread;
67-
use std::thread::JoinHandle;
6868
use std::time::{Duration, Instant};
6969
use thread::spawn;
70+
use tokio::runtime;
71+
use tokio::runtime::Runtime;
72+
use tokio::task::JoinHandle;
7073
//use ordered_channel::Sender;
7174
//use crossbeam::channel::{Receiver, Sender};
7275

@@ -79,6 +82,7 @@ pub(crate) struct Slice2Arrow<'a> {
7982
pub(crate) masterbuilders: MasterBuilders,
8083
pub(crate) consistent_counter: ConsistentCounter,
8184
pub(crate) target: Targets,
85+
pub(crate) fixed_schema: FixedSchema,
8286
}
8387

8488
pub(crate) struct MasterBuilders {
@@ -190,7 +194,7 @@ impl<'a> Converter<'a> for Slice2Arrow<'a> {
190194

191195
let start_parse = Instant::now();
192196
let offset: usize = self.consistent_counter.get();
193-
197+
let mut local_consistent_counter: ConsistentCounter = ConsistentCounter::new(0);
194198
let arc_slices = Arc::new(&slices);
195199
self.masterbuilders
196200
.builders
@@ -213,40 +217,39 @@ impl<'a> Converter<'a> for Slice2Arrow<'a> {
213217
br.push(bb.finish());
214218
}
215219
let record_batch = RecordBatch::try_from_iter(br).unwrap();
216-
217220
let _ = self
218221
.masterbuilders
219222
.sender
220223
.clone()
221224
.unwrap()
222225
.send(offset + i, record_batch);
226+
let offset: usize = local_consistent_counter.add(1);
223227
}
224228
}
225229
});
226230

227231
for ii in slices.iter() {
228232
bytes_in += ii.len();
229233
}
230-
let offset: usize = self
231-
.consistent_counter
232-
.add(self.masterbuilders.builders.len());
234+
let offset: usize = self.consistent_counter.add(local_consistent_counter.get());
233235

234236
parse_duration = start_parse.elapsed();
235237
(bytes_in, bytes_out, parse_duration, builder_write_duration)
236238
}
237239

238-
fn setup(&mut self) -> (Sender<RecordBatch>, JoinHandle<Result<Stats>>) {
240+
fn setup(&mut self, rt: &runtime::Runtime) -> (Sender<RecordBatch>, JoinHandle<Result<Stats>>) {
239241
let o = output_factory(
240242
self.target.clone(),
243+
self.fixed_schema.clone(),
241244
self.masterbuilders.schema_factory(),
242245
self.masterbuilders.outfile.clone(),
246+
rt,
243247
);
244248
self.masterbuilders.sender = Some(o.0.clone());
245249
o
246250
}
247251

248-
fn shutdown(&mut self) {
249-
// converter.shutdown();
252+
fn shutdown(&mut self, rt: &runtime::Runtime, jh: JoinHandle<Result<Stats>>) {
250253
let schema = Schema::new(vec![Field::new(
251254
"id",
252255
arrow::datatypes::DataType::Int32,
@@ -256,6 +259,10 @@ impl<'a> Converter<'a> for Slice2Arrow<'a> {
256259
let emptyrb = arrow::record_batch::RecordBatch::new_empty(Arc::new(schema));
257260
let c = self.consistent_counter.get();
258261
let _ = &self.masterbuilders.sender.clone().unwrap().send(c, emptyrb);
262+
rt.spawn_blocking(|| async {
263+
info!("...spawned blockiiiiing");
264+
jh.await.unwrap()
265+
});
259266
}
260267
}
261268

src/chunked/mod.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,16 +34,19 @@ use ordered_channel::Sender;
3434
use std::fs;
3535
use std::path::PathBuf;
3636
use std::sync::mpsc::SyncSender;
37-
use std::thread::JoinHandle;
3837
use std::time::Duration;
38+
use tokio::runtime::Runtime;
39+
use tokio::task::JoinHandle;
3940

4041
use self::residual_slicer::SLICER_IN_CHUNK_SIZE;
42+
use crate::schema::FixedSchema;
4143
use parquet::errors::{ParquetError, Result};
44+
use tokio::runtime;
4245

4346
pub(crate) mod arrow_converter;
47+
pub(crate) mod recordbatch_output;
4448
pub(crate) mod residual_slicer;
4549
pub(crate) mod self_converter;
46-
pub(crate) mod threaded_file_output;
4750
mod trimmer;
4851

4952
pub(crate) struct ChunkAndResidue {
@@ -61,7 +64,7 @@ pub(crate) struct Stats {
6164
pub(crate) bytes_in: usize,
6265
pub(crate) bytes_out: usize,
6366
pub(crate) num_rows: i64,
64-
67+
pub(crate) adds: i64,
6568
pub(crate) read_duration: Duration,
6669
pub(crate) parse_duration: Duration,
6770
pub(crate) builder_write_duration: Duration,
@@ -161,8 +164,8 @@ pub(crate) trait Converter<'a> {
161164

162165
// fn process(& mut self, slices: Vec< &'a[u8]>) -> usize;
163166
fn process(&mut self, slices: Vec<&'a [u8]>) -> (usize, usize, Duration, Duration);
164-
fn setup(&mut self) -> (Sender<RecordBatch>, JoinHandle<Result<Stats>>);
165-
fn shutdown(&mut self);
167+
fn setup(&mut self, rt: &Runtime) -> (Sender<RecordBatch>, JoinHandle<Result<Stats>>);
168+
fn shutdown(&mut self, rt: &Runtime, jh: JoinHandle<Result<Stats>>);
166169
}
167170

168171
pub trait ColumnBuilder {
@@ -171,10 +174,12 @@ pub trait ColumnBuilder {
171174
// fn name(& self) -> &String;
172175
}
173176

174-
pub trait arrow_file_output {
177+
pub trait RecordBatchOutput {
175178
fn setup(
176179
&mut self,
177180
schema: SchemaRef,
181+
fixed_schema: FixedSchema,
178182
outfile: PathBuf,
183+
rt: &Runtime,
179184
) -> (Sender<RecordBatch>, JoinHandle<Result<Stats>>);
180185
}

0 commit comments

Comments
 (0)