Skip to content
This repository was archived by the owner on Jun 30, 2022. It is now read-only.

Commit 8e90369

Browse files
committed
Several refactorings in preparation for making the repo public.
1 parent 7ebad55 commit 8e90369

53 files changed

Lines changed: 1782 additions & 1384 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

README.md

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,6 @@ and streaming parallel data processing pipelines.
77
The Dataflow SDK for Python provides access to Dataflow capabilities
88
from the Python programming language.
99

10-
## Table of Contents
11-
[TOC]
12-
1310
## Status of this Release
1411

1512
This is the Google Cloud Dataflow SDK for Python version 0.2.0.

google/cloud/dataflow/coders/typecoders.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ def register_standard_coders(self, fallback_coder):
7979
"""Register coders for all basic and composite types."""
8080
self._register_coder_internal(int, coders.VarIntCoder)
8181
self._register_coder_internal(long, coders.VarIntCoder)
82+
self._register_coder_internal(float, coders.FloatCoder)
8283
self._register_coder_internal(str, coders.BytesCoder)
8384
self._register_coder_internal(bytes, coders.BytesCoder)
8485
self._register_coder_internal(unicode, coders.StrUtf8Coder)

google/cloud/dataflow/dataflow_test.py

Lines changed: 77 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import re
2121
import unittest
2222

23-
from google.cloud.dataflow.error import PValueError
2423
from google.cloud.dataflow.pipeline import Pipeline
2524
from google.cloud.dataflow.pvalue import AsDict
2625
from google.cloud.dataflow.pvalue import AsIter as AllOf
@@ -35,6 +34,8 @@
3534
from google.cloud.dataflow.transforms import Map
3635
from google.cloud.dataflow.transforms import ParDo
3736
from google.cloud.dataflow.transforms import WindowInto
37+
from google.cloud.dataflow.transforms.util import assert_that
38+
from google.cloud.dataflow.transforms.util import equal_to
3839
from google.cloud.dataflow.transforms.window import IntervalWindow
3940
from google.cloud.dataflow.transforms.window import WindowFn
4041

@@ -43,7 +44,7 @@ class DataflowTest(unittest.TestCase):
4344
"""Dataflow integration tests."""
4445

4546
SAMPLE_DATA = 'aa bb cc aa bb aa \n' * 10
46-
SAMPLE_RESULT = {'aa': 30, 'bb': 20, 'cc': 10}
47+
SAMPLE_RESULT = [('cc', 10), ('bb', 20), ('aa', 30)]
4748

4849
# TODO(silviuc): Figure out a nice way to specify labels for stages so that
4950
# internal steps get prepended with surorunding stage names.
@@ -61,25 +62,26 @@ def test_word_count(self):
6162
result = (
6263
(lines | FlatMap('GetWords', lambda x: re.findall(r'\w+', x)))
6364
.apply('CountWords', DataflowTest.Count))
65+
assert_that(result, equal_to(DataflowTest.SAMPLE_RESULT))
6466
pipeline.run()
65-
self.assertEqual(DataflowTest.SAMPLE_RESULT, dict(result.get()))
6667

6768
def test_map(self):
6869
pipeline = Pipeline('DirectPipelineRunner')
6970
lines = pipeline | Create('input', ['a', 'b', 'c'])
7071
result = (lines
7172
| Map('upper', str.upper)
7273
| Map('prefix', lambda x, prefix: prefix + x, 'foo-'))
74+
assert_that(result, equal_to(['foo-A', 'foo-B', 'foo-C']))
7375
pipeline.run()
74-
self.assertEqual(['foo-A', 'foo-B', 'foo-C'], list(result.get()))
7576

