Skip to content
This repository was archived by the owner on Jun 30, 2022. It is now read-only.

Commit 1d44fdb

Browse files
chamikaramjaaltay
authored andcommitted
Adds the base API for creating new sources.
Sources API consists of the classes 'BoundedSource' and 'RangeTracker'. Updated 'iobase.RangeTracker' and related classes to the new API. Adds a good amount of documentation along with the API classes. Adds support for reading sources using DirectRunner. ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=123057920
1 parent 0bee865 commit 1d44fdb

8 files changed

Lines changed: 575 additions & 301 deletions

File tree

google/cloud/dataflow/io/fileio.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -586,8 +586,7 @@ def __exit__(self, exception_type, exception_value, traceback):
586586

587587
def __iter__(self):
588588
while True:
589-
if not self.range_tracker.try_return_record_at(
590-
is_at_split_point=True,
589+
if not self.range_tracker.try_claim(
591590
record_start=self.current_offset):
592591
# Reader has completed reading the set of records in its range. Note
593592
# that the end offset of the range may be smaller than the original
@@ -620,16 +619,15 @@ def request_dynamic_split(self, dynamic_split_request):
620619
return
621620
split_position = iobase.ReaderPosition()
622621
split_position.byte_offset = (
623-
self.range_tracker.get_position_for_fraction_consumed(
624-
percent_complete))
622+
self.range_tracker.position_at_fraction(percent_complete))
625623
else:
626624
logging.warning(
627625
'TextReader requires either a position or a percentage of work to '
628626
'be complete to perform a dynamic split request. Requested: %r',
629627
dynamic_split_request)
630628
return
631629

632-
if self.range_tracker.try_split_at_position(split_position.byte_offset):
630+
if self.range_tracker.try_split(split_position.byte_offset):
633631
return iobase.DynamicSplitResultWithPosition(split_position)
634632
else:
635633
return

google/cloud/dataflow/io/iobase.py

Lines changed: 198 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,13 @@
2626
the sink.
2727
"""
2828

29+
from collections import namedtuple
30+
2931
import logging
3032
import uuid
3133

3234
from google.cloud.dataflow import pvalue
35+
from google.cloud.dataflow.coders import PickleCoder
3336
from google.cloud.dataflow.pvalue import AsIter
3437
from google.cloud.dataflow.pvalue import AsSingleton
3538
from google.cloud.dataflow.transforms import core
@@ -295,17 +298,125 @@ def Write(self, o): # pylint: disable=invalid-name
295298
raise NotImplementedError
296299

297300

298-
class RangeTracker(object):
299-
"""A thread-safe helper object for implementing dynamic work rebalancing.
301+
# Encapsulates information about a bundle of a source generated when method
302+
# BoundedSource.split() is invoked.
303+
# This is a named 4-tuple that has following fields.
304+
# * weight - a number that represents the size of the bundle. This value will
305+
# be used to compare the relative sizes of bundles generated by the
306+
# current source.
307+
# The weight returned here could be specified using a unit of your
308+
# choice (for example, bundles of sizes 100MB, 200MB, and 700MB may
309+
# specify weights 100, 200, 700 or 1, 2, 7) but all bundles of a
310+
# source should specify the weight using the same unit.
311+
# * source - a BoundedSource object for the bundle.
312+
# * start_position - starting position of the bundle
313+
# * stop_position - ending position of the bundle.
314+
#
315+
# Type for start and stop positions are specific to the bounded source and must
316+
# be consistent throughout.
317+
SourceBundle = namedtuple(
318+
'SourceBundle',
319+
'weight source start_position stop_position')
320+
321+
322+
class BoundedSource(object):
323+
"""A Dataflow source that reads a finite amount of input records.
324+
325+
This class defines following operations which can be used to read the source
326+
efficiently.
327+
* Size estimation - method ``estimate_size()`` may return an accurate
328+
estimation in bytes for the size of the source.
329+
* Splitting into bundles of a given size - method ``split()`` can be used to
330+
split the source into a set of sub-sources (bundles) based on a desired
331+
bundle size.
332+
* Getting a RangeTracker - method ``get_range_tracker() should return a
333+
``RangeTracker`` object for a given position range for the position type
334+
of the records returned by the source.
335+
* Reading the data - method ``read()`` can be used to read data from the
336+
source while respecting the boundaries defined by a given
337+
``RangeTracker``.
338+
"""
339+
340+
def estimate_size(self):
341+
"""Estimates the size of source in bytes.
342+
343+
An estimate of the total size (in bytes) of the data that would be read
344+
from this source. This estimate is in terms of external storage size,
345+
before performing decompression or other processing.
346+
347+
Returns:
348+
estimated size of the source if the size can be determined, ``None``
349+
otherwise.
350+
"""
351+
raise NotImplementedError
352+
353+
def split(self, desired_bundle_size, start_position=None, stop_position=None):
354+
"""Splits the source into a set of bundles.
355+
356+
Bundles should be approximately of size ``desired_bundle_size`` bytes.
357+
358+
Args:
359+
desired_bundle_size: the desired size (in bytes) of the bundles returned.
360+
start_position: if specified the given position must be used as the
361+
starting position of the first bundle.
362+
stop_position: if specified the given position must be used as the ending
363+
position of the last bundle.
364+
Returns:
365+
an iterator of objects of type 'SourceBundle' that gives information about
366+
the generated bundles.
367+
"""
368+
raise NotImplementedError
369+
370+
def get_range_tracker(self, start_position, stop_position):
371+
"""Returns a RangeTracker for a given position range.
372+
373+
Framework may invoke ``read()`` method with the RangeTracker object returned
374+
here to read data from the source.
375+
Args:
376+
start_position: starting position of the range.
377+
stop_position: ending position of the range.
378+
Returns:
379+
a ``RangeTracker`` for the given position range.
380+
"""
381+
raise NotImplementedError
382+
383+
def read(self, range_tracker):
384+
"""Returns an iterator that reads data from the source.
385+
386+
The returned set of data must respect the boundaries defined by the given
387+
``RangeTracker`` object. For example:
388+
* Returned set of data must be for the range
389+
``[range_tracker.start_position, range_tracker.stop_position)``. Note
390+
that a source may decide to return records that start after
391+
``range_tracker.stop_position``. See documentation in class
392+
``RangeTracker`` for more details. Also, note that framework might
393+
invoke ``range_tracker.try_split()`` to perform dynamic split
394+
operations. range_tracker.stop_position may be updated
395+
dynamically due to successful dynamic split operations.
396+
* Method ``range_tracker.try_split()`` must be invoked for every record
397+
that starts at a split point.
398+
* Method ``range_tracker.record_current_position()`` may be invoked for
399+
records that do not start at split points.
400+
Args:
401+
range_tracker: a ``RangeTracker`` whose boundaries must be respected
402+
when reading data from the source. If 'None' all records
403+
represented by the current source should be read.
404+
Returns:
405+
an iterator of data read by the source.
406+
"""
407+
raise NotImplementedError
300408

