Skip to content
This repository was archived by the owner on Jun 30, 2022. It is now read-only.

Commit 2e69a49

Browse files
gildeasilviulica
authored andcommitted
Add class ByteCountingOutputStream
ByteCountingOutputStream is an output string stream implementation that only counts the bytes. This is useful for sizing an encoding. ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=119995430
1 parent 6084ff5 commit 2e69a49

4 files changed

Lines changed: 106 additions & 0 deletions

File tree

google/cloud/dataflow/coders/slow_stream.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,33 @@ def get(self):
5959
return ''.join(self.data)
6060

6161

62+
class ByteCountingOutputStream(OutputStream):
63+
"""A pure Python implementation of stream.ByteCountingOutputStream."""
64+
65+
def __init__(self):
66+
# Note that we don't actually use any of the data initialized by our super.
67+
super(ByteCountingOutputStream, self).__init__()
68+
self.count = 0
69+
70+
def write(self, byte_array, nested=False):
71+
blen = len(byte_array)
72+
if nested:
73+
self.write_var_int64(blen)
74+
self.count += blen
75+
76+
def write_byte(self, _):
77+
self.count += 1
78+
79+
def get_count(self):
80+
return self.count
81+
82+
def get(self):
83+
raise NotImplementedError
84+
85+
def __str__(self):
86+
return '<%s %s>' % (self.__class__.__name__, self.count)
87+
88+
6289
class InputStream(object):
6390
"""A pure Python implementation of stream.InputStream."""
6491

google/cloud/dataflow/coders/stream.pxd

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,17 @@ cdef class OutputStream(object):
3232
cdef extend(self, size_t missing)
3333

3434

35+
cdef class ByteCountingOutputStream(OutputStream):
36+
cdef size_t count
37+
38+
cpdef write(self, bytes b, bint nested=*)
39+
cpdef write_byte(self, unsigned char val)
40+
cpdef write_bigendian_int64(self, libc.stdint.int64_t val)
41+
cpdef write_bigendian_int32(self, libc.stdint.int32_t val)
42+
cpdef size_t get_count(self)
43+
cpdef bytes get(self)
44+
45+
3546
cdef class InputStream(object):
3647
cdef size_t pos
3748
cdef bytes all

google/cloud/dataflow/coders/stream.pyx

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,44 @@ cdef class OutputStream(object):
9494
self.data = <char*>libc.stdlib.realloc(self.data, self.size)
9595

9696

97+
cdef class ByteCountingOutputStream(OutputStream):
98+
"""An output string stream implementation that only counts the bytes.
99+
100+
This implementation counts the number of bytes it "writes" but
101+
doesn't actually write them anyway. Thus it has write() but not
102+
get(). get_count() returns how many bytes were written.
103+
104+
This is useful for sizing an encoding.
105+
"""
106+
107+
def __cinit__(self):
108+
self.count = 0
109+
110+
cpdef write(self, bytes b, bint nested=False):
111+
cdef size_t blen = len(b)
112+
if nested:
113+
self.write_var_int64(blen)
114+
self.count += blen
115+
116+
cpdef write_byte(self, unsigned char _):
117+
self.count += 1
118+
119+
cpdef write_bigendian_int64(self, libc.stdint.int64_t _):
120+
self.count += 8
121+
122+
cpdef write_bigendian_int32(self, libc.stdint.int32_t _):
123+
self.count += 4
124+
125+
cpdef size_t get_count(self):
126+
return self.count
127+
128+
cpdef bytes get(self):
129+
raise NotImplementedError
130+
131+
def __str__(self):
132+
return '<%s %s>' % (self.__class__.__name__, self.count)
133+
134+
97135
cdef class InputStream(object):
98136
"""An input string stream implementation supporting read() and size()."""
99137

google/cloud/dataflow/coders/stream_test.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
"""Tests for the stream implementations."""
1616

17+
import logging
1718
import math
1819
import unittest
1920

@@ -22,8 +23,11 @@
2223

2324

2425
class StreamTest(unittest.TestCase):
26+
# pylint: disable=invalid-name
2527
InputStream = slow_stream.InputStream
2628
OutputStream = slow_stream.OutputStream
29+
ByteCountingOutputStream = slow_stream.ByteCountingOutputStream
30+
# pylint: enable=invalid-name
2731

2832
def test_read_write(self):
2933
out_s = self.OutputStream()
@@ -99,6 +103,28 @@ def test_read_write_bigendian_int32(self):
99103
for v in values:
100104
self.assertEquals(v, in_s.read_bigendian_int32())
101105

106+
def test_byte_counting(self):
107+
bc_s = self.ByteCountingOutputStream()
108+
self.assertEquals(0, bc_s.get_count())
109+
bc_s.write('def')
110+
self.assertEquals(3, bc_s.get_count())
111+
bc_s.write('')
112+
self.assertEquals(3, bc_s.get_count())
113+
bc_s.write_byte(10)
114+
self.assertEquals(4, bc_s.get_count())
115+
# "nested" also writes the length of the string, which should
116+
# cause 1 extra byte to be counted.
117+
bc_s.write('2345', nested=True)
118+
self.assertEquals(9, bc_s.get_count())
119+
bc_s.write_var_int64(63)
120+
self.assertEquals(10, bc_s.get_count())
121+
bc_s.write_bigendian_int64(42)
122+
self.assertEquals(18, bc_s.get_count())
123+
bc_s.write_bigendian_int32(36)
124+
self.assertEquals(22, bc_s.get_count())
125+
bc_s.write_bigendian_double(6.25)
126+
self.assertEquals(30, bc_s.get_count())
127+
102128

103129
try:
104130
# pylint: disable=g-import-not-at-top
@@ -108,22 +134,26 @@ class FastStreamTest(StreamTest):
108134
"""Runs the test with the compiled stream classes."""
109135
InputStream = stream.InputStream
110136
OutputStream = stream.OutputStream
137+
ByteCountingOutputStream = stream.ByteCountingOutputStream
111138

112139

113140
class SlowFastStreamTest(StreamTest):
114141
"""Runs the test with compiled and uncompiled stream classes."""
115142
InputStream = stream.InputStream
116143
OutputStream = slow_stream.OutputStream
144+
ByteCountingOutputStream = slow_stream.ByteCountingOutputStream
117145

118146

119147
class FastSlowStreamTest(StreamTest):
120148
"""Runs the test with uncompiled and compiled stream classes."""
121149
InputStream = slow_stream.InputStream
122150
OutputStream = stream.OutputStream
151+
ByteCountingOutputStream = stream.ByteCountingOutputStream
123152

124153
except ImportError:
125154
pass
126155

127156

128157
if __name__ == '__main__':
158+
logging.getLogger().setLevel(logging.INFO)
129159
unittest.main()

0 commit comments

Comments
 (0)