I have a DAG running in Airflow, and one of the tasks is to make hundreds of requests to an external API. I did this asynchronously using HTTPX. In the middle of the task, it stops logging and seems frozen, without sending any logs. I’ve already set a timeout in HTTPX and a retry policy. When I run the same code on my local machine, it works perfectly.
Would you like help troubleshooting why it's freezing in Airflow?
class Client:
async def __aenter__(self):
retry = Retry(total=3, backoff_factor=0.5)
xauth = httpx.BasicAuth(username=self.login, password=self.passwd)
self._client = httpx.AsyncClient(
base_url=f"https://{self.root_domain}",
timeout=30,
transport=RetryTransport(retry=retry),
auth=xauth,
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self._client.aclose()
async def get_agendas(
self,
sem: asyncio.Semaphore,
id_produto: int,
data: str,
dias: int = 15,
) -> dict:
This is the service calling the Httpx Client
async def run():
for produto in produtos:
id_produto = int(produto["id_produto"])
today_date = datetime.today().strftime("%Y-%m-%d")
agendas = client.get_agendas(id_produto=id_produto, data=today_date, sem=sem)
tasks.append(agendas)
print(f"Sending in parallel: {task_count}")
results = await asyncio.gather(*tasks)
return results
The python operator has:
import asyncio
result = asyncio.run(run())
This is the log. The task really stopped, but it seens strange as its running on my computer smothly.
ERROR 2025年04月29日T10:34:39.460534199Z Detected zombie job: {'full_filepath': '/home/airflow/gcs/dags/load_healthcare_reference_data.py', 'processor_subdir': '/home/airflow/gcs/dags', 'msg': "{'DAG Id': 'load_healthcare_reference_data', 'Task Id': 'xxxx', 'Run Id': 'scheduled__2025年04月28日T00:00:00+00:00', 'Hostname': 'airflow-worker-ltg69', 'External Executor Id': '2b783354-951d-4351-8b72-33c10effd48e'}", 'simple_task_instance': <airflow.models.taskinstance.SimpleTaskInstance object at 0x7db8f0841790>, 'task_callback_type': None} (See https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html#zombie-undead-tasks) { "textPayload": "Detected zombie job: {'full_filepath': '/home/airflow/gcs/dags/load_healthcare_reference_data.py', 'processor_subdir': '/home/airflow/gcs/dags', 'msg': "{'DAG Id': 'load_healthcare_reference_data', 'Task Id': 'xxxx', 'Run Id': 'scheduled__2025年04月28日T00:00:00+00:00', 'Hostname': 'airflow-worker-ltg69', 'External Executor Id': '2b783354-951d-4351-8b72-33c10effd48e'}", 'simple_task_instance': <airflow.models.taskinstance.SimpleTaskInstance object at 0x7db8f0841790>, 'task_callback_type': None} (See https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html#zombie-undead-tasks)", "insertId": "lxd298f9jfilu", "resource": { "type": "cloud_composer_environment", "labels": { "location": "us-east1", "project_id": "primary-care-378721", "environment_name": "ai-agents" } }, "timestamp": "2025年04月29日T10:34:39.460534199Z", "severity": "ERROR", "labels": { "scheduler_id": "airflow-scheduler-7966ddd44b-4jfpm", "process": "scheduler_job_runner.py:2001" }, "logName": "projects/primary-care-378721/logs/airflow-scheduler", "receiveTimestamp": "2025年04月29日T10:34:43.179672035Z" }
asyncmight cause troubles here. Finally, each Airflow task should produce the same output if run once or multiple times. What happens when your task fails during a run and you retry it ? I guess you'll have dupplicates.