Skip to content
This repository was archived by the owner on Jun 30, 2022. It is now read-only.

Commit 4075ca9

Browse files
charlesccychenaaltay
authored andcommitted
Add TimestampCoder for timeutil.Timestamp objects
----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=118044798
1 parent b2816c6 commit 4075ca9

2 files changed

Lines changed: 36 additions & 4 deletions

File tree

google/cloud/dataflow/coders/coders.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
"""Collection of useful coders."""
1616

1717
import base64
18+
import collections
1819
import cPickle as pickle
1920
import struct
2021

@@ -27,10 +28,12 @@
2728
# Import dill from the pickler module to make sure our monkey-patching of dill
2829
# occurs.
2930
from google.cloud.dataflow.internal.pickler import dill
31+
from google.cloud.dataflow.transforms.timeutil import Timestamp
3032
except ImportError:
3133
# We fall back to using the stock dill library in tests that don't use the
3234
# full Python SDK.
3335
import dill
36+
Timestamp = collections.namedtuple('Timestamp', 'micros')
3437

3538

3639
def serialize_coder(coder):
@@ -236,6 +239,20 @@ def is_deterministic(self):
236239
return True
237240

238241

242+
# TODO(ccy): Write a Cython implementation of TimestampCoder.
243+
class TimestampCoder(Coder):
244+
"""A coder used for timeutil.Timestamp values."""
245+
246+
def encode(self, value):
247+
return struct.pack('<q', value.micros)
248+
249+
def decode(self, encoded):
250+
return Timestamp(micros=struct.unpack('<q', encoded)[0])
251+
252+
def is_deterministic(self):
253+
return True
254+
255+
239256
def maybe_dill_dumps(o):
240257
"""Pickle using cPickle or the Dill pickler as a fallback."""
241258
# We need to use the dill pickler for objects of certain custom classes,
@@ -428,7 +445,7 @@ class WindowedValueCoder(FastCoder):
428445
def __init__(self, wrapped_value_coder, timestamp_coder=None,
429446
window_coder=None):
430447
if not timestamp_coder:
431-
timestamp_coder = FloatCoder()
448+
timestamp_coder = TimestampCoder()
432449
if not window_coder:
433450
window_coder = PickleCoder()
434451
self.wrapped_value_coder = wrapped_value_coder

google/cloud/dataflow/coders/coders_test_common.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,14 @@ def tearDownClass(cls):
5050
for c in coders.__dict__.values()
5151
if isinstance(c, type) and issubclass(c, coders.Coder) and
5252
'Base' not in c.__name__)
53-
standard -= set([coders.Coder, coders.ToStringCoder, coders.FloatCoder,
54-
coders.Base64PickleCoder, coders.FastCoder,
55-
coders.WindowCoder, coders.WindowedValueCoder])
53+
standard -= set([coders.Coder,
54+
coders.FastCoder,
55+
coders.Base64PickleCoder,
56+
coders.FloatCoder,
57+
coders.TimestampCoder,
58+
coders.ToStringCoder,
59+
coders.WindowCoder,
60+
coders.WindowedValueCoder])
5661
assert not standard - cls.seen, standard - cls.seen
5762
assert not standard - cls.seen_nested, standard - cls.seen_nested
5863

@@ -124,6 +129,16 @@ def test_float_coder(self):
124129
*[float(2 ** (0.1 * x)) for x in range(-100, 100)])
125130
self.check_coder(coders.FloatCoder(), float('-Inf'), float('Inf'))
126131

132+
def test_timestamp_coder(self):
133+
self.check_coder(coders.TimestampCoder(),
134+
*[coders.Timestamp(micros=x) for x in range(-100, 100)])
135+
self.check_coder(coders.TimestampCoder(),
136+
coders.Timestamp(micros=-1234567890),
137+
coders.Timestamp(micros=1234567890))
138+
self.check_coder(coders.TimestampCoder(),
139+
coders.Timestamp(micros=-1234567890123456789),
140+
coders.Timestamp(micros=1234567890123456789))
141+
127142
def test_tuple_coder(self):
128143
self.check_coder(
129144
coders.TupleCoder((coders.VarIntCoder(), coders.BytesCoder())),

0 commit comments

Comments
 (0)