Skip to content

Commit 38e4f34

Browse files
author
Rickard Lundin
committed
fixed choice
1 parent 56ac779 commit 38e4f34

4 files changed

Lines changed: 39 additions & 3 deletions

File tree

src/chunked/arrow_converter.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,10 @@ pub(crate) struct Slice2Arrow<'a> {
7676
pub(crate) fn_line_break_len: FnLineBreakLen,
7777
pub(crate) masterbuilders: MasterBuilders,
7878
pub(crate) consistent_counter: ConsistentCounter,
79+
pub(crate) threaded_write: (Sender<RecordBatch>,JoinHandle<Result<Stats>>)
7980
}
8081

82+
8183
pub(crate) struct MasterBuilders {
8284
builders: Vec<Vec<Box<dyn Sync + Send + ColumnBuilder>>>,
8385
outfile: PathBuf,

src/chunked/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ use parquet::errors::{ParquetError, Result};
4343
pub(crate) mod arrow_converter;
4444
pub(crate) mod residual_slicer;
4545
pub(crate) mod self_converter;
46-
mod threaded_file_output;
46+
pub(crate) mod threaded_file_output;
4747
mod trimmer;
4848

4949
pub(crate) struct ChunkAndResidue {

src/chunked/threaded_file_output.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,11 +66,24 @@ use std::thread::JoinHandle;
6666
use std::time::{Duration, Instant};
6767
use thread::spawn;
6868
use Compression::SNAPPY;
69+
use crate::cli::Targets;
6970

7071
pub(crate) struct parquet_file_out {
7172
pub(crate) sender: Option<Sender<RecordBatch>>,
7273
}
73-
74+
pub(crate) fn output_factory (target: Targets,schema: SchemaRef,_outfile:PathBuf) -> (Sender<RecordBatch>, JoinHandle<Result<Stats>>) {
75+
match target {
76+
Targets::Parquet => {
77+
let mut pfo: Box<dyn arrow_file_output> = Box::new(parquet_file_out { sender: None });
78+
pfo.setup(schema, _outfile)
79+
}
80+
Targets::IPC => {
81+
let mut pfo: Box<dyn arrow_file_output> = Box::new(ipc_file_out { sender: None });
82+
pfo.setup(schema, _outfile)
83+
}
84+
Targets::None => {todo!()}
85+
}
86+
}
7487
impl arrow_file_output for parquet_file_out {
7588
fn setup(
7689
&mut self,

src/cli.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ use std::fs;
3838
#[cfg(feature = "rayon")]
3939
use std::fs::File;
4040
use std::path::PathBuf;
41+
use arrow::datatypes::SchemaRef;
42+
use atomic_counter::ConsistentCounter;
4143

4244
#[cfg(feature = "rayon")]
4345
use crate::chunked::arrow_converter::{MasterBuilders, Slice2Arrow};
@@ -47,6 +49,7 @@ use crate::chunked::residual_slicer::ResidualSlicer;
4749
use crate::chunked::self_converter::SampleSliceAggregator;
4850
#[cfg(feature = "rayon")]
4951
use crate::chunked::{find_last_nl, line_break_len_cr, Converter as ChunkedConverter, Slicer};
52+
use crate::chunked::threaded_file_output::output_factory;
5053
use crate::converter::Converter;
5154
use crate::error::Result;
5255
use crate::mocker::Mocker;
@@ -59,6 +62,14 @@ enum Converters {
5962
None,
6063
}
6164

65+
#[cfg(feature = "rayon")]
66+
#[derive(clap::ValueEnum, Clone)]
67+
pub(crate) enum Targets {
68+
Parquet,
69+
IPC,
70+
None,
71+
}
72+
6273
#[cfg(feature = "rayon")]
6374
#[derive(clap::ValueEnum, Clone)]
6475
enum Slicers {
@@ -77,6 +88,13 @@ enum Slicers {
7788
pub struct Cli {
7889
#[command(subcommand)]
7990
command: Commands,
91+
#[arg(
92+
long = "target",
93+
value_name = "TARGET-ENCODING",
94+
action = ArgAction::Set,
95+
default_value = "Parquet",
96+
)]
97+
target: Targets,
8098

8199
/// Set the number of threads (logical cores) to use when multi-threading.
82100
#[arg(
@@ -89,6 +107,7 @@ pub struct Cli {
89107
n_threads: usize,
90108
}
91109

110+
92111
#[derive(Subcommand)]
93112
enum Commands {
94113
/// Convert a fixed-length file (.flf) to parquet.
@@ -275,12 +294,14 @@ impl Cli {
275294
schema.to_path_buf(),
276295
n_threads as i16,
277296
);
278-
297+
let sc=master_builders.schema_factory();
298+
279299
let s2a: Box<Slice2Arrow> = Box::new(Slice2Arrow {
280300
fn_line_break: find_last_nl,
281301
fn_line_break_len: line_break_len_cr,
282302
masterbuilders: master_builders,
283303
consistent_counter: ConsistentCounter::new(0),
304+
threaded_write: output_factory(self.target.clone(),sc ,out_file.clone().to_path_buf()),
284305
});
285306
s2a
286307
}

0 commit comments

Comments
 (0)