4
\$\begingroup\$

The following code works and will be put into cron on a AWS server (not sure what specifications, cores, kernels etc) at a every 1 minute frequency.

However, it's my first time putting a script into production, so I'd like some help reviewing this code. Please look it over and point to any hidden mistakes or something that's risky to implement or is just bad design. Additionally, it currently runs within a minute because I've limited the design to optimize 100 rows of data at a time.

Could this be made faster and allow me to optimize even more rows? Hypothetically, I can have row count in the 10s of 1000s, but since I sort them according to my cost, they become less valuable down below- but still if it can be done, it'll help after a while when the product scales.

Stack: MySQL where main data is stored & fetched. Redis database is where changes need to go. This script will be placed in a 1 minute cron in a separate server. All are on independant AWS servers. I haven't really taken any redis based optimization while building this, not sure how much of a difference it could make.

import MySQLdb
import pandas as pd
import datetime
import redis
from decimal import Decimal, DecimalException
import math
import cPickle as pickle
import re
from pandas.util.testing import assert_frame_equal
conn = MySQLdb.connect(host="AWS_HOST", user="root", passwd="pasword",db="db_name", local_infile=1)
cursor = conn.cursor(MySQLdb.cursors.DictCursor)
cursor.execute("SET NAMES utf8")
now = datetime.datetime.now()+datetime.timedelta(minutes=330)
subtract = now.minute if now.minute < 30 else now.minute-30
now-=datetime.timedelta(minutes=subtract)
read_query = """
select group, url, sum(acost)/1000000 cost, sum(rpm-acost)/1000000 pft, 
sum(lg) imps, sum(acost)/1000000/sum(lg)*1000 cpm, sum(rpm)/1000000/sum(lg)*1000 rpm,
sum(rpm-acost)/1000000/sum(lg)*1000 ppm
from db_name.db 
where dh = '{}'
group by group, url having pft < 0
order by sum(acost) desc;""".format(now.strftime('%Y-%m-%d %H:%M:00'))
cursor.execute(read_query)
cost_data_new = pd.DataFrame(list(cursor.fetchall()))
cost_data_old = pickle.load( open( "cost_data_old.p", "rb" ))
## manage structure
global changed,r 
r = redis.Redis(host='AWS2_HOST', port=PORT, db=0)
changed=True
try:
 if assert_frame_equal(cost_data_new, cost_data_old[['group','url','cost','pft','imps','cpm','rpm','ppm']], check_dtype=False):
 changed=False
except:
 changed=True
def rename_x(col_name):
 if re.match(r".*\_(x)$", col_name):
 return (col_name[:-2])
 else:
 return col_name
def cal_rpm(rw):
 try:
 return (rw['pft_diff']+rw['cost_diff'])/rw['imps_diff']*1000
 except DecimalException:
 return 0
 except Exception as e:
 raise e
 #print rw['group'], rw['url'], rw['pft_diff'], rw['cost_diff'], rw['imps_diff']
 print "proceeding with 0"
 return 0
def cal_cpm(rw):
 try:
 return rw['cost_diff']/rw['imps_diff']*1000
 except DecimalException:
 return 0
 except Exception as e:
 raise e
 #print rw['group'], rw['url'], rw['cost_diff'], rw['imps_diff']
 print "proceeding with 0"
 return 0
def cal_ppm(rw):
 try:
 return rw['pft_diff']/rw['imps_diff']*1000
 except DecimalException:
 return 0
 except Exception as e:
 raise e
 print "proceeding with 0"
 return 0
def get_diff(new,old):
 diff = pd.merge(new,old,how='outer',on=['group','url'])
 diff.fillna(0,inplace=True)
 diff['pft_diff']=diff['pft_x']-diff['pft_y']
 diff['cost_diff']=diff['cost_x']-diff['cost_y']
 diff['imps_diff']=diff['imps_x']-diff['imps_y']
 diff['diff_rpm']=diff[['group','url','pft_diff','cost_diff','imps_diff']].apply(cal_rpm,axis=1)
 diff['diff_cpm']=diff[['group','url','cost_diff','imps_diff']].apply(cal_cpm,axis=1)
 diff['diff_ppm']=diff[['group','url','pft_diff','imps_diff']].apply(cal_ppm,axis=1)
 diff=diff.rename(columns=rename_x)
 diff.drop(list(diff.filter(regex = '_y')), axis = 1, inplace = True)
 try:
 del diff['optimized']
 except:
 pass
 return diff
