I'm still a beginner so any pointers to make this code more professional/production ready are appreciated. The tokens and database connections have been anonymized. The script is working, its goal is to copy prices from one database and push them via api into another database. This data is monthly price data that usually arrives early in the month. So march prices arrive somewhere early march, but usually not on the first. They are then supposed to be pushed to the destination database via rest api.
The catch is that we never know when the prices arrive in the source database, so the idea is to run this script daily via task scheduler and only push the data to the destination database once when they arrive, but we have to make sure that the data is not pushed twice and duplicates are created. I achieved this by only pushing when the data is in the source database, but not yet in the destination databse via IF check.
I feel like this code is not very pythonic and lacking some possible error catchings for it to be ran automatically on a daily basis. Any pointers are appreciated.
import json
import pandas as pd
import pyodbc
import sqlalchemy
import datetime
import sys
import requests
import pdb
import pymysql
#RPA API Connector
target_url = "exampleurl"
target_token = "exampletoken"
target_headers = {"AUTH-TOKEN": target_token, "Content-Type": "application/json"}
#Store todays date in variable
now = datetime.datetime.now()
#connecting to source DB
def connect_mssqlserver():
engine_stmt = ("mssql+pyodbc://@server/database?driver=SQL+Server")
engine = sqlalchemy.create_engine(engine_stmt)
engine.connect()
return engine
#connect to destination DB
def connect_mysqlserver():
engine_stmt = ('mysql+pymysql://user:password@server:host/database')
engine = sqlalchemy.create_engine(engine_stmt)
engine.connect()
return engine
def create_df(query, engine):
#save table to df
df = pd.read_sql(query, engine)
return df
#store source data in df
source_df = create_df("SELECT * FROM PriceMonthly", connect_mssqlserver())
#Some dataframe modification to fit destination format
source_df = source_df[source_df.Year == now.year]
source_df['Tax'].fillna(0, inplace=True)
source_df['Year'] = source_df['ImportFileName'].str[0:4].astype(int)
source_df['Month'] = source_df['ImportFileName'].str[4:6].astype(int)
#store destination table in dataframe
dest_df = create_df("SELECT * FROM prices", connect_mysqlserver())
#Filter both dataframes by current year and month
source_df = source_df[(source_df.Year == now.year) & (source_df.Month == now.month)]
dest_df = dest_df[(dest_df.Year == now.year) & (dest_df.Month == now.month)]
#Logic to check if data is in source and not yet in destination, only then push via api
if source_df.empty == False and dest_df.empty == True:
print('Data will be pushed')
data = source_df.to_json(orient = "records")
data = json.loads(data)
#This loop is used for chunking as API times out when pushing a lot of data at the same time
url = 'price'
j = 0
i = 400
length = len(data)
while j <= length:
print(j,i)
data2 = data[j:i]
target_response = requests.put(target_url + url + "/json", json=data2, headers=target_headers)
assert target_response.status_code == 200, f"{target_url + url} -> Expected 200, but received {target_response.status_code}"
j = i
i = i+400
elif dest_df.empty == False:
print("Data Already At Destination")
elif source_df.empty == True:
print("No Data In Source")
else:
print("issue")
1 Answer 1
A few notes on your code above:
connect_mssqlserver
andconnect_mysqlserver
could be condensed to a single 'connect' function that takes aconn_str
arg to be used increate_engine
.- I believe
create_df
seems to be unnecessary at this point. If the process for creating the df involves more than just callingread_sql
in the future, then I would opt for moving that logic into a separate function. - nitpick: I would change
source_df.empty == False
and the followingTrue
check to justnot source.empty and dest_df.empty
pdb
import at the top, don't think you need that \$\endgroup\$