301-
**Usage of the RangeTracker class hierarchy**
409+
def default_output_coder(self):
410+
"""Coder that should be used for the records returned by the source."""
411+
return PickleCoder()
302412

303-
The ``RangeTracker`` class should not be used per se---all users should use
304-
its subclasses directly. We declare it here because all subclasses have
305-
roughly the same interface and the same properties, to centralize the
306-
documentation.
307413

308-
Currently we provide one implementation: ``iobase.OffsetRangeTracker``.
414+
class RangeTracker(object):
415+
"""A thread safe object used by Dataflow source framework.
416+
417+
A Dataflow source is defined using a ''BoundedSource'' and a ''RangeTracker''
418+
pair. A ''RangeTracker'' is used by Dataflow source framework to perform
419+
dynamic work rebalancing of position-based sources.
309420
310421
**Position-based sources**
311422
@@ -421,67 +532,110 @@ def stop_position(self):
421532
"""Returns the ending position of the current range, exclusive."""
422533
raise NotImplementedError
423534

424-
def try_return_record_at(self, is_at_split_point, record_start):
425-
"""Atomically determines if a record at the given position can be returned.
535+
def try_claim(self, position): # pylint: disable=unused-argument
536+
"""Atomically determines if a record at a split point is within the range.
426537
427-
Additionally, Updates the internal state of the ``RangeTracker``.
538+
This method should be called **if and only if** the record is at a split
539+
point. This method may modify the internal state of the ``RangeTracker`` by
540+
updating the last-consumed position to ``position``.
428541
429-
In particular:
542+
** Thread safety **
430543
431-
* If ``is_at_split_point`` is ``True``, and ``record_start`` is outside the
432-
current range, returns ``False``;
433-
* Otherwise, updates the last-consumed position to ``record_start`` and
434-
returns ``True``.
544+
This method along with several other methods of this class may be invoked by
545+
multiple threads, hence must be made thread-safe, e.g. by using a single
546+
lock object.
435547
436-
This method MUST be called on all split point records. It may be called on
437-
every record.
548+
Args:
549+
position: starting position of a record being read by a source.
438550
439-
Method ``try_return_record_at`` and method ``try_split_at_position`` will be
440-
accessed by different threads and implementor must ensure that only one of
441-
these methods is executed at a given time.
551+
Returns:
552+
``True``, if the given position falls within the current range, returns
553+
``False`` otherwise.
554+
"""
555+
raise NotImplementedError
556+
557+
def set_current_position(self, position):
558+
"""Updates the last-consumed position to the given position.
559+
560+
A source may invoke this method for records that do not start at split
561+
points. This may modify the internal state of the ``RangeTracker``. If the
562+
record starts at a split point, method ``try_claim()`` **must** be invoked
563+
instead of this method.
442564
443565
Args:
444-
is_at_split_point: ``True`` if record is at a split point, ``False``
445-
otherwise.
566+
position: starting position of a record being read by a source.
567+
"""
568+
raise NotImplementedError
569+
570+
def position_at_fraction(self, fraction):
571+
"""Returns the position at the given fraction.
572+
573+
Given a fraction within the range [0.0, 1.0) this method will return the
574+
position at the given fraction compared the the position range
575+
[self.start_position, self.stop_position).
576+
577+
** Thread safety **
446578
447-
record_start: starting position of the record.
579+
This method along with several other methods of this class may be invoked by
580+
multiple threads, hence must be made thread-safe, e.g. by using a single
581+
lock object.
582+
583+
Args:
584+
fraction: a float value within the range [0.0, 1.0).
585+
Returns:
586+
a position within the range [self.start_position, self.stop_position).
448587
"""
449588
raise NotImplementedError
450589

