4
\$\begingroup\$

I wrote the following scripts to extract data from BigQuery and InfluxDB using the bigquery cli and the influx cli. The code works as expected. I included all files for completeness, but I'm primarily interested in feedback on the bash scripts config_query.sh, extract.sh and run.sh. You can also comment on the python file and on my queries, if you want, but the focus should be on my bash scripts. The overall goal is to make the scripts as maintainable and reliable as possible, execution speed doesn't matter.

This is how the pipeline works on a high level: The queries are saved in textfiles in the folder /app/queries. First these textfiles are configured via config_query.sh, by replacing the start and end strings of the queries. Then extract.sh passes the configured queries to the respective cli's and saves the data in /app/tmp/raw. The raw data is then processed and cleaned via transform.py and saved to /app/tmp/clean. Finally, all these steps are run via run.sh. If config_query.sh sets up a range between the start and end date that is longer than five days, run.sh splits up the queries into chunks of five days in order to avoid a timeout of the server.

This is an overview the directory structure:

app/
├── queries/
│ └── gads.sql
│ └── leads.flux
│ └── li.sql
│ └── ms.sql
│ └── subs.flux
├── src/
│ └── config_query.sh
│ └── extract.sh
│ └── transform.py
└── tmp/
 ├── clean/
 │ └── gads.csv
 │ └── leads.csv
 │ └── li.csv
 │ └── ms.csv
 │ └── subs.csv
 ├── raw/
 │ └── gads.csv
 │ └──leads.csv
 │ └── li.csv
 │ └── ms.csv
 │ └── subs.csv
 └── gads_startdate_enddate.csv
 └── leads_startdate_enddate.csv
 └── li_startdate_enddate.csv
 └── ms_startdate_enddate.csv
 └── subs_startdate_enddate.csv

Example queries in /app/queries:

# BigQuery query:
WITH dates AS (
 SELECT
 *
 FROM
 UNNEST(GENERATE_DATE_ARRAY('2022-01-01', DATE_SUB('2022-01-01', INTERVAL 1 DAY), INTERVAL 1 DAY)) AS date
), gads_data AS (
 SELECT DATE(__hevo_report_date) AS date,
 campaign,
 ad_group,
 sum(impressions) AS impressions,
 sum(clicks) AS clicks,
 ROUND(sum(cost)/1000000, 2) AS spend
 FROM `example_table`
 WHERE DATE(__hevo_report_date) >= '2022-01-01' AND DATE(__hevo_report_date) < '2022-01-01'
 GROUP BY date, campaign, ad_group
)
SELECT *
FROM dates
LEFT OUTER JOIN gads_data
ON dates.date = gads_data.date
# Influx query:
from(bucket: "example_bucket")
 |> range(start: 2022年01月01日T00:00:00.000Z, stop: 2022年01月01日T00:00:00.000Z)
 |> aggregateWindow(every: 1d, fn: sum, column: "val")
 |> yield(name: "mean")

Here are the scripts:

config_query.sh:

#!/bin/bash
# Configures start date and end date of query files
START_DATE=1ドル
END_DATE=2ドル
# Setup directories
HOME="/app"
F1="${HOME}/queries/leads.flux"
F2="${HOME}/queries/subs.flux"
F3="${HOME}/queries/gads.sql"
F4="${HOME}/queries/ms.sql"
F5="${HOME}/queries/li.sql"
# Replaces 'start' and 'stop' in Influx queries:
for f in $F1 $F2
do
 sed -i -E "s/start: .{4}-.{2}-.{2}T00:00:00.000Z, stop: .{4}-.{2}-.{2}/start: ${START_DATE}T00:00:00.000Z, stop: ${END_DATE}/g" $f
done
# Replaces 'START_DATE' and 'END_DATE' in Bigquery queries:
for f in $F3 $F4 $F5
do
 sed -i -E "s/\('.{4}-.{2}-.{2}', DATE_SUB\('.{4}-.{2}-.{2}',/\('$START_DATE', DATE_SUB\('$END_DATE',/g" $f;
 sed -i -E "s/>= '.{4}-.{2}-.{2}'/>= '$START_DATE'/g" $f;
 sed -i -E "s/< '.{4}-.{2}-.{2}'/< '$END_DATE'/g" $f;
done

extract.sh:

#!/bin/bash
# Saves query output to csv-files in folder tmp/raw
# Setup filepaths:
ROOT="/app"
QUERIES="${ROOT}/queries"
RAW="${ROOT}/tmp/raw"
# Setup query paths:
Q1="${QUERIES}/leads.flux"
Q2="${QUERIES}/subs.flux"
Q3="${QUERIES}/gads.sql"
Q4="${QUERIES}/ms.sql"
Q5="${QUERIES}/li.sql"
# Setup target paths:
F1="${RAW}/leads.csv"
F2="${RAW}/subs.csv"
F3="${RAW}/gads.csv"
F4="${RAW}/ms.csv"
F5="${RAW}/li.csv"
# Check if connection to BigQuery works:
bq query --dry_run --use_legacy_sql=false < $Q3 > $F3
if [ $? -ne 0 ]; then
 echo "No connection to BigQuery possible. Program exiting."
 exit $?;
fi
# Run queries:
for ((i=1;i<=5;i++))
do
 if (( $i <= 2 )); then
 # run influx
 eval "influx query --file \$Q${i} -r > \$F${i} && echo '$(date): Extracted \$Q${i} to \$F${i}...'";
 else
 # run bq
 eval "(bq query -q --use_legacy_sql=false --format=csv --max_rows=100000 < \$Q${i}) > \$F${i} && echo '$(date): Extracted \$Q${i} to \$F${i}...'";
 fi
 if [ $? -ne 0 ]; then
 echo "An error occurred. Program exiting."
 exit $?;
 fi
done

transform.py:

import pandas as pd
# Setup directories:
ROOT_DIR = '/app'
RAW_DIR = ROOT_DIR + '/tmp/raw'
CLEAN_DIR = ROOT_DIR + '/tmp/clean'
# Setup Schemas:
SCHEMA = ['date', 'campaign', 'ad_group', 'impressions', 'clicks', 'spend']
SCHEMA_LI = ['date', 'campaign', 'impressions', 'clicks', 'spend']
SCHEMA_LEADS = ['date', 'path', 'leads', 'registrations', 'trials',
 'basic-trial', 'business-trial', 'education-trial', 'enterprise-trial']
SCHEMA_SUBS = ['New_ID', 'subscription', 'date', 'total_subscriptions',
 'new_subscriptions', 'isTrial', 'offer', 'segment']
def transform_li():
 try:
 # Read Data
 file = RAW_DIR + '/li.csv'
 df = pd.read_csv(file, header=0)
 df['date'] = pd.to_datetime(df['date'], format='%Y-%m-%d %H:%M:%S')
 df.insert(0, 'New_ID', range(0, 0 + len(df)))
 except KeyError:
 raise KeyError("Your query output does not conform to the specified schema."
 "Have you perhaps used 'bq query --dry_run'?")
 except pd.errors.EmptyDataError:
 df = pd.DataFrame(columns=SCHEMA_LI.insert(0, 'NEW_ID'))
 finally:
 # Save Data
 path_to = CLEAN_DIR + '/li.csv'
 df.to_csv(path_to, index=False)
def transform_ms():
 try:
 # Read Data
 file = RAW_DIR + '/ms.csv'
 df = pd.read_csv(file, header=0)
 df['date'] = pd.to_datetime(df['date'], format='%Y-%m-%d %H:%M:%S')
 df.insert(0, 'New_ID', range(0, 0 + len(df)))
 except KeyError:
 raise KeyError("Your query output does not conform to the specified schema."
 "Have you perhaps used 'bq query --dry_run'?")
 except pd.errors.EmptyDataError:
 df = pd.DataFrame(columns=SCHEMA.insert(0, 'NEW_ID'))
 finally:
 # Save Data
 path_to = CLEAN_DIR + '/ms.csv'
 df.to_csv(path_to, index=False)
def transform_gads():
 try:
 file = RAW_DIR + '/gads.csv'
 df = pd.read_csv(file, sep=',', header=0)
 df['date'] = pd.to_datetime(df['date'], format='%Y-%m-%d %H:%M:%S')
 df.insert(0, 'New_ID', range(0, 0 + len(df)))
 except KeyError:
 raise KeyError("Your query output does not conform to the specified schema."
 "Have you perhaps used 'bq query --dry_run'?")
 except pd.errors.EmptyDataError:
 df = pd.DataFrame(columns=SCHEMA.insert(0, 'NEW_ID'))
 finally:
 # Save Data
 path_to = CLEAN_DIR + '/gads.csv'
 df.to_csv(path_to, index=False)
