Skip to content

Commit 9daebc3

Browse files
committed
Merge pull request #20 from nisanharamati/discharging_queue
Auto-flushing queue by @nisanharamati
2 parents d4fd81f + eea7e15 commit 9daebc3

13 files changed

Lines changed: 437 additions & 9 deletions

README.rst

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,28 @@ Requirements
7878
* `protobuf`_ (when using Python 2)
7979
* `protobuf-py3`_ (when using Python 3)
8080

81+
Testing (Linux/OSX)
82+
-------
83+
84+
Testing is done with `tox`_::
85+
86+
tox
87+
88+
.. _tox: https://tox.readthedocs.org/en/latest/
89+
8190
Changelog
8291
---------
8392

93+
Version 6.1.1
94+
^^^^^^^^^^^^^
95+
96+
* Fixed socket error handling in ``riemann_client.client.AutoFlushingQueuedClient``.
97+
98+
Version 6.1.0
99+
^^^^^^^^^^^^^
100+
101+
* ``riemann_client.client.AutoFlushingQueuedClient`` added.
102+
84103
Version 6.0.0
85104
^^^^^^^^^^^^^
86105

riemann_client/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
"""A Python Riemann client and command line tool"""
22

3-
__version__ = '6.0.2'
3+
__version__ = '6.1.1'
44
__author__ = 'Sam Clements <sam.clements@datasift.com>'

riemann_client/client.py

Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,32 @@
66

77
from __future__ import absolute_import
88

9+
import logging
10+
try:
11+
from logging import NullHandler
12+
except ImportError:
13+
# Create a NullHandler class in logging for python 2.6
14+
class NullHandler(logging.Handler):
15+
def emit(self, record):
16+
pass
17+
918
import socket
19+
try:
20+
from threading import RLock
21+
from threading import Timer
22+
except ImportError:
23+
RLock = None
24+
Timer = None
25+
import time
1026

1127
import riemann_client.riemann_pb2
1228
import riemann_client.transport
1329

1430

