Skip to content
This repository was archived by the owner on Jun 30, 2022. It is now read-only.

Commit c79ccc5

Browse files
jkffsilviulica
authored andcommitted
Fixes a bug in progress reporting in TextFileReader
It was 1) thread-unsafe (accessed a reader's variable from a different thread) and 2) incorrect (did not return position of the last returned record). Because of this, the reader could end up in a situation where it would keep returning a position past the end of the range, confusing dynamic work rebalancing. ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=119983487
1 parent c271699 commit c79ccc5

3 files changed

Lines changed: 7 additions & 10 deletions

File tree

google/cloud/dataflow/internal/windmill_pb2.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1292,7 +1292,7 @@
12921292
_descriptor.FieldDescriptor(
12931293
name='double_scalar', full_name='windmill.Counter.double_scalar', index=2,
12941294
number=3, type=1, cpp_type=5, label=1,
1295-
has_default_value=False, default_value=0,
1295+
has_default_value=False, default_value=float(0),
12961296
message_type=None, enum_type=None, containing_type=None,
12971297
is_extension=False, extension_scope=None,
12981298
options=None),

google/cloud/dataflow/io/fileio.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -313,8 +313,8 @@ def __iter__(self):
313313
yield self.source.coder.decode(line)
314314

315315
def get_progress(self):
316-
return iobase.ReaderProgress(
317-
position=iobase.ReaderPosition(byte_offset=self.current_offset))
316+
return iobase.ReaderProgress(position=iobase.ReaderPosition(
317+
byte_offset=self.range_tracker.last_record_start))
318318

319319
def request_dynamic_split(self, dynamic_split_request):
320320
assert dynamic_split_request is not None

google/cloud/dataflow/io/fileio_test.py

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,7 @@ def progress_with_offsets(self, input_lines,
4949
start_offset=start_offset, end_offset=end_offset)
5050
progress_record = []
5151
with source.reader() as reader:
52-
# Starting value of percent_complete might be larger than zero since we
53-
# will ignore the first line of a record if the starting position of the
54-
# line is smaller than the start offset of the source.
55-
self.assertGreaterEqual(reader.get_progress().position.byte_offset, 0)
52+
self.assertEqual(reader.get_progress().position.byte_offset, -1)
5653
for line in reader:
5754
self.assertIsNotNone(line)
5855
progress_record.append(reader.get_progress().position.byte_offset)
@@ -78,14 +75,14 @@ def test_progress_entire_file(self):
7875
file_path=self.create_temp_file('\n'.join(lines)))
7976
progress_record = []
8077
with source.reader() as reader:
81-
self.assertEqual(0, reader.get_progress().position.byte_offset)
78+
self.assertEqual(-1, reader.get_progress().position.byte_offset)
8279
for line in reader:
8380
self.assertIsNotNone(line)
8481
progress_record.append(reader.get_progress().position.byte_offset)
85-
self.assertEqual(18, reader.get_progress().position.byte_offset)
82+
self.assertEqual(13, reader.get_progress().position.byte_offset)
8683

8784
self.assertEqual(len(progress_record), 3)
88-
self.assertEqual(progress_record, [6, 13, 18])
85+
self.assertEqual(progress_record, [0, 6, 13])
8986

9087
def try_splitting_reader_at(self, reader, split_request, expected_response):
9188
actual_response = reader.request_dynamic_split(split_request)

0 commit comments

Comments
 (0)