Skip to content

Commit 026cd13

Browse files
authored
Merge branch 'main' into merge/master-to-wilhelm
2 parents 3df0a8a + 7b81d27 commit 026cd13

6 files changed

Lines changed: 881 additions & 10 deletions

File tree

src/chunked.rs

Lines changed: 272 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,272 @@
1+
/*
2+
* MIT License
3+
*
4+
* Copyright (c) 2024 Firelink Data
5+
*
6+
* Permission is hereby granted, free of charge, to any person obtaining a copy
7+
* of this software and associated documentation files (the "Software"), to deal
8+
* in the Software without restriction, including without limitation the rights
9+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10+
* copies of the Software, and to permit persons to whom the Software is
11+
* furnished to do so, subject to the following conditions:
12+
*
13+
* The above copyright notice and this permission notice shall be included in all
14+
* copies or substantial portions of the Software.
15+
*
16+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22+
* SOFTWARE.
23+
*
24+
* File created: 2023-11-21
25+
* Last updated: 2023-11-21
26+
*/
27+
28+
use arrow_array::array::ArrayRef;
29+
use parquet::format;
30+
use std::cmp::min;
31+
use std::fs;
32+
use crate::chunked::chunked_slicer::SLICER_IN_CHUNK_SIZE;
33+
34+
35+
// pub(crate) mod arrow2_converter;
36+
pub(crate) mod self_converter;
37+
38+
pub mod arrow_converter;
39+
pub mod chunked_slicer;
40+
41+
pub(crate) trait Converter<'a> {
42+
fn set_line_break_handler(&mut self, fn_line_break: FnFindLastLineBreak<'a>);
43+
fn get_line_break_handler(&self) -> FnFindLastLineBreak<'a>;
44+
45+
// fn process(& mut self, slices: Vec< &'a[u8]>) -> usize;
46+
fn process(&mut self, slices: Vec<&'a [u8]>) -> (usize, usize);
47+
fn finish(&mut self) -> parquet::errors::Result<format::FileMetaData>;
48+
fn get_finish_bytes_written(&mut self) -> usize;
49+
}
50+
51+
pub trait ColumnBuilder {
52+
fn parse_value(&mut self, name: &[u8]) -> usize;
53+
fn finish(&mut self) -> (&str, ArrayRef);
54+
// fn name(& self) -> &String;
55+
}
56+
57+
fn column_length_num_rightaligned(data: &[u8], runes: i16) -> (usize, usize) {
58+
let mut eat = data.iter();
59+
let mut counted_runes = 0;
60+
let mut start: usize = 0;
61+
let stop: usize = min(data.len(), runes as usize);
62+
63+
while counted_runes < runes as usize {
64+
let byten = eat.nth(0);
65+
let bb: u8 = match byten {
66+
None => {
67+
//TODO we ran out of data,this is an error, fix later.
68+
return (start, stop);
69+
}
70+
Some(b) => *b,
71+
};
72+
73+
match bb {
74+
48..=57 => return (start, stop),
75+
_ => {}
76+
};
77+
start += 1;
78+
counted_runes += 1;
79+
}
80+
81+
(start, stop)
82+
}
83+
84+
fn column_length_char_rightaligned(data: &[u8], runes: i16) -> (usize, usize) {
85+
let mut eat = data.iter();
86+
let mut counted_runes = 0;
87+
let mut start: usize = 0;
88+
let stop: usize = min(data.len(), runes as usize);
89+
90+
while counted_runes < runes as usize {
91+
let byten = eat.nth(0);
92+
let bb: u8 = match byten {
93+
None => {
94+
//TODO we ran out of data,this is an error, fix later.
95+
return (start, stop);
96+
}
97+
Some(b) => *b,
98+
};
99+
100+
match bb {
101+
101..=132 => return (start, stop),
102+
141..=172 => return (start, stop),
103+
_ => {}
104+
};
105+
start += 1;
106+
counted_runes += 1;
107+
}
108+
109+
(start, stop)
110+
}
111+
112+
fn column_length(data: &[u8], runes: i16) -> usize {
113+
let mut eat = data.iter();
114+
let mut counted_runes = 0;
115+
let mut len: usize = 0;
116+
let mut units = 1;
117+
118+
while counted_runes < runes as usize {
119+
let byten = eat.nth(units - 1);
120+
121+
let bb: u8 = match byten {
122+
None => {
123+
return len;
124+
}
125+
Some(b) => *b,
126+
};
127+
128+
units = match bb {
129+
bb if bb >> 7 == 0 => 1,
130+
bb if bb >> 5 == 0b110 => 2,
131+
bb if bb >> 4 == 0b1110 => 3,
132+
bb if bb >> 3 == 0b11110 => 4,
133+
_bb => {
134+
// TODO BAD ERROR HANDL
135+
panic!("Incorrect UTF-8 sequence");
136+
#[allow(unreachable_code)]
137+
0
138+
}
139+
};
140+
141+
len += units;
142+
counted_runes += 1;
143+
}
144+
145+
len
146+
}
147+
148+
#[cfg(test)]
149+
mod tests {
150+
// this brings everything from parent's scope into this scope
151+
use super::*;
152+
153+
#[test]
154+
fn test_column_length() {
155+
let data = b"abcd";
156+
assert_eq!(column_length(data, 4), 4);
157+
assert_ne!(column_length(b"\xE5abc", 4), 3);
158+
assert_eq!(column_length(b"\xE5abc", 4), 4);
159+
assert_ne!(column_length(b"\xE5abc", 4), 5);
160+
}
161+
162+
#[test]
163+
fn test_column_length_num_rightaligned() {
164+
assert_eq!(column_length_num_rightaligned(b"123", 3), (0, 2));
165+
}
166+
}
167+
168+
169+
pub(crate) struct ChunkAndResidue {
170+
pub(crate) chunk: Box<[u8; SLICER_IN_CHUNK_SIZE]>,
171+
}
172+
pub(crate) trait Slicer<'a> {
173+
fn slice_and_convert(
174+
&mut self,
175+
converter: Box<dyn 'a + Converter<'a>>,
176+
infile: fs::File,
177+
n_threads: usize,
178+
) -> Result<crate::chunked::Stats, &str>;
179+
}
180+
pub(crate) struct Stats {
181+
pub(crate) bytes_in: usize,
182+
pub(crate) bytes_out: usize,
183+
184+
pub(crate) num_rows: i64,
185+
}
186+
187+
pub(crate) type FnLineBreakLen = fn() -> usize;
188+
#[allow(dead_code)]
189+
pub(crate) fn line_break_len_cr() -> usize {
190+
1 as usize
191+
}
192+
#[allow(dead_code)]
193+
pub(crate) fn line_break_len_crlf() -> usize {
194+
2 as usize
195+
}
196+
197+
pub(crate) type FnFindLastLineBreak<'a> = fn(bytes: &'a [u8]) -> (bool, usize);
198+
#[allow(dead_code)]
199+
pub(crate) fn find_last_nlcr(bytes: &[u8]) -> (bool, usize) {
200+
if bytes.is_empty() {
201+
return (false, 0); // TODO should report err ...
202+
}
203+
204+
let mut p2 = bytes.len() - 1;
205+
206+
if 0 == p2 {
207+
return (false, 0); // hmm
208+
}
209+
210+
loop {
211+
if bytes[p2 - 1] == 0x0d && bytes[p2] == 0x0a {
212+
return (true, p2 + 1);
213+
}
214+
if 0 == p2 {
215+
return (false, 0); // indicate we didnt find nl
216+
}
217+
218+
p2 -= 1;
219+
}
220+
}
221+
222+
#[allow(dead_code)]
223+
pub(crate) fn find_last_nl(bytes: &[u8]) -> (bool, usize) {
224+
if bytes.is_empty() {
225+
return (false, 0); // Indicate we didnt found nl.
226+
}
227+
228+
let mut p2 = bytes.len() - 1;
229+
230+
if 0 == p2 {
231+
return (false, 0); // hmm
232+
}
233+
234+
loop {
235+
if bytes[p2] == 0x0a {
236+
return (true, p2);
237+
}
238+
if 0 == p2 {
239+
return (false, 0); // indicate we didnt find nl
240+
}
241+
p2 -= 1;
242+
}
243+
}
244+
pub(crate) struct IterRevolver<'a, T> {
245+
shards: *mut T,
246+
next: usize,
247+
len: usize,
248+
phantom: std::marker::PhantomData<&'a mut [T]>,
249+
}
250+
251+
impl<'a, T> From<&'a mut [T]> for crate::chunked::IterRevolver<'a, T> {
252+
fn from(shards: &'a mut [T]) -> crate::chunked::IterRevolver<'a, T> {
253+
IterRevolver {
254+
next: 0,
255+
len: shards.len(),
256+
shards: shards.as_mut_ptr(),
257+
phantom: std::marker::PhantomData,
258+
}
259+
}
260+
}
261+
262+
impl<'a, T> Iterator for IterRevolver<'a, T> {
263+
type Item = &'a mut T;
264+
fn next(&mut self) -> Option<Self::Item> {
265+
if self.next < self.len {
266+
self.next += 1;
267+
} else {
268+
self.next = 1;
269+
}
270+
unsafe { Some(&mut *self.shards.offset(self.next as isize - 1)) }
271+
}
272+
}

0 commit comments

Comments
 (0)