Skip to content

Commit 2871944

Browse files
authored
Merge pull request #40 from firelink-data/feature/threaded_revolver
added timing output
2 parents 86338b2 + a6e8f89 commit 2871944

7 files changed

Lines changed: 47 additions & 15 deletions

File tree

src/chunked/arrow_converter.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ use crate::datatype::DataType;
4646
use crate::schema;
4747
use crate::trimmer::{trimmer_factory, ColumnTrimmer};
4848
use crate::{chunked, trimmer};
49+
use std::time::{Duration, Instant};
4950

5051
pub(crate) struct Slice2Arrow<'a> {
5152
// pub(crate) file_out: File,
@@ -163,9 +164,13 @@ impl<'a> Converter<'a> for Slice2Arrow<'a> {
163164
self.fn_line_break
164165
}
165166

166-
fn process(&mut self, slices: Vec<&'a [u8]>) -> (usize, usize) {
167+
fn process(&mut self, slices: Vec<&'a [u8]>) -> (usize, usize, Duration, Duration) {
167168
let mut bytes_in: usize = 0;
168169
let mut bytes_out: usize = 0;
170+
let mut parse_duration: Duration = Duration::new(0, 0);
171+
let mut builder_write_duration: Duration = Duration::new(0, 0);
172+
173+
let start_parse = Instant::now();
169174

170175
let arc_slices = Arc::new(&slices);
171176
self.masterbuilders
@@ -190,6 +195,7 @@ impl<'a> Converter<'a> for Slice2Arrow<'a> {
190195
for ii in slices.iter() {
191196
bytes_in += ii.len();
192197
}
198+
parse_duration = start_parse.elapsed();
193199

194200
for b in self.masterbuilders.builders.iter_mut() {
195201
let mut br: Vec<(&str, ArrayRef)> = vec![];
@@ -198,14 +204,17 @@ impl<'a> Converter<'a> for Slice2Arrow<'a> {
198204
br.push(bb.finish());
199205
}
200206

207+
let start_builder_write = Instant::now();
201208
let batch = RecordBatch::try_from_iter(br).unwrap();
202209

203210
self.writer.write(&batch).expect("Error Writing batch");
204211
bytes_out += self.writer.bytes_written();
205-
debug!("Batch write: accumulated bytes_written {}", bytes_out);
212+
213+
builder_write_duration += start_builder_write.elapsed();
206214
}
215+
debug!("Batch write: accumulated bytes_written {}", bytes_out);
207216

208-
(bytes_in, bytes_out)
217+
(bytes_in, bytes_out, parse_duration, builder_write_duration)
209218
}
210219

