Skip to content

Commit 36f3184

Browse files
committed
fix never ending program bug
1 parent 75159a0 commit 36f3184

2 files changed

Lines changed: 7 additions & 8 deletions

File tree

src/chunked/arrow_converter.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ impl<'a> Converter<'a> for Slice2Arrow<'a> {
194194

195195
let start_parse = Instant::now();
196196
let offset: usize = self.consistent_counter.get();
197-
197+
let mut local_consistent_counter: ConsistentCounter = ConsistentCounter::new(0);
198198
let arc_slices = Arc::new(&slices);
199199
self.masterbuilders
200200
.builders
@@ -217,23 +217,21 @@ impl<'a> Converter<'a> for Slice2Arrow<'a> {
217217
br.push(bb.finish());
218218
}
219219
let record_batch = RecordBatch::try_from_iter(br).unwrap();
220-
221220
let _ = self
222221
.masterbuilders
223222
.sender
224223
.clone()
225224
.unwrap()
226225
.send(offset + i, record_batch);
226+
let offset: usize = local_consistent_counter.add(1);
227227
}
228228
}
229229
});
230230

231231
for ii in slices.iter() {
232232
bytes_in += ii.len();
233233
}
234-
let offset: usize = self
235-
.consistent_counter
236-
.add(self.masterbuilders.builders.len());
234+
let offset: usize = self.consistent_counter.add(local_consistent_counter.get());
237235

238236
parse_duration = start_parse.elapsed();
239237
(bytes_in, bytes_out, parse_duration, builder_write_duration)
@@ -261,7 +259,10 @@ impl<'a> Converter<'a> for Slice2Arrow<'a> {
261259
let emptyrb = arrow::record_batch::RecordBatch::new_empty(Arc::new(schema));
262260
let c = self.consistent_counter.get();
263261
let _ = &self.masterbuilders.sender.clone().unwrap().send(c, emptyrb);
264-
rt.spawn_blocking(|| async { jh.await.unwrap() });
262+
rt.spawn_blocking(|| async {
263+
info!("...spawned blockiiiiing");
264+
jh.await.unwrap()
265+
});
265266
}
266267
}
267268

src/chunked/recordbatch_output.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,6 @@ impl ParquetFileOut {
297297

298298
'outer: loop {
299299
let mut message = receiver.recv();
300-
301300
match message {
302301
Ok(rb) => {
303302
if (rb.num_rows() == 0) {
@@ -306,7 +305,6 @@ impl ParquetFileOut {
306305
writer.write(&rb).expect("Error Writing batch");
307306
}
308307
Err(e) => {
309-
info!("got RecvError in channel , break to outer");
310308
break 'outer;
311309
}
312310
}

0 commit comments

Comments
 (0)