Skip to content

Commit ae1135a

Browse files
author
Rickard Lundin
committed
ipc implemented
1 parent 09a7a0b commit ae1135a

9 files changed

Lines changed: 236 additions & 81 deletions

File tree

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ arrow2 = "0.18.0"
4343
libc = "0.2.154"
4444
arrow = "51.0.0"
4545
parquet = "51.0.0"
46+
arrow-ipc = "51.0.0"
47+
4648
padder = { version = "1.2.0", features = ["serde"] }
4749
ordered-channel="1.1.0"
4850
atomic-counter = "1.0.1"

src/chunked/arrow_converter.rs

Lines changed: 14 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
// Last updated: 2024-05-15
2626
//
2727

28+
use crate::chunked::trimmer::{ColumnTrimmer, trimmer_factory};
2829
use arrow::array::{ArrayRef, BooleanBuilder, Int32Builder, Int64Builder, StringBuilder};
2930
use arrow::record_batch::RecordBatch;
3031
use log::{debug, info};
@@ -42,11 +43,10 @@ use std::path::PathBuf;
4243
use std::str::from_utf8_unchecked;
4344
use std::sync::Arc;
4445

45-
use super::{ColumnBuilder, Converter, FnFindLastLineBreak, FnLineBreakLen};
46+
use super::{arrow_file_output, ColumnBuilder, Converter, FnFindLastLineBreak, FnLineBreakLen, Stats, trimmer};
4647
use crate::datatype::DataType;
4748
use crate::schema;
48-
use crate::trimmer::{trimmer_factory, ColumnTrimmer};
49-
use crate::{chunked, trimmer};
49+
use crate::{chunked};
5050
use arrow::datatypes::{Field, Schema, SchemaRef};
5151
use atomic_counter::{AtomicCounter, ConsistentCounter};
5252
use crossbeam::atomic::AtomicConsume;
@@ -60,6 +60,8 @@ use std::thread;
6060
use std::thread::JoinHandle;
6161
use std::time::{Duration, Instant};
6262
use thread::spawn;
63+
use rayon::join;
64+
use crate::chunked::threaded_file_output::{ipc_file_out, parquet_file_out};
6365
//use ordered_channel::Sender;
6466
//use crossbeam::channel::{Receiver, Sender};
6567

@@ -93,21 +95,6 @@ impl MasterBuilders {
9395
let batch = RecordBatch::try_from_iter(br).unwrap();
9496
batch.schema()
9597
}
96-
pub fn writer_factory(out_file: &PathBuf, schema: SchemaRef) -> ArrowWriter<File> {
97-
let _out_file = fs::OpenOptions::new()
98-
.create(true)
99-
.append(true)
100-
.open(out_file)
101-
.expect("aaa");
102-
103-
let props = WriterProperties::builder()
104-
.set_compression(Compression::SNAPPY)
105-
.build();
106-
107-
let writer: ArrowWriter<File> =
108-
ArrowWriter::try_new(_out_file, schema, Some(props.clone())).unwrap();
109-
writer
110-
}
11198

11299
pub fn builders_factory(out_file: PathBuf, schema_path: PathBuf, instances: i16) -> Self {
113100
let schema = schema::FixedSchema::from_path(schema_path).unwrap();
@@ -242,42 +229,19 @@ impl<'a> Converter<'a> for Slice2Arrow<'a> {
242229
(bytes_in, bytes_out, parse_duration, builder_write_duration)
243230
}
244231

245-
fn setup(&mut self) -> JoinHandle<Result<format::FileMetaData>> {
232+
fn setup(&mut self) -> JoinHandle<Result<Stats>> {
246233
let schema = self.masterbuilders.schema_factory();
247234
let _outfile = self.masterbuilders.outfile.clone();
248-
let mut writer = crate::chunked::arrow_converter::MasterBuilders::writer_factory(
249-
&_outfile,
250-
schema.clone(),
251-
);
252-
253-
let (sender, mut receiver) = bounded::<RecordBatch>(100);
254-
255-
let t: JoinHandle<Result<format::FileMetaData>> = thread::spawn(move || {
256-
'outer: loop {
257-
let mut message = receiver.recv();
258-
259-
match message {
260-
Ok(rb) => {
261-
writer.write(&rb).expect("Error Writing batch");
262-
if (rb.num_rows() == 0) {
263-
break 'outer;
264-
}
265-
}
266-
Err(e) => {
267-
info!("got RecvError in channel , break to outer");
268-
break 'outer;
269-
}
270-
}
271-
}
272-
info!("closing the writer for parquet");
273-
writer.finish()
274-
});
275-
self.masterbuilders.sender = Some(sender.clone());
276-
t
277-
278-
// bytes_out += crate::chunked::arrow_converter::GLOBAL_COUNTER.load_consume();
235+
236+
let mut pfo: Box<dyn arrow_file_output> = Box::new(ipc_file_out { sender: None } );
237+
let (sender,joinhandler)=pfo.setup(schema,_outfile);
238+
239+
self.masterbuilders.sender = Some(sender);
240+
joinhandler
241+
279242
}
280243