def calc_bid_prob(lgpm):
 beta = 0.01
 alpha = 1
 infl = 13
 slope = 1
 prob=int(100 * (beta + (alpha - beta) / (1 + math.pow((lgpm / infl), slope))))
 return prob
def optimize_val(rw):
 ppm = rw['diff_ppm']
 lg = rw['imps_diff']
 rpm = rw['diff_rpm']
 cpm = rw['diff_cpm']
 bid = r.hget("plcmnt:{}".format(rw['group']),"{}".format(rw['url']))
 try:
 bid=int(bid)
 except:
 bid=20
 b_prob=r.hget("url:prob:{}".format(rw['group']),"{}".format(rw['url']))
 try:
 if ppm < -1:
 if rpm >= 2:
 new_bid = min((1-0.3)*rpm,bid)
 new_bid_prob = min(calc_bid_prob(lg),b_prob)
 r.hset("plcmnt:{}".format(rw['group']),rw['url'],new_bid)
 r.hset("url:prob:{}".format(rw['group']),rw['url'],new_bid)
 else:
 new_bid = min((1-0.5)*rpm,bid)
 new_bid_prob = min(calc_bid_prob(lg),b_prob)
 r.hset("plcmnt:{}".format(rw['group']),rw['url'],new_bid)
 r.hset("url:prob:{}".format(rw['group']),rw['url'],new_bid_prob)
 elif (ppm >= -1) & (ppm < 0):
 if rpm >= 2:
 new_bid_prob = min(calc_bid_prob(lg),b_prob)
 r.hset("url:prob:{}".format(rw['group']),rw['url'],new_bid_prob)
 else:
 new_bid = min((1-0.3)*rpm,bid)
 new_bid_prob = min(calc_bid_prob(lg),b_prob)
 r.hset("plcmnt:{}".format(rw['group']),rw['url'],new_bid)
 r.hset("url:prob:{}".format(rw['group']),rw['url'],new_bid_prob)
 else:
 pass
 return 1
 except Exception as e:
 #log exception
 return 0
## if not changed, proceed with old, optimize unoptimized top 100 and store as new pickle
if changed:
 ## optimize diff_data
 cost_data_diff=get_diff(cost_data_new,cost_data_old[['group','url','cost','pft','imps','cpm','rpm','ppm']])
 cost_data_diff['optimized']=0
 cost_data_diff.sort_values(by=['cost_diff'],ascending=False,inplace=True)
 optimize=cost_data_diff.head(100)
 optimize['optimized']=optimize.apply(optimize_val,axis=1)
 pickle.dump(cost_data_diff,open("cost_data_old.p","wb"))
else:
 cost_data_old.sort_values(by=['cost'],ascending=False,inplace=True)
 optimize = cost_data_old[cost_data_old['optimized']==0].head(100)
 optimize['optimized']=optimize.apply(optimize_val,axis=1)
 pickle.dump(cost_data_old,open("cost_data_old.p","wb"))
import sys
sys.exit()

cost_data_old_example:

 group acost_x cpm_x imps_x pft_x ppm_x rpm_x \
0 6841 0.0002 0.12150000 2 -0.0002 -0.12150000 0E-8 
1 6891 0.0002 0.19900000 1 -0.0002 -0.19900000 0E-8 
2 7174 0.0001 0.14900000 1 -0.0001 -0.14900000 0E-8 
3 6732 0.0001 0.14600000 1 -0.0001 -0.14600000 0E-8 
4 6882 0.0001 0.13500000 1 -0.0001 -0.13500000 0E-8 
5 6856 0.0001 0.10700000 1 -0.0001 -0.10700000 0E-8 
6 6838 0.0001 0.08700000 1 -0.0001 -0.08700000 0E-8 
7 6838 0.0001 0.08600000 1 -0.0001 -0.08600000 0E-8 
8 6838 0.0001 0.08600000 1 -0.0001 -0.08600000 0E-8 
 url cost_y pft_y imps_y \
