I'm reimplementing functionality originally written in Kotlin. It uses Dispatchers.Default
which is recommended for CPU-intensive tasks like JSON parsing to start coroutines listening for Kafka events as so:
val scope = CoroutineScope(Dispatchers.Default) + CoroutineName("KongKafkaBinding")
val jobs =
List(settings.incoming().partitions()) {
scope.async {
val receiver = KafkaReceiver.create(receiverOptions)
listen(receiver)
}
}
scope.launch {
jobs.forEach { it.await() }
}
The closest thing in Python as I see it, is to start a process for each partition, and then to launch an event loop inside each process to handle incoming records asynchronously.
Simplified code of the resulting Python code (also available on github as FastAPI service):
import asyncio
import os
from datetime import timedelta
from multiprocessing import Process, Event
from signal import signal, SIGINT, SIG_IGN
from aiokafka import AIOKafkaConsumer, ConsumerRecord, TopicPartition
from aiokafka import AIOKafkaProducer
from config import KafkaBindingConfig
class KafkaBindingProcesses:
def __init__(self, config: KafkaBindingConfig):
super().__init__()
self.__config = config
# keeping references for started tasks: https://github.com/python/cpython/issues/88831
self.__running_tasks = set()
self.__shutdown_event = Event()
async def on_startup(self):
self.processes = [
Process(
target=self._process_event_loop,
args=(
partition,
self.__shutdown_event,
),
)
for partition in range(self.__config.incoming.partitions)
]
for process in self.processes:
process.start()
def _process_event_loop(
self,
partition: int,
shutdown_event: Event,
):
print(f"{os.getpid()} starting process event loop")
# ingore Ctrl + C so we could stop consumer inside a coroutine
signal(SIGINT, SIG_IGN)
asyncio.run(self._listen(partition, shutdown_event))
async def _listen(self, partition: int, shutdown_event: Event):
print(f"{os.getpid()} listening")
self._consumer = AIOKafkaConsumer(
bootstrap_servers=self.__config.bootstrap.servers,
group_id=self.__config.incoming.group.id,
enable_auto_commit=False,
)
self._consumer.assign([TopicPartition(self.__config.incoming.topic, partition)])
await self._consumer.start()
print(f"{os.getpid()} consumer started")
while True:
if shutdown_event.is_set():
await asyncio.gather(*self.__running_tasks)
await self._consumer.stop()
print(f"{os.getpid()} consumer stopped")
break
# only `getmany` supports pausing, not `getone` nor `__aiter__`
# code pausing event is omitted for simplicity
msgs = await self._consumer.getmany(
timeout_ms=int(
self.__config.incoming.poll.timeout / timedelta(milliseconds=1)
)
)
for tp, messages in msgs.items():
for msg in messages:
task = asyncio.create_task(self._on_event(msg))
self.__running_tasks.add(task)
task.add_done_callback(lambda t: self.__running_tasks.remove(t))
async def _on_event(self, record: ConsumerRecord):
try:
await self._dummy_handler(record)
await self._ack(record)
except Exception as ex:
await self._dlq(record, ex)
async def _dummy_handler(self, record: ConsumerRecord):
print(f"{os.getpid()} dummy_handler", record)
async def on_shutdown(self):
self.__shutdown_event.set()
for process in self.processes:
process.join()
async def _ack(self, record: ConsumerRecord):
task = asyncio.create_task(
self._consumer.commit(
{TopicPartition(record.topic, record.partition): record.offset}
)
)
self.__running_tasks.add(task)
task.add_done_callback(lambda t: (self.__running_tasks.remove(t)))
async def _dlq(self, record: ConsumerRecord, ex: Exception):
async with AIOKafkaProducer(
bootstrap_servers=self.__config.bootstrap.servers,
) as producer:
await producer.send_and_wait(
"dead-letter-topic-" + self.__config.incoming.topic,
key=record.key,
value=record.value,
headers=list(record.headers),
)
My points of concern:
- Graceful shutdown routine: first wait for tasks to complete, only then
join
the processes. - Adding
signal(SIGINT, SIG_IGN)
beforeasyncio.run
so that I can close consumer, otherwise coroutine is killed. CatchingKeyboardInterrupt
inside__listen
don't solve the problem. - Since I'm ignoring
SIG_IGN
, I have to useEvent()
to trigger graceful shutdown inside each process. - I'm creating and starting plain processes, since
multiprocessing.Pool
just doesn't fit the scenario (I have a small, constant number of tasks, no need for chunks). And ProcessPoolExecutor doesn't seem to start submitted tasks at all, if I make it a class attribute (and I need a reference to wait for processes to finish on shutdown):
Logs without self
:
async def on_startup(self):
__pool = Pool(self._config.incoming.partitions)
__pool.map_async(self._process_event_loop, range(self._config.incoming.partitions))
for partition in range(self._config.incoming.partitions):
__pool.submit(self._process_event_loop, partition)
INFO: Started server process [41720]
INFO: Waiting for application startup.
lifespan startup
INFO: Application startup complete.
INFO: Uvicorn running on http://0.0.0.0:8080 (Press CTRL+C to quit)
starting process event loop
listening
consumer started
Logs with self
:
async def on_startup(self):
self.__exec = ProcessPoolExecutor(self._config.incoming.partitions)
for partition in range(self._config.incoming.partitions):
self.__exec.submit(self._process_event_loop, partition)
INFO: Started server process [40340]
INFO: Waiting for application startup.
lifespan startup
INFO: Application startup complete.
INFO: Uvicorn running on http://0.0.0.0:8080 (Press CTRL+C to quit)
Finally my question: is this the right way to solve this problem with the given requirements or I'm overcomplicating?
- process for each partition
- keeping Python implementation as close to Kotlin as possible (keeping
KafkaBinding
class for example)
1 Answer 1
Thank you for using isort
, it
looks great!
I recommend $ echo .idea/ >> .gitignore
,
so spurious PyCharm details don't wind up in the repo.
Certainly I do, but not everyone makes that their IDE.
class hierarchy
super()
I don't understand what's going on here.
class KafkaBindingProcesses:
def __init__(self, ... ):
super().__init__() ...
Given that we didn't specify some cool class to inherit from,
why are we examining the
MRO
and explicitly invoking object
's __init__()
constructor?
There's nothing wrong with doing that, it's not harmful.
But it's not idiomatic python source code.
Maybe we used to have a fancier inheritance setup, and this is just leftover?
extra abstractions
We may have gotten a little carried away, here.
class KafkaGroupConfig(BaseModel):
id: str
class KafkaPollConfig(BaseModel):
timeout: timedelta
class KafkaIncomingConfig(BaseModel):
poll: KafkaPollConfig
topic: str
group: KafkaGroupConfig ...
Those first two classes don't seem to be pulling their weight. Consider rephrasing the third class as just
poll_timeout: timedelta
topic: str
group_id: str ...
Similarly with the KafkaBootstrapConfig
class
and a bootstrap_servers
attribute.
These tiny classes are not reused to DRY up code. And they don't appear to be modeling an interesting Business Domain concept. A helpful """docstring""" on one of them might make it "worth it", but they are so self-explanatory that I can't imagine we'd need more docs.
tuple vs list
In settings_customise_sources()
we see a literal ellipsis in the return type.
) -> Tuple[PydanticBaseSettingsSource, ...]:
I can't imagine that's a good thing.
Maybe you wanted a list
instead?
Also, in modern source code prefer to annotate with tuple
rather than the deprecated typing.Tuple
.
background
Python seems like it's open ended on which container type to choose, tuple vs list. There are some practical aspects that can play into it, like hashable vs not, which stems from immutable vs mutable. But philosophically the difference is this:
- We use a
list
for arbitrary number of "the same thing", e.g.ages: list[int]
. - We use a
tuple
for fixed number of items, where position determines semantics, e.g.tuple[str, int, str]
, or a 2-D point oftuple[float, float]
.
name mangling
self.__running_tasks = set()
self.__shutdown_event = Event()
I can't imagine why you're using the leading __
dunder to request name mangling.
It is seldom that you will find it helpful.
And the current codebase includes no inheriting classes.
Prefer to just create _private
attributes:
self._running_tasks = set()
self._shutdown_event = Event()
logging
Clearly these work just fine:
print(f"{os.getpid()} starting process event loop")
...
print(f"{os.getpid()} listening")
Consider relying on import logging
instead.
It offers the usual benefits, including timestamping and adjustable verbosity.
You could definitely arrange for PID to appear as part
of the standard prefix for each logged message.
SI units
Nice conversion; I thank you for its clarity.
timeout_ms=int(
self.__config.incoming.poll.timeout / timedelta(milliseconds=1)
)
callback
task.add_done_callback(lambda t: self.__running_tasks.remove(t))
No need to invent a new anonymous dispatching function in _listen
and _ack
.
I think this would suffice:
task.add_done_callback(self.__running_tasks.remove)
cryptic abbrev
The identifier is fine:
async def _dlq( ...
But it really needs a """dead letter queue""" docstring.
- Graceful shutdown routine: first wait for tasks to complete, only then join the processes.
The OP code looks good to me as written. Upon completing its current task a worker will notice the .join() request and will gracefully terminate.
- Catching KeyboardInterrupt ...
It's possible you're getting hung up on this detail:
The exception inherits from BaseException so as to not be accidentally caught by code that catches Exception and thus prevent the interpreter from exiting.
It's also possible that you'd prefer to have CTRL/C simply set a flag, which causes termination to happen "soon". Writing a tiny CLI script which queues up a "please terminate!" message might be another attractive option.
- Since I'm ignoring SIG_IGN
Ummm, typo nit: "ignoring SIGINT" -- the SIG_IGN handler is a "do nothing" routine.
design
- process for each partition
- keeping Python implementation as close to Kotlin as possible (keeping KafkaBinding class for example)
Well, a given process might plausibly handle multiple partitions or topics. It's a load thing. The central reason for forking a new process is to get a new GIL, so you can max out another core.
Sure, keeping the two implementations as similar as feasible seems a good practice. Especially if you have an automated e2e system test which can elicit similar results from both implementations. Presumably at some point you'll add a feature to one, and then be faced with a desire to port that feature to the other.
This codebase is clear and achieves its design goals.
I would be willing to delegate or accept maintenance tasks on it.
Explore related questions
See similar questions with these tags.
Process
-es manually. \$\endgroup\$