Skip to content

Commit 56ac779

Browse files
author
Rickard Lundin
committed
fmt
1 parent 207807c commit 56ac779

4 files changed

Lines changed: 56 additions & 42 deletions

File tree

src/chunked/arrow_converter.rs

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
// Last updated: 2024-05-15
2626
//
2727

28-
use crate::chunked::trimmer::{ColumnTrimmer, trimmer_factory};
28+
use crate::chunked::trimmer::{trimmer_factory, ColumnTrimmer};
2929
use arrow::array::{ArrayRef, BooleanBuilder, Int32Builder, Int64Builder, StringBuilder};
3030
use arrow::record_batch::RecordBatch;
3131
use log::{debug, info};
@@ -43,25 +43,28 @@ use std::path::PathBuf;
4343
use std::str::from_utf8_unchecked;
4444
use std::sync::Arc;
4545

46-
use super::{arrow_file_output, ColumnBuilder, Converter, FnFindLastLineBreak, FnLineBreakLen, Stats, trimmer};
46+
use super::{
47+
arrow_file_output, trimmer, ColumnBuilder, Converter, FnFindLastLineBreak, FnLineBreakLen,
48+
Stats,
49+
};
50+
use crate::chunked;
51+
use crate::chunked::threaded_file_output::{ipc_file_out, parquet_file_out};
4752
use crate::datatype::DataType;
4853
use crate::schema;
49-
use crate::{chunked};
5054
use arrow::datatypes::{Field, Schema, SchemaRef};
5155
use atomic_counter::{AtomicCounter, ConsistentCounter};
5256
use crossbeam::atomic::AtomicConsume;
5357
use libc::bsearch;
5458
use ordered_channel::Sender;
5559
use parquet::errors::{ParquetError, Result};
5660
use parquet::file::metadata::FileMetaData;
61+
use rayon::join;
5762
use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
5863
use std::sync::mpsc::{sync_channel, Receiver, RecvError, SyncSender};
5964
use std::thread;
6065
use std::thread::JoinHandle;
6166
use std::time::{Duration, Instant};
6267
use thread::spawn;
63-
use rayon::join;
64-
use crate::chunked::threaded_file_output::{ipc_file_out, parquet_file_out};
6568
//use ordered_channel::Sender;
6669
//use crossbeam::channel::{Receiver, Sender};
6770

@@ -232,16 +235,14 @@ impl<'a> Converter<'a> for Slice2Arrow<'a> {
232235
fn setup(&mut self) -> JoinHandle<Result<Stats>> {
233236
let schema = self.masterbuilders.schema_factory();
234237
let _outfile = self.masterbuilders.outfile.clone();
235-
236-
let mut pfo: Box<dyn arrow_file_output> = Box::new(ipc_file_out { sender: None } );
237-
let (sender,joinhandler)=pfo.setup(schema,_outfile);
238-
238+
239+
let mut pfo: Box<dyn arrow_file_output> = Box::new(ipc_file_out { sender: None });
240+
let (sender, joinhandler) = pfo.setup(schema, _outfile);
241+
239242
self.masterbuilders.sender = Some(sender);
240-
joinhandler
241-
243+
joinhandler
242244
}
243245

