Skip to content
This repository was archived by the owner on Jun 30, 2022. It is now read-only.

Commit 0edd4bf

Browse files
robertwbsilviulica
authored andcommitted
Consistently apply sharding suffix to TextFileSink
Previously, the default behavior varied between the Direct and Dataflow pipeline runner, with only the latter adding -SSSSS-of-NNNNN suffixes. ----Release Notes---- Consistently apply sharding suffix to TextFileSink [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=120885157
1 parent 517d355 commit 0edd4bf

14 files changed

Lines changed: 33 additions & 23 deletions

google/cloud/dataflow/examples/complete/estimate_pi_test.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,11 @@ def create_temp_file(self, contents):
3030
return f.name
3131

3232
def test_basics(self):
33-
temp_path = self.create_temp_file('not used')
33+
temp_path = self.create_temp_file('result')
3434
estimate_pi.run([
3535
'--output=%s' % temp_path])
3636
# Parse result file and compare.
37-
with open(temp_path) as result_file:
37+
with open(temp_path + '-00000-of-00001') as result_file:
3838
estimated_pi = json.loads(result_file.readline())[2]
3939
# Note: Probabilistically speaking this test can fail with a probability
4040
# that is very small (VERY) given that we run at least 10 million trials.

google/cloud/dataflow/examples/complete/juliaset/juliaset/juliaset_test.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ def test_output_file_format(self):
5757

5858
# Parse the results from the file, and ensure it was written in the proper
5959
# format.
60-
with open(self.test_files['output_coord_file_name']) as result_file:
60+
with open(self.test_files['output_coord_file_name'] +
61+
'-00000-of-00001') as result_file:
6162
output_lines = result_file.readlines()
6263

6364
# Should have a line for each x-coordinate.

google/cloud/dataflow/examples/complete/tfidf_test.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,8 @@ def test_basics(self):
7171
'--output', os.path.join(temp_folder, 'result')])
7272
# Parse result file and compare.
7373
results = []
74-
with open(os.path.join(temp_folder, 'result')) as result_file:
74+
with open(os.path.join(temp_folder,
75+
'result-00000-of-00001')) as result_file:
7576
for line in result_file:
7677
match = re.search(EXPECTED_LINE_RE, line)
7778
logging.info('Result line: %s', line)

google/cloud/dataflow/examples/cookbook/bigshuffle_test.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,17 +39,18 @@ def test_basics(self):
3939
'--checksum_output=%s.checksum' % temp_path])
4040
# Parse result file and compare.
4141
results = []
42-
with open(temp_path + '.result') as result_file:
42+
with open(temp_path + '.result-00000-of-00001') as result_file:
4343
for line in result_file:
4444
results.append(line.strip())
4545
expected = self.SAMPLE_TEXT.split('\n')
4646
self.assertEqual(sorted(results), sorted(expected))
4747
# Check the checksums
4848
input_csum = ''
49-
with open(temp_path + '.checksum-input') as input_csum_file:
49+
with open(temp_path + '.checksum-input-00000-of-00001') as input_csum_file:
5050
input_csum = input_csum_file.read().strip()
5151
output_csum = ''
52-
with open(temp_path + '.checksum-output') as output_csum_file:
52+
with open(temp_path +
53+
'.checksum-output-00000-of-00001') as output_csum_file:
5354
output_csum = output_csum_file.read().strip()
5455
expected_csum = 'd629c1f6'
5556
self.assertEqual(input_csum, expected_csum)

google/cloud/dataflow/examples/cookbook/coders_test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ def test_basics(self):
4242
'--output=%s.result' % temp_path])
4343
# Parse result file and compare.
4444
results = []
45-
with open(temp_path + '.result') as result_file:
45+
with open(temp_path + '.result-00000-of-00001') as result_file:
4646
for line in result_file:
4747
results.append(json.loads(line))
4848
logging.info('result: %s', results)

google/cloud/dataflow/examples/cookbook/custom_ptransform_test.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,9 @@ def run_pipeline(self, count_implementation, factor=1):
4242
])
4343

