Skip to content

Commit 7b81d27

Browse files
IgnalinaRickard Lundin
andauthored
[feat] chunked as a module (#33)
Co-authored-by: Rickard Lundin <rickard.lundin@enkla.com>
1 parent 1bdf8eb commit 7b81d27

8 files changed

Lines changed: 133 additions & 164 deletions

File tree

src/converters.rs renamed to src/chunked.rs

Lines changed: 111 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,18 @@
2525
* Last updated: 2023-11-21
2626
*/
2727

28-
use crate::slicers::FnFindLastLineBreak;
29-
use arrow::array::ArrayRef;
28+
use arrow_array::array::ArrayRef;
3029
use parquet::format;
3130
use std::cmp::min;
31+
use std::fs;
32+
use crate::chunked::chunked_slicer::SLICER_IN_CHUNK_SIZE;
33+
3234

3335
// pub(crate) mod arrow2_converter;
3436
pub(crate) mod self_converter;
3537

3638
pub mod arrow_converter;
39+
pub mod chunked_slicer;
3740

3841
pub(crate) trait Converter<'a> {
3942
fn set_line_break_handler(&mut self, fn_line_break: FnFindLastLineBreak<'a>);
@@ -161,3 +164,109 @@ mod tests {
161164
assert_eq!(column_length_num_rightaligned(b"123", 3), (0, 2));
162165
}
163166
}
167+
168+
169+
pub(crate) struct ChunkAndResidue {
170+
pub(crate) chunk: Box<[u8; SLICER_IN_CHUNK_SIZE]>,
171+
}
172+
pub(crate) trait Slicer<'a> {
173+
fn slice_and_convert(
174+
&mut self,
175+
converter: Box<dyn 'a + Converter<'a>>,
176+
infile: fs::File,
177+
n_threads: usize,
178+
) -> Result<crate::chunked::Stats, &str>;
179+
}
180+
pub(crate) struct Stats {
181+
pub(crate) bytes_in: usize,
182+
pub(crate) bytes_out: usize,
183+
184+
pub(crate) num_rows: i64,
185+
}
186+
187+
pub(crate) type FnLineBreakLen = fn() -> usize;
188+
#[allow(dead_code)]
189+
pub(crate) fn line_break_len_cr() -> usize {
190+
1 as usize
191+
}
192+
#[allow(dead_code)]
193+
pub(crate) fn line_break_len_crlf() -> usize {
194+
2 as usize
195+
}
196+
197+
pub(crate) type FnFindLastLineBreak<'a> = fn(bytes: &'a [u8]) -> (bool, usize);
198+
#[allow(dead_code)]
199+
pub(crate) fn find_last_nlcr(bytes: &[u8]) -> (bool, usize) {
200+
if bytes.is_empty() {
201+
return (false, 0); // TODO should report err ...
202+
}
203+
204+
let mut p2 = bytes.len() - 1;
205+
206+
if 0 == p2 {
207+
return (false, 0); // hmm
208+
}
209+
210+
loop {
211+
if bytes[p2 - 1] == 0x0d && bytes[p2] == 0x0a {
212+
return (true, p2 + 1);
213+
}
214+
if 0 == p2 {
215+
return (false, 0); // indicate we didnt find nl
216+
}
217+
218+
p2 -= 1;
219+
}
220+
}
221+
222+
#[allow(dead_code)]
223+
pub(crate) fn find_last_nl(bytes: &[u8]) -> (bool, usize) {
224+
if bytes.is_empty() {
225+
return (false, 0); // Indicate we didnt found nl.
226+
}
227+
228+
let mut p2 = bytes.len() - 1;
229+
230+
if 0 == p2 {
231+
return (false, 0); // hmm
232+
}
233+
234+
loop {
235+
if bytes[p2] == 0x0a {
236+
return (true, p2);
237+
}
238+
if 0 == p2 {
239+
return (false, 0); // indicate we didnt find nl
240+
}
241+
p2 -= 1;
242+
}
243+
}
244+
pub(crate) struct IterRevolver<'a, T> {
245+
shards: *mut T,
246+
next: usize,
247+
len: usize,
248+
phantom: std::marker::PhantomData<&'a mut [T]>,
249+
}
250+
251+
impl<'a, T> From<&'a mut [T]> for crate::chunked::IterRevolver<'a, T> {
252+
fn from(shards: &'a mut [T]) -> crate::chunked::IterRevolver<'a, T> {
253+
IterRevolver {
254+
next: 0,
255+
len: shards.len(),
256+
shards: shards.as_mut_ptr(),
257+
phantom: std::marker::PhantomData,
258+
}
259+
}
260+
}
261+
262+
impl<'a, T> Iterator for IterRevolver<'a, T> {
263+
type Item = &'a mut T;
264+
fn next(&mut self) -> Option<Self::Item> {
265+
if self.next < self.len {
266+
self.next += 1;
267+
} else {
268+
self.next = 1;
269+
}
270+
unsafe { Some(&mut *self.shards.offset(self.next as isize - 1)) }
271+
}
272+
}
Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,9 @@ use rayon::iter::IndexedParallelIterator;
4040
use rayon::prelude::*;
4141

4242
use crate::datatype::DataType;
43-
use crate::converters::{ColumnBuilder, Converter};
44-
use crate::slicers::{FnFindLastLineBreak, FnLineBreakLen};
45-
use crate::{converters, schema};
43+
use crate::chunked::{ColumnBuilder, Converter};
44+
use crate::chunked::{FnFindLastLineBreak, FnLineBreakLen};
45+
use crate::{chunked, schema};
4646
use debug_print::debug_println;
4747

