Skip to content
This repository was archived by the owner on Jun 30, 2022. It is now read-only.

Commit 4784ea5

Browse files
charlesccychenaaltay
authored andcommitted
Rename GlobalWindows.WindowedValue to GlobalWindows.windowed_value
----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=121691420
1 parent 4f7605b commit 4784ea5

5 files changed

Lines changed: 15 additions & 15 deletions

File tree

google/cloud/dataflow/runners/direct_runner.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ def run_GroupByKeyOnly(self, transform_node):
200200
% wv)
201201

202202
gbk_result = map(
203-
GlobalWindows.WindowedValue,
203+
GlobalWindows.windowed_value,
204204
((key_coder.decode(k), v) for k, v in result_dict.iteritems()))
205205
self.debug_counters['element_counts'][
206206
transform_node.full_label] += len(gbk_result)
@@ -209,7 +209,7 @@ def run_GroupByKeyOnly(self, transform_node):
209209
@skip_if_cached
210210
def run_Create(self, transform_node):
211211
transform = transform_node.transform
212-
create_result = [GlobalWindows.WindowedValue(v) for v in transform.value]
212+
create_result = [GlobalWindows.windowed_value(v) for v in transform.value]
213213
self.debug_counters['element_counts'][
214214
transform_node.full_label] += len(create_result)
215215
self._cache.cache_output(transform_node, create_result)
@@ -230,7 +230,7 @@ def run_Read(self, transform_node):
230230
source = transform_node.transform.source
231231
source.pipeline_options = transform_node.inputs[0].pipeline.options
232232
with source.reader() as reader:
233-
read_result = [GlobalWindows.WindowedValue(e) for e in reader]
233+
read_result = [GlobalWindows.windowed_value(e) for e in reader]
234234
self.debug_counters['element_counts'][
235235
transform_node.full_label] += len(read_result)
236236
self._cache.cache_output(transform_node, read_result)

google/cloud/dataflow/transforms/window.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,7 @@ class GlobalWindows(WindowFn):
259259
"""A windowing function that assigns everything to one global window."""
260260

261261
@classmethod
262-
def WindowedValue(cls, value, timestamp=MIN_TIMESTAMP): # pylint: disable=invalid-name
262+
def windowed_value(cls, value, timestamp=MIN_TIMESTAMP):
263263
return WindowedValue(value, timestamp, [GlobalWindow()])
264264

265265
def assign(self, assign_context):

google/cloud/dataflow/worker/executor.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ def start(self):
199199
if reader.returns_windowed_values:
200200
windowed_value = value
201201
else:
202-
windowed_value = GlobalWindows.WindowedValue(value)
202+
windowed_value = GlobalWindows.windowed_value(value)
203203
self.output(windowed_value)
204204

205205
def request_dynamic_split(self, dynamic_split_request):
@@ -289,7 +289,7 @@ def start(self):
289289
with self.shuffle_source.reader() as reader:
290290
for key, key_values in reader:
291291
self._reader = reader
292-
windowed_value = GlobalWindows.WindowedValue((key, key_values))
292+
windowed_value = GlobalWindows.windowed_value((key, key_values))
293293
self.output(windowed_value, coder=write_coder)
294294

295295
def get_progress(self):
@@ -322,7 +322,7 @@ def start(self):
322322
with self.shuffle_source.reader() as reader:
323323
for value in reader:
324324
self._reader = reader
325-
windowed_value = GlobalWindows.WindowedValue(value)
325+
windowed_value = GlobalWindows.windowed_value(value)
326326
self.output(windowed_value, coder=write_coder)
327327

328328
def get_progress(self):

google/cloud/dataflow/worker/opcounters_test.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -48,15 +48,15 @@ def test_update_int(self):
4848
opcounts = OperationCounters(CounterFactory(), 'some-name',
4949
coders.PickleCoder(), 0)
5050
self.verify_counters(opcounts, 0)
51-
opcounts.update_from(GlobalWindows.WindowedValue(1))
51+
opcounts.update_from(GlobalWindows.windowed_value(1))
5252
opcounts.update_collect()
5353
self.verify_counters(opcounts, 1)
5454

5555
def test_update_str(self):
5656
opcounts = OperationCounters(CounterFactory(), 'some-name',
5757
coders.PickleCoder(), 0)
5858
self.verify_counters(opcounts, 0)
59-
opcounts.update_from(GlobalWindows.WindowedValue('abcde'))
59+
opcounts.update_from(GlobalWindows.windowed_value('abcde'))
6060
opcounts.update_collect()
6161
self.verify_counters(opcounts, 1)
6262

@@ -65,7 +65,7 @@ def test_update_old_object(self):
6565
coders.PickleCoder(), 0)
6666
self.verify_counters(opcounts, 0)
6767
obj = OldClassThatDoesNotImplementLen()
68-
opcounts.update_from(GlobalWindows.WindowedValue(obj))
68+
opcounts.update_from(GlobalWindows.windowed_value(obj))
6969
opcounts.update_collect()
7070
self.verify_counters(opcounts, 1)
7171

@@ -75,19 +75,19 @@ def test_update_new_object(self):
7575
self.verify_counters(opcounts, 0)
7676

7777
obj = ObjectThatDoesNotImplementLen()
78-
opcounts.update_from(GlobalWindows.WindowedValue(obj))
78+
opcounts.update_from(GlobalWindows.windowed_value(obj))
7979
opcounts.update_collect()
8080
self.verify_counters(opcounts, 1)
8181

8282
def test_update_multiple(self):
8383
opcounts = OperationCounters(CounterFactory(), 'some-name',
8484
coders.PickleCoder(), 0)
8585
self.verify_counters(opcounts, 0)
86-
opcounts.update_from(GlobalWindows.WindowedValue('abcde'))
87-
opcounts.update_from(GlobalWindows.WindowedValue('defghij'))
86+
opcounts.update_from(GlobalWindows.windowed_value('abcde'))
87+
opcounts.update_from(GlobalWindows.windowed_value('defghij'))
8888
opcounts.update_collect()
8989
self.verify_counters(opcounts, 2)
90-
opcounts.update_from(GlobalWindows.WindowedValue('klmnop'))
90+
opcounts.update_from(GlobalWindows.windowed_value('klmnop'))
9191
opcounts.update_collect()
9292
self.verify_counters(opcounts, 3)
9393

google/cloud/dataflow/worker/windmillio.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ def __init__(self, source):
5858
def __iter__(self):
5959
for bundle in self.source.context.work_item.message_bundles:
6060
for message in bundle.messages:
61-
yield GlobalWindows.WindowedValue(
61+
yield GlobalWindows.windowed_value(
6262
self.source.coder.decode(message.data),
6363
timestamp=windmill_to_harness_timestamp(message.timestamp))
6464

0 commit comments

Comments
 (0)