211220
fn finish(&mut self) -> parquet::errors::Result<format::FileMetaData> {

src/chunked/mod.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use arrow::array::ArrayRef;
2929
use parquet::format;
3030

3131
use std::fs;
32+
use std::time::Duration;
3233

3334
use self::residual_slicer::SLICER_IN_CHUNK_SIZE;
3435

@@ -50,8 +51,11 @@ pub(crate) trait Slicer<'a> {
5051
pub(crate) struct Stats {
5152
pub(crate) bytes_in: usize,
5253
pub(crate) bytes_out: usize,
53-
5454
pub(crate) num_rows: i64,
55+
56+
pub(crate) read_duration: Duration,
57+
pub(crate) parse_duration: Duration,
58+
pub(crate) builder_write_duration: Duration,
5559
}
5660

5761
pub(crate) type FnLineBreakLen = fn() -> usize;
@@ -147,7 +151,7 @@ pub(crate) trait Converter<'a> {
147151
fn get_line_break_handler(&self) -> FnFindLastLineBreak<'a>;
148152

149153
// fn process(& mut self, slices: Vec< &'a[u8]>) -> usize;
150-
fn process(&mut self, slices: Vec<&'a [u8]>) -> (usize, usize);
154+
fn process(&mut self, slices: Vec<&'a [u8]>) -> (usize, usize, Duration, Duration);
151155
fn finish(&mut self) -> parquet::errors::Result<format::FileMetaData>;
152156
fn get_finish_bytes_written(&mut self) -> usize;
153157
}

src/chunked/residual_slicer.rs

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,11 @@ use std::cmp;
3131
use std::fs;
3232
use std::fs::File;
3333
use std::io::{BufReader, Read};
34+
use std::time::{Duration, Instant};
3435

3536
use super::{ChunkAndResidue, Converter, FnFindLastLineBreak, IterRevolver, Slicer, Stats};
3637

37-
pub(crate) const SLICER_IN_CHUNK_SIZE: usize = 1024 * 1024;
38+
pub(crate) const SLICER_IN_CHUNK_SIZE: usize = 1024 * 2024;
3839
pub(crate) const SLICER_MAX_RESIDUE_SIZE: usize = SLICER_IN_CHUNK_SIZE;
3940

4041
pub(crate) const IN_MAX_CHUNKS: usize = 2;
@@ -61,6 +62,9 @@ impl<'a> Slicer<'a> for ResidualSlicer<'a> {
6162

6263
let mut bytes_in = 0;
6364
let mut bytes_out = 0;
65+
let mut read_duration_tot: Duration = Duration::new(0, 0);
66+
let mut parse_duration_tot: Duration = Duration::new(0, 0);
67+
let mut builder_write_duration_tot: Duration = Duration::new(0, 0);
6468

6569
let mut remaining_file_length = infile.metadata().unwrap().len() as usize;
6670

@@ -90,6 +94,8 @@ impl<'a> Slicer<'a> for ResidualSlicer<'a> {
9094

9195
let chunk_len_effective_read: usize;
9296

97+
let start_read = Instant::now();
98+
9399
(residue_len, chunk_len_effective_read, slices) = read_chunk_and_slice(
94100
self.fn_find_last_nl,
95101
&mut residue,
@@ -100,10 +106,14 @@ impl<'a> Slicer<'a> for ResidualSlicer<'a> {
100106
chunk_len_toread,
101107
);
102108

109+
read_duration_tot += start_read.elapsed();
110+
103111
remaining_file_length -= chunk_len_effective_read;
104-
let (bin, bout) = converter.process(slices);
112+
let (bin, bout, parse_duration, builder_write_duration) = converter.process(slices);
105113
bytes_in += bin;
106114
bytes_out += bout;
115+
parse_duration_tot += parse_duration;
116+
builder_write_duration_tot += builder_write_duration;
107117

108118
if remaining_file_length == 0 {
109119
break;
@@ -115,18 +125,26 @@ impl<'a> Slicer<'a> for ResidualSlicer<'a> {
115125
if 0 != residue_len {
116126
slices = residual_to_slice(&residue, &mut cr.chunk, residue_len);
117127

118-
let (bin, bout) = converter.process(slices);
128+
let (bin, bout, parse_duration, builder_write_duration) = converter.process(slices);
119129
bytes_in += bin;
120130
bytes_out += bout;
131+
parse_duration_tot += parse_duration;
132+
builder_write_duration_tot += builder_write_duration;
121133
}
122134

123-
info!("Bytes in= {} out= {}", bytes_in, bytes_out);
135+
info!(
136+
"Bytes in= {}\n out= {}\nparse duration= {:?}\n \n builder write_duration {:?}\n",
137+
bytes_in, bytes_out, parse_duration_tot, builder_write_duration_tot
138+
);
124139

125140
match converter.finish() {
126141
Ok(x) => Result::Ok(Stats {
127142
bytes_in,
128143
bytes_out: converter.get_finish_bytes_written(),
129144
num_rows: x.num_rows,
145+
read_duration: read_duration_tot,
146+
parse_duration: parse_duration_tot,
147+
builder_write_duration: builder_write_duration_tot,
130148
}),
131149
Err(_x) => Result::Err("Could not produce Parquet"),
132150
}

src/chunked/self_converter.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIter
3030

3131
use std::fs::File;
3232
use std::io::Write;
33+
use std::time::Duration;
3334

3435
use super::{Converter, FnFindLastLineBreak};
3536

@@ -46,8 +47,9 @@ impl<'a> Converter<'a> for SampleSliceAggregator<'a> {
4647
self.fn_line_break
4748
}
4849

49-
fn process(&mut self, slices: Vec<&'a [u8]>) -> (usize, usize) {
50+
fn process(&mut self, slices: Vec<&'a [u8]>) -> (usize, usize, Duration, Duration) {
5051
let mut bytes_processed: usize = 0;
52+
let duration = Duration::new(0, 0);
5153

5254
slices
5355
.par_iter()
@@ -58,7 +60,7 @@ impl<'a> Converter<'a> for SampleSliceAggregator<'a> {
5860
let l = val.len();
5961
bytes_processed += l;
6062
}
61-
(bytes_processed, 0)
63+
(bytes_processed, 0, duration, duration)
6264
}
6365

6466
fn finish(&mut self) -> Result<parquet::format::FileMetaData, parquet::errors::ParquetError> {

src/cli.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -302,8 +302,8 @@ impl Cli {
302302
slicer_instance.slice_and_convert(converter_instance, _in_file, n_threads)?;
303303

304304
info!(
305-
"Operation successful inbytes={} out bytes={} num of rows={}",
306-
stats.bytes_in, stats.bytes_out, stats.num_rows
305+
"Operation successful \ninbytes={} \nout bytes={} \nnum of rows={} \nread duration={:?} \nparse duration {:?}\nwrite duration={:?}",
306+
stats.bytes_in, stats.bytes_out, stats.num_rows,stats.read_duration,stats.parse_duration,stats.builder_write_duration
307307
);
308308
}
309309
Commands::Mock {

src/trimmer.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727

2828
use std::cmp::min;
2929

30-
use crate::converter::Converter;
3130
use padder::{Alignment, Symbol};
3231

3332
use crate::datatype::DataType;

src/writer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,7 @@ mod tests_writer {
326326
.build()
327327
.unwrap();
328328

329-
flfw.write(&vec![0u8; 64]).unwrap();
329+
flfw.write(&[0u8; 64]).unwrap();
330330
fs::remove_file(path).unwrap();
331331
}
332332

0 commit comments

Comments
 (0)