-
Notifications
You must be signed in to change notification settings - Fork 6.7k
Open
@Bechamelle
Description
In which file did you encounter the issue?
composer/workflows/airflow_db_cleanup.py
Did you change the file? If so, how?
dags = session.query(airflow_db_model.dag_id).distinct() session.commit() - list_dags = [str(list(dag)[0]) for dag in dags] + [None] + list_dags = [str(list(dag)[0]) for dag in dags] for dag_id in list_dags: query = build_query( session=session,
Describe the issue
Since this commit which removed a check on whether dag_id argument is None, running the airflow_db_cleanup DAG will result in deleting all DAGs runs older than max_db_entry_age_in_days across all DAG IDs without keeping the at least the latest run for each DAG.
As shown in the logs for the final iteration [None] this generates the following query:
[2025年09月19日T13:24:14.981+0000] {airflow_db_cleanup.py:378} INFO - Query: SELECT dag_run.state AS dag_run_state, dag_run.id AS dag_run_id, dag_run.dag_id AS dag_run_dag_id, dag_run.queued_at AS dag_run_queued_at,
dag_run.execution_date AS dag_run_execution_date, dag_run.start_date AS dag_run_start_date, dag_run.end_date AS dag_run_end_date, dag_run.run_id AS dag_run_run_id, dag_run.creating_job_id AS dag_run_creating_job_id,
dag_run.external_trigger AS dag_run_external_trigger, dag_run.run_type AS dag_run_run_type, dag_run.conf AS dag_run_conf, dag_run.data_interval_start AS dag_run_data_interval_start,
dag_run.data_interval_end AS dag_run_data_interval_end, dag_run.last_scheduling_decision AS dag_run_last_scheduling_decision, dag_run.dag_hash AS dag_run_dag_hash,
dag_run.log_template_id AS dag_run_log_template_id, dag_run.updated_at AS dag_run_updated_at, dag_run.clear_number AS dag_run_clear_number
FROM dag_run
WHERE dag_run.execution_date <= %(execution_date_1)s