Skip to content
This repository was archived by the owner on Jun 30, 2022. It is now read-only.

Commit e958bbb

Browse files
chamikaramjsilviulica
authored andcommitted
Updates and simplifies logic related to progress reporting.
Adds a new class ProgressReporter that encapsulates logic related to progress reporting. Each progress reporter object will start a new thread. A progress reporter should be created for each work item processed by a worker. This simplifies logic since we can now cleanly start and stop progress reporting of a work item without having to worry about unnecessary progress reports being send by a progress reporting thread. Updates ProgressReporter to shutdown cleanly and send any unsent split requests. Significantly improves test coverage of BatchWorker by adding tests to a batchworker_test.py. Updates apiclient so that its 'report_status' and 'lease_work' methods accept an object of type WorkerInfo that contains necessary information about the worker instead of the worker object preventing apiclient from accessing internal state of the worker. Fixes a bug in CounterFactory. Updated it to be thread safe so that progress reporting thread and main thread can update the counters map simultaneously. ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=119423075
1 parent 11c9db6 commit e958bbb

5 files changed

Lines changed: 540 additions & 317 deletions

File tree

google/cloud/dataflow/internal/apiclient.py

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -632,20 +632,20 @@ def __init__(self, worker, skip_get_credentials=False):
632632
get_credentials=(not skip_get_credentials)))
633633

634634
@retry.with_exponential_backoff() # Using retry defaults from utils/retry.py
635-
def lease_work(self, worker):
635+
def lease_work(self, worker_info, desired_lease_duration):
636636
"""Leases a work item from the service."""
637637
work_request = dataflow.LeaseWorkItemRequest()
638-
work_request.workerId = worker.worker_id
639-
work_request.requestedLeaseDuration = worker.desired_lease_duration()
640-
work_request.currentWorkerTime = worker.current_time
641-
work_request.workerCapabilities.append(worker.worker_id)
642-
for value in worker.capabilities:
638+
work_request.workerId = worker_info.worker_id
639+
work_request.requestedLeaseDuration = desired_lease_duration
640+
work_request.currentWorkerTime = worker_info.formatted_current_time
641+
work_request.workerCapabilities.append(worker_info.worker_id)
642+
for value in worker_info.capabilities:
643643
work_request.workerCapabilities.append(value)
644-
for value in worker.work_types:
644+
for value in worker_info.work_types:
645645
work_request.workItemTypes.append(value)
646646
request = dataflow.DataflowProjectsJobsWorkItemsLeaseRequest()
647-
request.jobId = worker.job_id
648-
request.projectId = worker.project_id
647+
request.jobId = worker_info.job_id
648+
request.projectId = worker_info.project_id
649649
try:
650650
request.leaseWorkItemRequest = work_request
651651
except AttributeError:
@@ -656,7 +656,8 @@ def lease_work(self, worker):
656656
return response
657657

