7,946 questions
- Bountied 0
- Unanswered
- Frequent
- Score
- Trending
- Week
- Month
- Unanswered (my tags)
-3
votes
2
answers
72
views
"Event loop closed" with asyncio/asyncpg
I clearly have a fundamental misunderstanding of how async works. In Python 3.14, this snipped:
import asyncio
import asyncpg
def adapted_async():
conn = asyncio.run(asyncpg.connect(...
1
vote
2
answers
52
views
Is it okay to use a synchronous decorator on a FastAPI route?
I want to add a decorator on a FastAPI route like this:
/admin/verification.py
from functools import wraps
import json
def admin_required(admin_function):
@wraps(admin_function)
def ...
-1
votes
0
answers
39
views
asyncio signal handler task is terminated before completing await during shutdown
I am writing a Python consumer for RabbitMQ Streams using an asynchronous library. I want to save the message offset to the stream server when the script is stopped (via SIGINT/Ctrl+C).
However, the ...
2
votes
2
answers
70
views
What causes memory leaks when using Python asyncio tasks in a long-running service?
I’m building a long-running background service in Python using asyncio, and I’m facing a memory usage issue that keeps growing over time. The service schedules multiple async tasks every few seconds, ...
Best practices
0
votes
2
replies
92
views
How to program equivalent of javascript event handler in python using asyncio?
Context
I have been programming in javascript/typescript a lot and now I have to write a program that opens a TCP connection and performs some data exchange tasks initiated by incoming messages over ...
1
vote
2
answers
93
views
FastAPI: Not able to use AsyncMock in "async with connection.execute()" context for aiosqlite (async sqlite3 library) while testing the endpoint
I am using Python 3.13.2
I have async with connection.execute(query) as cursor: block in my endpoint, which I want to mock (connection is an object generated by await aiosqlite.connection(':memory:').
...
-3
votes
1
answer
74
views
How can i stop debug output of asyncserialserver? [closed]
I run StartAsyncSerialServer from pymodbus and get a lot of debug output into my logfiles like:
>>>>> recv: 0x11 0xc1 extra data: 0xfd 0xed 0xff 0x13 0x24 0x44 0x64 0xeb 0xa
>>>...
0
votes
1
answer
101
views
My API returns 500 on successful run, but not when run manually in debug
When I run my code in normal way, my API endpoint returns 500. The problem is I cannot debug, as when I am running in debug mode and going line-by-line, it actually runs OK. The suspected piece of ...
0
votes
1
answer
79
views
Why does my websocket connection get blocked?
I have a simple FastAPI server with just one WebSocket endpoint. It receives a terminal command, executes it locally, and broadcasts the stdout to the client every second:
from fastapi import FastAPI, ...
-1
votes
0
answers
33
views
Why does asyncio.gather() not run my coroutines concurrently when one of them contains a blocking function, and what is the correct way to fix it? [duplicate]
I have following code:
import asyncio
import time
async def task1():
print("task1 start")
time.sleep(2) # intentionally blocking
print("task1 end")
async def task2():...
0
votes
1
answer
50
views
FastAPI: Not able to use AsyncMock to fetch mock rows from aiosqlite (async sqlite3 library) while testing the endpoint
I am using Python 3.13.2
I am trying a sample use case of using AsyncMock to simulate fetching of mock rows from DB to test my endpoint (using pytest-asyncio).
I have also included a non-mock endpoint ...
1
vote
1
answer
94
views
Why does asyncio.gather() not cancel pending tasks when one task fails?
I'm using asyncio.gather to run multiple coroutines in parallel. When one coroutine raises an exception, I expect the remaining tasks to be cancelled. However, they continue running.
import asyncio
...
2
votes
1
answer
77
views
asyncio freezes when spawning multiprocessing process on Windows but works fine on Linux
I’m experimenting with mixing asyncio and multiprocessing in Python 3.12.
On Linux, the following code works as expected — the event loop stays responsive and the child process prints normally.
On ...
2
votes
1
answer
131
views
How can I create a tool for langgaph's agent to save data in db?
I created an agent using Langgraph in Python, and I developed a tool for them to save a todo in a database. But that tool doesn't work and raises an error!
I use SQLAlchemy to connect to the DB.
This ...
0
votes
1
answer
62
views
Monitor `asyncio.create_subprocess_exec` pipes for errors
I am trying to pipe multiple Linux commands and abort if there is an error. With Popen the communicate() method waits for all commands to finish. This is why I am trying asyncio now.
I have the ...