4444
count_implementation(known_args, PipelineOptions(pipeline_args))
45-
self.assertEqual(["""(u'CAT', %d)""" % (3 * factor),
46-
"""(u'DOG', %d)""" % (2 * factor)],
47-
self.get_output(output_path))
45+
self.assertEqual(["(u'CAT', %d)" % (3 * factor),
46+
"(u'DOG', %d)" % (2 * factor)],
47+
self.get_output(output_path + '-00000-of-00001'))
4848

4949
def create_temp_file(self, contents=''):
5050
with tempfile.NamedTemporaryFile(delete=False) as f:

google/cloud/dataflow/examples/cookbook/group_with_coder_test.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ def test_basics_with_type_check(self):
5151
'--output=%s.result' % temp_path])
5252
# Parse result file and compare.
5353
results = []
54-
with open(temp_path + '.result') as result_file:
54+
with open(temp_path + '.result-00000-of-00001') as result_file:
5555
for line in result_file:
5656
name, points = line.split(',')
5757
results.append((name, int(points)))
@@ -72,7 +72,7 @@ def test_basics_without_type_check(self):
7272
'--output=%s.result' % temp_path])
7373
# Parse result file and compare.
7474
results = []
75-
with open(temp_path + '.result') as result_file:
75+
with open(temp_path + '.result-00000-of-00001') as result_file:
7676
for line in result_file:
7777
name, points = line.split(',')
7878
results.append((name, int(points)))

google/cloud/dataflow/examples/cookbook/mergecontacts_test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ def test_mergecontacts(self):
111111
'--output_tsv=%s.tsv' % result_prefix,
112112
'--output_stats=%s.stats' % result_prefix], assert_results=(2, 1, 3))
113113

114-
with open('%s.tsv' % result_prefix) as f:
114+
with open('%s.tsv-00000-of-00001' % result_prefix) as f:
115115
contents = f.read()
116116
self.assertEqual(self.EXPECTED_TSV, self.normalize_tsv_results(contents))
117117

google/cloud/dataflow/examples/cookbook/multiple_output_pardo_test.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,14 +52,15 @@ def test_multiple_output_pardo(self):
5252
'--output=%s' % result_prefix])
5353

5454
expected_char_count = len(''.join(self.SAMPLE_TEXT.split('\n')))
55-
with open(result_prefix + '-chars') as f:
55+
with open(result_prefix + '-chars-00000-of-00001') as f:
5656
contents = f.read()
5757
self.assertEqual(expected_char_count, int(contents))
5858

59-
short_words = self.get_wordcount_results(result_prefix + '-short-words')
59+
short_words = self.get_wordcount_results(
60+
result_prefix + '-short-words-00000-of-00001')
6061
self.assertEqual(sorted(short_words), sorted(self.EXPECTED_SHORT_WORDS))
6162

62-
words = self.get_wordcount_results(result_prefix + '-words')
63+
words = self.get_wordcount_results(result_prefix + '-words-00000-of-00001')
6364
self.assertEqual(sorted(words), sorted(self.EXPECTED_WORDS))
6465

6566

google/cloud/dataflow/examples/snippets/snippets_test.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -322,9 +322,9 @@ def create_temp_file(self, contents=''):
322322
f.write(contents)
323323
return f.name
324324

325-
def get_output(self, path, sorted_output=True):
325+
def get_output(self, path, sorted_output=True, suffix=''):
326326
lines = []
327-
with open(path) as f:
327+
with open(path + '-00000-of-00001' + suffix) as f:
328328
lines = f.readlines()
329329
if sorted_output:
330330
return sorted(s.rstrip('\n') for s in lines)
@@ -365,7 +365,7 @@ def test_model_textio(self):
365365
snippets.model_textio({'read': temp_path, 'write': result_path})
366366
self.assertEqual(
367367
['aa', 'bb', 'bb', 'cc', 'cc', 'cc'],
368-
self.get_output(result_path))
368+
self.get_output(result_path, suffix='.csv'))
369369

370370
def test_model_bigqueryio(self):
371371
# We cannot test BigQueryIO functionality in unit tests therefore we limit

0 commit comments

Comments
 (0)