658658
def report_status(self,
659-
worker,
659+
worker_info,
660+
desired_lease_duration,
660661
work_item,
661662
completed,
662663
progress,
@@ -670,7 +671,12 @@ def report_status(self,
670671
work item.
671672
672673
Args:
673-
worker: The Worker instance executing the work item.
674+
worker_info: A batchworker.BatchWorkerInfo that contains
675+
information about the Worker instance executing the work
676+
item.
677+
desired_lease_duration: The duration for which the worker would like to
678+
extend the lease of the work item. Should be in seconds formatted as a
679+
string.
674680
work_item: The work item for which to report status.
675681
completed: True if there is no further work to be done on this work item
676682
either because it succeeded or because it failed. False if this is a
@@ -695,7 +701,7 @@ def report_status(self,
695701
work_item_status.completed = completed
696702

697703
if not completed:
698-
work_item_status.requestedLeaseDuration = worker.desired_lease_duration()
704+
work_item_status.requestedLeaseDuration = desired_lease_duration
699705

700706
if progress is not None:
701707
work_item_progress = dataflow.ApproximateProgress()
@@ -742,13 +748,13 @@ def report_status(self,
742748
append_counter(work_item_status, counter, tentative=not completed)
743749

744750
report_request = dataflow.ReportWorkItemStatusRequest()
745-
report_request.currentWorkerTime = worker.current_time
746-
report_request.workerId = worker.worker_id
751+
report_request.currentWorkerTime = worker_info.formatted_current_time
752+
report_request.workerId = worker_info.worker_id
747753
report_request.workItemStatuses.append(work_item_status)
748754

749755
request = dataflow.DataflowProjectsJobsWorkItemsReportStatusRequest()
750-
request.jobId = worker.job_id
751-
request.projectId = worker.project_id
756+
request.jobId = worker_info.job_id
757+
request.projectId = worker_info.project_id
752758
try:
753759
request.reportWorkItemStatusRequest = report_request
754760
except AttributeError:

google/cloud/dataflow/utils/counters.py

Lines changed: 36 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
"""Counters collect the progress of the Worker for reporting to the service."""
1919

20+
import threading
21+
2022

2123
class Counter(object):
2224
"""A counter aggregates a series of values.
@@ -109,6 +111,9 @@ class CounterFactory(object):
109111
def __init__(self):
110112
self.counters = {}
111113

114+
# Lock to be acquired when accessing the counters map.
115+
self._lock = threading.Lock()
116+
112117
def get_counter(self, name, aggregation_kind):
113118
"""Returns a counter with the requested name.
114119
@@ -122,13 +127,14 @@ def get_counter(self, name, aggregation_kind):
122127
Returns:
123128
A new or existing counter with the requested name.
124129
"""
125-
counter = self.counters.get(name, None)
126-
if counter:
127-
assert counter.aggregation_kind == aggregation_kind
128-
else:
129-
counter = Counter(name, aggregation_kind)
130-
self.counters[name] = counter
131-
return counter
130+
with self._lock:
131+
counter = self.counters.get(name, None)
132+
if counter:
133+
assert counter.aggregation_kind == aggregation_kind
134+
else:
135+
counter = Counter(name, aggregation_kind)
136+
self.counters[name] = counter
137+
return counter
132138

133139
def get_aggregator_counter(self, step_name, aggregator):
134140
"""Returns an AggregationCounter for this step's aggregator.
@@ -141,16 +147,26 @@ def get_aggregator_counter(self, step_name, aggregator):
141147
Returns:
142148
A new or existing counter.
143149
"""
144-
name = 'user-%s-%s' % (step_name, aggregator.name)
145-
aggregation_kind = aggregator.aggregation_kind
146-
counter = self.counters.get(name, None)
147-
if counter:
148-
assert isinstance(counter, AggregatorCounter)
149-
assert counter.aggregation_kind == aggregation_kind
150-
else:
151-
counter = AggregatorCounter(name, aggregation_kind)
152-
self.counters[name] = counter
153-
return counter
154-
155-
def itercounters(self):
156-
return self.counters.itervalues()
150+
with self._lock:
151+
name = 'user-%s-%s' % (step_name, aggregator.name)
152+
aggregation_kind = aggregator.aggregation_kind
153+
counter = self.counters.get(name, None)
154+
if counter:
155+
assert isinstance(counter, AggregatorCounter)
156+
assert counter.aggregation_kind == aggregation_kind
157+
else:
158+
counter = AggregatorCounter(name, aggregation_kind)
159+
self.counters[name] = counter
160+
return counter
161+
162+
def get_counters(self):
163+
"""Returns the current set of counters.
164+
165+
Returns:
166+
An iterable that contains the current set of counters. To make sure that
167+
multiple threads can iterate over the set of counters, we return a new
168+
iterable here. Note that the actual set of counters may get modified after
169+
this method returns hence the returned iterable may be stale.
170+
"""
171+
with self._lock:
172+
return self.counters.values()

0 commit comments

Comments
 (0)