@@ -58,10 +58,12 @@ use atomic_counter::{AtomicCounter, ConsistentCounter};
5858use crossbeam:: atomic:: AtomicConsume ;
5959use libc:: { bsearch, send} ;
6060use ordered_channel:: Sender ;
61+ use ordered_channel:: Receiver ;
62+
6163use parquet:: errors:: { ParquetError , Result } ;
6264use parquet:: file:: metadata:: FileMetaData ;
6365use std:: sync:: atomic:: { AtomicUsize , Ordering , ATOMIC_USIZE_INIT } ;
64- use std:: sync:: mpsc:: { sync_channel, Receiver , RecvError , SyncSender } ;
66+ // use std::sync::mpsc::{sync_channel, Receiver, RecvError, SyncSender};
6567use std:: thread;
6668//use std::thread::JoinHandle;
6769use tokio:: task:: JoinHandle ;
@@ -205,7 +207,7 @@ pub(crate) struct ParquetFileOut {
205207 pub ( crate ) sender : Option < Sender < RecordBatch > > ,
206208}
207209impl ParquetFileOut {
208- pub ( crate ) async fn myParquet ( schema : SchemaRef , fixed_schema : FixedSchema , outfile : PathBuf , rt : tokio :: runtime :: Runtime ) ->( Result < Stats > ) {
210+ pub ( crate ) async fn myParquet ( sender : Sender < RecordBatch > , mut receiver : Receiver < RecordBatch > , schema : SchemaRef , fixed_schema : FixedSchema , outfile : PathBuf ) ->( Result < Stats > ) {
209211// Self::deltasetup(fixed_schema).await.unwrap();
210212 let _out_file = fs:: OpenOptions :: new ( )
211213 . create ( true )
@@ -217,11 +219,7 @@ impl ParquetFileOut {
217219
218220 let mut writer: ArrowWriter < File > =
219221 ArrowWriter :: try_new ( _out_file, schema, Some ( props. clone ( ) ) ) . unwrap ( ) ;
220-
221- let ( sender, mut receiver) = bounded :: < RecordBatch > ( 100 ) ;
222-
223- self . sender = Some ( sender. clone ( ) ) ;
224-
222+
225223 ' outer: loop {
226224 let mut message = receiver. recv ( ) ;
227225
@@ -254,8 +252,11 @@ impl ParquetFileOut {
254252}
255253impl RecordBatchOutput for ParquetFileOut {
256254 fn setup ( & mut self , schema : SchemaRef , fixed_schema : FixedSchema , outfile : PathBuf , rt : tokio:: runtime:: Runtime ) -> ( Sender < RecordBatch > , JoinHandle < Result < Stats > > ) {
255+ let ( sender, mut receiver) = bounded :: < RecordBatch > ( 100 ) ;
256+
257+ self . sender = Some ( sender. clone ( ) ) ;
257258
258- let j: JoinHandle < Result < Stats > > =rt. spawn ( Self :: myParquet ( schema, fixed_schema, outfile, rt ) ) ;
259+ let j: JoinHandle < Result < Stats > > =rt. spawn ( Self :: myParquet ( sender , receiver , schema, fixed_schema, outfile) ) ;
259260
260261// let j:JoinHandle<Result<Stats>>=rt.spawn(Self::myDelta(schema,fixed_schema,outfile));
261262
0 commit comments