7677
def test_word_count_using_get(self):
7778
pipeline = Pipeline('DirectPipelineRunner')
7879
lines = pipeline | Create('SomeWords', [DataflowTest.SAMPLE_DATA])
7980
result = (
8081
(lines | FlatMap('GetWords', lambda x: re.findall(r'\w+', x)))
8182
.apply('CountWords', DataflowTest.Count))
82-
self.assertEqual(DataflowTest.SAMPLE_RESULT, dict(result.get()))
83+
assert_that(result, equal_to(DataflowTest.SAMPLE_RESULT))
84+
pipeline.run()
8385

8486
def test_par_do_with_side_input_as_arg(self):
8587
pipeline = Pipeline('DirectPipelineRunner')
@@ -91,9 +93,8 @@ def test_par_do_with_side_input_as_arg(self):
9193
'DecorateWords',
9294
lambda x, pfx, sfx: ['%s-%s-%s' % (pfx, x, sfx)],
9395
AsSingleton(prefix), suffix)
94-
self.assertEquals(
95-
['%s-%s-%s' % (prefix.get().next(), x, suffix) for x in words_list],
96-
list(result.get()))
96+
assert_that(result, equal_to(['xyz-%s-zyx' % x for x in words_list]))
97+
pipeline.run()
9798

9899
def test_par_do_with_side_input_as_keyword_arg(self):
99100
pipeline = Pipeline('DirectPipelineRunner')
@@ -105,9 +106,8 @@ def test_par_do_with_side_input_as_keyword_arg(self):
105106
'DecorateWords',
106107
lambda x, pfx, sfx: ['%s-%s-%s' % (pfx, x, sfx)],
107108
prefix, sfx=AsSingleton(suffix))
108-
self.assertEquals(
109-
['%s-%s-%s' % (prefix, x, suffix.get().next()) for x in words_list],
110-
list(result.get()))
109+
assert_that(result, equal_to(['zyx-%s-xyz' % x for x in words_list]))
110+
pipeline.run()
111111

112112
def test_par_do_with_do_fn_object(self):
113113
class SomeDoFn(DoFn):
@@ -123,9 +123,8 @@ def process(self, context, prefix, suffix):
123123
suffix = pipeline | Create('SomeString', ['xyz']) # side in
124124
result = words | ParDo('DecorateWordsDoFn', SomeDoFn(), prefix,
125125
suffix=AsSingleton(suffix))
126-
self.assertEquals(
127-
['%s-%s-%s' % (prefix, x, suffix.get().next()) for x in words_list],
128-
list(result.get()))
126+
assert_that(result, equal_to(['zyx-%s-xyz' % x for x in words_list]))
127+
pipeline.run()
129128

130129
def test_par_do_with_multiple_outputs_and_using_yield(self):
131130
class SomeDoFn(DoFn):
@@ -142,9 +141,10 @@ def process(self, context):
142141
nums = pipeline | Create('Some Numbers', [1, 2, 3, 4])
143142
results = nums | ParDo(
144143
'ClassifyNumbers', SomeDoFn()).with_outputs('odd', 'even', main='main')
145-
self.assertEquals([1, 2, 3, 4], list(results.main.get()))
146-
self.assertEquals([1, 3], list(results.odd.get()))
147-
self.assertEquals([2, 4], list(results.even.get()))
144+
assert_that(results.main, equal_to([1, 2, 3, 4]))
145+
assert_that(results.odd, equal_to([1, 3]), label='assert:odd')
146+
assert_that(results.even, equal_to([2, 4]), label='assert:even')
147+
pipeline.run()
148148

149149
def test_par_do_with_multiple_outputs_and_using_return(self):
150150
def some_fn(v):
@@ -157,9 +157,10 @@ def some_fn(v):
157157
nums = pipeline | Create('Some Numbers', [1, 2, 3, 4])
158158
results = nums | FlatMap(
159159
'ClassifyNumbers', some_fn).with_outputs('odd', 'even', main='main')
160-
self.assertEquals([1, 2, 3, 4], list(results.main.get()))
161-
self.assertEquals([1, 3], list(results.odd.get()))
162-
self.assertEquals([2, 4], list(results.even.get()))
160+
assert_that(results.main, equal_to([1, 2, 3, 4]))
161+
assert_that(results.odd, equal_to([1, 3]), label='assert:odd')
162+
assert_that(results.even, equal_to([2, 4]), label='assert:even')
163+
pipeline.run()
163164

