Skip to content

Commit 1d743d5

Browse files
authored
fix(mock): add channel bound to avoid io bottleneck (#27)
1 parent 7753348 commit 1d743d5

1 file changed

Lines changed: 10 additions & 4 deletions

File tree

src/mock.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,15 @@ use std::sync::Arc;
3737
use std::thread::JoinHandle;
3838
use std::{thread, usize};
3939

40-
pub(crate) static MINIMUM_N_ROWS_FOR_MULTITHREADING: usize = 1000;
40+
// This default value should depend on the memory capacity of the system
41+
// running the program. Because the workers produce buffers faster than
42+
// the master can write them to disk we need to bound the worker/master
43+
// channel. If you have a lot of system memory, you can increase this value,
44+
// but if you increase it too much the program will run out of system memory.
45+
pub(crate) static DEFAULT_THREAD_CHANNEL_CAPACITY: usize = 128;
46+
pub(crate) static DEFAULT_MIN_N_ROWS_FOR_MULTITHREADING: usize = 1000;
4147
pub(crate) static DEFAULT_MOCKED_FILENAME_LEN: usize = 8;
42-
pub(crate) static DEFAULT_ROW_BUFFER_LEN: usize = 1024 * 1024;
48+
pub(crate) static DEFAULT_ROW_BUFFER_LEN: usize = 1048 * 1024;
4349

4450
///
4551
pub struct Mocker {
@@ -67,7 +73,7 @@ impl Mocker {
6773

6874
///
6975
pub fn generate(&self, n_rows: usize) {
70-
if self.multithreaded && n_rows > MINIMUM_N_ROWS_FOR_MULTITHREADING {
76+
if self.multithreaded && n_rows > DEFAULT_MIN_N_ROWS_FOR_MULTITHREADING {
7177
self.generate_multithreaded(n_rows);
7278
} else {
7379
if self.multithreaded {
@@ -221,7 +227,7 @@ pub fn spawn_workers(
221227
info!("Remaining rows to handle: {}", remaining_rows);
222228

223229
let arc_schema = Arc::new(schema);
224-
let (sender, receiver) = channel::unbounded();
230+
let (sender, receiver) = channel::bounded(DEFAULT_THREAD_CHANNEL_CAPACITY);
225231

226232
let threads: Vec<JoinHandle<()>> = (0..n_threads)
227233
.map(|t| {

0 commit comments

Comments
 (0)