0

I'm processing webhooks on Cloud Run (Django) that need async handling because processing takes 30+ seconds but the webhook provider times out at 30s.

Since Cloud Run is stateless and spins up per-request (no persistent background workers like Celery), I'm using this pattern:

# 1. Webhook endpoint
def receive_webhook(request):
 blob_name = f"webhooks/{uuid.uuid4()}.json"
 bucket.blob(blob_name).upload_from_string(json.dumps(request.data))
 
 webhook = WebhookPayload.objects.create(gcs_path=blob_name)
 create_cloud_task(payload_id=webhook.id)
 
 return Response(status=200) # Fast response

And then our cloud task calls the following endpoint with the unique path to the cloud storage url passed from the original webhook endpoint:

def process_webhook(request):
 webhook = WebhookPayload.objects.get(id=request.data['payload_id'])
 payload = json.loads(bucket.blob(webhook.gcs_path).download_as_text())
 
 process_data(payload) # 30+ seconds
 bucket.blob(webhook.gcs_path).delete()

My main query points:

  1. Is GCS + Cloud Tasks the right pattern for Cloud Run's model, or is storing JSON directly temporarily in a django model a better approach since Cloud Tasks handles the queueing?

  2. Should I be using Pub/Sub instead? My understanding is that pubsub would be more appropriate for broadcasting to numerous subscribers, currently I only have the one django monolith.

Thanks for any advice that comes my way.

asked Nov 10 at 0:05

1 Answer 1

0

I would say use Cloud Tasks to decouple the webhook response from processing. It gives retries, rate limiting, and auth to Cloud Run. Also store the payload where it best fits, for example, put the JSON directly in the task body for less than 1 MB.
Cloud Tasks + DB or GCS for large blobs is the right fit. Keep URLs accurate with Tasks; and use DB for transactions, GCS for big payloads.

# Webhook (fast)
def receive_webhook(request):
 payload = request.get_json()
 rec = WebhookPayload.objects.create(
 external_id=payload.get("event_id"), 
 data=payload, # if small, else store in GCS and save path
 status="queued",
 )
 create_cloud_task(
 url=PROCESS_URL,
 body={"payload_id": rec.id},
 oidc_service_account=SA_EMAIL,
 retry={"maxAttempts": 10, "maxBackoff": "600s"},
 rate_limit=5,
 )
 return Response(status=200)
# Task handler
def process_webhook(request):
 rec = WebhookPayload.objects.get(id=request.json["payload_id"])
 if rec.status == "done": # idempotency
 return Response(status=200)
 process_data(rec.data) # or load from GCS if you stored a path
 rec.status = "done"
 rec.save()
 return Response(status=200)
answered Nov 13 at 20:47
New contributor
Ardeshir Shojaei is a new contributor to this site. Take care in asking for clarification, commenting, and answering. Check out our Code of Conduct.
Sign up to request clarification or add additional context in comments.

1 Comment

Thank you - yes I was initially just using cloud tasks, but some payloads were larger than 1MB so reassuring that using GCS for those larger blobs and then accessing the necessary URL is a reasonable approach

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.