164165
def test_empty_singleton_side_input(self):
165166
pipeline = Pipeline('DirectPipelineRunner')
@@ -170,53 +171,61 @@ def my_fn(k, s):
170171
v = ('empty' if isinstance(s, EmptySideInput) else 'full')
171172
return [(k, v)]
172173
result = pcol | FlatMap('compute', my_fn, AsSingleton(side))
173-
self.assertEquals([(1, 'empty'), (2, 'empty')], sorted(result.get()))
174+
assert_that(result, equal_to([(1, 'empty'), (2, 'empty')]))
175+
pipeline.run()
174176

175177
def test_multi_valued_singleton_side_input(self):
176178
pipeline = Pipeline('DirectPipelineRunner')
177179
pcol = pipeline | Create('start', [1, 2])
178180
side = pipeline | Create('side', [3, 4]) # 2 values in side input.
179-
result = pcol | FlatMap('compute', lambda x, s: [x * s], AsSingleton(side))
181+
pcol | FlatMap('compute', lambda x, s: [x * s], AsSingleton(side))
180182
with self.assertRaises(ValueError) as e:
181-
result.get()
183+
pipeline.run()
182184

183185
def test_default_value_singleton_side_input(self):
184186
pipeline = Pipeline('DirectPipelineRunner')
185187
pcol = pipeline | Create('start', [1, 2])
186188
side = pipeline | Create('side', []) # 0 values in side input.
187189
result = (
188190
pcol | FlatMap('compute', lambda x, s: [x * s], AsSingleton(side, 10)))
189-
self.assertEquals([10, 20], sorted(result.get()))
191+
assert_that(result, equal_to([10, 20]))
192+
pipeline.run()
190193

191194
def test_iterable_side_input(self):
192195
pipeline = Pipeline('DirectPipelineRunner')
193196
pcol = pipeline | Create('start', [1, 2])
194197
side = pipeline | Create('side', [3, 4]) # 2 values in side input.
195198
result = pcol | FlatMap('compute',
196199
lambda x, s: [x * y for y in s], AllOf(side))
197-
self.assertEquals([3, 4, 6, 8], sorted(result.get()))
200+
assert_that(result, equal_to([3, 4, 6, 8]))
201+
pipeline.run()
198202

199203
def test_undeclared_side_outputs(self):
200204
pipeline = Pipeline('DirectPipelineRunner')
201205
nums = pipeline | Create('Some Numbers', [1, 2, 3, 4])
202206
results = nums | FlatMap(
203207
'ClassifyNumbers',
204208
lambda x: [x, SideOutputValue('even' if x % 2 == 0 else 'odd', x)]
205-
).with_outputs()
206-
self.assertEquals([1, 2, 3, 4], list(results[None].get()))
207-
self.assertEquals([1, 3], list(results.odd.get()))
208-
self.assertEquals([2, 4], list(results.even.get()))
209+
).with_outputs('odd', 'even', main='main')
210+
# TODO(silviuc): Revisit this test to check for undeclared side outputs.
211+
# This should work with .with_outputs() without any tags declared and
212+
# the results[None] should work also.
213+
assert_that(results.main, equal_to([1, 2, 3, 4]))
214+
assert_that(results.odd, equal_to([1, 3]), label='assert:odd')
215+
assert_that(results.even, equal_to([2, 4]), label='assert:even')
216+
pipeline.run()
209217

