Skip to content
This repository was archived by the owner on Jun 30, 2022. It is now read-only.

Commit 2c4c414

Browse files
robertwbsilviulica
authored andcommitted
Cythonize Timestamp- and FloatCoder
Also add further utilities to our coder stream classes. ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=118068607
1 parent 7beec91 commit 2c4c414

7 files changed

Lines changed: 137 additions & 16 deletions

File tree

google/cloud/dataflow/coders/coder_impl.pxd

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,14 @@ cdef class BytesCoderImpl(CoderImpl):
5858
pass
5959

6060

61+
cdef class FloatCoderImpl(StreamCoderImpl):
62+
pass
63+
64+
65+
cdef class TimestampCoderImpl(StreamCoderImpl):
66+
cdef object timestamp_class
67+
68+
6169
cdef list small_ints
6270
cdef class VarIntCoderImpl(StreamCoderImpl):
6371
@cython.locals(ivalue=libc.stdint.int64_t)

google/cloud/dataflow/coders/coder_impl.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,27 @@ def decode(self, encoded):
162162
return encoded
163163

164164

165+
class FloatCoderImpl(StreamCoderImpl):
166+
167+
def encode_to_stream(self, value, out, nested):
168+
out.write_bigendian_double(value)
169+
170+
def decode_from_stream(self, in_stream, nested):
171+
return in_stream.read_bigendian_double()
172+
173+
174+
class TimestampCoderImpl(StreamCoderImpl):
175+
176+
def __init__(self, timestamp_class):
177+
self.timestamp_class = timestamp_class
178+
179+
def encode_to_stream(self, value, out, nested):
180+
out.write_bigendian_int64(value.micros)
181+
182+
def decode_from_stream(self, in_stream, nested):
183+
return self.timestamp_class(micros=in_stream.read_bigendian_int64())
184+
185+
165186
small_ints = [chr(_) for _ in range(128)]
166187

167188

google/cloud/dataflow/coders/coders.py

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import base64
1818
import collections
1919
import cPickle as pickle
20-
import struct
2120

2221
from google.cloud.dataflow.coders import coder_impl
2322

@@ -225,29 +224,21 @@ def is_deterministic(self):
225224
return True
226225

227226

228-
# TODO(ccy): Write a Cython implementation of FloatCoder.
229-
class FloatCoder(Coder):
227+
class FloatCoder(FastCoder):
230228
"""A coder used for floating-point values."""
231229

232-
def encode(self, value):
233-
return struct.pack('<d', value)
234-
235-
def decode(self, encoded):
236-
return struct.unpack('<d', encoded)[0]
230+
def _create_impl(self):
231+
return coder_impl.FloatCoderImpl()
237232

238233
def is_deterministic(self):
239234
return True
240235

241236

242-
# TODO(ccy): Write a Cython implementation of TimestampCoder.
243-
class TimestampCoder(Coder):
237+
class TimestampCoder(FastCoder):
244238
"""A coder used for timeutil.Timestamp values."""
245239

246-
def encode(self, value):
247-
return struct.pack('<q', value.micros)
248-
249-
def decode(self, encoded):
250-
return Timestamp(micros=struct.unpack('<q', encoded)[0])
240+
def _create_impl(self):
241+
return coder_impl.TimestampCoderImpl(Timestamp)
251242

252243
def is_deterministic(self):
253244
return True

google/cloud/dataflow/coders/slow_stream.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
"""A pure Python implementation of stream.pyx."""
1616

17+
import struct
18+
1719

1820
class OutputStream(object):
1921
"""A pure Python implementation of stream.OutputStream."""
@@ -44,6 +46,15 @@ def write_var_int64(self, v):
4446
if not v:
4547
break
4648

49+
def write_bigendian_int64(self, v):
50+
self.write(struct.pack('>q', v))
51+
52+
def write_bigendian_int32(self, v):
53+
self.write(struct.pack('>i', v))
54+
55+
def write_bigendian_double(self, v):
56+
self.write(struct.pack('>d', v))
57+
4758
def get(self):
4859
return ''.join(self.data)
4960

@@ -87,3 +98,12 @@ def read_var_int64(self):
8798
if result >= 1 << 63:
8899
result -= 1 << 64
89100
return result
101+
102+
def read_bigendian_int64(self):
103+
return struct.unpack('>q', self.read(8))[0]
104+
105+
def read_bigendian_int32(self):
106+
return struct.unpack('>i', self.read(4))[0]
107+
108+
def read_bigendian_double(self):
109+
return struct.unpack('>d', self.read(8))[0]

google/cloud/dataflow/coders/stream.pxd

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ cdef class OutputStream(object):
2323
cpdef write(self, bytes b, bint nested=*)
2424
cpdef write_byte(self, unsigned char val)
2525
cpdef write_var_int64(self, libc.stdint.int64_t v)
26+
cpdef write_bigendian_int64(self, libc.stdint.int64_t signed_v)
27+
cpdef write_bigendian_int32(self, libc.stdint.int32_t signed_v)
28+
cpdef write_bigendian_double(self, double d)
2629

2730
cpdef bytes get(self)
2831

@@ -38,4 +41,7 @@ cdef class InputStream(object):
3841
cpdef bytes read(self, size_t len)
3942
cpdef long read_byte(self) except? -1
4043
cpdef libc.stdint.int64_t read_var_int64(self) except? -1
44+
cpdef libc.stdint.int64_t read_bigendian_int64(self) except? -1
45+
cpdef libc.stdint.int32_t read_bigendian_int32(self) except? -1
46+
cpdef double read_bigendian_double(self) except? -1
4147
cpdef bytes read_all(self, bint nested=*)