4848
pub(crate) struct Slice2Arrow<'a> {
@@ -224,7 +224,7 @@ impl ColumnBuilder for HandlerInt32Builder {
224224
Self: Sized,
225225
{
226226
let (start, stop) =
227-
converters::column_length_num_rightaligned(data, self.runes_in_column as i16);
227+
chunked::column_length_num_rightaligned(data, self.runes_in_column as i16);
228228

229229
match atoi_simd::parse(&data[start..stop]) {
230230
Ok(n) => {
@@ -255,7 +255,7 @@ impl ColumnBuilder for HandlerInt64Builder {
255255
Self: Sized,
256256
{
257257
let (start, stop) =
258-
converters::column_length_num_rightaligned(data, self.runes_in_column as i16);
258+
chunked::column_length_num_rightaligned(data, self.runes_in_column as i16);
259259
match atoi_simd::parse(&data[start..stop]) {
260260
Ok(n) => {
261261
self.int64builder.append_value(n);
@@ -287,7 +287,7 @@ impl ColumnBuilder for HandlerStringBuilder {
287287
where
288288
Self: Sized,
289289
{
290-
let column_length: usize = converters::column_length(data, self.runes_in_column as i16);
290+
let column_length: usize = chunked::column_length(data, self.runes_in_column as i16);
291291
// Me dont like ... what is the cost ? Could it be done once for the whole chunk ?
292292
let text: &str = unsafe { from_utf8_unchecked(&data[..column_length]) };
293293

@@ -323,7 +323,7 @@ impl ColumnBuilder for HandlerBooleanBuilder {
323323
Self: Sized,
324324
{
325325
let (start, stop) =
326-
converters::column_length_char_rightaligned(data, self.runes_in_column as i16);
326+
chunked::column_length_char_rightaligned(data, self.runes_in_column as i16);
327327

328328
let text: &str = unsafe { from_utf8_unchecked(&data[start..stop]) };
329329

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,9 @@ use std::{cmp, fs};
3131

3232
use log::info;
3333

34-
use crate::converters::Converter;
35-
use crate::slicers::{ChunkAndResidue, FnFindLastLineBreak, Slicer};
36-
use crate::slicers::{IterRevolver, Stats};
34+
use crate::chunked::Converter;
35+
use crate::chunked::{ChunkAndResidue, FnFindLastLineBreak, Slicer};
36+
use crate::chunked::{IterRevolver, Stats};
3737

3838
/**
3939
GOAL(s)
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@
2525
* Last updated: 2023-12-14
2626
*/
2727

28-
use crate::converters::Converter;
29-
use crate::slicers::FnFindLastLineBreak;
28+
use crate::chunked::Converter;
29+
use crate::chunked::FnFindLastLineBreak;
3030
use rayon::iter::IntoParallelRefIterator;
3131
use rayon::prelude::*;
3232
use std::fs::File;

src/cli.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,14 @@ use std::fs;
2929
use std::fs::File;
3030
use std::path::PathBuf;
3131

32-
// use crate::converters::arrow2_converter::{MasterBuilder, Slice2Arrow2};
33-
use crate::converters::arrow_converter::{MasterBuilders, Slice2Arrow};
34-
use crate::converters::self_converter::SampleSliceAggregator;
35-
use crate::converters::Converter;
32+
// use crate::chunked::arrow2_converter::{MasterBuilder, Slice2Arrow2};
33+
use crate::chunked::arrow_converter::{MasterBuilders, Slice2Arrow};
34+
use crate::chunked::self_converter::SampleSliceAggregator;
35+
use crate::chunked::Converter;
3636
use crate::dump::dump;
37-
use crate::slicers::chunked_slicer::{OldSlicer, IN_MAX_CHUNKS, SLICER_IN_CHUNK_SIZE};
38-
use crate::slicers::Slicer;
39-
use crate::slicers::{find_last_nl, line_break_len_cr, ChunkAndResidue};
37+
use crate::chunked::chunked_slicer::{OldSlicer, IN_MAX_CHUNKS, SLICER_IN_CHUNK_SIZE};
38+
use crate::chunked::Slicer;
39+
use crate::chunked::{find_last_nl, line_break_len_cr, ChunkAndResidue};
4040
use crate::{converter, error, mocker, schema};
4141
use clap::{value_parser, ArgAction, Parser, Subcommand};
4242
use log::info;

src/main.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
* Last updated: 2024-02-28
2626
*/
2727

28+
use crate::cli::Cli;
2829
use clap::Parser;
2930
use log::{debug, error, info};
3031

@@ -34,17 +35,14 @@ mod error;
3435
mod logger;
3536
mod mocker;
3637
mod schema;
37-
use crate::slicers::chunked_slicer::{IN_MAX_CHUNKS, SLICER_IN_CHUNK_SIZE};
38-
use crate::slicers::ChunkAndResidue;
39-
use cli::Cli;
38+
4039
mod threads;
4140
mod writer;
4241
mod builder;
4342
mod parser;
4443

45-
mod converters;
44+
mod chunked;
4645
mod dump;
47-
mod slicers;
4846
mod mocking;
4947
mod converter;
5048
mod slicer;

src/slicers.rs

Lines changed: 0 additions & 138 deletions
This file was deleted.

0 commit comments

Comments
 (0)