244+
281245
fn shutdown(&mut self) {
282246
// converter.shutdown();
283247
let schema = Schema::new(vec![Field::new(

src/chunked/mod.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,20 +25,26 @@
2525
// Last updated: 2024-05-15
2626
//
2727

28+
use std::fmt::Debug;
2829
use arrow::array::{ArrayRef, RecordBatch};
2930
use parquet::format;
3031

3132
use std::fs;
33+
use std::path::PathBuf;
3234
use std::sync::mpsc::SyncSender;
3335
use std::thread::JoinHandle;
3436
use std::time::Duration;
37+
use arrow::datatypes::SchemaRef;
38+
use ordered_channel::Sender;
3539

3640
use self::residual_slicer::SLICER_IN_CHUNK_SIZE;
3741
use parquet::errors::{ParquetError, Result};
3842

3943
pub(crate) mod arrow_converter;
4044
pub(crate) mod residual_slicer;
4145
pub(crate) mod self_converter;
46+
mod trimmer;
47+
mod threaded_file_output;
4248

4349
pub(crate) struct ChunkAndResidue {
4450
pub(crate) chunk: Box<[u8; SLICER_IN_CHUNK_SIZE]>,
@@ -155,7 +161,7 @@ pub(crate) trait Converter<'a> {
155161

156162
// fn process(& mut self, slices: Vec< &'a[u8]>) -> usize;
157163
fn process(&mut self, slices: Vec<&'a [u8]>) -> (usize, usize, Duration, Duration);
158-
fn setup(&mut self) -> JoinHandle<Result<format::FileMetaData>>;
164+
fn setup(&mut self) -> JoinHandle<Result<Stats>>;
159165
fn shutdown(&mut self);
160166
}
161167

@@ -164,3 +170,7 @@ pub trait ColumnBuilder {
164170
fn finish(&mut self) -> (&str, ArrayRef);
165171
// fn name(& self) -> &String;
166172
}
173+
174+
pub trait arrow_file_output {
175+
fn setup(&mut self,schema:SchemaRef,outfile:PathBuf)-> (Sender<RecordBatch>, JoinHandle<Result<Stats>>);
176+
}

src/chunked/residual_slicer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ use std::time::{Duration, Instant};
4040

4141
use super::{ChunkAndResidue, Converter, FnFindLastLineBreak, IterRevolver, Slicer, Stats};
4242

43-
pub(crate) const SLICER_IN_CHUNK_SIZE: usize = 1024 * 2024;
43+
pub(crate) const SLICER_IN_CHUNK_SIZE: usize = 1024 * 2048;
4444
pub(crate) const SLICER_MAX_RESIDUE_SIZE: usize = SLICER_IN_CHUNK_SIZE;
4545

4646
pub(crate) const IN_MAX_CHUNKS: usize = 2;

src/chunked/self_converter.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
// Last updated: 2024-05-15
2626
//
2727

28+
use crate::chunked::Stats;
2829
use log::info;
2930
use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator};
3031

@@ -69,7 +70,7 @@ impl<'a> Converter<'a> for SampleSliceAggregator<'a> {
6970
(bytes_processed, 0, duration, duration)
7071
}
7172

72-
fn setup(&mut self) -> JoinHandle<Result<format::FileMetaData>> {
73+
fn setup(&mut self) -> JoinHandle<Result<Stats>> {
7374
todo!()
7475
}
7576

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
//
2+
// MIT License
3+
//
4+
// Copyright (c) 2024 Firelink Data
5+
//
6+
// Permission is hereby granted, free of charge, to any person obtaining a copy
7+
// of this software and associated documentation files (the "Software"), to deal
8+
// in the Software without restriction, including without limitation the rights
9+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10+
// copies of the Software, and to permit persons to whom the Software is
11+
// furnished to do so, subject to the following conditions:
12+
//
13+
// The above copyright notice and this permission notice shall be included in all
14+
// copies or substantial portions of the Software.
15+
//
16+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22+
// SOFTWARE.
23+
//
24+
// File created: 2024-05-07
25+
// Last updated: 2024-05-15
26+
//
27+
28+
use crate::chunked::trimmer::{ColumnTrimmer, trimmer_factory};
29+
use arrow::array::{ArrayRef, BooleanBuilder, Int32Builder, Int64Builder, StringBuilder};
30+
use arrow::record_batch::RecordBatch;
31+
use log::{debug, info};
32+
use ordered_channel::bounded;
33+
use parquet::arrow::ArrowWriter;
34+
use parquet::basic::Compression;
35+
use parquet::file::properties::WriterProperties;
36+
use parquet::format;
37+
use rayon::iter::IndexedParallelIterator;
38+
use rayon::prelude::*;
39+
40+
use std::fs;
41+
use std::fs::File;
42+
use std::path::PathBuf;
43+
use std::str::from_utf8_unchecked;
44+
use std::sync::Arc;
45+
46+
use super::{arrow_file_output, ColumnBuilder, Converter, FnFindLastLineBreak, FnLineBreakLen, Stats, trimmer};
47+
use crate::datatype::DataType;
48+
use crate::schema;
49+
use crate::{chunked};
50+
use arrow::datatypes::{Field, Schema, SchemaRef};
51+
use atomic_counter::{AtomicCounter, ConsistentCounter};
52+
use crossbeam::atomic::AtomicConsume;
53+
use libc::bsearch;
54+
use ordered_channel::Sender;
55+
use parquet::errors::{ParquetError, Result};
56+
use parquet::file::metadata::FileMetaData;
57+
use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
58+
use std::sync::mpsc::{sync_channel, Receiver, RecvError, SyncSender};
59+
use std::thread;
60+
use std::thread::JoinHandle;
61+
use std::time::{Duration, Instant};
62+
use thread::spawn;
63+
use arrow_ipc::CompressionType;
64+
use arrow_ipc::writer::IpcWriteOptions;
65+
use Compression::SNAPPY;
66+
67+
pub(crate) struct parquet_file_out {
68+
pub(crate) sender: Option<Sender<RecordBatch>>,
69+
}
70+
71+
impl arrow_file_output for parquet_file_out {
72+
fn setup(&mut self, schema: SchemaRef, outfile: PathBuf) -> (Sender<RecordBatch>, JoinHandle<Result<Stats>>) {
73+
74+
let _out_file = fs::OpenOptions::new()
75+
.create(true)
76+
.append(true)
77+
.open(outfile)
78+
.expect("aaa");
79+
80+
let props = WriterProperties::builder()
81+
.set_compression(SNAPPY)
82+
.build();
83+
84+
let mut writer: ArrowWriter<File> =
85+
ArrowWriter::try_new(_out_file, schema, Some(props.clone())).unwrap();
86+
87+
88+
let (sender, mut receiver) = bounded::<RecordBatch>(100);
89+
90+
let t: JoinHandle<Result<Stats>> = thread::spawn(move || {
91+
'outer: loop {
92+
let mut message = receiver.recv();
93+
94+
match message {
95+
Ok(rb) => {
96+
writer.write(&rb).expect("Error Writing batch");
97+
if (rb.num_rows() == 0) {
98+
break 'outer;
99+
}
100+
}
101+
Err(e) => {
102+
info!("got RecvError in channel , break to outer");
103+
break 'outer;
104+
}
105+
}
106+
}
107+
info!("closing the writer for parquet");
108+
writer.finish();
109+
Ok(Stats {
110+
bytes_in: 0,
111+
bytes_out: 0,
112+
num_rows: 0,
113+
read_duration: Default::default(),
114+
parse_duration: Default::default(),
115+
builder_write_duration: Default::default(),
116+
})
117+
});
118+
(sender,t)
119+
}
120+
121+
}
122+
123+
pub struct ipc_file_out {
124+
pub(crate) sender: Option<Sender<RecordBatch>>,
125+
126+
}
127+
128+
impl arrow_file_output for ipc_file_out {
129+
fn setup(&mut self, schema: SchemaRef, outfile: PathBuf) -> (Sender<RecordBatch>, JoinHandle<Result<Stats>>) {
130+
131+
let _out_file = fs::OpenOptions::new()
132+
.create(true)
133+
.append(true)
134+
.open(outfile)
135+
.expect("aaa");
136+
137+
138+
let p= IpcWriteOptions::try_with_compression(Default::default(),Some(CompressionType::LZ4_FRAME) );
139+
140+
let mut writer=arrow_ipc::writer::FileWriter::try_new_with_options(_out_file, &schema, Default::default()).expect("TODO: panic message");
141+
142+
let props = WriterProperties::builder()
143+
.set_compression(SNAPPY)
144+
.build();
145+
146+
let (sender, mut receiver) = bounded::<RecordBatch>(1000);
147+
148+
let t: JoinHandle<Result<Stats>> = thread::spawn(move || {
149+
'outer: loop {
150+
let mut message = receiver.recv();
151+
152+
match message {
153+
Ok(rb) => {
154+
writer.write(&rb).expect("Error Writing batch");
155+
if (rb.num_rows() == 0) {
156+
break 'outer;
157+
}
158+
}
159+
Err(e) => {
160+
info!("got RecvError in channel , break to outer");
161+
break 'outer;
162+
}
163+
}
164+
}
165+
info!("closing the writer for parquet");
166+
writer.finish();
167+
Ok(Stats {
168+
bytes_in: 0,
169+
bytes_out: 0,
170+
num_rows: 0,
171+
read_duration: Default::default(),
172+
parse_duration: Default::default(),
173+
builder_write_duration: Default::default(),
174+
})
175+
});
176+
(sender,t)
177+
}
178+
}

0 commit comments

Comments
 (0)