Skip to content
This repository was archived by the owner on Jun 30, 2022. It is now read-only.

Commit 1389ded

Browse files
chamikaramjgildea
authored andcommitted
Updates MapTask._parse_avro_source() so that start and end position of sources are set properly.
Also does a small logging update. ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=117180516
1 parent b5b8508 commit 1389ded

2 files changed

Lines changed: 10 additions & 4 deletions

File tree

google/cloud/dataflow/io/range_trackers.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -219,8 +219,8 @@ def try_split_at_position(self, decoded_split_position):
219219
'of range', self, decoded_split_position)
220220
return False
221221

222-
logging.info('Agreeing to split %r at %r'
223-
, self, decoded_split_position)
222+
logging.debug('Agreeing to split %r at %r'
223+
, self, decoded_split_position)
224224
self._decoded_stop_pos = decoded_split_position
225225
return True
226226

google/cloud/dataflow/worker/maptask.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -678,10 +678,16 @@ def _parse_avro_source(specs, unused_codec_specs, unused_context):
678678
# Note that the worker does not really implement AVRO yet.It takes
679679
# advantage that both reading and writing is done through the worker to
680680
# choose a supported format (text files with one pickled object per line).
681+
start_offset = None
682+
if 'start_offset' in specs:
683+
start_offset = int(specs['start_offset']['value'])
684+
end_offset = None
685+
if 'end_offset' in specs:
686+
end_offset = int(specs['end_offset']['value'])
681687
return io.TextFileSource(
682688
file_path=specs['filename']['value'],
683-
start_offset=None,
684-
end_offset=None,
689+
start_offset=start_offset,
690+
end_offset=end_offset,
685691
strip_trailing_newlines=True,
686692
coder=coders.Base64PickleCoder())
687693

0 commit comments

Comments
 (0)