Asyncio daemon tutorial

This tutorial shows how to build an asyncio daemon following the dependency injection principle.

In this tutorial we will use:

  • Python 3

  • Docker

  • Docker Compose

Start from the scratch or jump to the section:

You can find complete project on the Github.

What are we going to build?

We will build a monitoring daemon that monitors web services availability.

The daemon will send the requests to the example.com and httpbin.org every couple of seconds. For each successfully completed response it will log:

  • The response code

  • The amount of bytes in the response

  • The time took to complete the response

../_images/diagram.png

Prerequisites

We will use docker compose in this tutorial. Let’s check the versions:

docker--version
dockercomposeversion

The output should look something like:

Dockerversion27.3.1,buildce12230
DockerComposeversionv2.29.7

Note

If you don’t have Docker or docker compose you need to install them before proceeding. Follow these installation guides:

The prerequisites are satisfied. Let’s get started with the project layout.

Project layout

Create the project root folder and set it as a working directory:

mkdirasyncio-daemon-tutorial
cdasyncio-daemon-tutorial

Now we need to create the initial project structure. Create the files and folders following next layout. All files should be empty for now. We will fill them later.

Initial project layout:

./
├──monitoringdaemon/
│├──__init__.py
│├──__main__.py
│└──containers.py
├──config.yml
├──docker-compose.yml
├──Dockerfile
└──requirements.txt

Initial project layout is ready. We will extend it in the next sections.

Let’s proceed to the environment preparation.

Prepare the environment

In this section we are going to prepare the environment for running our daemon.

First we need to specify the project requirements. We will use next packages:

  • dependency-injector - the dependency injection framework

  • aiohttp - the web framework (we need only http client)

  • pyyaml - the YAML files parsing library, used for the reading of the configuration files

  • pytest - the test framework

  • pytest-asyncio - the helper library for the testing of the asyncio application

  • pytest-cov - the helper library for measuring the test coverage

Put next lines into the requirements.txt file:

dependency-injector
aiohttp
pyyaml
pytest
pytest-asyncio
pytest-cov

Second, we need to create the Dockerfile. It will describe the daemon’s build process and specify how to run it. We will use python:3.13-bookworm as a base image.

Put next lines into the Dockerfile file:

FROMpython:3.13-bookworm
ENVPYTHONUNBUFFERED=1
WORKDIR/code
COPY./code/
RUNapt-getinstallopenssl\
&&pipinstall--upgradepip\
&&pipinstall-rrequirements.txt\
&&rm-rf~/.cache
CMD["python","-m","monitoringdaemon"]

Third, we need to define the container in the docker-compose configuration.

Put next lines into the docker-compose.yml file:

services:
monitor:
build:./
image:monitoring-daemon
volumes:
-"./:/code"

All is ready. Let’s check that the environment is setup properly.

Run in the terminal:

dockercomposebuild

The build process may take a couple of minutes. You should see something like this in the end:

Successfullybuilt5b4ee5e76e35
Successfullytaggedmonitoring-daemon:latest

After the build is done run the container:

dockercomposeup

The output should look like:

Creatingnetwork"asyncio-daemon-tutorial_default"withthedefaultdriver
Creatingasyncio-daemon-tutorial_monitor_1...done
Attachingtoasyncio-daemon-tutorial_monitor_1
asyncio-daemon-tutorial_monitor_1exitedwithcode0

The environment is ready. The application does not do any work and just exits with a code 0.

Next step is to configure the logging and configuration file parsing.

Logging and configuration

In this section we will configure the logging and configuration file parsing.

Let’s start with the the main part of our application – the container. Container will keep all of the application components and their dependencies.

First two components that we’re going to add are the configuration provider and the resource provider for configuring the logging.

Put next lines into the containers.py file:

"""Containers module."""
importlogging
importsys
fromdependency_injectorimport containers, providers
classContainer(containers.DeclarativeContainer):
 config = providers.Configuration(yaml_files=["config.yml"])
 logging = providers.Resource(
 logging.basicConfig,
 stream=sys.stdout,
 level=config.log.level,
 format=config.log.format,
 )

The configuration file will keep the logging settings. Put next lines into the config.yml file:

log:
level:"INFO"
format:"[%(asctime)s][%(levelname)s][%(name)s]:%(message)s"

