Skip to content
This repository was archived by the owner on Jun 30, 2022. It is now read-only.

Commit 9e84c97

Browse files
robertwbgildea
authored andcommitted
Optimize shuffle writing
* Cache the key and value coder impls on the shuffle readers and writers themselves. * Don't re-encode the key in the secondary key slot. Instead, let it be empty, as we're not using it now. ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=117183412
1 parent 1389ded commit 9e84c97

4 files changed

Lines changed: 19 additions & 15 deletions

File tree

google/cloud/dataflow/worker/executor.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -328,10 +328,9 @@ def process(self, o):
328328
k, v = str(random.getrandbits(64)), o.value
329329
else:
330330
k, v = o.value
331-
# TODO(silviuc): Figure out what is the proper value for the secondary key.
332-
# For now the secondary key is a duplicate of the primary key just because
333-
# they both use the same coder.
334-
self.writer.Write(k, k, v)
331+
# TODO(silviuc): Use timestamps for the secondary key to get values in
332+
# times-sorted order.
333+
self.writer.Write(k, '', v)
335334

336335

337336
class DoOperation(Operation):

google/cloud/dataflow/worker/executor_test.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -186,8 +186,8 @@ def test_read_do_shuffle_write(self):
186186
test_shuffle_sink=shuffle_sink_mock)
187187
# Make sure we have seen all the (k, v) writes.
188188
shuffle_sink_mock.writer().Write.assert_has_calls(
189-
[mock.call('a', 'a', 1), mock.call('b', 'b', 1),
190-
mock.call('c', 'c', 1), mock.call('d', 'd', 1)])
189+
[mock.call('a', '', 1), mock.call('b', '', 1),
190+
mock.call('c', '', 1), mock.call('d', '', 1)])
191191

192192
def test_shuffle_read_do_write(self):
193193
output_path = self.create_temp_file('n/a')

google/cloud/dataflow/worker/shuffle.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,8 @@ def __init__(self, shuffle_source, reader=None):
309309
self.source = shuffle_source
310310
self.reader = reader
311311
self.entries_iterable = None
312+
self.key_coder = self.source.key_coder.get_impl()
313+
self.value_coder = self.source.value_coder.get_impl()
312314

313315
def __enter__(self):
314316
if self.reader is None:
@@ -340,7 +342,7 @@ def __iter__(self):
340342
entries_iterator.push_back(entry)
341343
key_values = ShuffleKeyValuesIterable(
342344
entries_iterator,
343-
entry.key, self.source.value_coder, entry.position)
345+
entry.key, self.value_coder, entry.position)
344346
group_start = entry.position
345347

346348
last_group_start = self._range_tracker.last_group_start
@@ -354,7 +356,7 @@ def __iter__(self):
354356
# source.
355357
return
356358

357-
yield (self.source.key_coder.decode(entry.key), key_values)
359+
yield (self.key_coder.decode(entry.key), key_values)
358360
# We need to drain the iterator returned just in case this
359361
# was not done by the caller. Otherwise we will not properly advance
360362
# to the next key but rather return the next entry for the current
@@ -406,7 +408,7 @@ def __init__(self, shuffle_source, reader=None):
406408

407409
def __iter__(self):
408410
for entry in self.entries_iterable:
409-
yield self.source.value_coder.decode(entry.value)
411+
yield self.value_coder.decode(entry.value)
410412

411413

412414
class ShuffleSourceBase(iobase.Source):
@@ -451,6 +453,8 @@ def __init__(self, shuffle_sink, writer=None):
451453
self.writer = writer
452454
self.stream = StringIO.StringIO()
453455
self.bytes_buffered = 0
456+
self.key_coder = self.sink.key_coder.get_impl()
457+
self.value_coder = self.sink.value_coder.get_impl()
454458

455459
def __enter__(self):
456460
if self.writer is None:
@@ -468,9 +472,9 @@ def __exit__(self, exception_type, exception_value, traceback):
468472

469473
def Write(self, key, secondary_key, value):
470474
entry = ShuffleEntry(
471-
self.sink.key_coder.encode(key),
472-
self.sink.key_coder.encode(secondary_key),
473-
self.sink.value_coder.encode(value),
475+
self.key_coder.encode(key),
476+
secondary_key,
477+
self.value_coder.encode(value),
474478
position=None)
475479
entry.to_bytes(self.stream, with_position=False)
476480
self.bytes_buffered += entry.size

google/cloud/dataflow/worker/shuffle_test.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,15 @@
1919
import logging
2020
import unittest
2121

22+
from google.cloud.dataflow import coders
2223
from google.cloud.dataflow.io import iobase
2324
from google.cloud.dataflow.worker.shuffle import GroupedShuffleSource
2425
from google.cloud.dataflow.worker.shuffle import ShuffleEntry
2526
from google.cloud.dataflow.worker.shuffle import ShuffleSink
2627
from google.cloud.dataflow.worker.shuffle import UngroupedShuffleSource
2728

2829

29-
class Base64Coder(object):
30+
class Base64Coder(coders.Coder):
3031
"""Simple base64 coder used throughout the tests."""
3132

3233
def decode(self, o):
@@ -67,7 +68,7 @@ def _make_chunk(self, descriptor, start_index):
6768
for key, value in descriptor:
6869
ShuffleEntry(
6970
coder.encode(key),
70-
coder.encode('2nd-%s' % key),
71+
'',
7172
coder.encode(value),
7273
position=str(position)).to_bytes(stream)
7374
position += 1
@@ -348,7 +349,7 @@ class TestShuffleSink(unittest.TestCase):
348349

349350
def test_basics(self):
350351
source = ShuffleSink(config_bytes='not used', coder=Base64Coder())
351-
entries = [('a', '2nd-a', '1'), ('b', '2nd-b', '0'), ('b', '2nd-b', '1')]
352+
entries = [('a', '', '1'), ('b', '', '0'), ('b', '', '1')]
352353
fake_writer = FakeShuffleWriter()
353354
with source.writer(test_writer=fake_writer) as writer:
354355
for entry in entries:

0 commit comments

Comments
 (0)