Skip to content

Commit 79097cc

Browse files
authored
feat: efficient chunked buffer slicing for multithreading (#74)
* build: add bytesize deps for printing file size * feat: chunked buffer slicing for threads * build: bump crate version 1.2.0 → 1.3.0 * fix: cargo fmt --all -- --check * ci/cd: update deploy workflow to only publish evolution full example
1 parent 3936321 commit 79097cc

5 files changed

Lines changed: 88 additions & 73 deletions

File tree

.github/workflows/cd.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ jobs:
2525
with:
2626
command: check
2727
- name: Cargo publish
28-
run: cargo publish --token ${CRATES_TOKEN}
28+
run: |
29+
cd examples/full
30+
cargo publish --token ${CRATES_TOKEN}
2931
env:
3032
CRATES_TOKEN: ${{ secrets.CRATES_TOKEN }}

Cargo.toml

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ authors = [
1515
"Wilhelm Ågren <wilhelmagren98@gmail.com>",
1616
]
1717
license = "MIT"
18-
version = "1.2.0"
18+
version = "1.3.0"
1919
readme = "README.md"
2020
categories = [
2121
"science",
@@ -26,16 +26,19 @@ categories = [
2626
keywords = [
2727
"arrow",
2828
"parquet",
29+
"concurrency",
30+
"data-engineering",
31+
"ETL",
2932
]
30-
description = "Efficiently evolve your old fixed-length data files into modern file formats. "
33+
description = "Efficiently evolve your old fixed-length data files into modern file formats."
3134

3235
[workspace.dependencies]
33-
evolution-builder = { path = "crates/evolution-builder", version = "1.2.0" }
34-
evolution-common = { path = "crates/evolution-common", version = "1.2.0" }
35-
evolution-converter = { path = "crates/evolution-converter", version = "1.2.0" }
36-
evolution-mocker = { path = "crates/evolution-mocker", version = "1.2.0" }
37-
evolution-parser = { path = "crates/evolution-parser", version = "1.2.0" }
38-
evolution-schema = { path = "crates/evolution-schema", version = "1.2.0" }
39-
evolution-slicer = { path = "crates/evolution-slicer", version = "1.2.0" }
40-
evolution-target = { path = "crates/evolution-target", version = "1.2.0" }
41-
evolution-writer = { path = "crates/evolution-writer", version = "1.2.0" }
36+
evolution-builder = { path = "crates/evolution-builder", version = "1.3.0" }
37+
evolution-common = { path = "crates/evolution-common", version = "1.3.0" }
38+
evolution-converter = { path = "crates/evolution-converter", version = "1.3.0" }
39+
evolution-mocker = { path = "crates/evolution-mocker", version = "1.3.0" }
40+
evolution-parser = { path = "crates/evolution-parser", version = "1.3.0" }
41+
evolution-schema = { path = "crates/evolution-schema", version = "1.3.0" }
42+
evolution-slicer = { path = "crates/evolution-slicer", version = "1.3.0" }
43+
evolution-target = { path = "crates/evolution-target", version = "1.3.0" }
44+
evolution-writer = { path = "crates/evolution-writer", version = "1.3.0" }

crates/evolution-converter/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ bench = false
1616

1717
[dependencies]
1818
arrow = "51.0.0"
19+
bytesize = "1.3.0"
1920
crossbeam = "0.8.4"
2021
evolution-builder = { workspace = true }
2122
evolution-common = { workspace = true }

crates/evolution-converter/src/converter.rs

Lines changed: 17 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,11 @@
2222
// SOFTWARE.
2323
//
2424
// File created: 2024-02-17
25-
// Last updated: 2024-10-11
25+
// Last updated: 2024-10-14
2626
//
2727

2828
use arrow::datatypes::SchemaRef as ArrowSchemaRef;
29+
use bytesize::ByteSize;
2930
use crossbeam::channel;
3031
use crossbeam::thread::scope;
3132
use crossbeam::thread::ScopedJoinHandle;
@@ -106,11 +107,12 @@ impl ParquetConverter {
106107
pub fn try_convert_multithreaded(&mut self) -> Result<()> {
107108
let mut buffer_capacity = self.read_buffer_size;
108109
let n_worker_threads: usize = self.n_threads - 1;
110+
let mut ratio_processed: f32;
109111

110112
info!("Converting flf to parquet in multithreaded mode.");
111113
info!(
112-
"The file to convert is {} bytes in total.",
113-
self.slicer.bytes_to_read(),
114+
"The file to convert is ~{} in total.",
115+
ByteSize::gb((self.slicer.bytes_to_read() / 1_000_000_000) as u64),
114116
);
115117

116118
let mut line_break_indices: Vec<usize> = Vec::with_capacity(buffer_capacity);
@@ -132,29 +134,27 @@ impl ParquetConverter {
132134
let mut buffer: Vec<u8> = vec![0u8; buffer_capacity];
133135
self.slicer.try_read_to_buffer(&mut buffer)?;
134136
self.slicer
135-
.try_find_line_breaks(&buffer, &mut line_break_indices, true)?;
136-
137-
let byte_idx_last_line_break: usize = self.slicer.try_find_last_line_break(&buffer)?;
138-
let n_bytes_left_after_last_line_break: usize =
139-
buffer_capacity - byte_idx_last_line_break - NUM_BYTES_FOR_NEWLINE;
140-
141-
self.distribute_worker_thread_workloads(&line_break_indices, &mut thread_workloads);
142-
137+
.try_distribute_buffer_chunks_on_workers(&buffer, &mut thread_workloads)?;
143138
self.spawn_converter_threads(&buffer, &thread_workloads)?;
144139

145-
line_break_indices.clear();
146-
thread_workloads.clear();
147-
140+
let n_bytes_left_after_last_line_break: usize =
141+
buffer_capacity - thread_workloads[n_worker_threads - 1].1 - NUM_BYTES_FOR_NEWLINE;
148142
self.slicer
149143
.try_seek_relative(-(n_bytes_left_after_last_line_break as i64))?;
150144

151145
bytes_processed += buffer_capacity - n_bytes_left_after_last_line_break;
152146
bytes_overlapped += n_bytes_left_after_last_line_break;
153147
remaining_bytes -= buffer_capacity - n_bytes_left_after_last_line_break;
148+
ratio_processed = 100.0 * bytes_processed as f32 / self.slicer.bytes_to_read() as f32;
154149

155150
self.slicer.set_remaining_bytes(remaining_bytes);
156151
self.slicer.set_bytes_processed(bytes_processed);
157152
self.slicer.set_bytes_overlapped(bytes_overlapped);
153+
154+
line_break_indices.clear();
155+
thread_workloads.clear();
156+
157+
info!("Estimated progress: {:.2}%", ratio_processed);
158158
}
159159

160160
#[cfg(debug_assertions)]
@@ -208,6 +208,8 @@ impl ParquetConverter {
208208
// requires that the buffer contains some values, it can't be empty
209209
// but with large enough capacity, really dumb. Look into smarter
210210
// way to do this, because it really bothers me!!!!!
211+
//
212+
// But maybe this is not too bad, the allocations don't take that long compared to the I/O...
211213
let mut buffer: Vec<u8> = vec![0u8; buffer_capacity];
212214
self.slicer.try_read_to_buffer(&mut buffer)?;
213215

@@ -244,33 +246,6 @@ impl ParquetConverter {
244246
Ok(())
245247
}
246248

247-
/// Divide the current buffer into chunks for each worker thread based on the line breaks.
248-
///
249-
/// # Note
250-
/// The workload will attempt to be uniform on each worker, however, the worker with the last index might get some
251-
/// extra lines to process due to number of rows not being divisible by the estimated number of rows per thread.
252-
fn distribute_worker_thread_workloads(
253-
&self,
254-
line_break_indices: &[usize],
255-
thread_workloads: &mut Vec<(usize, usize)>,
256-
) {
257-
let n_line_break_indices: usize = line_break_indices.len();
258-
let n_rows_per_thread: usize = n_line_break_indices / thread_workloads.capacity(); // usize division will floor the result
259-
260-
let mut prev_line_break_byte_idx: usize = 0;
261-
for worker_idx in 1..(thread_workloads.capacity()) {
262-
let next_line_break_byte_idx: usize =
263-
line_break_indices[n_rows_per_thread * worker_idx];
264-
thread_workloads.push((prev_line_break_byte_idx, next_line_break_byte_idx));
265-
prev_line_break_byte_idx = next_line_break_byte_idx + NUM_BYTES_FOR_NEWLINE;
266-
}
267-
268-
thread_workloads.push((
269-
prev_line_break_byte_idx,
270-
line_break_indices[n_line_break_indices - 1],
271-
));
272-
}
273-
274249
/// Spawn the threads which perform the conversion, specifically, n-1 threads will work on converting and 1
275250
/// thread will collect and buffer the converted results and write those to parquet file, where n was the
276251
/// specified number of threads for the program.
@@ -297,16 +272,9 @@ impl ParquetConverter {
297272
let arc_buffer: Arc<&Vec<u8>> = Arc::new(buffer);
298273

299274
let thread_result: thread::Result<()> = scope(|s| {
300-
#[cfg(debug_assertions)]
301-
debug!(
302-
"Starting {} worker threads for conversion.",
303-
thread_workloads.len()
304-
);
305-
306275
let threads = thread_workloads
307276
.iter()
308-
.enumerate()
309-
.map(|(t_idx, (from, to))| {
277+
.map(|(from, to)| {
310278
let t_sender: channel::Sender<ParquetBuilder> = sender.clone();
311279
// Can we do this in another way? So we don't have to allocate a bunch of stuff in our loop...
312280
// TODO: pull this out and create them as part of the ParquetConverter struct?..
@@ -316,30 +284,19 @@ impl ParquetConverter {
316284
let t_buffer_slice: &[u8] = &t_buffer[*from..*to];
317285

318286
s.spawn(move |_| {
319-
#[cfg(debug_assertions)]
320-
debug!("Thread {} working...", t_idx + 1);
321-
322287
t_builder.try_build_from_slice(t_buffer_slice).unwrap();
323288
t_sender.send(t_builder).unwrap();
324289
drop(t_sender);
325-
#[cfg(debug_assertions)]
326-
debug!("Thread {} done!", t_idx + 1);
327290
})
328291
})
329292
.collect::<Vec<ScopedJoinHandle<()>>>();
330293

331294
drop(sender);
332-
333-
#[cfg(debug_assertions)]
334-
debug!("Thread 0 waiting for batches to write to buffer...");
335295
for mut builder in receiver {
336296
self.writer.try_write_from_builder(&mut builder).unwrap();
337297
drop(builder);
338298
}
339299

340-
#[cfg(debug_assertions)]
341-
debug!("Writer thread done!");
342-
343300
for handle in threads {
344301
handle.join().expect("Could not join worker thread handle!");
345302
}

crates/evolution-slicer/src/slicer.rs

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,11 @@
2222
// SOFTWARE.
2323
//
2424
// File created: 2023-12-11
25-
// Last updated: 2024-05-31
25+
// Last updated: 2024-10-13
2626
//
2727

2828
use evolution_common::error::{ExecutionError, Result};
29+
use evolution_common::NUM_BYTES_FOR_NEWLINE;
2930
use log::warn;
3031

3132
use std::fs::{File, OpenOptions};
@@ -142,6 +143,57 @@ impl FileSlicer {
142143
}
143144
}
144145

146+
/// Try and evenly distribute the buffer into uniformly sized chunks for each worker thread.
147+
/// This function expects a [`Vec`] of usize tuples, representing the start and end byte
148+
/// indices for each worker threads chunk.
149+
///
150+
/// # Note
151+
/// This function is optimized to spend as little time as possible looking for valid chunks, i.e.,
152+
/// where there are line breaks, and will not look through the entire buffer. This can have an
153+
/// effect on the CPU cache hit-rate, however, this depends on the size of the buffer.
154+
///
155+
/// # Errors
156+
/// This function might return an error for the following reasons:
157+
/// * If the buffer was empty.
158+
/// * If there were no line breaks in the buffer.
159+
pub fn try_distribute_buffer_chunks_on_workers(
160+
&self,
161+
buffer: &[u8],
162+
thread_workloads: &mut Vec<(usize, usize)>,
163+
) -> Result<()> {
164+
let n_bytes_total: usize = buffer.len();
165+
let n_worker_threads: usize = thread_workloads.capacity();
166+
167+
let n_bytes_per_thread: usize = n_bytes_total / n_worker_threads;
168+
let n_bytes_remaining: usize = n_bytes_total - n_bytes_per_thread * n_worker_threads;
169+
170+
let mut prev_byte_idx: usize = 0;
171+
for _ in 0..(n_worker_threads - 1) {
172+
let next_byte_idx: usize = n_bytes_per_thread + prev_byte_idx;
173+
thread_workloads.push((prev_byte_idx, next_byte_idx));
174+
prev_byte_idx = next_byte_idx;
175+
}
176+
177+
thread_workloads.push((
178+
prev_byte_idx,
179+
prev_byte_idx + n_bytes_per_thread + n_bytes_remaining,
180+
));
181+
182+
let mut n_bytes_to_offset_start: usize = 0;
183+
for t_idx in 0..n_worker_threads {
184+
let (mut start_byte_idx, mut end_byte_idx) = thread_workloads[t_idx];
185+
start_byte_idx -= n_bytes_to_offset_start;
186+
let n_bytes_to_offset_end: usize = (end_byte_idx - start_byte_idx)
187+
- self.try_find_last_line_break(&buffer[start_byte_idx..end_byte_idx])?;
188+
end_byte_idx -= n_bytes_to_offset_end;
189+
thread_workloads[t_idx].0 = start_byte_idx;
190+
thread_workloads[t_idx].1 = end_byte_idx;
191+
n_bytes_to_offset_start = n_bytes_to_offset_end - NUM_BYTES_FOR_NEWLINE;
192+
}
193+
194+
Ok(())
195+
}
196+
145197
/// Read from the buffered reader into the provided buffer. This function reads
146198
/// enough bytes to fill the buffer, hence, it is up to the caller to ensure that
147199
/// that buffer has the correct and/or wanted capacity.

0 commit comments

Comments
 (0)