Skip to content
This repository was archived by the owner on Jun 30, 2022. It is now read-only.

Commit 4f7605b

Browse files
silviulicaaaltay
authored andcommitted
Fix issue in cache trimming logic for combiner lifting
----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=121688910
1 parent a113e83 commit 4f7605b

1 file changed

Lines changed: 4 additions & 2 deletions

File tree

google/cloud/dataflow/worker/executor.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -594,15 +594,17 @@ def process(self, wkv):
594594
target = self.key_count * 9 // 10
595595
old_wkeys = []
596596
# TODO(robertwb): Use an LRU cache?
597-
for old_wkey, old_wvalue in enumerate(self.table.iterkeys()):
597+
for old_wkey, old_wvalue in self.table.iteritems():
598598
old_wkeys.append(old_wkey) # Can't mutate while iterating.
599-
self.output_key(old_wkey, old_wvalue)
599+
self.output_key(old_wkey, old_wvalue[0])
600600
self.key_count -= 1
601601
if self.key_count <= target:
602602
break
603603
for old_wkey in reversed(old_wkeys):
604604
del self.table[old_wkey]
605605
self.key_count += 1
606+
# We save the accumulator as a one element list so we can efficiently
607+
# mutate when new values are added without searching the cache again.
606608
entry = self.table[wkey] = [self.combine_fn.create_accumulator()]
607609
entry[0] = self.combine_fn.add_inputs(entry[0], [value])
608610

0 commit comments

Comments
 (0)