This is my first try with Python. I wanted to parse some Log4J
so I thought it a good opportunity to write my first Python program.
The format of the logs I deal with are simply like [INFO BLA 08:00:00] Blabla
. Most of the time there are single lines like the above; but sometimes, there is a stack trace logged, so a logical "line" is longer than a physical line. Each "logical" line starts with the character [
.
So I came up with
#!/usr/bin/env python
import sys
from collections import namedtuple
Logline = namedtuple("Logline", "level timestamp message")
def apply_filter(current_log_line, filtered_lines):
parsed_line = parse_line(current_log_line)
if filter(parsed_line):
filtered_lines.append(parsed_line)
def get_filtered_lines(source):
filtered_lines = []
line_buffer = ""
for line in source:
if line.startswith('[') and line_buffer:
apply_filter(line_buffer, filtered_lines)
line_buffer = ""
line_buffer += line
apply_filter(line_buffer, filtered_lines)
return filtered_lines
def parse_line(log_line):
prefix, message = log_line.split("]", 1)
level, timestamp = parse_prefix(prefix)
return Logline(level, timestamp, message)
def parse_prefix(prefix):
prefix = prefix.strip("[]")
tokens = prefix.split(" ")
level = tokens[0]
timestamp = tokens[-1]
return level, timestamp
def filter(log_line):
return log_line.level == "ERROR" and log_line.timestamp > "07:00:00"
def console_out(lines):
for line in lines:
print("{}{}".format(line.timestamp, line.message.rstrip("\n")))
def main():
if len(sys.argv) != 2:
print("Usage: {0} $logfile".format(sys.argv[0]))
else:
file_name = sys.argv[1]
with open(file_name) as file:
console_out(get_filtered_lines(file))
if __name__ == '__main__':
main()
Feel free to comment.
One thing, what bugs me is the apply_filter
function, which does too many things at once. But I have have no solution for that.
1 Answer 1
Let's see if I understand what's going on with get_filtered_lines() and apply_filter(). First, get_filtered_lines() reads physical lines from the file and strings them together in line_buffer. When we find the beginning of the next logical line ('['), we pass line_buffer off to apply_filter() so the material that has been collected gets processed. Then we empty out line_buffer and start over with the beginning of the logical line just read.
Here's an approach I think is simpler and clearer:
def apply_filter(lbuf):
parsed = parse_line(lbuf)
if filter(parsed):
return [parsed]
else
return []
def get_filtered_lines(source):
filtered_lines = []
collected = ""
for line in source:
if line.startswith('[') and collected:
filtered_lines.extend(apply_filter(collected))
collected = ""
collected += line
filtered_lines.extend(apply_filter(collected))
return filtered_lines
That way, apply_filter doesn't have to be aware of the list being built. It just returns a list of what it finds -- either a line that passes the filter or an empty list. Passing an empty list to extend is a no-op.