210218
def test_empty_side_outputs(self):
211219
pipeline = Pipeline('DirectPipelineRunner')
212220
nums = pipeline | Create('Some Numbers', [1, 3, 5])
213221
results = nums | FlatMap(
214222
'ClassifyNumbers',
215223
lambda x: [x, SideOutputValue('even' if x % 2 == 0 else 'odd', x)]
216-
).with_outputs()
217-
self.assertEquals([1, 3, 5], list(results[None].get()))
218-
self.assertEquals([1, 3, 5], list(results.odd.get()))
219-
self.assertEquals([], list(results.even.get()))
224+
).with_outputs('odd', 'even', main='main')
225+
assert_that(results.main, equal_to([1, 3, 5]))
226+
assert_that(results.odd, equal_to([1, 3, 5]), label='assert:odd')
227+
assert_that(results.even, equal_to([]), label='assert:even')
228+
pipeline.run()
220229

221230
def test_as_list_and_as_dict_side_inputs(self):
222231
a_list = [5, 1, 3, 2, 9]
@@ -229,10 +238,17 @@ def test_as_list_and_as_dict_side_inputs(self):
229238
'concatenate',
230239
lambda x, the_list, the_dict: [[x, the_list, the_dict]],
231240
AsList(side_list), AsDict(side_pairs))
232-
[[result_elem, result_list, result_dict]] = results.get()
233-
self.assertEquals(1, result_elem)
234-
self.assertEquals(sorted(a_list), sorted(result_list))
235-
self.assertEquals(sorted(some_pairs), sorted(result_dict.iteritems()))
241+
242+
def matcher(expected_elem, expected_list, expected_pairs):
243+
def match(actual):
244+
[[actual_elem, actual_list, actual_dict]] = actual
245+
equal_to([expected_elem])([actual_elem])
246+
equal_to(expected_list)(actual_list)
247+
equal_to(expected_pairs)(actual_dict.iteritems())
248+
return match
249+
250+
assert_that(results, matcher(1, a_list, some_pairs))
251+
pipeline.run()
236252

237253
def test_as_list_without_unique_labels(self):
238254
a_list = [1, 2, 3]
@@ -258,10 +274,16 @@ def test_as_list_with_unique_labels(self):
258274
lambda x, ls1, ls2: [[x, ls1, ls2]],
259275
AsList(side_list), AsList(side_list, label='label'))
260276

261-
[[result_elem, result_ls1, result_ls2]] = results.get()
262-
self.assertEquals(1, result_elem)
263-
self.assertEquals(sorted(a_list), sorted(result_ls1))
264-
self.assertEquals(sorted(a_list), sorted(result_ls2))
277+
def matcher(expected_elem, expected_list):
278+
def match(actual):
279+
[[actual_elem, actual_list1, actual_list2]] = actual
280+
equal_to([expected_elem])([actual_elem])
281+
equal_to(expected_list)(actual_list1)
282+
equal_to(expected_list)(actual_list2)
283+
return match
284+
285+
assert_that(results, matcher(1, a_list))
286+
pipeline.run()
265287

266288
def test_as_dict_without_unique_labels(self):
267289
some_kvs = [('a', 1), ('b', 2)]
@@ -286,30 +308,17 @@ def test_as_dict_with_unique_labels(self):
286308
'test',
287309
lambda x, dct1, dct2: [[x, dct1, dct2]],
288310
AsDict(side_kvs), AsDict(side_kvs, label='label'))
289-
[[result_elem, result_dict1, result_dict2]] = results.get()
290-
self.assertEquals(1, result_elem)
291-
self.assertEquals(sorted(some_kvs), sorted(result_dict1.iteritems()))
292-
self.assertEquals(sorted(some_kvs), sorted(result_dict2.iteritems()))
293-
294-
def test_runner_clear(self):
295-
"""Tests for Runner.clear() method.
296-
297-
Note that it is not expected that users of the SDK will call this directly.
298-
More likely intermediate layers will call this to control the amount of
299-
caching for computed values.
300-
"""
301-
pipeline = Pipeline('DirectPipelineRunner')
302-
words_list = ['aa', 'bb', 'cc']
303-
words = pipeline | Create('SomeWords', words_list)
304-
result = words | FlatMap('DecorateWords', lambda x: ['x-%s' % x])
305-
self.assertEquals(['x-%s' % x for x in words_list], list(result.get()))
306-
# Now clear the entire pipeline.
307-
pipeline.runner.clear(pipeline)
308-
self.assertRaises(PValueError, pipeline.runner.get_pvalue, result)
309-
# Recompute and clear the pvalue node.
310-
result.get()
311-
pipeline.runner.clear(pipeline, node=result)
312-
self.assertRaises(PValueError, pipeline.runner.get_pvalue, result)
311+
312+
def matcher(expected_elem, expected_kvs):
313+
def match(actual):
314+
[[actual_elem, actual_dict1, actual_dict2]] = actual
315+
equal_to([expected_elem])([actual_elem])
316+
equal_to(expected_kvs)(actual_dict1.iteritems())
317+
equal_to(expected_kvs)(actual_dict2.iteritems())
318+
return match
319+
320+
assert_that(results, matcher(1, some_kvs))
321+
pipeline.run()
313322