0 url1.org 0.0002 -0.0002 2 
1 url2.com 0.0002 -0.0002 1 
2 url3.com 0.0001 -0.0001 1 
3 url4.tv 0.0001 -0.0001 1 
4 url5.com 0.0001 -0.0001 1 
5 url6.com 0.0001 -0.0001 1 
6 url7.com 0.0001 -0.0001 1 
7 url8.com 0.0001 -0.0001 1 
8 url9.com 0.0001 -0.0001 1 
 cpm_y rpm_y ppm_y pft_diff cost_diff imps_diff 
0 0.12150000 0E-8 -0.12150000 0.0000 0.0000 0 
1 0.19900000 0E-8 -0.19900000 0.0000 0.0000 0 
2 0.14900000 0E-8 -0.14900000 0.0000 0.0000 0 
3 0.14600000 0E-8 -0.14600000 0.0000 0.0000 0 
4 0.13500000 0E-8 -0.13500000 0.0000 0.0000 0 
5 0.10700000 0E-8 -0.10700000 0.0000 0.0000 0 
6 0.08700000 0E-8 -0.08700000 0.0000 0.0000 0 
7 0.08600000 0E-8 -0.08600000 0.0000 0.0000 0 
8 0.08600000 0E-8 -0.08600000 0.0000 0.0000 0 

cost_data_new_shape:

(1587, 8)
Jamal
35.2k13 gold badges134 silver badges238 bronze badges
asked Apr 29, 2018 at 9:44
\$\endgroup\$

1 Answer 1

2
\$\begingroup\$

In no particular order:

  • I have a hard time understanding why the get_diff function use vectorized operations sometimes and apply some other times. If the aim it to avoid NaNs in the dataframe, you can simply use fillna afterwards. If you want to avoid inf that would come from a division by 0, you could replace them with zeros or NaNs depending on your use-case. In any case, the cal_xxx functions are better replaced by vectorized operations.
  • Your lack of whitespace and your usage of acronyms as variable names makes your code hard to read and understand.
  • You open things that you never close, this includes files and database connections. The with statement and the closing utility are your friends here.
  • assert_frame_equal can safely be replaced by equals that behaves more nicely: it only returns True or False. The changed variable can then be safely eliminated.
  • Your top-level code is better put into one or several functions to improve maintainability and testing. In the same vein, the global keyword is to be avoided and having functions (such as optimize_val) rely on the external initialisation of an object they need to use is prone to errors. Instead, pass the object as a parameter; and if you can't controll the calling point (such as using apply that will call the function with a single parameter), you can make use of functools.partial to bind some parameters beforehand.
  • You should avoid putting credentials into your script, pass them on the command line (and use argparse for instance to retrieve them).
  • Don't fall into the habit of preparing SQL statement using format. Even though in this instance it sounds rather safe, you better train yourself to use parametrized statements.

Proposed improvements:

import re
import math
import datetime
import cPickle as pickle
from functools import partial
from contextlib import closing
import redis
import MySQLdb
import pandas as pd
READ_QUERY = """\
SELECT group, url,
 sum(lg) imps,
 sum(acost)/1000000 cost,
 sum(rpm-acost)/1000000 pft,
 sum(rpm)/1000000/sum(lg)*1000 rpm,
 sum(acost)/1000000/sum(lg)*1000 cpm,
 sum(rpm-acost)/1000000/sum(lg)*1000 ppm
FROM db_name.db
WHERE dh = %s
GROUP BY group, url HAVING pft < 0
ORDER BY sum(acost) DESC;"""
def read_database(host, user, password, database, local_infile=True):
 now = datetime.datetime.now() + datetime.timedelta(minutes=330)
 minutes = 0 if now.minute < 30 else 30
 date = now.replace(minute=minutes, second=0, microsecond=0)
 with closing(MySQLdb.connect(host=host, user=user, passwd=password, db=database, local_infile=int(local_infile))) as connection:
 cursor = connection.cursor(MySQLdb.cursors.DictCursor)
 cursor.execute("SET NAMES utf8")
 cursor.execute(READ_QUERY, (date,))
 return pd.DataFrame(list(cursor.fetchall()))
