Skip to content

Commit ecd9ca7

Browse files
committed
merged in single_threaded line based impl
1 parent f3f147a commit ecd9ca7

6 files changed

Lines changed: 232 additions & 9 deletions

File tree

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ serde_json = "1.0.117"
4646
threadpool = "1.8.1"
4747
substring = "1.4.5"
4848
tempfile = "3.10.1"
49+
libc = "0.2.154"
4950

5051
[dev-dependencies]
5152
glob = "0.3.1"

src/cli.rs

Lines changed: 75 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,10 @@ use crate::converters::arrow_converter::{MasterBuilders, Slice2Arrow};
3434
use crate::converters::self_converter::SampleSliceAggregator;
3535
use crate::converters::Converter;
3636
use crate::dump::dump;
37-
use crate::slicers::old_slicer::{OldSlicer, IN_MAX_CHUNKS, SLICER_IN_CHUNK_SIZE};
37+
use crate::slicers::chunked_slicer::{OldSlicer, IN_MAX_CHUNKS, SLICER_IN_CHUNK_SIZE};
3838
use crate::slicers::Slicer;
3939
use crate::slicers::{find_last_nl, line_break_len_cr, ChunkAndResidue};
40-
use crate::{error, mocker, schema};
40+
use crate::{converter, error, mocker, schema};
4141
use clap::{value_parser, ArgAction, Parser, Subcommand};
4242
use log::info;
4343
use parquet::arrow::ArrowWriter;
@@ -71,8 +71,8 @@ enum Converters {
7171

7272
#[derive(clap::ValueEnum, Clone)]
7373
enum Slicers {
74-
Old,
75-
New,
74+
Chunks,
75+
Lines,
7676
}
7777

7878
#[derive(Subcommand)]
@@ -107,7 +107,7 @@ enum Commands {
107107
thread_channel_capacity: Option<usize>,
108108

109109
},
110-
Convert {
110+
ConvertChunked {
111111
/// Sets schema file
112112
113113
#[clap(value_enum, value_name = "CONVERTER")]
@@ -127,6 +127,54 @@ enum Commands {
127127
#[arg(short, long, value_name = "OUT-FILE")]
128128
out_file: PathBuf,
129129
},
130+
Convert {
131+
/// The fixed-length file to convert.
132+
#[arg(
133+
short = 'f',
134+
long = "file",
135+
value_name = "FILE",
136+
action = ArgAction::Set,
137+
)]
138+
file: PathBuf,
139+
140+
/// Specify output (target) file name.
141+
#[arg(
142+
short = 'o',
143+
long = "output-file",
144+
value_name = "OUTPUT-FILE",
145+
action = ArgAction::Set,
146+
required = false,
147+
)]
148+
output_file: PathBuf,
149+
150+
/// Specify the .json schema file to use when converting.
151+
#[arg(
152+
short = 's',
153+
long = "schema",
154+
value_name = "SCHEMA",
155+
action = ArgAction::Set,
156+
)]
157+
schema: PathBuf,
158+
159+
/// Set the size of the buffer (in bytes).
160+
#[arg(
161+
long = "buffer-size",
162+
value_name = "BUFFER-SIZE",
163+
action = ArgAction::Set,
164+
required = false,
165+
)]
166+
buffer_size: Option<usize>,
167+
168+
/// Set the capacity of the thread channel (number of messages).
169+
#[arg(
170+
long = "thread-channel-capacity",
171+
value_name = "THREAD-CHANNEL-CAPACITY",
172+
action = ArgAction::Set,
173+
required = false,
174+
)]
175+
thread_channel_capacity: Option<usize>,
176+
},
177+
130178
Dump {
131179
/// Sets schema file
132180
#[clap(value_enum, value_name = "CONVERTER")]
@@ -192,7 +240,28 @@ impl Cli {
192240

193241
Ok(())
194242
}
195-
Some(Commands::Convert {
243+
Some (Commands::Convert {
244+
file,
245+
output_file,
246+
schema,
247+
buffer_size,
248+
thread_channel_capacity,
249+
}) => {
250+
converter::Converter::builder()
251+
.target_file(file.to_owned())
252+
.output_file(output_file.to_owned())
253+
.schema(schema.to_owned())
254+
.num_threads(n_threads)
255+
.buffer_size(*buffer_size)
256+
.thread_channel_capacity(*thread_channel_capacity)
257+
.build()?
258+
.convert();
259+
Ok(())
260+
}
261+
262+
263+
264+
Some(Commands::ConvertChunked {
196265
converter,
197266
slicer: _,
198267
schema,

src/main.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ mod error;
3333
mod logger;
3434
mod mocker;
3535
mod schema;
36-
use crate::slicers::old_slicer::{IN_MAX_CHUNKS, SLICER_IN_CHUNK_SIZE};
36+
use crate::slicers::chunked_slicer::{IN_MAX_CHUNKS, SLICER_IN_CHUNK_SIZE};
3737
use crate::slicers::ChunkAndResidue;
3838
use cli::Cli;
3939
mod threads;
@@ -45,6 +45,9 @@ mod converters;
4545
mod dump;
4646
mod slicers;
4747
mod mocking;
48+
mod converter;
49+
mod slicer;
50+
4851
///
4952
fn main() {
5053
let cli = Cli::parse();

src/slicer.rs

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
/*
2+
* MIT License
3+
*
4+
* Copyright (c) 2024 Firelink Data
5+
*
6+
* Permission is hereby granted, free of charge, to any person obtaining a copy
7+
* of this software and associated documentation files (the "Software"), to deal
8+
* in the Software without restriction, including without limitation the rights
9+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10+
* copies of the Software, and to permit persons to whom the Software is
11+
* furnished to do so, subject to the following conditions:
12+
*
13+
* The above copyright notice and this permission notice shall be included in all
14+
* copies or substantial portions of the Software.
15+
*
16+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22+
* SOFTWARE.
23+
*
24+
* File created: 2023-12-11
25+
* Last updated: 2024-05-08
26+
*/
27+
28+
use std::default::Default;
29+
use std::slice::Iter;
30+
31+
use crate::error::Result;
32+
33+
#[derive(Debug)]
34+
pub(crate) struct Slicer {
35+
n_threads: usize,
36+
multithreading: bool,
37+
}
38+
39+
///
40+
impl Slicer {
41+
///
42+
pub fn builder() -> SlicerBuilder {
43+
SlicerBuilder {
44+
..Default::default()
45+
}
46+
}
47+
48+
/// Calculate how many bytes correspond to how many rune in the provided byte slice.
49+
/// See this link https://en.wikipedia.org/wiki/UTF-8 for details on how this works.
50+
///
51+
/// # Panics
52+
/// Iff the byte slice is not a valid utf-8 sequence.
53+
pub fn find_num_bytes_for_num_runes(&self, bytes: &[u8], num_runes: usize) -> usize {
54+
let mut found_runes: usize = 0;
55+
let mut num_bytes: usize = 0;
56+
let mut byte_units: usize = 1;
57+
58+
let mut iterator: Iter<u8> = bytes.iter();
59+
60+
while found_runes < num_runes {
61+
let byte = match iterator.nth(byte_units - 1) {
62+
Some(b) => *b,
63+
None => break,
64+
};
65+
66+
byte_units = match byte {
67+
byte if byte >> 7 == 0 => 1,
68+
byte if byte >> 5 == 0b110 => 2,
69+
byte if byte >> 4 == 0b1110 => 3,
70+
byte if byte >> 3 == 0b11110 => 4,
71+
_ => panic!("Invalid utf-8 sequence!"),
72+
};
73+
74+
found_runes += 1;
75+
num_bytes += byte_units;
76+
}
77+
78+
num_bytes
79+
}
80+
81+
/// Find all line breaks in a slice of bytes representing utf-8 encoded data.
82+
/// This method looks for a line-feed (LF) character, represented as `\n`.
83+
/// The hexadecimal code for `\n` is 0x0a.
84+
///
85+
/// # Panics
86+
/// Iff the byte slice is empty.
87+
#[cfg(not(target_os = "windows"))]
88+
pub fn find_line_breaks(&self, bytes: &[u8], buffer: &mut Vec<usize>) {
89+
if bytes.is_empty() {
90+
panic!("Byte slice was empty!");
91+
}
92+
93+
(0..bytes.len()).for_each(|idx| {
94+
if bytes[idx] == 0x0a {
95+
buffer.push(idx);
96+
}
97+
});
98+
}
99+
100+
/// Find all line breaks in a slice of bytes representing utf-8 encoded data.
101+
/// This method looks for windows OS specific line break characters called
102+
/// carriage-return (CR) and line-feed (LF). They are represented as the
103+
/// `\r` and `\n` utf-8 characters. The hexadecimal code for `\r` is 0x0d
104+
/// and `\n` is 0x0a.
105+
///
106+
/// # Panics
107+
/// Iff the byte slice is empty.
108+
#[cfg(target_os = "windows")]
109+
pub fn find_line_breaks(&self, bytes: &[u8], buffer: &mut Vec<usize>) {
110+
if bytes.is_empty() {
111+
panic!("Byte slice was empty!");
112+
}
113+
114+
(1..bytes.len()).for_each(|idx| {
115+
if (bytes[idx - 1] == 0x0d) && (bytes[idx] == 0x0a) {
116+
buffer.push(idx);
117+
}
118+
});
119+
}
120+
}
121+
122+
///
123+
#[derive(Debug, Default)]
124+
pub(crate) struct SlicerBuilder {
125+
n_threads: Option<usize>,
126+
}
127+
128+
///
129+
impl SlicerBuilder {
130+
///
131+
pub fn num_threads(mut self, n_threads: usize) -> Self {
132+
self.n_threads = Some(n_threads);
133+
self
134+
}
135+
136+
///
137+
pub fn build(self) -> Result<Slicer> {
138+
let n_threads = match self.n_threads {
139+
Some(n) => n,
140+
None => 1,
141+
};
142+
143+
let multithreading = n_threads > 1;
144+
145+
Ok(Slicer {
146+
n_threads,
147+
multithreading,
148+
})
149+
}
150+
}

src/slicers.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,11 @@
2525
* Last updated: 2023-12-14
2626
*/
2727
use crate::converters::Converter;
28-
use crate::slicers::old_slicer::{IN_MAX_CHUNKS, SLICER_IN_CHUNK_SIZE};
28+
use crate::slicers::chunked_slicer::{IN_MAX_CHUNKS, SLICER_IN_CHUNK_SIZE};
2929
use std::fs;
3030

3131

32-
pub mod old_slicer;
32+
pub mod chunked_slicer;
3333
//pub mod new_slicer;
3434

3535
pub(crate) struct ChunkAndResidue {

0 commit comments

Comments
 (0)