Now let’s create the function that will run our daemon. It’s traditionally called main(). The main() function will start the dispatcher, but we will keep it empty for now. We will create the container instance before calling main() in if __name__ == "__main__". Container instance will parse config.yml and then we will call the logging configuration provider.

Put next lines into the __main__.py file:

"""Main module."""
from.containersimport Container
defmain() -> None:
 ...
if __name__ == "__main__":
 container = Container()
 container.init_resources()
 main()

Note

Container is the first object in the application.

Logging and configuration parsing part is done. In next section we will create the monitoring checks dispatcher.

Dispatcher

Now let’s add the monitoring checks dispatcher.

The dispatcher will control a list of the monitoring tasks. It will execute each task according to the configured schedule. The Monitor class is the base class for all the monitors. You can create different monitors by subclassing it and implementing the check() method.

../_images/classes-01.png

Let’s create dispatcher and the monitor base classes.

Create dispatcher.py and monitors.py in the monitoringdaemon package:

./
├──monitoringdaemon/
│├──__init__.py
│├──__main__.py
│├──containers.py
│├──dispatcher.py
│└──monitors.py
├──config.yml
├──docker-compose.yml
├──Dockerfile
└──requirements.txt

Put next into the monitors.py:

"""Monitors module."""
importlogging
classMonitor:
 def__init__(self, check_every: int) -> None:
 self.check_every = check_every
 self.logger = logging.getLogger(self.__class__.__name__)
 async defcheck(self) -> None:
 raise NotImplementedError()

and next into the dispatcher.py:

"""Dispatcher module."""
importasyncio
importlogging
importsignal
importtime
fromtypingimport List
from.monitorsimport Monitor
classDispatcher:
 def__init__(self, monitors: List[Monitor]) -> None:
 self._monitors = monitors
 self._monitor_tasks: List[asyncio.Task] = []
 self._logger = logging.getLogger(self.__class__.__name__)
 self._stopping = False
 defrun(self) -> None:
 asyncio.run(self.start())
 async defstart(self) -> None:
 self._logger.info("Starting up")
 for monitor in self._monitors:
 self._monitor_tasks.append(
 asyncio.create_task(self._run_monitor(monitor)),
 )
 asyncio.get_event_loop().add_signal_handler(signal.SIGTERM, self.stop)
 asyncio.get_event_loop().add_signal_handler(signal.SIGINT, self.stop)
 await asyncio.gather(*self._monitor_tasks, return_exceptions=True)
 self.stop()
 defstop(self) -> None:
 if self._stopping:
 return
 self._stopping = True
 self._logger.info("Shutting down")
 for task, monitor in zip(self._monitor_tasks, self._monitors):
 task.cancel()
 self._monitor_tasks.clear()
 self._logger.info("Shutdown finished successfully")
 @staticmethod
 async def_run_monitor(monitor: Monitor) -> None:
 def_until_next(last: float) -> float:
 time_took = time.time() - last
 return monitor.check_every - time_took
 while True:
 time_start = time.time()
 try:
 await monitor.check()
 except asyncio.CancelledError:
 break
 except Exception:
 monitor.logger.exception("Error executing monitor check")
 await asyncio.sleep(_until_next(last=time_start))

Now we need to add the dispatcher to the container.

Edit containers.py:

"""Containers module."""
importlogging
importsys
fromdependency_injectorimport containers, providers
from.import dispatcher

classContainer(containers.DeclarativeContainer):
 config = providers.Configuration(yaml_files=["config.yml"])
 logging = providers.Resource(
 logging.basicConfig,
 stream=sys.stdout,
 level=config.log.level,
 format=config.log.format,
 )
 dispatcher = providers.Factory(
 dispatcher.Dispatcher,
 monitors=providers.List(
 # TODO: add monitors
 ),
 )

At the last we will inject dispatcher into the main() function and call the run() method. We will use Wiring feature.

Edit __main__.py:

"""Main module."""
fromdependency_injector.wiringimport Provide, inject
from.dispatcherimport Dispatcher
from.containersimport Container
@inject
defmain(dispatcher: Dispatcher = Provide[Container.dispatcher]) -> None:
 dispatcher.run()