451-
def try_split_at_position(self, split_position):
590+
def try_split(self, position):
452591
"""Atomically splits the current range.
453592
454-
Splits the current range '[get_start_position(), get_stop_position())'
455-
into a "primary" part '[get_start_position(), split_position())' and a
456-
"residual" part '[split_position(), get_stop_position())', assuming the
593+
Determines a position to split the current range, split_position, based on
594+
the given position. In most cases split_position and position will be the
595+
same.
596+
597+
Splits the current range '[self.start_position, self.stop_position)'
598+
into a "primary" part '[self.start_position, split_position)' and a
599+
"residual" part '[split_position, self.stop_position)', assuming the
457600
current last-consumed position is within
458-
'[get_start_position(), split_position())' (i.e., 'split_position()'
459-
has not been consumed yet).
601+
'[self.start_position, split_position)' (i.e., split_position has not been
602+
consumed yet).
603+
604+
If successful, updates the current range to be the primary and returns a
605+
tuple (split_position, split_fraction). split_fraction should be the
606+
fraction of size of range '[self.start_position, split_position)' compared
607+
to the original (before split) range
608+
'[self.start_position, self.stop_position)'.
460609
461-
Updates the current range to be the primary and returns ``True``. This
462-
means that all further calls on the current object will interpret their
463-
arguments relative to the primary range.
610+
If the split_position has already been consumed, returns ``None``.
464611
465-
If the split position has already been consumed, or if no
466-
``try_return_record_at`` call was made yet, returns ``False``. The
467-
second condition is to prevent dynamic splitting during reader start-up.
612+
** Thread safety **
468613
469-
Method ``try_return_record_at`` and method ``try_split_at_position`` will be
470-
accessed by different threads and implementor must ensure that only one of
471-
these methods is executed at a given time.
614+
This method along with several other methods of this class may be invoked by
615+
multiple threads, hence must be made thread-safe, e.g. by using a single
616+
lock object.
472617
473618
Args:
474-
split_position: an instance of ReaderPosition that gives the position
475-
where the current range should be split at.
619+
position: suggested position where the current range should try to
620+
be split at.
621+
Returns:
622+
a tuple containing the split position and split fraction.
476623
"""
477624
raise NotImplementedError
478625

479626
def fraction_consumed(self):
480627
"""Returns the approximate fraction of consumed positions in the source.
481628
482-
Returns the approximate fraction of positions that have been consumed by
483-
successful 'try_return_record_at()' calls, or 0.0 if no such calls have
484-
happened.
629+
** Thread safety **
630+
631+
This method along with several other methods of this class may be invoked by
632+
multiple threads, hence must be made thread-safe, e.g. by using a single
633+
lock object.
634+
635+
Returns:
636+
the approximate fraction of positions that have been consumed by
637+
successful 'try_split()' and 'report_current_position()' calls, or
638+
0.0 if no such calls have happened.
485639
"""
486640
raise NotImplementedError
487641

0 commit comments

Comments
 (0)