314323
def test_window_transform(self):
315324
class TestWindowFn(WindowFn):
@@ -327,10 +336,10 @@ def merge(self, existing_windows):
327336
result = (numbers
328337
| WindowInto('W', windowfn=TestWindowFn())
329338
| GroupByKey('G'))
339+
assert_that(
340+
result, equal_to([(1, [10]), (1, [10]), (2, [20]),
341+
(2, [20]), (3, [30]), (3, [30])]))
330342
pipeline.run()
331-
self.assertEqual(
332-
[(1, [10]), (1, [10]), (2, [20]), (2, [20]), (3, [30]), (3, [30])],
333-
sorted(result.get(), key=lambda x: x[0]))
334343

335344

336345
if __name__ == '__main__':

google/cloud/dataflow/examples/complete/autocomplete.py

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,24 +16,34 @@
1616

1717
from __future__ import absolute_import
1818

19+
import argparse
1920
import logging
2021
import re
22+
import sys
2123

2224
import google.cloud.dataflow as df
23-
from google.cloud.dataflow.utils.options import add_option
24-
from google.cloud.dataflow.utils.options import get_options
2525

2626

27-
def run(options=None):
28-
p = df.Pipeline(options=get_options(options))
27+
def run(argv=sys.argv[1:]):
28+
29+
parser = argparse.ArgumentParser()
30+
parser.add_argument('--input',
31+
required=True,
32+
help='Input file to process.')
33+
parser.add_argument('--output',
34+
required=True,
35+
help='Output file to write results to.')
36+
known_args, pipeline_args = parser.parse_known_args(argv)
37+
38+
p = df.Pipeline(argv=pipeline_args)
2939

3040
(p # pylint: disable=expression-not-assigned
31-
| df.io.Read('read', df.io.TextFileSource(p.options.input))
41+
| df.io.Read('read', df.io.TextFileSource(known_args.input))
3242
| df.FlatMap('split', lambda x: re.findall(r'[A-Za-z\']+', x))
3343
| TopPerPrefix('TopPerPrefix', 5)
3444
| df.Map('format',
3545
lambda (prefix, candidates): '%s: %s' % (prefix, candidates))
36-
| df.io.Write('write', df.io.TextFileSink(p.options.output)))
46+
| df.io.Write('write', df.io.TextFileSink(known_args.output)))
3747
p.run()
3848

3949

@@ -65,14 +75,6 @@ def extract_prefixes((word, count)):
6575
yield prefix, (count, word)
6676

6777

68-
add_option(
69-
'--input', dest='input', required=True,
70-
help='Input file to process.')
71-
add_option(
72-
'--output', dest='output', required=True,
73-
help='Output file to write results to.')
74-
75-
7678
if __name__ == '__main__':
7779
logging.getLogger().setLevel(logging.INFO)
7880
run()

0 commit comments

Comments
 (0)