I am having a problem when trying to do resource cleanup in a background thread. The code running in the thread is meant to run indefinitely or until it is stopped and then cleanup is done. I could use atexit to terminate the thread and do the cleanup. I need the thread to not end abruptly, so making it non-daemonic makes sense but then I cannot use atexit since as long as the thread is running, then atexit won't excute its code and depending on atexit to stop the thread will make the program run 'forever' and no cleanup will ever be done.
Other way is to make the thread daemonic and use atexit to join the thread then do the cleanup. This works great but I hear there is no guarantee the thread will be existing before atexit can run its code; I don't know because every test I have done the the thread has not been killed. Probably because I do not know at what point in the program execution atexit does its thing. I could not find anything in the docs.
Note, I have no control over the indefinitely running code so I cannot change its internals putting things like flags to check if main thread is_alive and stuff.
What I could think of is use another thread to monitor main thread exit then stop the 'forever' running code, ending the thread and then do the cleanup. Looks ugly.
My code look something like this
_control = {"stopFlag": False}
def forever_running_task():
while not _control["stopFlag"]:
"do stuff here"
def stop_that_task():
_control["stopFlag"] = True
# run those cleanups here
this will make the code run forever
t = threading.Thread(target=forever_running_task, daemon=False)
t.start()
atexit.register(stop_that_task) # `stop_that_task` will never be called
this works great, but there are unconfirmed doubts
t = threading.Thread(target=forever_running_code, daemon=True)
t.start()
atexit.register(stop_that_task)
atexit.register(t.join) # apparently `t` might have been killed before `t.join` and/or `stop_that_task` is called
my workaround, hating it
def watch_main():
mt = threading.main_thread()
while mt.is_alive():
time.sleep(1)
stop_that_task()
# do cleanups
t1 = threading.Thread(target=forever_running_task, daemon=False)
t1.start()
t2 = threading.Thread(target=watch_main, daemon=False)
t2.start()
So the question is, how do I end a non-daemonic thread as is in my case. And someone confirm the the thing with deamon threads being killed before atexit can execute.
Using Python 3.10, Windows 10.
Update
My actual code is a follows
import asyncio
from collections.abc import Coroutine
import threading
import time
import aiohttp
import atexit
from typing import Any, TypeVar
from app import SpaLogger
_logger = SpaLogger(name="session-manager")
_T = TypeVar("_T")
class SessionManager:
_base_url: str = "https://www.example.com"
_aclient_session: aiohttp.ClientSession = None
_aevent_loop: asyncio.BaseEventLoop = None
_asession_thread: threading.Thread = None
_started: bool = False
_lock = threading.Lock()
@classmethod
def get_session(cls):
cls._start()
return cls._aclient_session
@classmethod
async def aget_session(cls):
return cls.get_session()
@classmethod
def run_coroutine(cls, coro: Coroutine[Any, Any, _T]):
cls._start()
return asyncio.run_coroutine_threadsafe(coro, cls._aevent_loop).result()
@classmethod
def _start(cls):
with cls._lock:
if cls._started:
if any((cls._aevent_loop.is_closed(), cls._aclient_session.closed, not cls._asession_thread.is_alive())):
raise RuntimeError("Session Manager is corrupted")
return
if not all((cls._aevent_loop is None, cls._aclient_session is None, cls._asession_thread is None)):
raise RuntimeError("Session Manager is corrupted")
async def create_session():
return aiohttp.ClientSession(base_url=cls._base_url)
def start_in_thread():
cls._aevent_loop = el = asyncio.new_event_loop()
asyncio.set_event_loop(el)
cls._aclient_session = el.run_until_complete(create_session())
cls._started = True
el.run_forever()
cls._asession_thread = threading.Thread(target=start_in_thread, name="SessionManagerThread", daemon=True)
cls._asession_thread.start()
atexit.register(cls._shutdown)
while not cls._started:
time.sleep(0.1)
_logger.log_info("Session Manager Started")
@classmethod
def _shutdown(cls):
with cls._lock:
if not cls._started:
return
async def close_session():
if cls._aclient_session is not None and not cls._aclient_session.closed:
await cls._aclient_session.close()
await asyncio.sleep(.500)
_logger.log_info("Client Session Closed")
async def close_all_pending_tasks():
if cls._aevent_loop is not None and not cls._aevent_loop.is_closed():
pending_tasks = [t for t in asyncio.all_tasks(cls._aevent_loop) if t is not asyncio.current_task()]
for task in pending_tasks:
task.cancel()
await asyncio.gather(*pending_tasks, return_exceptions=True)
_logger.log_info(f"{len(pending_tasks)} Pending tasks cancelled")
if cls._aevent_loop is not None and not cls._aevent_loop.is_closed():
try:
asyncio.run_coroutine_threadsafe(close_session(), cls._aevent_loop).result()
asyncio.run_coroutine_threadsafe(close_all_pending_tasks(), cls._aevent_loop).result()
cls._aevent_loop.call_soon_threadsafe(cls._aevent_loop.stop)
cls._asession_thread.join()
_logger.log_info("Session Thread Exited")
finally:
cls._aevent_loop.close()
cls._aclient_session = cls._aevent_loop = cls._asession_thread = None
cls._started = False
_logger.log_info("Session Manager Closed")
This works fine, my problem is that _asession_thread is a daemon thread. I want to make it non-daemon so that I can have confidence it won't be terminated before the _aclient_session and _aevent_loop and any pending tasks are properlly closed
1 Answer 1
The first one is half right, but the python interpreter will keep running whilst there is at least one non-daemon thread alive. As the interpreter will not exit whilst your child thread is running, it sees no reason to run your atexit callbacks.
Do not use daemon threads. At least not for anything you don't mind being killed at any point during its execution. For instance, your program could be halfway through flushing some writes to disk, but be halted before it can complete. And so could end up with corrupted data on a "normal" program exit.
Instead, you could use contextlib.ExitStack to make sure your child threads are told to shutdown when your main thread exits:
import threading
import time
import contextlib
def child(event):
while True:
if event.is_set():
print('child done')
return
print('in child')
time.sleep(0.25)
def parent():
event = threading.Event()
t = threading.Thread(target=child, args=[event])
def halt_child():
print('halting child')
event.set()
t.join()
print('child halted')
with contextlib.ExitStack() as cm:
cm.callback(halt_child)
t.start()
# do whatever you need to do here
time.sleep(1)
print('last print before end of exit stack block')
assert not t.is_alive()
print('exiting')
Running the above will produce this output:
in child in child in child in child last print before end of exit stack block halting child child done child halted exiting
3 Comments
asyncio event loop handling https requests using aiohttp.ClientSession that I want to keep alive until program terminates,, it's a whole other problem i have.
atexititems, so it won't have ended yet.