def rename_x(col_name):
 if re.match(r'.*\_(x)$', col_name):
 return (col_name[:-2])
 else:
 return col_name
def get_diff(new, old):
 diff = pd.merge(new, old, how='outer', on=['group', 'url'])
 diff.fillna(0, inplace=True)
 diff['pft_diff'] = diff['pft_x'] - diff['pft_y']
 diff['cost_diff'] = diff['cost_x'] - diff['cost_y']
 diff['imps_diff'] = diff['imps_x'] - diff['imps_y']
 diff['diff_rpm'] = (diff['pft_diff'] + diff['cost_diff']) / diff['imps_diff'] * 1000
 diff['diff_cpm'] = diff['cost_diff'] / diff['imps_diff'] * 1000
 diff['diff_ppm'] = diff['pft_diff'] / diff['imps_diff'] * 1000
 diff = diff.rename(columns=rename_x)
 diff.drop(list(diff.filter(regex='_y')), axis=1, inplace=True)
 diff.replace(pd.np.inf, 0, inplace=True)
 diff['optimized'] = 0
 return diff
def calc_bid_prob(lgpm, alpha=1, beta=0.01, infl=13, slope=1):
 return int(100 * (beta + (alpha - beta) / (1 + math.pow((lgpm / infl), slope))))
def optimize_val(rw, redis):
 ppm = rw['diff_ppm']
 lg = rw['imps_diff']
 rpm = rw['diff_rpm']
 redis_group_plcmnt = 'plcmnt:{}'.format(rw['group'])
 redis_group_url = 'url:prob:{}'.format(rw['group'])
 redis_url = rw['url']
 bid = redis.hget(redis_group_plcmnt, redis_url)
 try:
 bid = int(bid)
 except ValueError:
 bid = 20
 b_prob = redis.hget(redis_group_url, redis_url)
 try:
 if ppm < 0:
 if ppm < -1:
 if rpm >= 2:
 new_bid = min((1-0.3)*rpm, bid)
 else:
 new_bid = min((1-0.5)*rpm, bid)
 else:
 if rpm < 2:
 new_bid = min((1-0.3)*rpm, bid)
 new_bid_prob = min(calc_bid_prob(lg), b_prob)
 redis.hset(redis_group_plcmnt, redis_url, new_bid)
 redis.hset(redis_group_url, redis_url, new_bid_prob)
 return 1
 except Exception:
 return 0
def compare_and_optimize(cost_data, optimizer, filename='cost_data_old.p'):
 with open(filename, 'rb') as pickled_file:
 cost_data_old = pickle.load(pickled_file)
 if cost_data.equals(cost_data_old):
 # If not changed, proceed with old, optimize unoptimized top 100 and store as new pickle
 cost_data_result = cost_data_old
 cost_data_old.sort_values(by=['cost'], ascending=False, inplace=True)
 optimize = cost_data_old[cost_data_old['optimized'] == 0].head(100)
 else:
 # Optimize diff_data
 cost_data_result = get_diff(cost_data, cost_data_old[['group', 'url', 'cost', 'pft', 'imps', 'cpm', 'rpm', 'ppm']])
 cost_data_result.sort_values(by=['cost_diff'], ascending=False, inplace=True)
 optimize = cost_data_result.head(100)
 optimize['optimized'] = optimize.apply(optimizer, axis=1)
 with open(filename, 'wb') as pickle_file:
 pickle.dump(cost_data_result, pickle_file)
if __name__ == '__main__':
 cost_data = read_database('AWS_HOST', 'root', 'password', 'db_name', True)
 redis_db = redis.Redis(host='AWS2_HOST', port=PORT, db=0)
 compare_and_optimize(cost_data, partial(optimize_val, redis=redis_db))
answered Apr 30, 2018 at 13:40
\$\endgroup\$
1
  • \$\begingroup\$ Thank you for the suggestions. Let me test this and get back to you \$\endgroup\$ Commented May 1, 2018 at 9:10

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.