2020import logging
2121import os
2222import re
23+ import shutil
2324import tempfile
25+ import time
2426
2527from google .cloud .dataflow import coders
2628from google .cloud .dataflow .io import iobase
@@ -147,7 +149,245 @@ def reader(self):
147149 return TextFileReader (self )
148150
149151
150- class TextFileSink (iobase .NativeSink ):
152+ def TextFileSink (file_path_prefix , # pylint: disable=invalid-name
153+ append_trailing_newlines = True ,
154+ file_name_suffix = '' ,
155+ num_shards = 0 ,
156+ shard_name_template = None ,
157+ validate = True ,
158+ coder = coders .ToStringCoder ()):
159+ """Initialize a TextSink.
160+
161+ Args:
162+ file_path_prefix: The file path to write to. The files written will begin
163+ with this prefix, followed by a shard identifier (see num_shards), and
164+ end in a common extension, if given by file_name_suffix. In most cases,
165+ only this argument is specified and num_shards, shard_name_template, and
166+ file_name_suffix use default values.
167+ append_trailing_newlines: indicate whether this sink should write an
168+ additional newline char after writing each element.
169+ file_name_suffix: Suffix for the files written.
170+ num_shards: The number of files (shards) used for output. If not set, the
171+ service will decide on the optimal number of shards.
172+ Constraining the number of shards is likely to reduce
173+ the performance of a pipeline. Setting this value is not recommended
174+ unless you require a specific number of output files.
175+ shard_name_template: A template string containing placeholders for
176+ the shard number and shard count. Currently only '' and
177+ '-SSSSS-of-NNNNN' are patterns accepted by the service.
178+ When constructing a filename for a particular shard number, the
179+ upper-case letters 'S' and 'N' are replaced with the 0-padded shard
180+ number and shard count respectively. This argument can be '' in which
181+ case it behaves as if num_shards was set to 1 and only one file will be
182+ generated. The default pattern used is '-SSSSS-of-NNNNN'.
183+ validate: Enable path validation on pipeline creation.
184+ coder: Coder used to encode each line.
185+
186+ Raises:
187+ TypeError: if file_path is not a string.
188+ ValueError: if shard_name_template is not of expected format.
189+
190+ Returns:
191+ A TextFileSink object usable for writing.
192+ """
193+ if not isinstance (file_path_prefix , basestring ):
194+ raise TypeError (
195+ 'TextFileSink: file_path_prefix must be a string; got %r instead' %
196+ file_path_prefix )
197+ if not isinstance (file_name_suffix , basestring ):
198+ raise TypeError (
199+ 'TextFileSink: file_name_suffix must be a string; got %r instead' %
200+ file_name_suffix )
201+ if shard_name_template not in (None , '' , '-SSSSS-of-NNNNN' ):
202+ raise ValueError (
203+ 'The shard_name_template argument must be an empty string or the '
204+ 'pattern -SSSSS-of-NNNNN instead of %s' % shard_name_template )
205+ if shard_name_template == '' : # pylint: disable=g-explicit-bool-comparison
206+ num_shards = 1
207+
208+ if num_shards :
209+ return NativeTextFileSink (file_path_prefix ,
210+ append_trailing_newlines = append_trailing_newlines ,
211+ file_name_suffix = file_name_suffix ,
212+ num_shards = num_shards ,
213+ shard_name_template = shard_name_template ,
214+ validate = validate ,
215+ coder = coder )
216+ else :
217+ return PureTextFileSink (file_path_prefix ,
218+ append_trailing_newlines = append_trailing_newlines ,
219+ file_name_suffix = file_name_suffix ,
220+ coder = coder )
221+
222+
223+ class ChannelFactory (object ):
224+ # TODO(robertwb): Generalize into extensible framework.
225+
226+ @staticmethod
227+ def mkdir (path ):
228+ if path .startswith ('gs://' ):
229+ return
230+ else :
231+ try :
232+ os .makedirs (path )
233+ except OSError as err :
234+ raise IOError (err )
235+
236+ @staticmethod
237+ def open (path , mode , mime_type ):
238+ if path .startswith ('gs://' ):
239+ # pylint: disable=g-import-not-at-top
240+ from google .cloud .dataflow .io import gcsio
241+ return gcsio .GcsIO ().open (path , mode , mime_type )
242+ else :
243+ return open (path , mode )
244+
245+ @staticmethod
246+ def rename (src , dst ):
247+ if src .startswith ('gs://' ):
248+ assert dst .startswith ('gs://' ), dst
249+ # pylint: disable=g-import-not-at-top
250+ from google .cloud .dataflow .io import gcsio
251+ gcsio .GcsIO ().rename (src , dst )
252+ else :
253+ try :
254+ os .rename (src , dst )
255+ except OSError as err :
256+ raise IOError (err )
257+
258+ @staticmethod
259+ def exists (path ):
260+ if path .startswith ('gs://' ):
261+ # pylint: disable=g-import-not-at-top
262+ from google .cloud .dataflow .io import gcsio
263+ return gcsio .GcsIO ().exists ()
264+ else :
265+ return os .path .exists (path )
266+
267+ @staticmethod
268+ def rmdir (path ):
269+ if path .startswith ('gs://' ):
270+ # pylint: disable=g-import-not-at-top
271+ from google .cloud .dataflow .io import gcsio
272+ gcs = gcsio .GcsIO ()
273+ if not path .endswith ('/' ):
274+ path += '/'
275+ # TODO(robertwb): Threadpool?
276+ for entry in gcs .glob (path + '*' ):
277+ gcs .delete (entry )
278+ else :
279+ try :
280+ shutil .rmtree (path )
281+ except OSError as err :
282+ raise IOError (err )
283+
284+
285+ class FileSink (iobase .Sink ):
286+ """A sink to a GCS or local files.
287+
288+ To implement a file-based sink, extend this class and override
289+ the open_file_writer method for writing a single shard.
290+
291+ The output of this write is a PCollection of all written shards.
292+ """
293+ mime_type = 'application/octet-stream'
294+
295+ def __init__ (self , file_path_prefix , coder , file_name_suffix = '' ):
296+ self .file_path_prefix = file_path_prefix
297+ self .file_name_suffix = file_name_suffix
298+ self .coder = coder
299+
300+ def open (self , temp_path ):
301+ return ChannelFactory .open (temp_path , 'wb' , self .mime_type )
302+
303+ def write_record (self , file_handle , value ):
304+ self .write_encoded_record (file_handle , self .coder .encode (value ))
305+
306+ def write_encoded_record (self , file_handle , encoded_value ):
307+ raise NotImplementedError
308+
309+ def close (self , file_handle ):
310+ if file_handle :
311+ file_handle .close ()
312+
313+ def open_file_writer (self , temp_path ):
314+ return FileSinkWriter (self , temp_path )
315+
316+ def initialize_write (self ):
317+ tmp_dir = self .file_path_prefix + self .file_name_suffix + time .strftime (
318+ '-temp-%Y-%m-%d_%H-%M-%S' )
319+ ChannelFactory ().mkdir (tmp_dir )
320+ return tmp_dir
321+
322+ def open_writer (self , init_result , uid ):
323+ return self .open_file_writer (os .path .join (init_result , uid ))
324+
325+ def finalize_write (self , init_result , writer_results ):
326+ writer_results = sorted (writer_results )
327+ num_shards = len (writer_results )
328+ # TODO(robertwb): Threadpool?
329+ channel_factory = ChannelFactory ()
330+ for shard_num , shard in enumerate (writer_results ):
331+ final_name = '%s-%05d-of-%05d%s' % (self .file_path_prefix , shard_num ,
332+ num_shards , self .file_name_suffix )
333+ try :
334+ channel_factory .rename (shard , final_name )
335+ except IOError :
336+ # May have already been copied.
337+ print shard , final_name , os .path .exists (final_name )
338+ if not channel_factory .exists (final_name ):
339+ raise
340+ yield final_name
341+ try :
342+ channel_factory .rmdir (init_result )
343+ except IOError :
344+ # May have already been removed.
345+ pass
346+
347+ def __eq__ (self , other ):
348+ # TODO(robertwb): Clean up workitem_test which uses this.
349+ # pylint: disable=unidiomatic-typecheck
350+ return type (self ) == type (other ) and self .__dict__ == other .__dict__
351+
352+
353+ class FileSinkWriter (iobase .Writer ):
354+ """A generic writer for FileSink.
355+ """
356+
357+ def __init__ (self , sink , temp_shard_path ):
358+ self .sink = sink
359+ self .temp_shard_path = temp_shard_path
360+ self .temp_handle = self .sink .open (temp_shard_path )
361+
362+ def write (self , value ):
363+ self .sink .write_record (self .temp_handle , value )
364+
365+ def close (self ):
366+ self .sink .close (self .temp_handle )
367+ return self .temp_shard_path
368+
369+
370+ class PureTextFileSink (FileSink ):
371+ """A sink to a GCS or local text file or files."""
372+ mime_type = 'text/plain'
373+
374+ def __init__ (self ,
375+ file_path_prefix ,
376+ file_name_suffix = '' ,
377+ coder = coders .ToStringCoder (),
378+ append_trailing_newlines = True ):
379+ super (PureTextFileSink , self ).__init__ (file_path_prefix ,
380+ file_name_suffix = file_name_suffix ,
381+ coder = coder )
382+ self .append_trailing_newlines = append_trailing_newlines
383+
384+ def write_encoded_record (self , file_handle , encoded_value ):
385+ file_handle .write (encoded_value )
386+ if self .append_trailing_newlines :
387+ file_handle .write ('\n ' )
388+
389+
390+ class NativeTextFileSink (iobase .NativeSink ):
151391 """A sink to a GCS or local text file or files."""
152392
153393 def __init__ (self , file_path_prefix ,
@@ -157,46 +397,6 @@ def __init__(self, file_path_prefix,
157397 shard_name_template = None ,
158398 validate = True ,
159399 coder = coders .ToStringCoder ()):
160- """Initialize a TextSink.
161-
162- Args:
163- file_path_prefix: The file path to write to. The files written will begin
164- with this prefix, followed by a shard identifier (see num_shards), and
165- end in a common extension, if given by file_name_suffix. In most cases,
166- only this argument is specified and num_shards, shard_name_template, and
167- file_name_suffix use default values.
168- append_trailing_newlines: indicate whether this sink should write an
169- additional newline char after writing each element.
170- file_name_suffix: Suffix for the files written.
171- num_shards: The number of files (shards) used for output. If not set, the
172- service will decide on the optimal number of shards.
173- Constraining the number of shards is likely to reduce
174- the performance of a pipeline. Setting this value is not recommended
175- unless you require a specific number of output files.
176- shard_name_template: A template string containing placeholders for
177- the shard number and shard count. Currently only '' and
178- '-SSSSS-of-NNNNN' are patterns accepted by the service.
179- When constructing a filename for a particular shard number, the
180- upper-case letters 'S' and 'N' are replaced with the 0-padded shard
181- number and shard count respectively. This argument can be '' in which
182- case it behaves as if num_shards was set to 1 and only one file will be
183- generated. The default pattern used is '-SSSSS-of-NNNNN'.
184- validate: Enable path validation on pipeline creation.
185- coder: Coder used to encode each line.
186-
187- Raises:
188- TypeError: if file_path is not a string.
189- ValueError: if shard_name_template is not of expected format.
190- """
191- if not isinstance (file_path_prefix , basestring ):
192- raise TypeError (
193- '%s: file_path_prefix must be a string; got %r instead' %
194- (self .__class__ .__name__ , file_path_prefix ))
195- if not isinstance (file_name_suffix , basestring ):
196- raise TypeError (
197- '%s: file_name_suffix must be a string; got %r instead' %
198- (self .__class__ .__name__ , file_name_suffix ))
199-
200400 # We initialize a file_path attribute containing just the prefix part for
201401 # local runner environment. For now, sharding is not supported in the local
202402 # runner and sharding options (template, num, suffix) are ignored.
@@ -213,13 +413,8 @@ def __init__(self, file_path_prefix,
213413 self .file_name_suffix = file_name_suffix
214414 self .num_shards = num_shards
215415 # TODO(silviuc): Update this when the service supports more patterns.
216- if shard_name_template not in (None , '' , '-SSSSS-of-NNNNN' ):
217- raise ValueError (
218- 'The shard_name_template argument must be an empty string or the '
219- 'pattern -SSSSS-of-NNNNN instead of %s' % shard_name_template )
220- self .shard_name_template = (
221- shard_name_template if shard_name_template is not None
222- else '-SSSSS-of-NNNNN' )
416+ self .shard_name_template = ('-SSSSS-of-NNNNN' if shard_name_template is None
417+ else shard_name_template )
223418 # TODO(silviuc): Implement sink validation.
224419 self .validate = validate
225420
0 commit comments