I wrote the following scripts to extract data from BigQuery and InfluxDB using the bigquery cli and the influx cli. The code works as expected. I included all files for completeness, but I'm primarily interested in feedback on the bash scripts config_query.sh
, extract.sh
and run.sh
. You can also comment on the python file and on my queries, if you want, but the focus should be on my bash scripts. The overall goal is to make the scripts as maintainable and reliable as possible, execution speed doesn't matter.
This is how the pipeline works on a high level: The queries are saved in textfiles in the folder /app/queries
. First these textfiles are configured via config_query.sh
, by replacing the start and end strings of the queries. Then extract.sh
passes the configured queries to the respective cli's and saves the data in /app/tmp/raw
. The raw data is then processed and cleaned via transform.py
and saved to /app/tmp/clean
. Finally, all these steps are run via run.sh
. If config_query.sh
sets up a range between the start and end date that is longer than five days, run.sh
splits up the queries into chunks of five days in order to avoid a timeout of the server.
This is an overview the directory structure:
app/
├── queries/
│ └── gads.sql
│ └── leads.flux
│ └── li.sql
│ └── ms.sql
│ └── subs.flux
├── src/
│ └── config_query.sh
│ └── extract.sh
│ └── transform.py
└── tmp/
├── clean/
│ └── gads.csv
│ └── leads.csv
│ └── li.csv
│ └── ms.csv
│ └── subs.csv
├── raw/
│ └── gads.csv
│ └──leads.csv
│ └── li.csv
│ └── ms.csv
│ └── subs.csv
└── gads_startdate_enddate.csv
└── leads_startdate_enddate.csv
└── li_startdate_enddate.csv
└── ms_startdate_enddate.csv
└── subs_startdate_enddate.csv
Example queries in /app/queries
:
# BigQuery query:
WITH dates AS (
SELECT
*
FROM
UNNEST(GENERATE_DATE_ARRAY('2022-01-01', DATE_SUB('2022-01-01', INTERVAL 1 DAY), INTERVAL 1 DAY)) AS date
), gads_data AS (
SELECT DATE(__hevo_report_date) AS date,
campaign,
ad_group,
sum(impressions) AS impressions,
sum(clicks) AS clicks,
ROUND(sum(cost)/1000000, 2) AS spend
FROM `example_table`
WHERE DATE(__hevo_report_date) >= '2022-01-01' AND DATE(__hevo_report_date) < '2022-01-01'
GROUP BY date, campaign, ad_group
)
SELECT *
FROM dates
LEFT OUTER JOIN gads_data
ON dates.date = gads_data.date
# Influx query:
from(bucket: "example_bucket")
|> range(start: 2022年01月01日T00:00:00.000Z, stop: 2022年01月01日T00:00:00.000Z)
|> aggregateWindow(every: 1d, fn: sum, column: "val")
|> yield(name: "mean")
Here are the scripts:
config_query.sh
:
#!/bin/bash
# Configures start date and end date of query files
START_DATE=1ドル
END_DATE=2ドル
# Setup directories
HOME="/app"
F1="${HOME}/queries/leads.flux"
F2="${HOME}/queries/subs.flux"
F3="${HOME}/queries/gads.sql"
F4="${HOME}/queries/ms.sql"
F5="${HOME}/queries/li.sql"
# Replaces 'start' and 'stop' in Influx queries:
for f in $F1 $F2
do
sed -i -E "s/start: .{4}-.{2}-.{2}T00:00:00.000Z, stop: .{4}-.{2}-.{2}/start: ${START_DATE}T00:00:00.000Z, stop: ${END_DATE}/g" $f
done
# Replaces 'START_DATE' and 'END_DATE' in Bigquery queries:
for f in $F3 $F4 $F5
do
sed -i -E "s/\('.{4}-.{2}-.{2}', DATE_SUB\('.{4}-.{2}-.{2}',/\('$START_DATE', DATE_SUB\('$END_DATE',/g" $f;
sed -i -E "s/>= '.{4}-.{2}-.{2}'/>= '$START_DATE'/g" $f;
sed -i -E "s/< '.{4}-.{2}-.{2}'/< '$END_DATE'/g" $f;
done
extract.sh
:
#!/bin/bash
# Saves query output to csv-files in folder tmp/raw
# Setup filepaths:
ROOT="/app"
QUERIES="${ROOT}/queries"
RAW="${ROOT}/tmp/raw"
# Setup query paths:
Q1="${QUERIES}/leads.flux"
Q2="${QUERIES}/subs.flux"
Q3="${QUERIES}/gads.sql"
Q4="${QUERIES}/ms.sql"
Q5="${QUERIES}/li.sql"
# Setup target paths:
F1="${RAW}/leads.csv"
F2="${RAW}/subs.csv"
F3="${RAW}/gads.csv"
F4="${RAW}/ms.csv"
F5="${RAW}/li.csv"
# Check if connection to BigQuery works:
bq query --dry_run --use_legacy_sql=false < $Q3 > $F3
if [ $? -ne 0 ]; then
echo "No connection to BigQuery possible. Program exiting."
exit $?;
fi
# Run queries:
for ((i=1;i<=5;i++))
do
if (( $i <= 2 )); then
# run influx
eval "influx query --file \$Q${i} -r > \$F${i} && echo '$(date): Extracted \$Q${i} to \$F${i}...'";
else
# run bq
eval "(bq query -q --use_legacy_sql=false --format=csv --max_rows=100000 < \$Q${i}) > \$F${i} && echo '$(date): Extracted \$Q${i} to \$F${i}...'";
fi
if [ $? -ne 0 ]; then
echo "An error occurred. Program exiting."
exit $?;
fi
done
transform.py
:
import pandas as pd
# Setup directories:
ROOT_DIR = '/app'
RAW_DIR = ROOT_DIR + '/tmp/raw'
CLEAN_DIR = ROOT_DIR + '/tmp/clean'
# Setup Schemas:
SCHEMA = ['date', 'campaign', 'ad_group', 'impressions', 'clicks', 'spend']
SCHEMA_LI = ['date', 'campaign', 'impressions', 'clicks', 'spend']
SCHEMA_LEADS = ['date', 'path', 'leads', 'registrations', 'trials',
'basic-trial', 'business-trial', 'education-trial', 'enterprise-trial']
SCHEMA_SUBS = ['New_ID', 'subscription', 'date', 'total_subscriptions',
'new_subscriptions', 'isTrial', 'offer', 'segment']
def transform_li():
try:
# Read Data
file = RAW_DIR + '/li.csv'
df = pd.read_csv(file, header=0)
df['date'] = pd.to_datetime(df['date'], format='%Y-%m-%d %H:%M:%S')
df.insert(0, 'New_ID', range(0, 0 + len(df)))
except KeyError:
raise KeyError("Your query output does not conform to the specified schema."
"Have you perhaps used 'bq query --dry_run'?")
except pd.errors.EmptyDataError:
df = pd.DataFrame(columns=SCHEMA_LI.insert(0, 'NEW_ID'))
finally:
# Save Data
path_to = CLEAN_DIR + '/li.csv'
df.to_csv(path_to, index=False)
def transform_ms():
try:
# Read Data
file = RAW_DIR + '/ms.csv'
df = pd.read_csv(file, header=0)
df['date'] = pd.to_datetime(df['date'], format='%Y-%m-%d %H:%M:%S')
df.insert(0, 'New_ID', range(0, 0 + len(df)))
except KeyError:
raise KeyError("Your query output does not conform to the specified schema."
"Have you perhaps used 'bq query --dry_run'?")
except pd.errors.EmptyDataError:
df = pd.DataFrame(columns=SCHEMA.insert(0, 'NEW_ID'))
finally:
# Save Data
path_to = CLEAN_DIR + '/ms.csv'
df.to_csv(path_to, index=False)
def transform_gads():
try:
file = RAW_DIR + '/gads.csv'
df = pd.read_csv(file, sep=',', header=0)
df['date'] = pd.to_datetime(df['date'], format='%Y-%m-%d %H:%M:%S')
df.insert(0, 'New_ID', range(0, 0 + len(df)))
except KeyError:
raise KeyError("Your query output does not conform to the specified schema."
"Have you perhaps used 'bq query --dry_run'?")
except pd.errors.EmptyDataError:
df = pd.DataFrame(columns=SCHEMA.insert(0, 'NEW_ID'))
finally:
# Save Data
path_to = CLEAN_DIR + '/gads.csv'
df.to_csv(path_to, index=False)
def transform_subs():
try:
# Read Data
file = RAW_DIR + '/subs.csv'
df = pd.read_csv(file,
header=3,
usecols=['_time', '_value', 'added_values', '_field',
'isTrial', 'offer', 'segment', 'sub_values'])
df.rename(columns={'_field': 'subscription',
'_time': 'date',
'_value': 'total_subscriptions',
'added_values': 'new_subscriptions',
'sub_values': 'lost_subscriptions'}, inplace=True)
df['date'] = pd.to_datetime(df['date'], format='%Y-%m-%dT%H:%M:%S.%fZ')
df['date'] = df['date'] - pd.Timedelta(days=1)
df.insert(0, 'New_ID', range(0, 0 + len(df)))
except pd.errors.EmptyDataError:
df = pd.DataFrame(columns=SCHEMA_SUBS.insert(0, 'NEW_ID'))
except KeyError:
raise KeyError("Your query output does not conform to the specified schema."
"Have you perhaps used 'bq query --dry_run'?")
finally:
# Save Data
path_to = CLEAN_DIR + '/subs.csv'
df.to_csv(path_to, index=False)
def transform_leads():
try:
# Read Data
file = RAW_DIR + '/leads.csv'
df = pd.read_csv(file, header=3, usecols=['_time', 'val', 'path', '_field'])
# Transform Data
df = pd.pivot_table(df, values='val', index=['_time', 'path'], columns='_field').reset_index()
df['trials'] = df['basic-trial'] + df['education-trial'] + df['business-trial'] + df['enterprise-trial']
df.rename(columns={'_time': 'date',
'registration': 'leads',
'registration_completed': 'registrations'}, inplace=True)
df = df.astype({'leads': int,
'registrations': int,
'trials': int,
'basic-trial': int,
'business-trial': int,
'education-trial': int,
'enterprise-trial': int})
df['date'] = pd.to_datetime(df['date'], format='%Y-%m-%dT%H:%M:%SZ')
df['date'] = df['date'] - pd.Timedelta(days=1)
cols = SCHEMA_LEADS
df = df[cols]
df.insert(0, 'New_ID', range(0, 0 + len(df)))
except pd.errors.EmptyDataError:
df = pd.DataFrame(columns=SCHEMA_LEADS.insert(0, 'NEW_ID'))
except KeyError:
raise KeyError("Your query output does not conform to the specified schema."
"Have you perhaps used 'bq query --dry_run'?")
finally:
# Save Data
path_to = CLEAN_DIR + '/leads.csv'
df.to_csv(path_to, index=False)
if __name__ == '__main__':
# Execute all functions in transform.py
import transform
for i in dir(transform):
item = getattr(transform, i)
if callable(item):
item()
run.sh
:
#!/bin/bash
# Run config_query.sh, extract.sh and transform.py.
# Split up jobs in chunks of 5 days if start date and end date are far apart.
# Example usage: ./run.sh start_date end_date
# pass date in format YYYY-mm-dd:
start=1ドル
end=2ドル
# Check if start and end are valid dates
for date in $start $end
do
# Check format
format="^[0-9]{4}-[0-9]{2}-[0-9]{2}$"
if [[ $start =~ $format && $end =~ $format ]]
then
echo "Date 1ドル is in valid format (YYYY-MM-DD)"
else
echo "Date 1ドル is in an invalid format (not YYYY-mm-dd)."
exit $?
fi
# Check if date value is valid
date -d $date
if [ $? -ne 0 ]; then
echo "${date} is not a valid date. Enter a valid date."
exit $?
fi
done
# Setup root directory path
ROOT="/app"
# Setup clean files directory paths
CLEAN_GADS="${ROOT}/tmp/clean/gads.csv"
CLEAN_LEADS="${ROOT}/tmp/clean/leads.csv"
CLEAN_LI="${ROOT}/tmp/clean/li.csv"
CLEAN_MS="${ROOT}/tmp/clean/ms.csv"
CLEAN_SUBS="${ROOT}/tmp/clean/subs.csv"
# Setup output paths
GADS="${ROOT}/tmp/gads_${start}_${end}.csv"
LEADS="${ROOT}/tmp/leads_${start}_${end}.csv"
LI="${ROOT}/tmp/li_${start}_${end}.csv"
MS="${ROOT}/tmp/ms_${start}_${end}.csv"
SUBS="${ROOT}/tmp/subs_${start}_${end}.csv"
# Delete previously created reports
find $ROOT/tmp -maxdepth 1 -type f -delete
# Setup schema for output paths
echo "New_ID,date,campaign,ad_group,impressions,clicks,spend" > "$GADS"
echo "New_ID,date,path,leads,registrations,trials,basic-trial,business-trial,education-trial,enterprise-trial" > "$LEADS"
echo "New_ID,date,campaign,impressions,clicks,spend" > "$LI"
echo "New_ID,date,campaign,ad_group,impressions,clicks,spend" > "$MS"
echo "New_ID,subscription,date,total_subscriptions,new_subscriptions,isTrial,offer,segment,lost_subscriptions" > "$SUBS"
# Download data in 5-day chunks:
while [[ "$start" < "$end" ]]; do
temp=$(date -I -d "$start + 5 day")
if [[ "$temp" < "$end" ]]; then
./src/config_query.sh $start $temp && ./src/extract.sh && python ./src/transform.py
else
./src/config_query.sh $start $end && ./src/extract.sh && python ./src/transform.py
fi
# Append data to files
tail -n +2 "$CLEAN_GADS" >> "$GADS"
tail -n +2 "$CLEAN_LEADS" >> "$LEADS"
tail -n +2 "$CLEAN_LI" >> "$LI"
tail -n +2 "$CLEAN_MS" >> "$MS"
tail -n +2 "$CLEAN_SUBS" >> "$SUBS"
start=$(date -I -d "$start + 5 day")
done
-
1\$\begingroup\$ You could start with shellcheck. \$\endgroup\$Richard Neumann– Richard Neumann2022年03月14日 16:18:00 +00:00Commented Mar 14, 2022 at 16:18
1 Answer 1
I see some of this code is an earlier version of your similar question. Most of the points in the answers on that questions are relevant here too, let me just call out the highlights here:
- For robustness and simplicity, consider using template files with placeholders to generate scripts with computed values
- Make sure
exit $?
comes right after the relevant command, or save$?
in a variable to exit later. - Always enclose variables in double-quotes used on the command line
- To give a name to logic in a block of code, consider defining a function
- Avoid repeated
sed -i
on the same file with different commands by using a single command with mulitple-e
expressions
In addition to the above, I have a few more ideas about the different parts of the code here.
Use arrays instead of numbered variables and eval
I'm referring to this part:
eval "influx query --file \$Q${i} -r > \$F${i} && echo '$(date): Extracted \$Q${i} to \$F${i}...'";
eval
is strongly discouraged,
it seems it was used here to be able to loop over numbered variables.
A better alternative is using an array,
replacing eval
with array indexing.
You already did that in the later version of the script, and that's great.
Reconsider a loop when it does different things depending on the counter variable
The loop in extract.sh
has a condition on the counter variable,
and does something different when the counter <= 2 and when it's above.
This would be better as 2 loops without a condition on the counter.
Consider something like:
die() {
local exit_code=$?
echo "An error occurred. Program exiting."
exit "$exit_code"
}
influx_query() {
local query=1ドル
local out=2ドル
influx query --file "$query" -r > "$out" && echo "$(date): Extracted $query to $out..."
}
for ((i = 0; i < 2; i++)); do
influx_query "${Q[i]}" "${F[i]}" || die
done
for ((i = 2; i < ${#Q[@]}; i++)); do
bq_query "${Q[i]}" "${F[i]}" || die
done
Use functions generously
Here:
if [[ "$temp" < "$end" ]]; then ./src/config_query.sh $start $temp && ./src/extract.sh && python ./src/transform.py else ./src/config_query.sh $start $end && ./src/extract.sh && python ./src/transform.py fi
Instead of duplicating the long ... && ... && ...
twice,
I would create a function that takes start and end date parameters.
config_query() {
local start=1ドル
local end=2ドル
./src/config_query.sh "$start" "$temp" && ./src/extract.sh && python ./src/transform.py
}
if [[ "$temp" < "$end" ]]; then
config_query "$start" "$temp"
else
config_query "$start" "$end"
fi
Making scripts more reliable
Since you seem to be interested in making scripts more reliable,
I recommend running shellcheck
on your scripts and fixing what you find.
Also, to mitigate the impact of certain mistakes, I start all my Bash scripts with this:
set -euo pipefail
This does a bunch of helpful things:
-e
makes the script abort on the first failing command, which can help in many ways- avoid disasters by failing fast
- highlight that something unexpected happened
- make the failure easy to see, at the end of the script output
- encourages implementing solid error handling
-u
makes it an error when referencing an undefined variable; together with-e
, this again can help avoid disasters-o pipefail
makes the exit code of a pipeline success only when all commands exit with success (you can read more about it inman bash
, search for "pipefail")
Use lowercase variable names in scripts
The SHOUT_CASE naming convention is recommended for exported variables for lower level system commands,
such as PATH
, HOME
, EDITOR
, and so on.