31+
logger = logging.getLogger(__name__)
32+
logger.addHandler(NullHandler())
33+
34+
1535
class Client(object):
1636
"""A client for sending events and querying a Riemann server.
1737
@@ -200,3 +220,147 @@ def send_events(self, events):
200220
def clear_queue(self):
201221
"""Resets the message/queue to a blank :py:class:`.Msg` object"""
202222
self.queue = riemann_client.riemann_pb2.Msg()
223+
224+
225+
if RLock and Timer: # noqa
226+
class AutoFlushingQueuedClient(QueuedClient):
227+
"""A Riemann client using a queue and a timer that will automatically
228+
flush its contents if either:
229+
- the queue size exceeds :param max_batch_size: or
230+
- more than :param max_delay: has elapsed since the last flush and
231+
the queue is non-empty.
232+
233+
if :param stay_connected: is False, then the transport will be
234+
disconnected after each flush and reconnected at the beginning of
235+
the next flush.
236+
if :param clear_on_fail: is True, then the client will discard its
237+
buffer after the second retry in the event of a socket error.
238+
239+
A message object is used as a queue, and the following methods are
240+
given:
241+
- :py:meth:`.send_event` - add a new event to the queue
242+
- :py:meth:`.send_events` add a tuple of new events to the queue
243+
- :py:meth:`.event` - add a new event to the queue from
244+
keyword arguments
245+
- :py:meth:`.events` - add new events to the queue from
246+
dictionaries
247+
- :py:meth:`.flush` - manually force flush the queue to the
248+
transport
249+
"""
250+
251+
def __init__(self, transport, max_delay=0.5, max_batch_size=100,
252+
stay_connected=False, clear_on_fail=False):
253+
super(AutoFlushingQueuedClient, self).__init__(transport)
254+
self.stay_connected = stay_connected
255+
self.clear_on_fail = clear_on_fail
256+
self.max_delay = max_delay
257+
self.max_batch_size = max_batch_size
258+
self.lock = RLock()
259+
self.event_counter = 0
260+
self.last_flush = time.time()
261+
self.timer = None
262+
263+
# start the timer
264+
self.start_timer()
265+
266+
def connect(self):
267+
"""Connect the transport if it is not already connected."""
268+
if not self.is_connected():
269+
self.transport.connect()
270+
271+
def is_connected(self):
272+
"""Check whether the transport is connected."""
273+
try:
274+
# this will throw an exception whenever socket isn't connected
275+
self.transport.socket.type
276+
return True
277+
except (AttributeError, RuntimeError, socket.error):
278+
return False
279+
280+
def event(self, **data):
281+
"""Enqueues an event, using keyword arguments to create an Event
282+
283+
>>> client.event(service='riemann-client', state='awesome')
284+
285+
:param \*\*data: keyword arguments used for :py:func:`create_event`
286+
"""
287+
self.send_events((self.create_event(data),))
288+
289+
def events(self, *events):
290+
"""Enqueues multiple events in a single message
291+
292+
>>> client.events({'service': 'riemann-client',
293+
>>> 'state': 'awesome'})
294+
295+
:param \*events: event dictionaries for :py:func:`create_event`
296+
:returns: The response message from Riemann
297+
"""
298+
self.send_events(self.create_event(evd) for evd in events)
299+
300+
def send_events(self, events):
301+
"""Enqueues multiple events
302+
303+
:param events: A list or iterable of ``Event`` objects
304+
:returns: The response message from Riemann
305+
"""
306+
with self.lock:
307+
for event in events:
308+
self.queue.events.add().MergeFrom(event)
309+
self.event_counter += 1
310+
self.check_for_flush()
311+
312+
def flush(self):
313+
"""Sends the events in the queue to Riemann in a single protobuf
314+
message
315+
316+
:returns: The response message from Riemann
317+
"""
318+
response = None
319+
with self.lock:
320+
if not self.is_connected():
321+
self.connect()
322+
try:
323+
response = super(AutoFlushingQueuedClient, self).flush()
324+
except socket.error:
325+
# log and retry
326+
logger.warn("Socket error on flushing. "
327+
"Attempting reconnect and retry...")
328+
try:
329+
self.transport.disconnect()
330+
self.connect()
331+
response = (
332+
super(AutoFlushingQueuedClient, self).flush())
333+
except:
334+
logger.warn("Socket error on flushing "
335+
"second attempt. Batch discarded.")
336+
self.transport.disconnect()
337+
if self.clear_on_fail:
338+
self.clear_queue()
339+
self.event_counter = 0
340+
if not self.stay_connected:
341+
self.transport.disconnect()
342+
self.last_flush = time.time()
343+
self.start_timer()
344+
return response
345+
346+
def check_for_flush(self):
347+
"""Checks the conditions for flushing the queue"""
348+
if (self.event_counter >= self.max_batch_size or
349+
(time.time() - self.last_flush) >= self.max_delay):
350+
self.flush()
351+
352+
def start_timer(self):
353+
"""Cycle the timer responsible for periodically flushing the queue
354+
"""
355+
if self.timer:
356+
self.timer.cancel()
357+
self.timer = Timer(self.max_delay, self.check_for_flush)
358+
self.timer.daemon = True
359+
self.timer.start()
360+
361+
def stop_timer(self):
362+
"""Stops the current timer
363+
364+
a :py:meth:`.flush` event will reactviate the timer
365+
"""
366+
self.timer.cancel()

riemann_client/transport.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -169,27 +169,34 @@ def connect(self):
169169

170170

171171
class BlankTransport(Transport):
172-
"""A transport that collects messages in a list, and has no connection
172+
"""A transport that collects events in a list, and has no connection
173173
174174
Used by ``--transport none``, which is useful for testing commands without
175175
contacting a Riemann server. This is also used by the automated tests in
176176
``riemann_client/tests/test_riemann_command.py``.
177177
"""
178178

179+
def __init__(self, *args, **kwargs):
180+
self.events = []
181+
179182
def connect(self):
180183
"""Creates a list to hold messages"""
181-
self.messages = []
184+
pass
182185

183186
def send(self, message):
184187
"""Adds a message to the list, returning a fake 'ok' response
185188
186189
:returns: A response message with ``ok = True``
187190
"""
188-
self.messages.append(message)
191+
for event in message.events:
192+
self.events.append(event)
189193
reply = riemann_client.riemann_pb2.Msg()
190194
reply.ok = True
191195
return reply
192196

193197
def disconnect(self):
194198
"""Clears the list of messages"""
195-
del self.messages
199+
pass
200+
201+
def __len__(self):
202+
return len(self.events)

setup.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
setuptools.setup(
1313
name='riemann-client',
14-
version='6.0.2',
14+
version='6.1.1',
1515

1616
author="Sam Clements",
1717
author_email="sam.clements@datasift.com",
@@ -23,7 +23,6 @@
2323

2424
packages=[
2525
'riemann_client',
26-
'riemann_client.tests'
2726
],
2827

2928
install_requires=[

tests/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)