Skip to content
This repository was archived by the owner on Jun 30, 2022. It is now read-only.

Commit 0228864

Browse files
charlesccychenaaltay
authored andcommitted
Store timestamps and time intervals with microsecond granularity
This change introduces the TimeInterval class used to store times and time intervals for timestamping and windowing as seconds with microsecond granularity and wires it through the built-in windowing primitives. ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=117743931
1 parent fc0e8cc commit 0228864

7 files changed

Lines changed: 416 additions & 58 deletions

File tree

google/cloud/dataflow/transforms/timeutil.py

Lines changed: 194 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,21 +19,211 @@
1919
from abc import ABCMeta
2020
from abc import abstractmethod
2121

22+
import datetime
23+
import sys
24+
25+
26+
class Timestamp(object):
27+
"""Represents a Unix second timestamp with microsecond granularity.
28+
29+
Can be treated in common timestamp arithmetic operations as a numeric type.
30+
31+
Internally stores a time interval as an int of microseconds. This strategy
32+
is necessary since floating point values lose precision when storing values,
33+
especially after arithmetic operations (for example, 10000000 % 0.1 evaluates
34+
to 0.0999999994448885).
35+
"""
36+
37+
def __init__(self, seconds=0, micros=0):
38+
self.micros = int(seconds * 1000000) + int(micros)
39+
40+
@staticmethod
41+
def of(seconds):
42+
"""Return the Timestamp for the given number of seconds.
43+
44+
If the input is already a Timestamp, the input itself will be returned.
45+
46+
Args:
47+
seconds: Number of seconds as int, float or Timestamp.
48+
49+
Returns:
50+
Corresponding Timestamp object.
51+
"""
52+
53+
if isinstance(seconds, Duration):
54+
raise TypeError('Can\'t interpret %s as Timestamp.' % seconds)
55+
if isinstance(seconds, Timestamp):
56+
return seconds
57+
return Timestamp(seconds)
58+
59+
def __repr__(self):
60+
micros = self.micros
61+
sign = ''
62+
if micros < 0:
63+
sign = '-'
64+
micros = -micros
65+
int_part = micros / 1000000
66+
frac_part = micros % 1000000
67+
if frac_part:
68+
return 'Timestamp(%s%d.%06d)' % (sign, int_part, frac_part)
69+
else:
70+
return 'Timestamp(%s%d)' % (sign, int_part)
71+
72+
def to_utc_datetime(self):
73+
epoch = datetime.datetime.utcfromtimestamp(0)
74+
# We can't easily construct a datetime object from microseconds, so we
75+
# create one at the epoch and add an appropriate timedelta interval.
76+
return epoch + datetime.timedelta(microseconds=self.micros)
77+
78+
def isoformat(self):
79+
# Append 'Z' for UTC timezone.
80+
return self.to_utc_datetime().isoformat() + 'Z'
81+
82+
def __float__(self):
83+
# Note that the returned value may have lost precision.
84+
return float(self.micros) / 1000000
85+
86+
def __int__(self):
87+
# Note that the returned value may have lost precision.
88+
return self.micros / 1000000
89+
90+
def __cmp__(self, other):
91+
# Allow comparisons between Duration and Timestamp values.
92+
if not isinstance(other, Duration):
93+
other = Timestamp.of(other)
94+
return cmp(self.micros, other.micros)
95+
96+
def __hash__(self):
97+
return hash(self.micros)
98+
99+
def __add__(self, other):
100+
other = Duration.of(other)
101+
return Timestamp(micros=self.micros + other.micros)
102+
103+
def __radd__(self, other):
104+
return self + other
105+
106+
def __sub__(self, other):
107+
other = Duration.of(other)
108+
return Timestamp(micros=self.micros - other.micros)
109+
110+
def __mod__(self, other):
111+
other = Duration.of(other)
112+
return Duration(micros=self.micros % other.micros)
113+
114+
115+
MIN_TIMESTAMP = Timestamp(micros=-sys.maxint - 1)
116+
MAX_TIMESTAMP = Timestamp(micros=sys.maxint)
117+
118+
119+
class Duration(object):
120+
"""Represents a second duration with microsecond granularity.
121+
122+
Can be treated in common arithmetic operations as a numeric type.
123+
124+
Internally stores a time interval as an int of microseconds. This strategy
125+
is necessary since floating point values lose precision when storing values,
126+
especially after arithmetic operations (for example, 10000000 % 0.1 evaluates
127+
to 0.0999999994448885).
128+
"""
129+
130+
def __init__(self, seconds=0, micros=0):
131+
self.micros = int(seconds * 1000000) + int(micros)
132+
133+
@staticmethod
134+
def of(seconds):
135+
"""Return the Duration for the given number of seconds since Unix epoch.
136+
137+
If the input is already a Duration, the input itself will be returned.
138+
139+
Args:
140+
seconds: Number of seconds as int, float or Duration.
141+
142+
Returns:
143+
Corresponding Duration object.
144+
"""
145+
146+
if isinstance(seconds, Timestamp):
147+
raise TypeError('Can\'t interpret %s as Duration.' % seconds)
148+
if isinstance(seconds, Duration):
149+
return seconds
150+
return Duration(seconds)
151+
152+
def __repr__(self):
153+
micros = self.micros
154+
sign = ''
155+
if micros < 0:
156+
sign = '-'
157+
micros = -micros
158+
int_part = micros / 1000000
159+
frac_part = micros % 1000000
160+
if frac_part:
161+
return 'Duration(%s%d.%06d)' % (sign, int_part, frac_part)
162+
else:
163+
return 'Duration(%s%d)' % (sign, int_part)
164+
165+
def __float__(self):
166+
# Note that the returned value may have lost precision.
167+
return float(self.micros) / 1000000
168+
169+
def __int__(self):
170+
# Note that the returned value may have lost precision.
171+
return self.micros / 1000000
172+
173+
def __cmp__(self, other):
174+
# Allow comparisons between Duration and Timestamp values.
175+
if not isinstance(other, Timestamp):
176+
other = Duration.of(other)
177+
return cmp(self.micros, other.micros)
178+
179+
def __hash__(self):
180+
return hash(self.micros)
181+
182+
def __neg__(self):
183+
return Duration(micros=-self.micros)
184+
185+
def __add__(self, other):
186+
if isinstance(other, Timestamp):
187+
return other + self
188+
other = Duration.of(other)
189+
return Duration(micros=self.micros + other.micros)
190+
191+
def __radd__(self, other):
192+
return self + other
193+
194+
def __sub__(self, other):
195+
other = Duration.of(other)
196+
return Duration(micros=self.micros - other.micros)
197+
198+
def __rsub__(self, other):
199+
return -(self - other)
200+
201+
def __mul__(self, other):
202+
other = Duration.of(other)
203+
return Duration(micros=self.micros * other.micros / 1000000)
204+
205+
def __rmul__(self, other):
206+
return self * other
207+
208+
def __mod__(self, other):
209+
other = Duration.of(other)
210+
return Duration(micros=self.micros % other.micros)
211+
22212

