-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapache-logs-stream.py
More file actions
executable file
·209 lines (180 loc) · 6.74 KB
/
apache-logs-stream.py
File metadata and controls
executable file
·209 lines (180 loc) · 6.74 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
#!/usr/bin/env -S uv --quiet run --no-project --script --
# https://peps.python.org/pep-0723/
# https://github.com/astral-sh/uv
# /// script
# # Docopt issues SyntaxWarning in Python 3.12
# requires-python = ">=3.11,<3.12"
# dependencies = [
# "docopt >=0.6.2",
# "geoip2 >=4.0.2",
# "dateparser >=0.7.6",
# ]
# ///
"""
Usage:
{prog} [options] --server-host=ADDRESS --log-file-path=PATH [--exclude-host=ADDRESS]...
Options:
--oneline Stream logs in a compact format.
--since=DATE, -s DATE Only show logs after specified date and time.
--exclude-host=ADDRESS, -x ADDRESS Do not show logs initiated from specified IP addresses.
--server-host=ADDRESS, -h ADDRESS The server hostname to get apache logs from.
--log-file-path=PATH, -p PATH Path to the access log file on the specified remote host.
"""
import sys, locale, asyncio, subprocess, os, socket, datetime, re, json
import docopt
import geoip2.database
import dateparser
async def main(*, args, prog):
locale.setlocale(locale.LC_ALL, "")
params = docopt.docopt(
__doc__.replace("\t", " " * 4).format(prog=os.path.basename(prog)),
argv=args,
help=True,
version=True,
options_first=False
)
server_host = params.pop("--server-host")
log_file_path = params.pop("--log-file-path")
stream_oneline = params.pop("--oneline")
since_time = params.pop("--since")
exclude_hosts_addresses = params.pop("--exclude-host")
if since_time:
since_time = dateparser.parse(since_time, settings={"RETURN_AS_TIMEZONE_AWARE": True})
assert not params, params
log = asyncio.Queue()
pending_tasks = []
printer_task = asyncio.create_task(
printer(queue=log, fo=sys.stdout)
)
#{
exclude_hosts = []
for a in exclude_hosts_addresses:
exclude_hosts.extend(socket.gethostbyname_ex(a)[2])
cmd = [
"ssh", server_host,
"tail", "--follow=name", "--lines=+0",
log_file_path,
]
p = await asyncio.create_subprocess_exec(
*cmd,
stdout=subprocess.PIPE,
)
pending_tasks.append(p.wait()) #TODO call p.terminate() at exit
pending_tasks.append(process_apache_access_log(
input_stream=p.stdout,
output_queue=log,
exclude_hosts=exclude_hosts,
oneline=stream_oneline,
since=since_time,
))
#}
await asyncio.gather(*pending_tasks)
current_task = asyncio.current_task()
all_tasks = asyncio.all_tasks()
assert {current_task, printer_task} == all_tasks, (current_task, all_tasks)
await log.put(None)
await printer_task
async def process_apache_access_log(*, input_stream, output_queue, exclude_hosts=None, oneline=False, since=None):
# LogFormat "%h %l %u %t \"%r\" %>s %O \"%{Referer}i\" \"%{User-Agent}i\"" combined
# http://httpd.apache.org/docs/current/mod/mod_log_config.html
line_p = re.compile(r"""^(?P<remote_hostname>\S+) (?P<remote_logname>\S+) (?P<remote_user>\S+) \[(?P<time>[^]]+)\] "(?P<request_first_line>[^"]+)" (?P<final_status>\S+) (?P<bytes_sent>\d+) "(?P<referrer>[^"]*)" "(?P<useragent>[^"]+)"\s*$""")
request_p = re.compile(r"""^(?P<method>\S+)\s+(?P<uri>.*)\s+(?P<httpversion>\S+)\s*$""")
def json_default(o):
if isinstance(o, datetime.datetime):
return o.isoformat()
raise TypeError(f"Object of type {o.__class__.__name__!r} is not JSON serializable")
# https://github.com/maxmind/GeoIP2-python
# https://dev.maxmind.com/#GeoIP
with geoip2.database.Reader("/Users/vruyr/.bin/geoip2/GeoLite2-City_20200804/GeoLite2-City.mmdb") as geoip_reader_city:
with geoip2.database.Reader("/Users/vruyr/.bin/geoip2/GeoLite2-ASN_20200811/GeoLite2-ASN.mmdb") as geoip_reader_asn:
while True:
line = await input_stream.readline()
if not line:
break
line = line.decode("UTF-8") #TODO Don't assume UTF-8 encoding.
line_m = line_p.match(line)
if line_m is None:
await output_queue.put(f"Unmatched: {line}")
continue
linedict = line_m.groupdict()
linedict["time"] = datetime.datetime.strptime(linedict["time"], "%d/%b/%Y:%H:%M:%S %z")
if since and linedict["time"] < since:
continue
if m := request_p.match(linedict["request_first_line"]):
linedict["request"] = m.groupdict()
else:
linedict["request"] = None
remote_hostname = linedict["remote_hostname"]
if exclude_hosts and remote_hostname in exclude_hosts:
continue
geoip_city = None
geoip_asn = None
try:
geoip_city = geoip_reader_city.city(remote_hostname)
except:
pass
try:
geoip_asn = geoip_reader_asn.asn(remote_hostname)
except:
pass
linedict["geoip"] = format_geoip(geoip_city, geoip_asn)
if oneline:
f_time = linedict["time"].isoformat()
f_method = (linedict.get("request") or {}).get("method", "<none>")
f_status = linedict["final_status"]
f_remote_host = linedict["remote_hostname"]
f_country = linedict["geoip"]["country"]["iso_code"] or "-"
f_state = linedict["geoip"]["state"]["iso_code"] or "-"
f_city = linedict["geoip"]["city"] or "-"
f_uri = (linedict.get("request") or {}).get("uri", repr(linedict["request_first_line"]))
await output_queue.put(f"""{f_time} {f_method:8} {f_status} {f_remote_host:15} {f_country:2} {f_state:2} {f_city:20} {f_uri}\n""")
else:
await output_queue.put(json.dumps(linedict, indent="\t", default=json_default))
await output_queue.put("\n")
def format_geoip(geoip_city, geoip_asn):
return {
"autonomous_system_organization": geoip_asn.autonomous_system_organization if geoip_asn else None,
"city": geoip_city.city.name if geoip_city else None,
"subdivisions": [s.name for s in geoip_city.subdivisions] if geoip_city else [],
"postal": geoip_city.postal.code if geoip_city else None,
"country": {
"name": geoip_city.country.name if geoip_city else None,
"iso_code": geoip_city.country.iso_code if geoip_city else None,
},
"state": {
"name": geoip_city.subdivisions.most_specific.name if geoip_city else None,
"iso_code": geoip_city.subdivisions.most_specific.iso_code if geoip_city else None,
},
"continent": geoip_city.continent.name if geoip_city else None,
"location": {
"latitude": geoip_city.location.latitude if geoip_city else None,
"longitude": geoip_city.location.longitude if geoip_city else None,
"accuracy_radius": geoip_city.location.accuracy_radius if geoip_city else None,
}
}
async def printer(*, queue, fo):
while True:
item = await queue.get()
if item is None:
break
if isinstance(item, (list, tuple)):
print(*item, file=fo, sep="", end="")
else:
print(item, file=fo, sep="", end="")
def smain(argv=None):
if argv is None:
argv = sys.argv
try:
if sys.platform == "win32":
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
return asyncio.run(
main(
args=argv[1:],
prog=argv[0]
),
debug=False,
)
except KeyboardInterrupt:
print(file=sys.stderr)
if __name__ == "__main__":
sys.exit(smain())