1
+ import sys
2
+ from awsglue .transforms import *
3
+ from awsglue .utils import getResolvedOptions
4
+ from pyspark .context import SparkContext
5
+ from awsglue .context import GlueContext
6
+ from awsglue .job import Job
7
+ from awsglue .dynamicframe import DynamicFrame
8
+ import gs_parse_json
9
+ from pyspark .sql import functions as F
10
+
11
+ args = getResolvedOptions (sys .argv , ['JOB_NAME' ])
12
+ sc = SparkContext ()
13
+ glueContext = GlueContext (sc )
14
+ spark = glueContext .spark_session
15
+ job = Job (glueContext )
16
+ job .init (args ['JOB_NAME' ], args )
17
+
18
+ # Script generated for node Amazon S3
19
+ AmazonS3_node1736996368406 = glueContext .create_dynamic_frame .from_options (format_options = {"quoteChar" : "\" " , "withHeader" : True , "separator" : "," , "optimizePerformance" : False }, connection_type = "s3" , format = "csv" , connection_options = {"paths" : ["s3://leetcode-contest-analytics/raw/contest_ranking.csv" ]}, transformation_ctx = "AmazonS3_node1736996368406" )
20
+
21
+ # Script generated for node Change Schema
22
+ ChangeSchema_node1736998468651 = ApplyMapping .apply (frame = AmazonS3_node1736996368406 , mappings = [("ranking" , "string" , "ranking" , "string" ), ("currentrating" , "string" , "rating" , "float" ), ("currentglobalranking" , "string" , "global_ranking" , "int" ), ("dataregion" , "string" , "data_region" , "string" ), ("user" , "string" , "user" , "string" )], transformation_ctx = "ChangeSchema_node1736998468651" )
23
+
24
+ # Script generated for node Parse JSON Column
25
+ ParseJSONColumn_node1736996392282 = ChangeSchema_node1736998468651 .gs_parse_json (colName = "user" )
26
+
27
+ # Manual transformation
28
+ df = ParseJSONColumn_node1736996392282 .toDF ()
29
+
30
+ # Process the ranking column
31
+ df = df .withColumn ("ranking" , F .from_json ("ranking" , "array<int>" ))
32
+ df = df .withColumn ("contest_count" , F .size (F .col ("ranking" ))) # Number of contests attended
33
+ df = df .withColumn ("avg_ranking" , F .expr ("aggregate(ranking, 0D, (acc, x) -> acc + x) / size(ranking)" ))
34
+
35
+ # Process the user column
36
+ df = df .withColumn ("username" , F .col ("user" )["username" ]) # Extract username
37
+ df = df .withColumn ("country" , F .col ("user" )["profile" ]["countryName" ]) # Extract country
38
+
39
+ df = df .withColumn ("country" , F .when (F .col ("data_region" ) == "CN" , "China" ).otherwise (F .col ("country" ))) # Fill country for CN users
40
+ df = df .withColumn ("country" , F .when (F .col ("country" ) == "" , "Unknown" ).otherwise (F .col ("country" )))
41
+ df = df .fillna ({"country" : "Unknown" })
42
+ df = df .drop ("ranking" , "user" , "data_region" ) # Unnecessary columns
43
+
44
+ # Convert back to DynamicFrame
45
+ df = DynamicFrame .fromDF (df , glueContext , "transformed_dynamic_frame" )
46
+
47
+ # Script generated for node Amazon S3
48
+ AmazonS3_node1736996407493 = glueContext .write_dynamic_frame .from_options (frame = df , connection_type = "s3" , format = "csv" , connection_options = {"path" : "s3://leetcode-contest-analytics/processed/" , "partitionKeys" : []}, transformation_ctx = "AmazonS3_node1736996407493" )
49
+
50
+ job .commit ()
0 commit comments