23213
class TimeDomain(object):
24214
"""Time domain for streaming timers."""
25215

26-
WATERMARK = "WATERMARK"
27-
REAL_TIME = "REAL_TIME"
28-
DEPENDENT_REAL_TIME = "DEPENDENT_REAL_TIME"
216+
WATERMARK = 'WATERMARK'
217+
REAL_TIME = 'REAL_TIME'
218+
DEPENDENT_REAL_TIME = 'DEPENDENT_REAL_TIME'
29219

30220
@staticmethod
31221
def from_string(domain):
32222
if domain in (TimeDomain.WATERMARK,
33223
TimeDomain.REAL_TIME,
34224
TimeDomain.DEPENDENT_REAL_TIME):
35225
return domain
36-
raise ValueError("Unknown time domain: %s" % domain)
226+
raise ValueError('Unknown time domain: %s' % domain)
37227

38228

39229
class OutputTimeFnImpl(object):
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
# Copyright 2016 Google Inc. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Unit tests for time utilities."""
16+
17+
from __future__ import absolute_import
18+
19+
import unittest
20+
21+
from google.cloud.dataflow.transforms.timeutil import Duration
22+
from google.cloud.dataflow.transforms.timeutil import Timestamp
23+
24+
25+
class TimestampTest(unittest.TestCase):
26+
27+
def test_of(self):
28+
interval = Timestamp(123)
29+
self.assertEqual(id(interval), id(Timestamp.of(interval)))
30+
self.assertEqual(interval, Timestamp.of(123.0))
31+
with self.assertRaises(TypeError):
32+
Timestamp.of(Duration(10))
33+
34+
def test_precision(self):
35+
self.assertEqual(Timestamp(10000000) % 0.1, 0)
36+
self.assertEqual(Timestamp(10000000) % 0.05, 0)
37+
self.assertEqual(Timestamp(10000000) % 0.000005, 0)
38+
self.assertEqual(Timestamp(10000000) % Duration(0.1), 0)
39+
self.assertEqual(Timestamp(10000000) % Duration(0.05), 0)
40+
self.assertEqual(Timestamp(10000000) % Duration(0.000005), 0)
41+
42+
def test_utc_timestamp(self):
43+
self.assertEqual(Timestamp(10000000).isoformat(),
44+
'1970-04-26T17:46:40Z')
45+
self.assertEqual(Timestamp(10000000.000001).isoformat(),
46+
'1970-04-26T17:46:40.000001Z')
47+
self.assertEqual(Timestamp(1458343379.123456).isoformat(),
48+
'2016-03-18T23:22:59.123456Z')
49+
50+
def test_arithmetic(self):
51+
# Supported operations.
52+
self.assertEqual(Timestamp(123) + 456, 579)
53+
self.assertEqual(Timestamp(123) + Duration(456), 579)
54+
self.assertEqual(456 + Timestamp(123), 579)
55+
self.assertEqual(Duration(456) + Timestamp(123), 579)
56+
self.assertEqual(Timestamp(123) - 456, -333)
57+
self.assertEqual(Timestamp(123) - Duration(456), -333)
58+
self.assertEqual(Timestamp(1230) % 456, 318)
59+
self.assertEqual(Timestamp(1230) % Duration(456), 318)
60+
61+
# Check that direct comparison of Timestamp and Duration is allowed.
62+
self.assertTrue(Duration(123) == Timestamp(123))
63+
self.assertTrue(Timestamp(123) == Duration(123))
64+
self.assertFalse(Duration(123) == Timestamp(1230))
65+
self.assertFalse(Timestamp(123) == Duration(1230))
66+
67+
# Check return types.
68+
self.assertEqual((Timestamp(123) + 456).__class__, Timestamp)
69+
self.assertEqual((Timestamp(123) + Duration(456)).__class__, Timestamp)
70+
self.assertEqual((456 + Timestamp(123)).__class__, Timestamp)
71+
self.assertEqual((Duration(456) + Timestamp(123)).__class__, Timestamp)
72+
self.assertEqual((Timestamp(123) - 456).__class__, Timestamp)
73+
self.assertEqual((Timestamp(123) - Duration(456)).__class__, Timestamp)
74+
self.assertEqual((Timestamp(1230) % 456).__class__, Duration)
75+
self.assertEqual((Timestamp(1230) % Duration(456)).__class__, Duration)
76+
77+
# Unsupported operations.
78+
with self.assertRaises(TypeError):
79+
self.assertEqual(Timestamp(123) * 456, 56088)
80+
with self.assertRaises(TypeError):
81+
self.assertEqual(Timestamp(123) * Duration(456), 56088)
82+
with self.assertRaises(TypeError):
83+
self.assertEqual(456 * Timestamp(123), 56088)
84+
with self.assertRaises(TypeError):
85+
self.assertEqual(Duration(456) * Timestamp(123), 56088)
86+
with self.assertRaises(TypeError):
87+
self.assertEqual(456 - Timestamp(123), 333)
88+
with self.assertRaises(TypeError):
89+
self.assertEqual(Duration(456) - Timestamp(123), 333)
90+
with self.assertRaises(TypeError):
91+
self.assertEqual(-Timestamp(123), -123)
92+
with self.assertRaises(TypeError):
93+
self.assertEqual(-Timestamp(123), -Duration(123))
94+
with self.assertRaises(TypeError):
95+
self.assertEqual(1230 % Timestamp(456), 318)
96+
with self.assertRaises(TypeError):
97+
self.assertEqual(Duration(1230) % Timestamp(456), 318)
98+
99+
def test_sort_order(self):
100+
self.assertEqual(
101+
[-63, Timestamp(-3), 2, 9, Timestamp(292.3), 500],
102+
sorted([9, 2, Timestamp(-3), Timestamp(292.3), -63, 500]))
103+
self.assertEqual(
104+
[4, 5, Timestamp(6), Timestamp(7), 8, 9],
105+
sorted([9, 8, Timestamp(7), Timestamp(6), 5, 4]))
106+
107+
def test_str(self):
108+
self.assertEqual('Timestamp(1.234567)',
109+
str(Timestamp(1.234567)))
110+
self.assertEqual('Timestamp(-1.234567)',
111+
str(Timestamp(-1.234567)))
112+
self.assertEqual('Timestamp(-999999999.900000)',
113+
str(Timestamp(-999999999.9)))
114+
self.assertEqual('Timestamp(999999999)',
115+
str(Timestamp(999999999)))
116+
self.assertEqual('Timestamp(-999999999)',
117+
str(Timestamp(-999999999)))
118+
119+
120+
class DurationTest(unittest.TestCase):
121+
122+
def test_of(self):
123+
interval = Duration(123)
124+
self.assertEqual(id(interval), id(Duration.of(interval)))
125+
self.assertEqual(interval, Duration.of(123.0))
126+
with self.assertRaises(TypeError):
127+
Duration.of(Timestamp(10))
128+
129+
def test_precision(self):
130+
self.assertEqual(Duration(10000000) % 0.1, 0)
131+
self.assertEqual(Duration(10000000) % 0.05, 0)
132+
self.assertEqual(Duration(10000000) % 0.000005, 0)
133+
134+
def test_arithmetic(self):
135+
self.assertEqual(Duration(123) + 456, 579)
136+
self.assertEqual(456 + Duration(123), 579)
137+
self.assertEqual(Duration(123) * 456, 56088)
138+
self.assertEqual(456 * Duration(123), 56088)
139+
self.assertEqual(Duration(123) - 456, -333)
140+
self.assertEqual(456 - Duration(123), 333)
141+
self.assertEqual(-Duration(123), -123)
142+
143+
def test_sort_order(self):
144+
self.assertEqual(
145+
[-63, Duration(-3), 2, 9, Duration(292.3), 500],
146+
sorted([9, 2, Duration(-3), Duration(292.3), -63, 500]))
147+
self.assertEqual(
148+
[4, 5, Duration(6), Duration(7), 8, 9],
149+
sorted([9, 8, Duration(7), Duration(6), 5, 4]))
150+
151+
def test_str(self):
152+
self.assertEqual('Duration(1.234567)',
153+
str(Duration(1.234567)))
154+
self.assertEqual('Duration(-1.234567)',
155+
str(Duration(-1.234567)))
156+
self.assertEqual('Duration(-999999999.900000)',
157+
str(Duration(-999999999.9)))
158+
self.assertEqual('Duration(999999999)',
159+
str(Duration(999999999)))
160+
self.assertEqual('Duration(-999999999)',
161+
str(Duration(-999999999)))
162+
163+
164+
if __name__ == '__main__':
165+
unittest.main()

0 commit comments

Comments
 (0)