Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

How to submit background tasks and poll results #400

Unanswered
skwzrd asked this question in Q&A
Discussion options

I've sorta come up with two methods of submitting tasks, and gathering their results via polling. I am wondering if you'd recommend either of these methods, or have another suggestion.

The first uses Dramatiq. Run python3.12 test.py and in another terminal, run dramatiq --processes 2 --threads 2 test. This one works, although I'm not sure if the authors would recommend it. I'm currently asking them about this.

import asyncio
import time
import dramatiq
from dramatiq.brokers.redis import RedisBroker
from dramatiq.middleware import AsyncIO
from dramatiq.results import Results
from dramatiq.results.backend import DEFAULT_TIMEOUT
from dramatiq.results.backends.redis import RedisBackend
from dramatiq.results.errors import ResultMissing, ResultTimeout
from quart import Quart, jsonify
class RedisBackendExt(RedisBackend):
 def get_result_from_message_key(self, message_key, *, block=False, timeout=None):
 if timeout is None:
 timeout = DEFAULT_TIMEOUT
 if block:
 timeout = int(timeout / 1000)
 if timeout == 0:
 data = self.client.rpoplpush(message_key, message_key)
 else:
 data = self.client.brpoplpush(message_key, message_key, timeout)
 if data is None:
 raise ResultTimeout(message_key)
 else:
 data = self.client.lindex(message_key, 0)
 if data is None:
 raise ResultMissing(message_key)
 return self.unwrap_result(self.encoder.decode(data))
backend = RedisBackendExt()
broker = RedisBroker(host='localhost', port=6379)
broker.add_middleware(Results(backend=backend))
broker.add_middleware(AsyncIO())
dramatiq.set_broker(broker)
app = Quart(__name__) # totally decoupled from dramatiq ! I was unable to achieve this with Flask + Celery so far.
@dramatiq.actor(store_results=True)
async def cut_granite_with_toothpick(enqueue_time: float):
 print(f'{enqueue_time=}')
 await asyncio.sleep(4)
 return 'finally, the granite is cut...'
@app.get('/message/<string:message_key>')
async def get_result(message_key):
 result = backend.get_result_from_message_key(message_key)
 return jsonify({'result': result}), 200
@app.get('/')
async def index():
 enqueue_time = time.time()
 message = cut_granite_with_toothpick.send(enqueue_time)
 message_key = backend.build_message_key(message)
 print(f'http://127.0.0.1:8000/message/{message_key}')
 return jsonify({'message_key': message_key}), 202
if __name__ == '__main__':
 app.run(host='127.0.0.1', port=8000, debug=True)

The second is an idea is to modify Quart.add_background_task, and return an auto-generated task_id, and poll using that. Each background task function would need to insert a task_id - result into Redis manually. Polling endpoints would need to similarly query Redis for results.

You must be logged in to vote

Replies: 0 comments

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Category
Q&A
Labels
None yet
1 participant

AltStyle によって変換されたページ (->オリジナル) /