@@ -286,32 +286,54 @@ class FileSink(iobase.Sink):
286286 """A sink to a GCS or local files.
287287
288288 To implement a file-based sink, extend this class and override
289- the open_file_writer method for writing a single shard.
289+ either ``write_record()`` or ``write_encoded_record()``.
290+
291+ If needed, also overwrite ``open()`` and/or ``close()`` to customize the
292+ file handling or write headers and footers.
290293
291294 The output of this write is a PCollection of all written shards.
292295 """
293- mime_type = 'application/octet-stream'
294296
295- def __init__ (self , file_path_prefix , coder , file_name_suffix = '' ):
297+ def __init__ (self ,
298+ file_path_prefix ,
299+ coder ,
300+ file_name_suffix = '' ,
301+ mime_type = 'application/octet-stream' ):
296302 self .file_path_prefix = file_path_prefix
297303 self .file_name_suffix = file_name_suffix
298304 self .coder = coder
305+ self .mime_type = mime_type
299306
300307 def open (self , temp_path ):
308+ """Opens ``temp_path``, returning an opaque file handle object.
309+
310+ The returned file handle is passed to ``write_[encoded_]record`` and
311+ ``close``.
312+ """
301313 return ChannelFactory .open (temp_path , 'wb' , self .mime_type )
302314
303315 def write_record (self , file_handle , value ):
316+ """Writes a single record go the file handle returned by ``open()``.
317+
318+ By default, calls ``write_encoded_record`` after encoding the record with
319+ this sink's Coder.
320+ """
304321 self .write_encoded_record (file_handle , self .coder .encode (value ))
305322
306323 def write_encoded_record (self , file_handle , encoded_value ):
324+ """Writes a single encoded record to the file handle returned by ``open()``.
325+ """
307326 raise NotImplementedError
308327
309328 def close (self , file_handle ):
310- if file_handle :
311- file_handle .close ()
329+ """Finalize and close the file handle returned from ``open()``.
312330
313- def open_file_writer (self , temp_path ):
314- return FileSinkWriter (self , temp_path )
331+ Called after all records are written.
332+
333+ By default, calls ``file_handle.close()`` iff it is not None.
334+ """
335+ if file_handle is not None :
336+ file_handle .close ()
315337
316338 def initialize_write (self ):
317339 tmp_dir = self .file_path_prefix + self .file_name_suffix + time .strftime (
@@ -320,7 +342,7 @@ def initialize_write(self):
320342 return tmp_dir
321343
322344 def open_writer (self , init_result , uid ):
323- return self . open_file_writer ( os .path .join (init_result , uid ))
345+ return FileSinkWriter ( self , os .path .join (init_result , uid ))
324346
325347 def finalize_write (self , init_result , writer_results ):
326348 writer_results = sorted (writer_results )
@@ -351,7 +373,7 @@ def __eq__(self, other):
351373
352374
353375class FileSinkWriter (iobase .Writer ):
354- """A generic writer for FileSink.
376+ """The writer for FileSink.
355377 """
356378
357379 def __init__ (self , sink , temp_shard_path ):
@@ -369,7 +391,6 @@ def close(self):
369391
370392class PureTextFileSink (FileSink ):
371393 """A sink to a GCS or local text file or files."""
372- mime_type = 'text/plain'
373394
374395 def __init__ (self ,
375396 file_path_prefix ,
@@ -378,7 +399,8 @@ def __init__(self,
378399 append_trailing_newlines = True ):
379400 super (PureTextFileSink , self ).__init__ (file_path_prefix ,
380401 file_name_suffix = file_name_suffix ,
381- coder = coder )
402+ coder = coder ,
403+ mime_type = 'text/plain' )
382404 self .append_trailing_newlines = append_trailing_newlines
383405
384406 def write_encoded_record (self , file_handle , encoded_value ):
0 commit comments