def transform_subs():
 try:
 # Read Data
 file = RAW_DIR + '/subs.csv'
 df = pd.read_csv(file,
 header=3,
 usecols=['_time', '_value', 'added_values', '_field',
 'isTrial', 'offer', 'segment', 'sub_values'])
 df.rename(columns={'_field': 'subscription',
 '_time': 'date',
 '_value': 'total_subscriptions',
 'added_values': 'new_subscriptions',
 'sub_values': 'lost_subscriptions'}, inplace=True)
 df['date'] = pd.to_datetime(df['date'], format='%Y-%m-%dT%H:%M:%S.%fZ')
 df['date'] = df['date'] - pd.Timedelta(days=1)
 df.insert(0, 'New_ID', range(0, 0 + len(df)))
 except pd.errors.EmptyDataError:
 df = pd.DataFrame(columns=SCHEMA_SUBS.insert(0, 'NEW_ID'))
 except KeyError:
 raise KeyError("Your query output does not conform to the specified schema."
 "Have you perhaps used 'bq query --dry_run'?")
 finally:
 # Save Data
 path_to = CLEAN_DIR + '/subs.csv'
 df.to_csv(path_to, index=False)
def transform_leads():
 try:
 # Read Data
 file = RAW_DIR + '/leads.csv'
 df = pd.read_csv(file, header=3, usecols=['_time', 'val', 'path', '_field'])
 # Transform Data
 df = pd.pivot_table(df, values='val', index=['_time', 'path'], columns='_field').reset_index()
 df['trials'] = df['basic-trial'] + df['education-trial'] + df['business-trial'] + df['enterprise-trial']
 df.rename(columns={'_time': 'date',
 'registration': 'leads',
 'registration_completed': 'registrations'}, inplace=True)
 df = df.astype({'leads': int,
 'registrations': int,
 'trials': int,
 'basic-trial': int,
 'business-trial': int,
 'education-trial': int,
 'enterprise-trial': int})
 df['date'] = pd.to_datetime(df['date'], format='%Y-%m-%dT%H:%M:%SZ')
 df['date'] = df['date'] - pd.Timedelta(days=1)
 cols = SCHEMA_LEADS
 df = df[cols]
 df.insert(0, 'New_ID', range(0, 0 + len(df)))
 except pd.errors.EmptyDataError:
 df = pd.DataFrame(columns=SCHEMA_LEADS.insert(0, 'NEW_ID'))
 except KeyError:
 raise KeyError("Your query output does not conform to the specified schema."
 "Have you perhaps used 'bq query --dry_run'?")
 finally:
 # Save Data
 path_to = CLEAN_DIR + '/leads.csv'
 df.to_csv(path_to, index=False)
if __name__ == '__main__':
 # Execute all functions in transform.py
 import transform
 for i in dir(transform):
 item = getattr(transform, i)
 if callable(item):
 item()

run.sh:

#!/bin/bash
# Run config_query.sh, extract.sh and transform.py.
# Split up jobs in chunks of 5 days if start date and end date are far apart.
# Example usage: ./run.sh start_date end_date
# pass date in format YYYY-mm-dd:
start=1ドル
end=2ドル
# Check if start and end are valid dates
for date in $start $end
do
 # Check format
 format="^[0-9]{4}-[0-9]{2}-[0-9]{2}$"
 if [[ $start =~ $format && $end =~ $format ]]
 then
 echo "Date 1ドル is in valid format (YYYY-MM-DD)"
 else
 echo "Date 1ドル is in an invalid format (not YYYY-mm-dd)."
 exit $?
 fi
# Check if date value is valid
 date -d $date
 if [ $? -ne 0 ]; then
 echo "${date} is not a valid date. Enter a valid date."
 exit $?
 fi
done
# Setup root directory path
ROOT="/app"
# Setup clean files directory paths
CLEAN_GADS="${ROOT}/tmp/clean/gads.csv"
CLEAN_LEADS="${ROOT}/tmp/clean/leads.csv"
CLEAN_LI="${ROOT}/tmp/clean/li.csv"
CLEAN_MS="${ROOT}/tmp/clean/ms.csv"
CLEAN_SUBS="${ROOT}/tmp/clean/subs.csv"
# Setup output paths
GADS="${ROOT}/tmp/gads_${start}_${end}.csv"
LEADS="${ROOT}/tmp/leads_${start}_${end}.csv"
LI="${ROOT}/tmp/li_${start}_${end}.csv"
MS="${ROOT}/tmp/ms_${start}_${end}.csv"
SUBS="${ROOT}/tmp/subs_${start}_${end}.csv"
# Delete previously created reports
find $ROOT/tmp -maxdepth 1 -type f -delete
# Setup schema for output paths
echo "New_ID,date,campaign,ad_group,impressions,clicks,spend" > "$GADS"
echo "New_ID,date,path,leads,registrations,trials,basic-trial,business-trial,education-trial,enterprise-trial" > "$LEADS"
echo "New_ID,date,campaign,impressions,clicks,spend" > "$LI"
echo "New_ID,date,campaign,ad_group,impressions,clicks,spend" > "$MS"
echo "New_ID,subscription,date,total_subscriptions,new_subscriptions,isTrial,offer,segment,lost_subscriptions" > "$SUBS"
# Download data in 5-day chunks:
while [[ "$start" < "$end" ]]; do
 temp=$(date -I -d "$start + 5 day")
 if [[ "$temp" < "$end" ]]; then
 ./src/config_query.sh $start $temp && ./src/extract.sh && python ./src/transform.py
 else
 ./src/config_query.sh $start $end && ./src/extract.sh && python ./src/transform.py
 fi
 # Append data to files
 tail -n +2 "$CLEAN_GADS" >> "$GADS"
 tail -n +2 "$CLEAN_LEADS" >> "$LEADS"
 tail -n +2 "$CLEAN_LI" >> "$LI"
 tail -n +2 "$CLEAN_MS" >> "$MS"
 tail -n +2 "$CLEAN_SUBS" >> "$SUBS"
 start=$(date -I -d "$start + 5 day")