if __name__ == "__main__":
 container = Container()
 container.init_resources()
 container.wire(modules=[__name__])

 main()

Finally let’s start the daemon to check that all works.

Run in the terminal:

dockercomposeup

The output should look like:

Startingasyncio-daemon-tutorial_monitor_1...done
Attachingtoasyncio-daemon-tutorial_monitor_1
monitor_1|[2020-08-0816:12:35,772][INFO][Dispatcher]:Startingup
monitor_1|[2020-08-0816:12:35,774][INFO][Dispatcher]:Shuttingdown
monitor_1|[2020-08-0816:12:35,774][INFO][Dispatcher]:Shutdownfinishedsuccessfully
asyncio-daemon-tutorial_monitor_1exitedwithcode0

Everything works properly. Dispatcher starts up and exits because there are no monitoring tasks.

By the end of this section we have the application skeleton ready. In next section will will add first monitoring task.

Example.com monitor

In this section we will add a monitoring task that will check the availability of the http://example.com.

We will start from the extending of our class model with a new type of the monitoring check, the HttpMonitor.

The HttpMonitor is a subclass of the Monitor. We will implement the check() method that will send the HTTP request to the specified URL. The http request sending will be delegated to the HttpClient.

../_images/classes-02.png

First we need to create the HttpClient.

Create http.py in the monitoringdaemon package:

./
├──monitoringdaemon/
│├──__init__.py
│├──__main__.py
│├──containers.py
│├──dispatcher.py
│├──http.py
│└──monitors.py
├──config.yml
├──docker-compose.yml
├──Dockerfile
└──requirements.txt

and put next into it:

"""Http client module."""
fromaiohttpimport ClientSession, ClientTimeout, ClientResponse
classHttpClient:
 async defrequest(self, method: str, url: str, timeout: int) -> ClientResponse:
 async with ClientSession(timeout=ClientTimeout(timeout)) as session:
 async with session.request(method, url) as response:
 return response

Now we need to add the HttpClient to the container.

Edit containers.py:

"""Containers module."""
importlogging
importsys
fromdependency_injectorimport containers, providers
from.import http, dispatcher

classContainer(containers.DeclarativeContainer):
 config = providers.Configuration(yaml_files=["config.yml"])
 logging = providers.Resource(
 logging.basicConfig,
 stream=sys.stdout,
 level=config.log.level,
 format=config.log.format,
 )
 http_client = providers.Factory(http.HttpClient)

 dispatcher = providers.Factory(
 dispatcher.Dispatcher,
 monitors=providers.List(
 # TODO: add monitors
 ),
 )

Now we’re ready to add the HttpMonitor. We will add it to the monitors module.

Edit monitors.py:

"""Monitors module."""
importlogging
importtime
fromtypingimport Dict, Any
from.httpimport HttpClient

classMonitor:
 def__init__(self, check_every: int) -> None:
 self.check_every = check_every
 self.logger = logging.getLogger(self.__class__.__name__)
 async defcheck(self) -> None:
 raise NotImplementedError()
classHttpMonitor(Monitor):
 def__init__(
 self,
 http_client: HttpClient,
 options: Dict[str, Any],
 ) -> None:
 self._client = http_client
 self._method = options.pop("method")
 self._url = options.pop("url")
 self._timeout = options.pop("timeout")
 super().__init__(check_every=options.pop("check_every"))
 async defcheck(self) -> None:
 time_start = time.time()
 response = await self._client.request(
 method=self._method,
 url=self._url,
 timeout=self._timeout,
 )
 time_end = time.time()
 time_took = time_end - time_start
 self.logger.info(
 "Check\n"
 " %s%s\n"
 " response code: %s\n"
 " content length: %s\n"
 " request took: %s seconds",
 self._method,
 self._url,
 response.status,
 response.content_length,
 round(time_took, 3)
 )

We have everything ready to add the http://example.com monitoring check. We make two changes in the container:

  • Add the factory provider example_monitor.

  • Inject the example_monitor into the dispatcher.

Edit containers.py:

"""Containers module."""
importlogging
importsys
fromdependency_injectorimport containers, providers
from.import http, monitors, dispatcher

classContainer(containers.DeclarativeContainer):
 config = providers.Configuration(yaml_files=["config.yml"])
 logging = providers.Resource(
 logging.basicConfig,
 stream=sys.stdout,
 level=config.log.level,
 format=config.log.format,
 )
 http_client = providers.Factory(http.HttpClient)
 example_monitor = providers.Factory(
 monitors.HttpMonitor,
 http_client=http_client,
 options=config.monitors.example,
 )

 dispatcher = providers.Factory(
 dispatcher.Dispatcher,
 monitors=providers.List(
 example_monitor,
 ),
 )

Provider example_monitor has a dependency on the configuration options. Let’s define these options.

Edit config.yml:

log:
level:"INFO"
format:"[%(asctime)s][%(levelname)s][%(name)s]:%(message)s"
monitors:
example:
method:"GET"
url:"http://example.com"
timeout:5
check_every:5

All set. Start the daemon to check that all works.

Run in the terminal:

dockercomposeup

You should see:

Startingasyncio-daemon-tutorial_monitor_1...done
Attachingtoasyncio-daemon-tutorial_monitor_1
monitor_1|[2020-08-0817:06:41,965][INFO][Dispatcher]:Startingup
monitor_1|[2020-08-0817:06:42,033][INFO][HttpMonitor]:Check
monitor_1|GEThttp://example.com
monitor_1|responsecode:200
monitor_1|contentlength:648
monitor_1|requesttook:0.067seconds
monitor_1|[2020-08-0817:06:47,040][INFO][HttpMonitor]:Check
monitor_1|GEThttp://example.com
monitor_1|responsecode:200
monitor_1|contentlength:648
monitor_1|requesttook:0.073seconds

Our daemon can monitor http://example.com availability.

Let’s add a monitor for the https://httpbin.org.

Httpbin.org monitor

Adding of a monitor for the https://httpbin.org will be much easier because we have all the components ready. We just need to create a new provider in the container and update the configuration.

Edit containers.py:

"""Containers module."""
importlogging
importsys
fromdependency_injectorimport containers, providers
from.import http, monitors, dispatcher
classContainer(containers.DeclarativeContainer):
 config = providers.Configuration(yaml_files=["config.yml"])
 logging = providers.Resource(
 logging.basicConfig,
 stream=sys.stdout,
 level=config.log.level,
 format=config.log.format,
 )
 http_client = providers.Factory(http.HttpClient)
 example_monitor = providers.Factory(
 monitors.HttpMonitor,
 http_client=http_client,
 options=config.monitors.example,
 )
 httpbin_monitor = providers.Factory(
 monitors.HttpMonitor,
 http_client=http_client,
 options=config.monitors.httpbin,
 )

 dispatcher = providers.Factory(
 dispatcher.Dispatcher,
 monitors=providers.List(
 example_monitor,
 httpbin_monitor,
 ),
 )

Edit config.yml:

log:
level:"INFO"
format:"[%(asctime)s][%(levelname)s][%(name)s]:%(message)s"
monitors:
example:
method:"GET"
url:"http://example.com"
timeout:5
check_every:5
httpbin:
method:"GET"
url:"https://httpbin.org/get"
timeout:5
check_every:5

Let’s start the daemon and check the logs.

Run in the terminal:

dockercomposeup

You should see:

Startingasyncio-daemon-tutorial_monitor_1...done
Attachingtoasyncio-daemon-tutorial_monitor_1
monitor_1|[2020-08-0818:09:08,540][INFO][Dispatcher]:Startingup
monitor_1|[2020-08-0818:09:08,618][INFO][HttpMonitor]:Check
monitor_1|GEThttp://example.com
monitor_1|responsecode:200
monitor_1|contentlength:648
monitor_1|requesttook:0.077seconds
monitor_1|[2020-08-0818:09:08,722][INFO][HttpMonitor]:Check
monitor_1|GEThttps://httpbin.org/get
monitor_1|responsecode:200
monitor_1|contentlength:310
monitor_1|requesttook:0.18seconds
monitor_1|[2020-08-0818:09:13,619][INFO][HttpMonitor]:Check
monitor_1|GEThttp://example.com
monitor_1|responsecode:200
monitor_1|contentlength:648
monitor_1|requesttook:0.066seconds
monitor_1|[2020-08-0818:09:13,681][INFO][HttpMonitor]:Check
monitor_1|GEThttps://httpbin.org/get
monitor_1|responsecode:200
monitor_1|contentlength:310
monitor_1|requesttook:0.126seconds

