-
-
Notifications
You must be signed in to change notification settings - Fork 203
-
I've sorta come up with two methods of submitting tasks, and gathering their results via polling. I am wondering if you'd recommend either of these methods, or have another suggestion.
The first uses Dramatiq. Run python3.12 test.py and in another terminal, run dramatiq --processes 2 --threads 2 test. This one works, although I'm not sure if the authors would recommend it. I'm currently asking them about this.
import asyncio import time import dramatiq from dramatiq.brokers.redis import RedisBroker from dramatiq.middleware import AsyncIO from dramatiq.results import Results from dramatiq.results.backend import DEFAULT_TIMEOUT from dramatiq.results.backends.redis import RedisBackend from dramatiq.results.errors import ResultMissing, ResultTimeout from quart import Quart, jsonify class RedisBackendExt(RedisBackend): def get_result_from_message_key(self, message_key, *, block=False, timeout=None): if timeout is None: timeout = DEFAULT_TIMEOUT if block: timeout = int(timeout / 1000) if timeout == 0: data = self.client.rpoplpush(message_key, message_key) else: data = self.client.brpoplpush(message_key, message_key, timeout) if data is None: raise ResultTimeout(message_key) else: data = self.client.lindex(message_key, 0) if data is None: raise ResultMissing(message_key) return self.unwrap_result(self.encoder.decode(data)) backend = RedisBackendExt() broker = RedisBroker(host='localhost', port=6379) broker.add_middleware(Results(backend=backend)) broker.add_middleware(AsyncIO()) dramatiq.set_broker(broker) app = Quart(__name__) # totally decoupled from dramatiq ! I was unable to achieve this with Flask + Celery so far. @dramatiq.actor(store_results=True) async def cut_granite_with_toothpick(enqueue_time: float): print(f'{enqueue_time=}') await asyncio.sleep(4) return 'finally, the granite is cut...' @app.get('/message/<string:message_key>') async def get_result(message_key): result = backend.get_result_from_message_key(message_key) return jsonify({'result': result}), 200 @app.get('/') async def index(): enqueue_time = time.time() message = cut_granite_with_toothpick.send(enqueue_time) message_key = backend.build_message_key(message) print(f'http://127.0.0.1:8000/message/{message_key}') return jsonify({'message_key': message_key}), 202 if __name__ == '__main__': app.run(host='127.0.0.1', port=8000, debug=True)
The second is an idea is to modify Quart.add_background_task, and return an auto-generated task_id, and poll using that. Each background task function would need to insert a task_id - result into Redis manually. Polling endpoints would need to similarly query Redis for results.
Beta Was this translation helpful? Give feedback.