0

I have a DAG running in Airflow, and one of the tasks is to make hundreds of requests to an external API. I did this asynchronously using HTTPX. In the middle of the task, it stops logging and seems frozen, without sending any logs. I’ve already set a timeout in HTTPX and a retry policy. When I run the same code on my local machine, it works perfectly.

Would you like help troubleshooting why it's freezing in Airflow?

class Client:
 async def __aenter__(self):
 retry = Retry(total=3, backoff_factor=0.5)
 xauth = httpx.BasicAuth(username=self.login, password=self.passwd)
 self._client = httpx.AsyncClient(
 base_url=f"https://{self.root_domain}",
 timeout=30,
 transport=RetryTransport(retry=retry),
 auth=xauth,
 )
 return self
 async def __aexit__(self, exc_type, exc_val, exc_tb):
 await self._client.aclose()
 async def get_agendas(
 self,
 sem: asyncio.Semaphore,
 id_produto: int,
 data: str,
 dias: int = 15,
 ) -> dict:

This is the service calling the Httpx Client

async def run():
 for produto in produtos:
 id_produto = int(produto["id_produto"])
 today_date = datetime.today().strftime("%Y-%m-%d")
 
 agendas = client.get_agendas(id_produto=id_produto, data=today_date, sem=sem)
 tasks.append(agendas)
 print(f"Sending in parallel: {task_count}")
 results = await asyncio.gather(*tasks)
 return results

The python operator has:

import asyncio
result = asyncio.run(run())

This is the log. The task really stopped, but it seens strange as its running on my computer smothly.

ERROR 2025年04月29日T10:34:39.460534199Z Detected zombie job: {'full_filepath': '/home/airflow/gcs/dags/load_healthcare_reference_data.py', 'processor_subdir': '/home/airflow/gcs/dags', 'msg': "{'DAG Id': 'load_healthcare_reference_data', 'Task Id': 'xxxx', 'Run Id': 'scheduled__2025年04月28日T00:00:00+00:00', 'Hostname': 'airflow-worker-ltg69', 'External Executor Id': '2b783354-951d-4351-8b72-33c10effd48e'}", 'simple_task_instance': <airflow.models.taskinstance.SimpleTaskInstance object at 0x7db8f0841790>, 'task_callback_type': None} (See https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html#zombie-undead-tasks) { "textPayload": "Detected zombie job: {'full_filepath': '/home/airflow/gcs/dags/load_healthcare_reference_data.py', 'processor_subdir': '/home/airflow/gcs/dags', 'msg': "{'DAG Id': 'load_healthcare_reference_data', 'Task Id': 'xxxx', 'Run Id': 'scheduled__2025年04月28日T00:00:00+00:00', 'Hostname': 'airflow-worker-ltg69', 'External Executor Id': '2b783354-951d-4351-8b72-33c10effd48e'}", 'simple_task_instance': <airflow.models.taskinstance.SimpleTaskInstance object at 0x7db8f0841790>, 'task_callback_type': None} (See https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html#zombie-undead-tasks)", "insertId": "lxd298f9jfilu", "resource": { "type": "cloud_composer_environment", "labels": { "location": "us-east1", "project_id": "primary-care-378721", "environment_name": "ai-agents" } }, "timestamp": "2025年04月29日T10:34:39.460534199Z", "severity": "ERROR", "labels": { "scheduler_id": "airflow-scheduler-7966ddd44b-4jfpm", "process": "scheduler_job_runner.py:2001" }, "logName": "projects/primary-care-378721/logs/airflow-scheduler", "receiveTimestamp": "2025年04月29日T10:34:43.179672035Z" }

asked Apr 29, 2025 at 11:08
1
  • 1
    So, basically, you shouldn't do long computing or heavy computing inside Airflow tasks. You should instead delegate it to another runner. Also, it would help us answer if we saw how you function is passed to Airflow. Each task has its own context and might not be thread-safe, so async might cause troubles here. Finally, each Airflow task should produce the same output if run once or multiple times. What happens when your task fails during a run and you retry it ? I guess you'll have dupplicates. Commented Apr 29, 2025 at 13:39

0

Know someone who can answer? Share a link to this question via email, Twitter, or Facebook.

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.