Skip to content
This repository was archived by the owner on Jun 30, 2022. It is now read-only.

Commit c271699

Browse files
gildeasilviulica
authored andcommitted
Add class ObservableMixin
This new class will be used to observe iterables over elements read in the worker, so that we can size them. ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=119882940
1 parent 2bbd273 commit c271699

2 files changed

Lines changed: 87 additions & 0 deletions

File tree

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# Copyright 2016 Google Inc. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
16+
"""Observable base class for iterables."""
17+
18+
19+
class ObservableMixin(object):
20+
"""An observable iterable.
21+
22+
Subclasses need to call self.notify_observers with any object yielded.
23+
"""
24+
25+
def __init__(self):
26+
self.observers = []
27+
28+
def register_observer(self, callback):
29+
self.observers.append(callback)
30+
31+
def notify_observers(self, value, **kwargs):
32+
for o in self.observers:
33+
o(value, **kwargs)
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
# Copyright 2016 Google Inc. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Tests for the Observable mixin class."""
16+
17+
import logging
18+
import unittest
19+
20+
21+
from google.cloud.dataflow.coders import observable
22+
23+
24+
class ObservableMixinTest(unittest.TestCase):
25+
observed_count = 0
26+
observed_sum = 0
27+
observed_keys = []
28+
29+
def observer(self, value, key=None):
30+
self.observed_count += 1
31+
self.observed_sum += value
32+
self.observed_keys.append(key)
33+
34+
def test_observable(self):
35+
class Watched(observable.ObservableMixin):
36+
37+
def __iter__(self):
38+
for i in (1, 4, 3):
39+
self.notify_observers(i, key='a%d' % i)
40+
yield i
41+
42+
watched = Watched()
43+
watched.register_observer(lambda v, key: self.observer(v, key=key))
44+
for _ in watched:
45+
pass
46+
47+
self.assertEquals(3, self.observed_count)
48+
self.assertEquals(8, self.observed_sum)
49+
self.assertEquals(['a1', 'a3', 'a4'], sorted(self.observed_keys))
50+
51+
52+
if __name__ == '__main__':
53+
logging.getLogger().setLevel(logging.INFO)
54+
unittest.main()

0 commit comments

Comments
 (0)