I've got this idea (based on experience) that logging plain messages isn't just enough to monitor an application so I built an abstraction layer on top of the built-in logging
package.
NOTE: I have refactored virtually the entire previous code as I wasn't happy with it at all.
Basiacally what it does, is to automatically log the begin
and end
of an activity. An activity is usually a function, but can be also a custom scope. This allows me to quickly find out whether something worked or if not then how it failed and whether it actually started.
Example:
@wiretap.telemetry()
def example():
pass
2023年10月25日 11:38:02.710 _ example | begin | 0.000s | None | {'source': {'file': 'demo.py', 'line': 173}} | None
2023年10月25日 11:38:02.723 _ example | end | 0.012s | None | {} | None
Where the formatter's pattern is defined as:
"fmt": "{asctime}.{msecs:03.0f} {indent} {activity} | {trace} | {elapsed:.3f}s | {message} | {details} | {attachment}"
Other examples:
@wiretap.telemetry(include_args=True, include_result=True, alias="example2")
def example(x: int, y: int):
return x + y
@wiretap.telemetry(auto_begin=False, include_result=True)
def example(x: int, y: int, activity: wiretap.Activity = None):
activity.initial.trace_begin().with_message("This is the begin!").with_details(x=x).log()
return x + y
Theory
The package (I call it wiretap) extends the LogRecord
with such extras as:
parent_id
- optionalUUID
of the parent logger (this is the correlation-id of the parent activity)unique_id
-UUID
of the current logger (this is the correlation-id; always the same for a single activity)timestamp
- date/timeactivity
- the name of the activity (function or alias)level
- the log-leveltrace
- the name of the trace that an activity leaveselapsed
- time in seconds elapsed since the logger has starteddetails
- json-details about the activity (args, source, result etc)attachment
- blob/text to dump any additional dataindent
- used only for console or file indicates the depth of the activity by injecting that number of_
or some other user specified character.
Each activity must leave at least two traces: begin
& end
indicating that everything went as planned. The end
can be replaced by:
error
- something unexptected occurednoop
- no-op meaning an activity didn't have to do anything (like removing columns from aDataFrame
that already doesn't have the columns to remove)abort
- an activity couldn't do it's job due to some precondition not being met (like filtering aDataFrame
that's empty)
begin
& end
can be logged only once, but between them other traces can provide any number of additional debug information about an activity:
info
- arbitrary dataitem
- info about one of many items being processed (like in a loop)skip
- info about an item being skipped (like in a loop)metric
- numeric info about some measurements
Implementation
The implementation is covered by the Activity
class that internally redirects all calls to the logging
package via the .log
API. It has three attributes that groups traces into categories:
start
- logs the beginning of an activityother
- logs other thingsfinal
- logs the final trace
Each of the three helpers contains methods to create a Trace
of that type that can be further customized and finally logged with its specialized log
methods that make sure that each trace is used properly. The start and final traces are allowed only once and logging any trace but the beginning one requires the beginning one to already be logged.
import contextlib
import dataclasses
import inspect
import logging
import re
from contextvars import ContextVar
from pathlib import Path
from typing import Callable, Any, Protocol, Iterator
from ..tools import Elapsed, Node
from .trace import Trace
class NewTrace(Protocol):
def __call__(self, name: str | None = None) -> Trace: ...
OnError = Callable[[BaseException, "Activity"], None]
OnBegin = Callable[[Trace], Trace]
current_activity: ContextVar[Node["Activity"] | None] = ContextVar("current_activity", default=None)
class Activity:
"""
This class represents an activity for which telemetry is collected.
"""
def __init__(
self,
name: str,
file: str,
line: int,
auto_begin: bool = True,
on_begin: OnBegin | None = None,
on_error: OnError | None = None
):
self.name = name
self.file = file
self.line = line
self.elapsed = Elapsed()
self._auto_begin = auto_begin
self._on_begin = on_begin or (lambda _t: _t)
self._on_error = on_error or (lambda _exc, _logger: None)
self._logger = logging.getLogger(name)
self._started = ActivityStartLogged(name, file, line)
self._tracing = PendingTrace(name, file, line)
self._finalized = OneTimeFalse()
self.start = StartTrace(self._trace(self._log_start))
self.other = OtherTrace(self._trace(self._log_other))
self.final = FinalTrace(self._trace(self._log_final))
def _log(self, trace: Trace) -> None:
self._logger.log(
level=trace.level,
msg=trace.message,
exc_info=trace.exc_info,
extra=dict(
activity=self.name,
trace=trace.name,
elapsed=float(self.elapsed),
details=trace.details,
attachment=trace.attachment
)
)
# This is how start traces are logged.
def _log_start(self, trace: Trace):
self._tracing.clear()
self._started.yes_for(trace.name)
# Source info is logged only here.
self._log(trace.with_details(source=dict(file=self.file, line=self.line)))
# This is how other traces are logged.
def _log_other(self, trace: Trace):
self._tracing.clear()
self._started.require_for(trace.name)
self._log(trace)
# This is how final traces are logged.
def _log_final(self, trace: Trace):
self._tracing.clear()
# Makes sure that final traces aren't logged multiple times.
# This can happen when an error trace was logged, so the end trace is obsolete.
if self._finalized:
return
self._started.require_for(trace.name)
self._log(trace)
# Creates a factory function for the Trace as some additional work is necessary.
def _trace(self, log: Callable[[Trace], None]) -> NewTrace:
def _factory(name: str | None = None) -> Trace:
name = name or str(TraceNameByCaller(2))
self._tracing.register(name)
return Trace(name, log)
return _factory
def __enter__(self) -> "Activity":
parent = current_activity.get()
self._token = current_activity.set(Node(self, parent))
if self._auto_begin:
self.start.trace_begin().action(self._on_begin).log()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_val:
if isinstance(exc_val, (ActivityStartMissing, ActivityAlreadyStarted, PreviousTraceNotLogged)):
# Do nothing when these errors occur, otherwise the same exception will raise for the default handler.
pass
else:
self._on_error(exc_val, self)
self.final.trace_error(f"Unhandled <{exc_type.__name__}> has occurred: <{str(exc_val) or 'N/A'}>").log()
else:
self.final.trace_end().log()
current_activity.reset(self._token)
return False
@contextlib.contextmanager
def begin_activity(
name: str,
auto_begin=True,
on_begin: OnBegin | None = None,
on_error: OnError | None = None
) -> Iterator[Activity]:
stack = inspect.stack()
frame = stack[2]
with Activity(
name=name,
file=Path(frame.filename).name,
line=frame.lineno,
auto_begin=auto_begin,
on_begin=on_begin,
on_error=on_error
) as activity:
yield activity
@dataclasses.dataclass(frozen=True, slots=True)
class LogAbortWhen(OnError):
exceptions: type[BaseException] | tuple[type[BaseException], ...]
def __call__(self, exc: BaseException, activity: Activity) -> None:
if isinstance(exc, self.exceptions):
activity.final.trace_abort(f"Unable to complete due to <{type(exc).__name__}>: {str(exc) or '<N/A>'}").log()
class StartTrace:
def __init__(self, trace: NewTrace):
self._trace = trace
def trace(self, name: str) -> Trace:
return self._trace(name).as_info()
def trace_begin(self) -> Trace:
return self._trace().as_info()
class OtherTrace:
def __init__(self, trace: NewTrace):
self._trace = trace
def trace(self, name: str) -> Trace:
return self._trace(name).as_debug()
def trace_info(self, message: str) -> Trace:
return self._trace().with_message(message).as_debug()
def trace_item(self, name: str, value: Any) -> Trace:
return self._trace().with_details(**{name: value}).as_debug()
def trace_skip(self, message: str) -> Trace:
return self._trace().with_message(message).as_debug()
def trace_metric(self, name: str, value: Any) -> Trace:
return self._trace().with_details(**{name: value}).as_debug()
class FinalTrace:
def __init__(self, trace: NewTrace):
self._trace = trace
def trace(self, name: str) -> Trace:
return self._trace(name).as_info()
def trace_noop(self, message: str) -> Trace:
return self._trace().with_message(message).as_info()
def trace_abort(self, message: str) -> Trace:
return self._trace().with_message(message).as_warning()
def trace_error(self, message: str) -> Trace:
return self._trace().with_message(message).as_error().with_exc_info(True)
def trace_end(self) -> Trace:
return self._trace().as_info()
class OneTimeFalse:
state = False
def __bool__(self):
try:
return self.state
finally:
self.state = True
class TraceNameByCaller:
def __init__(self, frame_index: int):
caller = inspect.stack()[frame_index].function
self.value = re.sub("^trace_", "", caller, flags=re.IGNORECASE)
def __str__(self):
return self.value
class ActivityAlreadyStarted(Exception):
def __init__(self, activity: str, file: str, line: int, trace: str):
super().__init__(
f"Cannot trace <{trace}> for the <{activity}> activity in <{file}:{line}>. "
f"You already did that. Did you mean to disable <auto_begin>?"
)
class ActivityStartMissing(Exception):
def __init__(self, activity: str, file: str, line: int, trace: str):
super().__init__(
f"Cannot trace <{trace}> for the <{activity}> activity in <{file}:{line}>. "
f"You need to log an start trace first."
)
@dataclasses.dataclass
class ActivityStartLogged:
"""
This class provides a mechanism to ensure that an initial trace is logged before any other trace is.
This situation may occur when telemetry's auto_begin=True and the user forgets to call logger.initial.log_begin.
"""
name: str
file: str
line: int
_value = False
def yes_for(self, trace: str):
if self:
raise ActivityAlreadyStarted(self.name, self.file, self.line, trace)
self._value = True
def require_for(self, trace: str):
if not self:
raise ActivityStartMissing(self.name, self.file, self.line, trace)
def __bool__(self) -> bool:
return self._value
class PreviousTraceNotLogged(Exception):
def __init__(self, activity: str, file: str, line: int, trace: str, previous: str):
super().__init__(
f"Cannot create or log trace <{trace}> for the <{activity}> activity in <{file}:{line}>. "
f"You need to log the previous <{previous}> trace first."
)
class PendingTrace:
"""
This class helps to make sure that pending traces are logged before the next one.
"""
def __init__(self, activity: str, file: str, line: int):
self.activity = activity
self.file = file
self.line = line
self.name: str | None = None
def register(self, name: str):
if self.name:
raise PreviousTraceNotLogged(self.activity, self.file, self.line, trace=name, previous=self.name)
self.name = name
def clear(self):
self.name = None
The Trace
class is a builder for a trace:
import logging
from typing import Any, Callable
from ..specs import ExcInfo
class Trace:
"""
This class is a builder for a single activity trace.
The process ends when log() is called and the trace is sent to the log.
"""
def __init__(self, name: str, log: Callable[["Trace"], None] | None):
self.name = name
self.message: str | None = None
self.details: dict[str, Any] = {}
self.attachment: Any | None = None
self.level: int = logging.INFO
self.exc_info: ExcInfo | bool | None = None
self.extra: dict[str, Any] = {}
self._log = log or (lambda _: None)
def with_message(self, value: str | None) -> "Trace":
self.message = value
return self
def with_details(self, **kwargs) -> "Trace":
self.details = self.details | kwargs
return self
def with_attachment(self, value: Any) -> "Trace":
self.attachment = value
return self
def with_exc_info(self, value: ExcInfo | bool) -> "Trace":
self.exc_info = value
return self
def with_level(self, value: int) -> "Trace":
self.level = value
return self
def as_debug(self) -> "Trace":
self.level = logging.DEBUG
return self
def as_info(self) -> "Trace":
self.level = logging.INFO
return self
def as_warning(self) -> "Trace":
self.level = logging.WARNING
return self
def as_error(self) -> "Trace":
self.level = logging.ERROR
return self
def action(self, func: Callable[["Trace"], "Trace"]) -> "Trace":
return func(self)
def log(self):
self._log(self)
The final core component is the telemetry
decorator. It initializes the Activity
and can pass it via dependency-injection to the decorated method if specified as activity: wiretap.Activity = None
. It should also be able to handle async
methods, thus the double implementation.
import asyncio
import functools
import inspect
from pathlib import Path
from typing import Any, Callable, TypeVar, Generic
from .tracing import Activity, OnBegin, OnError
def telemetry(
alias: str | None = None,
include_args: dict[str, str | Callable | None] | bool | None = False,
include_result: str | Callable | bool | None = False,
auto_begin=True,
on_begin: OnBegin | None = None,
on_error: OnError | None = None
):
"""Provides telemetry for the decorated function."""
on_begin = on_begin or (lambda _t: _t)
on_error = on_error or (lambda _exc, _logger: None)
def factory(decoratee):
stack = inspect.stack()
frame = stack[1]
kwargs_with_activity = KwargsWithActivity(decoratee)
def activity(**kwargs) -> Activity:
"""
Creates an activity with args to format because they are known later when the decorator is called.
"""
return Activity(
name=alias or decoratee.__name__,
file=Path(frame.filename).name,
line=frame.lineno,
auto_begin=auto_begin,
on_begin=lambda t: t.action(on_begin).with_details(**kwargs),
on_error=on_error
)
if asyncio.iscoroutinefunction(decoratee):
@functools.wraps(decoratee)
async def decorator(*decoratee_args, **decoratee_kwargs):
args = get_args(decoratee, *decoratee_args, **decoratee_kwargs)
with activity(args_native=args, args_format=include_args) as act:
result = await decoratee(*decoratee_args, **kwargs_with_activity(decoratee_kwargs, act))
act.final.trace_end().with_details(result_native=result, result_format=include_result).log()
return result
decorator.__signature__ = inspect.signature(decoratee)
return decorator
else:
@functools.wraps(decoratee)
def decorator(*decoratee_args, **decoratee_kwargs):
args = get_args(decoratee, *decoratee_args, **decoratee_kwargs)
with activity(args_native=args, args_format=include_args) as act:
result = decoratee(*decoratee_args, **kwargs_with_activity(decoratee_kwargs, act))
act.final.trace_end().with_details(result_native=result, result_format=include_result).log()
return result
decorator.__signature__ = inspect.signature(decoratee)
return decorator
return factory
_Func = TypeVar("_Func", bound=Callable)
class KwargsWithActivity(Generic[_Func]):
"""
This tool finds the parameter of type Activity and injects the instance of it.
"""
def __init__(self, func: _Func):
# Find the name of the activity argument if any...
self.name = next((n for n, t in inspect.getfullargspec(func).annotations.items() if t is Activity), "")
def __call__(self, kwargs: dict[str, Any], activity: Activity) -> dict[str, Any]:
# If name exists, then the key definitely is there so no need to check twice.
if self.name:
kwargs[self.name] = activity
return kwargs
def get_args(decoratee: object, *args, **kwargs) -> dict[str, Any]:
# Zip arg names and their indexes up to the number of args of the decoratee_args.
arg_pairs = zip(inspect.getfullargspec(decoratee).args, range(len(args)))
# Turn arg_pairs into a dictionary and combine it with decoratee_kwargs.
return {t[0]: args[t[1]] for t in arg_pairs} | kwargs
There are three more components used here that support the Elapsed
time, Used
for the single-used trace and TraceNameByCaller
to get the name of the calling function automatically.
from timeit import default_timer as timer
class Elapsed:
_start: float | None = None
@property
def current(self) -> float:
"""Gets the current elapsed time in seconds or 0 if called for the first time."""
if self._start:
return timer() - self._start
else:
self._start = timer()
return .0
def __float__(self):
return self.current
class Used:
state = False
def __bool__(self):
try:
return self.state
finally:
self.state = True
class TraceNameByCaller:
def __init__(self):
caller = inspect.stack()[1][3]
self.value = re.sub("^log_", "", caller, flags=re.IGNORECASE)
def __str__(self):
return self.value
Oh, there's one more class... the Node
that tracks the hierarchy of the activities:
_T = TypeVar("_T")
@dataclasses.dataclass(frozen=True, slots=True)
class Node(Generic[_T]):
value: _T
parent: Optional["Node[_T]"]
id: uuid.UUID = uuid.uuid4()
@property
def depth(self) -> int:
return self.parent.depth + 1 if self.parent else 1
def __iter__(self) -> Iterator["Node"]:
current: Node[_T] | None = self
while current:
yield current
current = current.parent
Review
Apart from this code, there are only a couple of custom logging.Filter
s that customize the LogRecord
.
What do you think? Is this code fine or do you think something could be improved dramatically?
The full code is avilable here on GitHub.
1 Answer 1
Thank you for the theory-of-operation, that was a helpful introduction.
First impression from it was that, out of {message, details, attachment}, maybe we could delete one or two of them? And rely on a JSON attribute to represent it, when present in a log entry?
I appreciate the consistent elapsed
column, even when BEGIN makes it zero.
I could definitely see myself using sort -n -k ...
to pick out slow operations.
You have a very clear record format.
Maybe the motivation behind attachment
is for logging big text, like an email?
In which case I'm going to propose sacrilege: intersperse such text with log records.
We can easily use grep "^2023-"
to pick out structured records (or even "^2").
Log attachment
lines with some distinctive prefix, perhaps just SPACE.
Anything that doesn't start with "2" would work.
Now we have a log stream and an attachment stream interspersed on stdout,
and we can separate them, and can recover the original attachment
by stripping the initial prefix.
Just a thought.
anonymous trace
class NewTrace(Protocol):
def __call__(self, name: str | None = None) -> Trace: ...
It's not obvious to me why name=None
would be Good.
If you were "trying to make mypy happy", consider revisiting this detail.
OIC, trace_begin
preferred None
over the empty string or a sentinel.
Still not exactly following why.
Maybe default to __name__
?
Or inspect what's on the call stack to find a function name?
Hmmm, as I read through more code I think the trouble with my understanding
is I'm not yet getting what's good about calling .clear()
.
I bet the code is great,
and the thing we might improve is the introductory documentation.
I'm not sure if .with_message(None)
relates to the same issue or not,
but I'm surprised it doesn't insist on being passed a str
message.
BaseException
OnError = Callable[[BaseException, "Activity"], None]
Sorry, I'm not following why you needed that. Possibly for GeneratorExit?
When developers start messing around with KeyboardInterrupt and SystemExit it makes me nervous. I'm willing to believe there's a good reason for not using Exception here. Please write it down. In a """docstring""".
Similarly down in LogAbortWhen
.
IDK, maybe we really do need to log quit()
/ sys.exit()
?
Just offer a hint about such a use case.
In the Activity
constructor args,
kudos on making many of your attributes _private
.
And the or
defaulting is very nice.
nice comment
Usually I grumble about too many comments, or that they're about the "how", or about comments that are inaccurate. And then we come to this...
# Do nothing when these errors occur, otherwise the same exception will raise for the default handler.
pass
Thank you for this helpful explanation.
magic number
Ok, I gave you a pass the first time, feeling it was "obvious":
def _trace(...
name = name or str(TraceNameByCaller(2))
And besides, if a refactor introduced one more level on the call stack
so we need 3
, I assume automated unit tests would immediately blow up
alerting us to the needed edit.
But now I see this and I start to get slightly nervous:
def begin_activity(
...
frame = stack[2]
Not sure if that's the same 2
or not.
I'm not saying there's anything urgent to attend to here,
just encouraging you to think about it.
I worry there might be shallow + deep call paths to get here.
In contrast, down in telemetry factory
the stack[1]
is cool
because I can see, without scrolling, everything that could affect it.
severity level
class OtherTrace:
...
def trace_info( ...
return self._trace().with_message(message).as_debug()
Not .as_info()
?
Sure, I get it, that's fine. Just registering some slight surprise.
trace_skip
(and perhaps "_metric") seems a little bit redundant.
Maybe its special "value add" would be to simply prepend "skip "
to the message
?
Or maybe there's something planned for it in the roadmap,
which we'd like to mention in a docstring? To give guidance
to an app developer about which one is appropriate to call.
vague name
In ActivityStartLogged
, _value
is a bit vague.
Consider renaming to _initialized
(or _is_initialized
).
meaningful identifier
def get_args( ...
return {t[0]: args[t[1]] for t in arg_pairs} | ...
Prefer tuple unpack over [0]
/ [1]
indices:
return {arg: args[i] for arg, i in arg_pairs} | ...
This impresses me as a well-engineered module with the details
carefully thought out.
Code formatting was easy to read, with no 500-char lines.
I appreciate the mypy
linting, it is helpful to the Gentle Reader
and instills confidence in code correctness.
This codebase achieves its design goals and appears ready for a pypi release.
I would be willing to delegate or accept maintenance tasks on it.
-
1\$\begingroup\$ I need to try harder to write more comments! I've recently opened an older project where I commented a couple of tricky lines in the past and it was such a good feeling not have to reverse engineer that thing LOL I like all the other suggestions too! Btw, I've been refactoring this code for weeks dozens of times haha and this is the first version that I'm actually happy with. \$\endgroup\$t3chb0t– t3chb0t2023年11月01日 07:50:07 +00:00Commented Nov 1, 2023 at 7:50
-
\$\begingroup\$ About the
attachment
. Your guess is correct. It's supposed to store some text like requests or respones etc, but only when logged into a database where all these properties actually make sense as there they can be easily queried. \$\endgroup\$t3chb0t– t3chb0t2023年11月01日 08:34:52 +00:00Commented Nov 1, 2023 at 8:34
types
- it shadows quite important standard librarytypes
module. Add at least some CI format-test-deploy pipeline (can suggest pre-commit for the lint step). You have type annotations, butmypy
is not green - it's about CI again. Please wrap code lines (black
can do this for you) - 100+ char/line is almost unreadable, esp. in complex editor setup with 2-3 panels stacked horizontally. I mean, even github page needs horiz. scrolling... \$\endgroup\$mypy
happy, but that's the best I can do ;-] About long lines. Their actually an exception here as they're pretty much all the same and I don't like random line breaks so I instructedPyCharm
to not interfere and set it to 500 heh. Either all params are each on a new line or none. There are no partial line-breaks with me ;P With these couple of APIs I chose to go with lengthy ones. \$\endgroup\$from . import types
, a relative import, sowiretap.types
cannot shadowtypes
. \$\endgroup\$./types
overrides builtintypes
for absolute import). \$\endgroup\$