I'm processing webhooks on Cloud Run (Django) that need async handling because processing takes 30+ seconds but the webhook provider times out at 30s.
Since Cloud Run is stateless and spins up per-request (no persistent background workers like Celery), I'm using this pattern:
# 1. Webhook endpoint
def receive_webhook(request):
blob_name = f"webhooks/{uuid.uuid4()}.json"
bucket.blob(blob_name).upload_from_string(json.dumps(request.data))
webhook = WebhookPayload.objects.create(gcs_path=blob_name)
create_cloud_task(payload_id=webhook.id)
return Response(status=200) # Fast response
And then our cloud task calls the following endpoint with the unique path to the cloud storage url passed from the original webhook endpoint:
def process_webhook(request):
webhook = WebhookPayload.objects.get(id=request.data['payload_id'])
payload = json.loads(bucket.blob(webhook.gcs_path).download_as_text())
process_data(payload) # 30+ seconds
bucket.blob(webhook.gcs_path).delete()
My main query points:
Is GCS + Cloud Tasks the right pattern for Cloud Run's model, or is storing JSON directly temporarily in a django model a better approach since Cloud Tasks handles the queueing?
Should I be using Pub/Sub instead? My understanding is that pubsub would be more appropriate for broadcasting to numerous subscribers, currently I only have the one django monolith.
Thanks for any advice that comes my way.
1 Answer 1
I would say use Cloud Tasks to decouple the webhook response from processing. It gives retries, rate limiting, and auth to Cloud Run. Also store the payload where it best fits, for example, put the JSON directly in the task body for less than 1 MB.
Cloud Tasks + DB or GCS for large blobs is the right fit. Keep URLs accurate with Tasks; and use DB for transactions, GCS for big payloads.
# Webhook (fast)
def receive_webhook(request):
payload = request.get_json()
rec = WebhookPayload.objects.create(
external_id=payload.get("event_id"),
data=payload, # if small, else store in GCS and save path
status="queued",
)
create_cloud_task(
url=PROCESS_URL,
body={"payload_id": rec.id},
oidc_service_account=SA_EMAIL,
retry={"maxAttempts": 10, "maxBackoff": "600s"},
rate_limit=5,
)
return Response(status=200)
# Task handler
def process_webhook(request):
rec = WebhookPayload.objects.get(id=request.json["payload_id"])
if rec.status == "done": # idempotency
return Response(status=200)
process_data(rec.data) # or load from GCS if you stored a path
rec.status = "done"
rec.save()
return Response(status=200)
1 Comment
Explore related questions
See similar questions with these tags.