google/cloud/dataflow/coders/stream.pyx

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ cdef class OutputStream(object):
4040
self.pos += blen
4141

4242
cpdef write_byte(self, unsigned char val):
43-
assert 0 <= val <= 0xFF
4443
if self.size <= self.pos:
4544
self.extend(1)
4645
self.data[self.pos] = val
@@ -59,6 +58,33 @@ cdef class OutputStream(object):
5958
if not v:
6059
break
6160

61+
cpdef write_bigendian_int64(self, libc.stdint.int64_t signed_v):
62+
cdef libc.stdint.uint64_t v = signed_v
63+
if self.size < self.pos - 8:
64+
self.extend(8)
65+
self.data[self.pos ] = <unsigned char>(v >> 56)
66+
self.data[self.pos + 1] = <unsigned char>(v >> 48)
67+
self.data[self.pos + 2] = <unsigned char>(v >> 40)
68+
self.data[self.pos + 3] = <unsigned char>(v >> 32)
69+
self.data[self.pos + 4] = <unsigned char>(v >> 24)
70+
self.data[self.pos + 5] = <unsigned char>(v >> 16)
71+
self.data[self.pos + 6] = <unsigned char>(v >> 8)
72+
self.data[self.pos + 7] = <unsigned char>(v )
73+
self.pos += 8
74+
75+
cpdef write_bigendian_int32(self, libc.stdint.int32_t signed_v):
76+
cdef libc.stdint.uint32_t v = signed_v
77+
if self.size < self.pos - 4:
78+
self.extend(4)
79+
self.data[self.pos ] = <unsigned char>(v >> 24)
80+
self.data[self.pos + 1] = <unsigned char>(v >> 16)
81+
self.data[self.pos + 2] = <unsigned char>(v >> 8)
82+
self.data[self.pos + 3] = <unsigned char>(v )
83+
self.pos += 4
84+
85+
cpdef write_bigendian_double(self, double d):
86+
self.write_bigendian_int64((<libc.stdint.int64_t*><char*>&d)[0])
87+
6288
cpdef bytes get(self):
6389
return self.data[:self.pos]
6490

@@ -111,3 +137,25 @@ cdef class InputStream(object):
111137
if not (byte & 0x80):
112138
break
113139
return result
140+
141+
cpdef libc.stdint.int64_t read_bigendian_int64(self) except? -1:
142+
self.pos += 8
143+
return (<unsigned char>self.allc[self.pos - 1]
144+
| <libc.stdint.uint64_t><unsigned char>self.allc[self.pos - 2] << 8
145+
| <libc.stdint.uint64_t><unsigned char>self.allc[self.pos - 3] << 16
146+
| <libc.stdint.uint64_t><unsigned char>self.allc[self.pos - 4] << 24
147+
| <libc.stdint.uint64_t><unsigned char>self.allc[self.pos - 5] << 32
148+
| <libc.stdint.uint64_t><unsigned char>self.allc[self.pos - 6] << 40
149+
| <libc.stdint.uint64_t><unsigned char>self.allc[self.pos - 7] << 48
150+
| <libc.stdint.uint64_t><unsigned char>self.allc[self.pos - 8] << 56)
151+
152+
cpdef libc.stdint.int32_t read_bigendian_int32(self) except? -1:
153+
self.pos += 4
154+
return (<unsigned char>self.allc[self.pos - 1]
155+
| <libc.stdint.uint32_t><unsigned char>self.allc[self.pos - 2] << 8
156+
| <libc.stdint.uint32_t><unsigned char>self.allc[self.pos - 3] << 16
157+
| <libc.stdint.uint32_t><unsigned char>self.allc[self.pos - 4] << 24)
158+
159+
cpdef double read_bigendian_double(self) except? -1:
160+
cdef libc.stdint.int64_t as_long = self.read_bigendian_int64()
161+
return (<double*><char*>&as_long)[0]

google/cloud/dataflow/coders/stream_test.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,33 @@ def test_medium_var_int64(self):
7272
def test_large_var_int64(self):
7373
self.run_read_write_var_int64([0, 2**63 - 1, -2**63, 2**63 - 3])
7474

75+
def test_read_write_double(self):
76+
values = 0, 1, -1, 1e100, 1.0/3, math.pi, float('inf')
77+
out_s = self.OutputStream()
78+
for v in values:
79+
out_s.write_bigendian_double(v)
80+
in_s = self.InputStream(out_s.get())
81+
for v in values:
82+
self.assertEquals(v, in_s.read_bigendian_double())
83+
84+
def test_read_write_bigendian_int64(self):
85+
values = 0, 1, -1, 2**63-1, -2**63, int(2**61 * math.pi)
86+
out_s = self.OutputStream()
87+
for v in values:
88+
out_s.write_bigendian_int64(v)
89+
in_s = self.InputStream(out_s.get())
90+
for v in values:
91+
self.assertEquals(v, in_s.read_bigendian_int64())
92+
93+
def test_read_write_bigendian_int32(self):
94+
values = 0, 1, -1, 2**31-1, -2**31, int(2**29 * math.pi)
95+
out_s = self.OutputStream()
96+
for v in values:
97+
out_s.write_bigendian_int32(v)
98+
in_s = self.InputStream(out_s.get())
99+
for v in values:
100+
self.assertEquals(v, in_s.read_bigendian_int32())
101+
75102

76103
try:
77104
# pylint: disable=g-import-not-at-top

0 commit comments

Comments
 (0)