The functional part is done. Daemon monitors http://example.com and https://httpbin.org.

In next section we will add some tests.

Tests

In this section we will add some tests.

We will use pytest and coverage.

Create tests.py in the monitoringdaemon package:

./
├──monitoringdaemon/
│├──__init__.py
│├──__main__.py
│├──containers.py
│├──dispatcher.py
│├──http.py
│├──monitors.py
│└──tests.py
├──config.yml
├──docker-compose.yml
├──Dockerfile
└──requirements.txt

and put next into it:

"""Tests module."""
importasyncio
importdataclasses
fromunittestimport mock
importpytest
from.containersimport Container
@dataclasses.dataclass
classRequestStub:
 status: int
 content_length: int
@pytest.fixture
defcontainer():
 return Container(
 config={
 "log": {
 "level": "INFO",
 "formant": "[%(asctime)s] [%(levelname)s] [%(name)s]: %(message)s",
 },
 "monitors": {
 "example": {
 "method": "GET",
 "url": "http://fake-example.com",
 "timeout": 1,
 "check_every": 1,
 },
 "httpbin": {
 "method": "GET",
 "url": "https://fake-httpbin.org/get",
 "timeout": 1,
 "check_every": 1,
 },
 },
 }
 )
@pytest.mark.asyncio
async deftest_example_monitor(container, caplog):
 caplog.set_level("INFO")
 http_client_mock = mock.AsyncMock()
 http_client_mock.request.return_value = RequestStub(
 status=200,
 content_length=635,
 )
 with container.http_client.override(http_client_mock):
 example_monitor = container.example_monitor()
 await example_monitor.check()
 assert "http://fake-example.com" in caplog.text
 assert "response code: 200" in caplog.text
 assert "content length: 635" in caplog.text
@pytest.mark.asyncio
async deftest_dispatcher(container, caplog, event_loop):
 caplog.set_level("INFO")
 example_monitor_mock = mock.AsyncMock()
 httpbin_monitor_mock = mock.AsyncMock()
 with container.override_providers(
 example_monitor=example_monitor_mock,
 httpbin_monitor=httpbin_monitor_mock,
 ):
 dispatcher = container.dispatcher()
 event_loop.create_task(dispatcher.start())
 await asyncio.sleep(0.1)
 dispatcher.stop()
 assert example_monitor_mock.check.called
 assert httpbin_monitor_mock.check.called

Run in the terminal:

dockercomposerun--rmmonitorpy.testmonitoringdaemon/tests.py--cov=monitoringdaemon

You should see:

platformlinux--Python3.13.1,pytest-8.3.4,pluggy-1.5.0
rootdir:/code
plugins:cov-6.0.0,asyncio-0.24.0
asyncio:mode=Mode.STRICT,default_loop_scope=None
collected2items
monitoringdaemon/tests.py..[100%]
----------coverage:platformlinux,python3.10.0-final-0-----------
NameStmtsMissCover
----------------------------------------------------
monitoringdaemon/__init__.py00100%
monitoringdaemon/__main__.py11110%
monitoringdaemon/containers.py110100%
monitoringdaemon/dispatcher.py45589%
monitoringdaemon/http.py6350%
monitoringdaemon/monitors.py23196%
monitoringdaemon/tests.py350100%
----------------------------------------------------
TOTAL1312085%

Note

Take a look at the highlights in the tests.py.

In the test_example_monitor it emphasizes the overriding of the HttpClient. The real HTTP calls are mocked.

In the test_dispatcher we override both monitors with the mocks.

Conclusion

In this tutorial we’ve built an asyncio monitoring daemon following the dependency injection principle. We’ve used the Dependency Injector as a dependency injection framework.

With a help of Containers and Providers we have defined how to assemble application components.

List provider helped to inject a list of monitors into dispatcher. Configuration provider helped to deal with reading YAML file.

We used Wiring feature to inject dispatcher into the main() function. Provider overriding feature helped in testing.

We kept all the dependencies injected explicitly. This will help when you need to add or change something in future.

You can find complete project on the Github.

What’s next?

Sponsor the project on GitHub:

[フレーム]