This repository contains an Apache Airflow DAG to synchronize the ODS (Opendatasoft) monitoring datasets with a local database. This is useful if the usage of the Opendatasoft-Portal should be analyzed and backtraceable beyond the timerange limit of the ODS-contract.
-
Setting Port: Start an Airflow service, e.g. by using a Docker image. The current DAG runs with the image
apache/airflow:2.8.1andpostgres:14-alpine. Also set up a (postgres) database to store the tables and install the python libraries in therequirements.txtfile. -
Environment Variables: Rename
.env-exampleto.envand set the parameters. -
Hardcoded Links:
ogd_web_analytics_utilities.pycontains two constantsUSER_DATA_API_URLandDATASETS_DATA_API_URLthat are hardcoded to point towards the monitoring endpoints of data.bl.ch. They have to be switched with the correct urls for your portal.
ogd_web_analytics_etl.py
The DAG is documented inline using md docs. Utility functions are loaded from ogd_web_analytics_utilities.py.
The tables that need to exist are the following two, where user_actions contains data from the endpoint [portal]/api/explore/v2.1/monitoring/datasets/ods-api-monitoring and datasets contains data from the endpoint [portal]/api/explore/v2.1/monitoring/datasets/ods-datasets-monitoring
table ogd_analytics.user_actions ( timestamp TIMESTAMP, user_ip_addr VARCHAR, user_id VARCHAR, dataset_id VARCHAR, api_type VARCHAR, mobile VARCHAR, action VARCHAR, attributes VARCHAR, filename VARCHAR, query_text VARCHAR );
table ogd_analytics.datasets ( timestamp TIMESTAMP, dataset_id VARCHAR, title VARCHAR, modified VARCHAR, publisher VARCHAR, license VARCHAR, keyword VARCHAR, theme VARCHAR, api_call_count INTEGER, download_count INTEGER, records_count INTEGER, visibility VARCHAR );
It is advisable to cleanse the monitoring data before publishing as bots are not filtered out automatically and internal users may should be omitted. data.bl.ch uses the following two views to cleanse the raw data in the above tables. The views filter out usage from bots and logged in users, and create continuous date series with missing values if for some days no data is available.
CREATE OR REPLACE VIEW ogd_analytics.daily_external_user_dataset_interactions AS WITH date_range AS ( SELECT generate_series(min(date_trunc('day'::text, user_actions."timestamp")), max(date_trunc('day'::text, user_actions."timestamp")), '1 day'::interval) AS date FROM ogd_analytics.user_actions ), interaction_counts AS ( SELECT date_trunc('day'::text, user_actions."timestamp") AS date, count(user_actions."timestamp") AS dataset_interactions FROM ogd_analytics.user_actions WHERE user_actions.user_id::text = 'anonymous'::text AND user_actions.dataset_id::text !~~ '%NULL%'::text AND (user_actions.user_agent IS NULL OR user_actions.user_agent !~~* '%bot%'::text) GROUP BY (date_trunc('day'::text, user_actions."timestamp")) ) SELECT dr.date, ac.dataset_interactions FROM date_range dr LEFT JOIN interaction_counts ac ON dr.date = ac.date ORDER BY dr.date;
CREATE OR REPLACE VIEW ogd_analytics.daily_unique_external_user_ip_count AS WITH date_range AS ( SELECT generate_series(min(date_trunc('day'::text, user_actions."timestamp")), max(date_trunc('day'::text, user_actions."timestamp")), '1 day'::interval) AS date FROM ogd_analytics.user_actions ), action_counts AS ( SELECT date_trunc('day'::text, user_actions."timestamp") AS date, count(DISTINCT user_actions.user_ip_addr) AS unique_ip_count FROM ogd_analytics.user_actions WHERE user_actions.user_id::text = 'anonymous'::text AND (user_actions.user_agent IS NULL OR user_actions.user_agent !~~* '%bot%'::text) GROUP BY (date_trunc('day'::text, user_actions."timestamp")) ) SELECT dr.date, ac.unique_ip_count FROM date_range dr LEFT JOIN action_counts ac ON dr.date = ac.date ORDER BY dr.date;
The sql script in dags>ogd>sql shows how the views can be accessed for synchronization with the portal.
If you have troubles or questions please don't hesitate to contact us at ogd@bl.ch