1212# See the License for the specific language governing permissions and
1313# limitations under the License.
1414
15+ # cython: profile=True
16+
1517"""Counters collect the progress of the Worker for reporting to the service."""
1618
1719from __future__ import absolute_import
20+ import math
21+ import random
1822
1923from google .cloud .dataflow .utils .counters import Counter
2024
@@ -23,32 +27,87 @@ class OperationCounters(object):
2327 """The set of basic counters to attach to an Operation."""
2428
2529 def __init__ (self , counter_factory , step_name , coder , output_index ):
30+ self ._counter_factory = counter_factory
2631 self .element_counter = counter_factory .get_counter (
2732 '%s-out%d-ElementCount' % (step_name , output_index ), Counter .SUM )
2833 self .mean_byte_counter = counter_factory .get_counter (
2934 '%s-out%d-MeanByteCount' % (step_name , output_index ), Counter .MEAN )
3035 self .coder = coder
36+ self ._active_accumulators = []
37+ self ._sample_counter = 0
38+ self ._next_sample = 0
3139
32- def update_from (self , windowed_value , coder = None ): # pylint: disable=unused-argument
40+ def update_from (self , windowed_value , coder = None ):
3341 """Add one value to this counter."""
3442 self .element_counter .update (1 )
35- # TODO(silviuc): Implement estimated size sampling.
36- # TODO(gildea):
37- # Actually compute the encoded size of this value.
38- # In spirit, something like this:
39- # if coder is None:
40- # coder = self.coder
41- # coder.store_estimated_size(windowed_value, byte_size_accumulator)
42- # but will need to do sampling.
43+ if self .should_sample ():
44+ self .do_sample (windowed_value , coder )
45+
46+ def do_sample (self , windowed_value , coder ):
47+ # TODO(ccy): implement in an efficient way.
48+ pass
4349
4450 def update_collect (self ):
4551 """Collects the accumulated size estimates.
4652
4753 Now that the element has been processed, we ask our accumulator
4854 for the total and store the result in a counter.
4955 """
50- # TODO(silviuc): Implement estimated size sampling.
51- pass
56+ for pending in self ._active_accumulators :
57+ self .mean_byte_counter .update (pending .value ())
58+ self ._active_accumulators = []
59+
60+ def _compute_next_sample (self , i ):
61+ # https://en.wikipedia.org/wiki/Reservoir_sampling#Fast_Approximation
62+ gap = math .log (1.0 - random .random ()) / math .log (1.0 - 10.0 / i )
63+ return i + math .floor (gap )
64+
65+ def should_sample (self ):
66+ """Determines whether to sample the next element.
67+
68+ Size calculation can be expensive, so we don't do it for each element.
69+ Because we need only an estimate of average size, we sample.
70+
71+ We always sample the first 10 elements, then the sampling rate
72+ is approximately 10/N. After reading N elements, of the next N,
73+ we will sample approximately 10*ln(2) (about 7) elements.
74+
75+ This algorithm samples at the same rate as Reservoir Sampling, but
76+ it never throws away early results. (Because we keep only a
77+ running accumulation, storage is not a problem, so there is no
78+ need to discard earlier calculations.)
79+
80+ Because we accumulate and do not replace, our statistics are
81+ biased toward early data. If the data are distributed uniformly,
82+ this is not a problem. If the data change over time (i.e., the
83+ element size tends to grow or shrink over time), our estimate will
84+ show the bias. We could correct this by giving weight N to each
85+ sample, since each sample is a stand-in for the N/(10*ln(2))
86+ samples around it, which is proportional to N. Since we do not
87+ expect biased data, for efficiency we omit the extra multiplication.
88+ We could reduce the early-data bias by putting a lower bound on
89+ the sampling rate.
90+
91+ Computing random.randint(1, self._sample_counter) for each element
92+ is too slow, so when the sample size is big enough (we estimate 30
93+ is big enough), we estimate the size of the gap after each sample.
94+ This estimation allows us to call random much less often.
95+
96+ Returns:
97+ True if it is time to compute another element's size.
98+ """
99+
100+ self ._sample_counter += 1
101+ if self ._next_sample == 0 :
102+ if random .randint (1 , self ._sample_counter ) <= 10 :
103+ if self ._sample_counter > 30 :
104+ self ._next_sample = self ._compute_next_sample (self ._sample_counter )
105+ return True
106+ return False
107+ elif self ._sample_counter >= self ._next_sample :
108+ self ._next_sample = self ._compute_next_sample (self ._sample_counter )
109+ return True
110+ return False
52111
53112 def __str__ (self ):
54113 return '<%s [%s]>' % (self .__class__ .__name__ ,
0 commit comments