244-
245246
fn shutdown(&mut self) {
246247
// converter.shutdown();
247248
let schema = Schema::new(vec![Field::new(

src/chunked/mod.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,26 +25,26 @@
2525
// Last updated: 2024-05-15
2626
//
2727

28-
use std::fmt::Debug;
2928
use arrow::array::{ArrayRef, RecordBatch};
3029
use parquet::format;
30+
use std::fmt::Debug;
3131

32+
use arrow::datatypes::SchemaRef;
33+
use ordered_channel::Sender;
3234
use std::fs;
3335
use std::path::PathBuf;
3436
use std::sync::mpsc::SyncSender;
3537
use std::thread::JoinHandle;
3638
use std::time::Duration;
37-
use arrow::datatypes::SchemaRef;
38-
use ordered_channel::Sender;
3939

4040
use self::residual_slicer::SLICER_IN_CHUNK_SIZE;
4141
use parquet::errors::{ParquetError, Result};
4242

4343
pub(crate) mod arrow_converter;
4444
pub(crate) mod residual_slicer;
4545
pub(crate) mod self_converter;
46-
mod trimmer;
4746
mod threaded_file_output;
47+
mod trimmer;
4848

4949
pub(crate) struct ChunkAndResidue {
5050
pub(crate) chunk: Box<[u8; SLICER_IN_CHUNK_SIZE]>,
@@ -172,5 +172,9 @@ pub trait ColumnBuilder {
172172
}
173173

174174
pub trait arrow_file_output {
175-
fn setup(&mut self,schema:SchemaRef,outfile:PathBuf)-> (Sender<RecordBatch>, JoinHandle<Result<Stats>>);
175+
fn setup(
176+
&mut self,
177+
schema: SchemaRef,
178+
outfile: PathBuf,
179+
) -> (Sender<RecordBatch>, JoinHandle<Result<Stats>>);
176180
}

src/chunked/threaded_file_output.rs

Lines changed: 33 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
// Last updated: 2024-05-15
2626
//
2727

28-
use crate::chunked::trimmer::{ColumnTrimmer, trimmer_factory};
28+
use crate::chunked::trimmer::{trimmer_factory, ColumnTrimmer};
2929
use arrow::array::{ArrayRef, BooleanBuilder, Int32Builder, Int64Builder, StringBuilder};
3030
use arrow::record_batch::RecordBatch;
3131
use log::{debug, info};
@@ -43,11 +43,16 @@ use std::path::PathBuf;
4343
use std::str::from_utf8_unchecked;
4444
use std::sync::Arc;
4545

46-
use super::{arrow_file_output, ColumnBuilder, Converter, FnFindLastLineBreak, FnLineBreakLen, Stats, trimmer};
46+
use super::{
47+
arrow_file_output, trimmer, ColumnBuilder, Converter, FnFindLastLineBreak, FnLineBreakLen,
48+
Stats,
49+
};
50+
use crate::chunked;
4751
use crate::datatype::DataType;
4852
use crate::schema;
49-
use crate::{chunked};
5053
use arrow::datatypes::{Field, Schema, SchemaRef};
54+
use arrow_ipc::writer::IpcWriteOptions;
55+
use arrow_ipc::CompressionType;
5156
use atomic_counter::{AtomicCounter, ConsistentCounter};
5257
use crossbeam::atomic::AtomicConsume;
5358
use libc::bsearch;
@@ -60,31 +65,29 @@ use std::thread;
6065
use std::thread::JoinHandle;
6166
use std::time::{Duration, Instant};
6267
use thread::spawn;
63-
use arrow_ipc::CompressionType;
64-
use arrow_ipc::writer::IpcWriteOptions;
6568
use Compression::SNAPPY;
6669

6770
pub(crate) struct parquet_file_out {
6871
pub(crate) sender: Option<Sender<RecordBatch>>,
6972
}
7073

7174
impl arrow_file_output for parquet_file_out {
72-
fn setup(&mut self, schema: SchemaRef, outfile: PathBuf) -> (Sender<RecordBatch>, JoinHandle<Result<Stats>>) {
73-
75+
fn setup(
76+
&mut self,
77+
schema: SchemaRef,
78+
outfile: PathBuf,
79+
) -> (Sender<RecordBatch>, JoinHandle<Result<Stats>>) {
7480
let _out_file = fs::OpenOptions::new()
7581
.create(true)
7682
.append(true)
7783
.open(outfile)
7884
.expect("aaa");
7985

80-
let props = WriterProperties::builder()
81-
.set_compression(SNAPPY)
82-
.build();
86+
let props = WriterProperties::builder().set_compression(SNAPPY).build();
8387

8488
let mut writer: ArrowWriter<File> =
8589
ArrowWriter::try_new(_out_file, schema, Some(props.clone())).unwrap();
8690

87-
8891
let (sender, mut receiver) = bounded::<RecordBatch>(100);
8992

9093
let t: JoinHandle<Result<Stats>> = thread::spawn(move || {
@@ -115,33 +118,39 @@ impl arrow_file_output for parquet_file_out {
115118
builder_write_duration: Default::default(),
116119
})
117120
});
118-
(sender,t)
121+
(sender, t)
119122
}
120-
121123
}
122124

123125
pub struct ipc_file_out {
124126
pub(crate) sender: Option<Sender<RecordBatch>>,
125-
126127
}
127128

128129
impl arrow_file_output for ipc_file_out {
129-
fn setup(&mut self, schema: SchemaRef, outfile: PathBuf) -> (Sender<RecordBatch>, JoinHandle<Result<Stats>>) {
130-
130+
fn setup(
131+
&mut self,
132+
schema: SchemaRef,
133+
outfile: PathBuf,
134+
) -> (Sender<RecordBatch>, JoinHandle<Result<Stats>>) {
131135
let _out_file = fs::OpenOptions::new()
132136
.create(true)
133137
.append(true)
134138
.open(outfile)
135139
.expect("aaa");
136140

141+
let p = IpcWriteOptions::try_with_compression(
142+
Default::default(),
143+
Some(CompressionType::LZ4_FRAME),
144+
);
145+
146+
let mut writer = arrow_ipc::writer::FileWriter::try_new_with_options(
147+
_out_file,
148+
&schema,
149+
Default::default(),
150+
)
151+
.expect("TODO: panic message");
137152

138-
let p= IpcWriteOptions::try_with_compression(Default::default(),Some(CompressionType::LZ4_FRAME) );
139-
140-
let mut writer=arrow_ipc::writer::FileWriter::try_new_with_options(_out_file, &schema, Default::default()).expect("TODO: panic message");
141-
142-
let props = WriterProperties::builder()
143-
.set_compression(SNAPPY)
144-
.build();
153+
let props = WriterProperties::builder().set_compression(SNAPPY).build();
145154

146155
let (sender, mut receiver) = bounded::<RecordBatch>(1000);
147156

@@ -173,6 +182,6 @@ impl arrow_file_output for ipc_file_out {
173182
builder_write_duration: Default::default(),
174183
})
175184
});
176-
(sender,t)
185+
(sender, t)
177186
}
178187
}

src/chunked/trimmer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@
2727

2828
use std::cmp::min;
2929

30-
use padder::{Alignment, Symbol};
3130
use crate::chunked::trimmer;
31+
use padder::{Alignment, Symbol};
3232

3333
use crate::datatype::DataType;
3434

0 commit comments

Comments
 (0)