done
asked Mar 14, 2022 at 11:34
\$\endgroup\$
1
  • 1
    \$\begingroup\$ You could start with shellcheck. \$\endgroup\$ Commented Mar 14, 2022 at 16:18

1 Answer 1

2
\$\begingroup\$

I see some of this code is an earlier version of your similar question. Most of the points in the answers on that questions are relevant here too, let me just call out the highlights here:

  • For robustness and simplicity, consider using template files with placeholders to generate scripts with computed values
  • Make sure exit $? comes right after the relevant command, or save $? in a variable to exit later.
  • Always enclose variables in double-quotes used on the command line
  • To give a name to logic in a block of code, consider defining a function
  • Avoid repeated sed -i on the same file with different commands by using a single command with mulitple -e expressions

In addition to the above, I have a few more ideas about the different parts of the code here.

Use arrays instead of numbered variables and eval

I'm referring to this part:

eval "influx query --file \$Q${i} -r > \$F${i} && echo '$(date): Extracted \$Q${i} to \$F${i}...'";

eval is strongly discouraged, it seems it was used here to be able to loop over numbered variables. A better alternative is using an array, replacing eval with array indexing. You already did that in the later version of the script, and that's great.

Reconsider a loop when it does different things depending on the counter variable

The loop in extract.sh has a condition on the counter variable, and does something different when the counter <= 2 and when it's above. This would be better as 2 loops without a condition on the counter.

Consider something like:

die() {
 local exit_code=$?
 echo "An error occurred. Program exiting."
 exit "$exit_code"
}
influx_query() {
 local query=1ドル
 local out=2ドル
 influx query --file "$query" -r > "$out" && echo "$(date): Extracted $query to $out..."
}
for ((i = 0; i < 2; i++)); do
 influx_query "${Q[i]}" "${F[i]}" || die
done
for ((i = 2; i < ${#Q[@]}; i++)); do
 bq_query "${Q[i]}" "${F[i]}" || die
done

Use functions generously

Here:

if [[ "$temp" < "$end" ]]; then
 ./src/config_query.sh $start $temp && ./src/extract.sh && python ./src/transform.py
else
 ./src/config_query.sh $start $end && ./src/extract.sh && python ./src/transform.py
fi

Instead of duplicating the long ... && ... && ... twice, I would create a function that takes start and end date parameters.

config_query() {
 local start=1ドル
 local end=2ドル
 ./src/config_query.sh "$start" "$temp" && ./src/extract.sh && python ./src/transform.py
}
if [[ "$temp" < "$end" ]]; then
 config_query "$start" "$temp"
else
 config_query "$start" "$end"
fi

Making scripts more reliable

Since you seem to be interested in making scripts more reliable, I recommend running shellcheck on your scripts and fixing what you find.

Also, to mitigate the impact of certain mistakes, I start all my Bash scripts with this:

set -euo pipefail

This does a bunch of helpful things:

  • -e makes the script abort on the first failing command, which can help in many ways
    • avoid disasters by failing fast
    • highlight that something unexpected happened
    • make the failure easy to see, at the end of the script output
    • encourages implementing solid error handling
  • -u makes it an error when referencing an undefined variable; together with -e, this again can help avoid disasters
  • -o pipefail makes the exit code of a pipeline success only when all commands exit with success (you can read more about it in man bash, search for "pipefail")

Use lowercase variable names in scripts

The SHOUT_CASE naming convention is recommended for exported variables for lower level system commands, such as PATH, HOME, EDITOR, and so on.

answered Aug 15, 2022 at 5:16
\$\endgroup\$

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.