Skip to content
This repository was archived by the owner on Jun 30, 2022. It is now read-only.

Commit 9bea52e

Browse files
chamikaramjgildea
authored andcommitted
Performs two small updates to progress reporting.
Updates BatchWorker so that progress reporting thread only sends progress updates when a work item is actively being processed. Without this progress reporting thread may send progress reports for already completed work items resulting in lease expiration errors. Updates BatchWorker so that dynamic_split_result from previous task get cleared when starting to process a new workitem. ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=117054158
1 parent 30c74e5 commit 9bea52e

1 file changed

Lines changed: 20 additions & 1 deletion

File tree

google/cloud/dataflow/worker/batchworker.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,10 @@ def __init__(self, properties):
105105
self._current_executor = None
106106
self.environment = maptask.WorkerEnvironment()
107107

108+
# If 'True', progress_reporting_thread keeps sending progress updates for
109+
# the currently set work item; does not send progress updates otherwise.
110+
self.report_progress = False
111+
108112
@property
109113
def current_work_item(self):
110114
with self.lock:
@@ -339,6 +343,15 @@ def progress_reporting_thread(self):
339343
try:
340344
self.log_memory_usage_if_needed(force=False)
341345

346+
if not self.report_progress:
347+
logging.debug('Progress update thread was paused. '
348+
'Sleeping a bit...')
349+
if work_item is not None:
350+
logging.debug('Releasing current work item')
351+
work_item = None
352+
time.sleep(1.0)
353+
continue
354+
342355
# If thread does not work on something then try to get the current work
343356
# item from the worker object. This in turn can be None if there where
344357
# no work items to lease from the service.
@@ -378,18 +391,24 @@ def do_work(self, work_item):
378391
self.log_memory_usage_if_needed(force=True)
379392
try:
380393
with work_item.lock:
394+
# If we still have a split result from a previous work item (which must
395+
# have failed) we clear it here.
396+
self.dynamic_split_result_to_report = None
397+
381398
self.set_current_work_item_and_executor(work_item,
382399
executor.MapTaskExecutor())
383-
400+
self.report_progress = True
384401
self.current_executor.execute(work_item.map_task)
385402
except Exception: # pylint: disable=broad-except
403+
self.report_progress = False
386404
exception_details = traceback.format_exc()
387405
logging.error('Exception: %s', exception_details, exc_info=True)
388406
# Completed with errors means failed.
389407
with work_item.lock:
390408
self.report_completion_status(work_item,
391409
exception_details=exception_details)
392410
else:
411+
self.report_progress = False
393412
with work_item.lock:
394413
self.report_completion_status(work_item)
395414
with work_item.lock:

0 commit comments

Comments
 (0)