Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit 914f422

Browse files
finalize the data pipeline
1 parent f07e2bb commit 914f422

File tree

2 files changed

+4
-19
lines changed

2 files changed

+4
-19
lines changed

‎dags/contest_ranking_dag.py‎

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -23,30 +23,15 @@
2323
catchup=False
2424
)
2525

26-
"""
27-
Approach 1: ETL pipeline
28-
- Extract raw data
29-
- Load into local storage/Amazon S3 (Optional)
30-
- Transform locally with pandas
31-
- Reload into local storage/Amazon S3
32-
- Load from S3 to Amazon Redshift
33-
34-
Approach 2: ELT pipeline
35-
- Extract raw data
36-
- Load into Amazon S3
37-
- Transform with AWS Glue
38-
- Load the transformed data to Amazon Redshift
39-
"""
40-
4126
# Extract raw data directly from API and store in local/cloud storage
4227
extract = PythonOperator(
4328
task_id="extract_contest_ranking",
4429
python_callable=extract_contest_ranking,
45-
op_args=[2],
30+
op_args=[4],
4631
dag=dag
4732
)
4833

49-
# Approach 1: Transform the data and reload into local/cloud storage
34+
# Approach 1: Transform the data locally using pandas and reload into local/cloud storage
5035
transform = PythonOperator(
5136
task_id="transform_contest_ranking",
5237
python_callable=transform_contest_ranking,
@@ -55,4 +40,4 @@
5540

5641
extract >> transform
5742

58-
# Approach 2 is done with AWS
43+
# Approach 2: Transform the data using AWS Glue and reload into cloud storage

‎operators/contest_ranking_ops.py‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ def extract_contest_ranking(num_pages):
1414
response = requests.post(URL, json=contest_ranking_query(i + 1)).json()["data"]["globalRanking"]["rankingNodes"]
1515
responses.extend(response)
1616
# output_path = f"{OUTPUT_PATH}/raw/sample_contest_ranking.csv" # Local file path for sample output data
17-
output_path = f"s3://{BUCKET_NAME}/raw_contest_ranking.csv" # Amazon S3 storage path
17+
output_path = f"s3://{BUCKET_NAME}/raw/contest_ranking.csv" # Amazon S3 storage path
1818
pd.DataFrame(responses).to_csv(output_path, index=False)
1919
return output_path
2020

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /