Skip to content

Commit 0c7227b

Browse files
author
Sam Clements
committed
Add support for querying the Riemann index
* transport: Use send and recv as method names, added recv * client: Added send_query, create_dict, query * client: Filter `None` values in create_dict * command: Added query subcommand
1 parent ebaae9c commit 0c7227b

5 files changed

Lines changed: 86 additions & 39 deletions

File tree

riemann/client.py

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,22 +31,46 @@ def create_event(data):
3131
event = riemann.riemann_pb2.Event()
3232
event.tags.extend(data.pop('tags'))
3333
for name, value in data.items():
34-
setattr(event, name, value)
34+
if value is not None:
35+
setattr(event, name, value)
3536
return event
3637

3738
def send_event(self, event):
3839
"""Wraps an event in a message and sends it to Riemann"""
3940
message = riemann.riemann_pb2.Msg()
4041
message.events.extend([event])
41-
self.transport.write(message)
42+
self.transport.send(message)
4243
return event
4344

4445
def event(self, **data):
4546
"""Sends an event"""
4647
return self.send_event(self.create_event(data))
4748

48-
def query(self, *args, **kwargs):
49-
raise NotImplemented("Querying is not yet supported")
49+
@staticmethod
50+
def create_dict(event):
51+
"""Creates a dict from an Event"""
52+
return {
53+
'time': event.time,
54+
'state': event.state,
55+
'host': event.host,
56+
'description': event.description,
57+
'service': event.service,
58+
'tags': list(event.tags),
59+
'ttl': event.ttl,
60+
'metric_f': event.metric_f,
61+
'metric_d': event.metric_d,
62+
'metric_sint64': event.metric_sint64
63+
}
64+
65+
def send_query(self, query):
66+
message = riemann.riemann_pb2.Msg()
67+
message.query.string = query
68+
self.transport.send(message)
69+
return self.transport.recv()
70+
71+
def query(self, query):
72+
response = self.send_query(query)
73+
return [self.create_dict(e) for e in response.events]
5074

5175

5276
class QueuedClient(Client):
@@ -58,7 +82,7 @@ def __init__(self, *args, **kwargs):
5882

5983
def flush(self):
6084
"""Sends the waiting message to Riemann"""
61-
self.transport.write(self.queue)
85+
self.transport.send(self.queue)
6286
self.queue = riemann.riemann_pb2.Msg()
6387

6488
def send_event(self, event):

riemann/command.py

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from __future__ import absolute_import, print_function
44

55
import argparse
6+
import json
67

78
import riemann.client
89
import riemann.transport
@@ -60,29 +61,36 @@ def wide_formatter(*args, **kwargs):
6061
send.add_argument(
6162
*arg_names, metavar=arg_type.__name__, type=arg_type, help=arg_help)
6263

63-
64-
def filter_dict(function, dictionary):
65-
return dict((k, v) for k, v in dictionary.items() if function(k, v))
64+
query = subparsers.add_parser('query', help='Query the Riemann index')
65+
query.add_argument(
66+
'query', type=str,
67+
help="The query to send")
68+
query.add_argument(
69+
'-pp', '--pretty-print', action='store_true',
70+
help="Pretty print output")
6671

6772

6873
def send(args, client):
69-
event = client.event(**filter_dict(lambda k, v: v is not None, {
70-
"time": args.time,
71-
"state": args.state,
72-
"host": args.event_host,
73-
"description": args.description,
74-
"service": args.service,
75-
"tags": args.tags,
76-
"ttl": args.ttl,
77-
"metric_f": args.metric
78-
}))
74+
event = client.event(
75+
time=args.time,
76+
state=args.state,
77+
host=args.host,
78+
description=args.description,
79+
service=args.service,
80+
tags=args.tags,
81+
ttl=args.ttl,
82+
metric_f=args.metric)
7983

8084
if args.print_message:
8185
print(str(event).strip())
8286

8387

88+
def query(args, client):
89+
print(json.dumps(client.query(args.query), sort_keys=True, indent=2))
90+
91+
8492
def main():
8593
args = parser.parse_args()
8694
transport = TRANSPORT_CLASSES[args.transport](args.host, args.port)
8795
with riemann.client.Client(transport=transport) as client:
88-
{'send': send}[args.subparser](args, client)
96+
{'send': send, 'query': query}[args.subparser](args, client)

riemann/tests/test_riemann.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,22 @@
66
import py.test
77

88
import riemann.client
9+
import riemann.riemann_pb2
910
import riemann.transport
1011

1112

1213
class StringTransport(riemann.transport.Transport):
1314
def connect(self):
1415
self.string = StringIO.StringIO()
1516

16-
def write(self, message):
17+
def send(self, message):
1718
self.string.write(message.SerializeToString())
1819

20+
def recv(self):
21+
message = riemann.riemann_pb2.Msg()
22+
message.ok = True
23+
return message
24+
1925
def disconnect(self):
2026
self.string.close()
2127

@@ -33,8 +39,6 @@ def disconnect():
3339

3440

3541
class TestClient(object):
36-
"""Tests Client.create_event()"""
37-
3842
def test_service(self, client):
3943
client.event(service='test event')
4044
assert 'test event' in client.transport.string.getvalue()
@@ -57,9 +61,8 @@ def test_event_cls(self, client):
5761
assert isinstance(client.create_event({}), riemann.riemann_pb2.Event)
5862
assert isinstance(client.event(), riemann.riemann_pb2.Event)
5963

60-
@py.test.mark.xfail
6164
def test_query(self, client):
62-
client.query("true")
65+
assert client.query("true") == []
6366

6467

6568
@py.test.fixture

riemann/transport.py

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ def __enter__(self):
2929
def __exit__(self, exc_type, exc_value, traceback):
3030
self.disconnect()
3131

32+
@property
33+
def address(self):
34+
return self.host, self.port
35+
3236
@abc.abstractmethod
3337
def connect(self):
3438
pass
@@ -38,12 +42,12 @@ def disconnect(self):
3842
pass
3943

4044
@abc.abstractmethod
41-
def write(self):
45+
def send(self):
4246
pass
4347

44-
@property
45-
def address(self):
46-
return self.host, self.port
48+
@abc.abstractmethod
49+
def recv(self):
50+
pass
4751

4852

4953
class UDPTransport(Transport):
@@ -53,29 +57,37 @@ def connect(self):
5357
def disconnect(self):
5458
self.socket.close()
5559

56-
def write(self, message):
60+
def send(self, message):
5761
self.socket.sendto(message.SerializeToString(), self.address)
5862

63+
def recv(self):
64+
raise NotImplementedError
65+
5966

6067
class TCPTransport(Transport):
6168
def connect(self):
6269
self.socket = socket.create_connection(self.address)
70+
self.socket.setblocking(True)
6371

6472
def disconnect(self):
6573
self.socket.close()
6674

67-
def write(self, message):
68-
# Send the message to the server
69-
data = message.SerializeToString()
70-
self.socket.send(struct.pack('!I', len(data)) + data)
75+
def send(self, message):
76+
message = message.SerializeToString()
77+
self.socket.sendall(struct.pack('!I', len(message)) + message)
78+
79+
def recv(self):
80+
length = struct.unpack('!I', self.socket.recv(4))[0]
7181

72-
# Return the server's response
73-
response_len = struct.unpack('!I', self.socket.recv(4))[0]
7482
response = riemann.riemann_pb2.Msg()
75-
response.ParseFromString(self.socket.recv(response_len))
83+
response.ParseFromString(self.recvall(length))
7684

77-
# Handle error messages
7885
if not response.ok:
7986
raise RiemannError(response.error)
80-
8187
return response
88+
89+
def recvall(self, length, bufsize=4096):
90+
data = ""
91+
while len(data) < length:
92+
data += self.socket.recv(bufsize)
93+
return data

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
setuptools.setup(
77
name = "riemann-client",
8-
version = '2.1.0',
8+
version = '3.0.0-beta',
99

1010
author = "Sam Clements",
1111
author_email = "sam.clements@datasift.com",

0 commit comments

Comments
 (0)