I have many (4000+) CSVs of stock data (Date, Open, High, Low, Close) which I import into individual Pandas dataframes to perform analysis. I am new to Python and want to calculate a rolling 12month beta for each stock, I found a post to calculate rolling beta (Python pandas calculate rolling stock beta using rolling apply to groupby object in vectorized fashion) however when used in my code below takes over 2.5 hours! Considering I can run the exact same calculations in SQL tables in under 3 minutes this is too slow.
How can I improve the performance of my below code to match that of SQL? I understand Pandas/Python has that capability. My current method loops over each row which I know slows performance but I am unaware of any aggregate way to perform a rolling window beta calculation on a dataframe.
Note: the first 2 steps of loading the CSVs into individual dataframes and calculating daily returns only takes ~20seconds. All my CSV dataframes are stored in the dictionary called FilesLoaded
with names such as XAO
.
import pandas as pd, numpy as np
import datetime
import ntpath
pd.set_option('precision',10) #Set the Decimal Point precision to DISPLAY
start_time=datetime.datetime.now()
MarketIndex = 'XAO'
period = 250
MinBetaPeriod = period
# ***********************************************************************************************
# CALC RETURNS
# ***********************************************************************************************
for File in FilesLoaded:
FilesLoaded[File]['Return'] = FilesLoaded[File]['Close'].pct_change()
# ***********************************************************************************************
# CALC BETA
# ***********************************************************************************************
def calc_beta(df):
np_array = df.values
m = np_array[:,0] # market returns are column zero from numpy array
s = np_array[:,1] # stock returns are column one from numpy array
covariance = np.cov(s,m) # Calculate covariance between stock and market
beta = covariance[0,1]/covariance[1,1]
return beta
#Build Custom "Rolling_Apply" function
def rolling_apply(df, period, func, min_periods=None):
if min_periods is None:
min_periods = period
result = pd.Series(np.nan, index=df.index)
for i in range(1, len(df)+1):
sub_df = df.iloc[max(i-period, 0):i,:]
if len(sub_df) >= min_periods:
idx = sub_df.index[-1]
result[idx] = func(sub_df)
return result
#Create empty BETA dataframe with same index as RETURNS dataframe
df_join = pd.DataFrame(index=FilesLoaded[MarketIndex].index)
df_join['market'] = FilesLoaded[MarketIndex]['Return']
df_join['stock'] = np.nan
for File in FilesLoaded:
df_join['stock'].update(FilesLoaded[File]['Return'])
df_join = df_join.replace(np.inf, np.nan) #get rid of infinite values "inf" (SQL won't take "Inf")
df_join = df_join.replace(-np.inf, np.nan)#get rid of infinite values "inf" (SQL won't take "Inf")
df_join = df_join.fillna(0) #get rid of the NaNs in the return data
FilesLoaded[File]['Beta'] = rolling_apply(df_join[['market','stock']], period, calc_beta, min_periods = MinBetaPeriod)
# ***********************************************************************************************
# CLEAN-UP
# ***********************************************************************************************
print('Run-time: {0}'.format(datetime.datetime.now() - start_time))
1 Answer 1
Whenever you write section delimiters like
# ***********************************************************************************************
# CALC RETURNS
# ***********************************************************************************************
that's a red flag that you should instead write modules and functions, rather than headings.
rolling_apply
needs to be deleted. That's not how Pandas roll calculations are supposed to work.
Don't np.cov
; use the Pandas column-wise DataFrame.cov
.
Reduce these:
df_join = df_join.replace(np.inf, np.nan) #get rid of infinite values "inf" (SQL won't take "Inf")
df_join = df_join.replace(-np.inf, np.nan)#get rid of infinite values "inf" (SQL won't take "Inf")
df_join = df_join.fillna(0) #get rid of the NaNs in the return data
to a single call to ~np.isfinite
.
For an outer loop, you can probably just use multiprocessing over your files; I don't demonstrate this.
Add regression tests.
For testing, generate fake data if you don't have the real thing. I consider a Brownian integral pretty convincing:
import numpy as np
import pandas as pd
def rolling_cov(df: pd.DataFrame, period: int) -> pd.Series:
cov = df.rolling(window=period).cov()
all_dates = slice(None)
market_cov = cov.loc[(all_dates, 'market'), :]
return market_cov['stock']/market_cov['market']
def run_op(files_loaded: dict[str, pd.DataFrame]) -> pd.DataFrame:
market_index = 'XAO'
period = 250
for file in files_loaded:
files_loaded[file]['Return'] = files_loaded[file]['Close'].pct_change()
# Create empty BETA dataframe with same index as RETURNS dataframe
df_join = pd.DataFrame(index=files_loaded[market_index].index)
df_join['market'] = files_loaded[market_index]['Return']
df_join['stock'] = np.nan
for file in files_loaded:
other_return = files_loaded[file]['Return']
is_valid = np.isfinite(other_return)
df_join.loc[is_valid, 'stock'] = other_return[is_valid]
df_join[~np.isfinite(df_join)] = 0.
files_loaded[file]['Beta'] = rolling_cov(
df=df_join[['market', 'stock']], period=period,
).values
return df_join
def fake_frame(
rand: np.random.Generator,
year: int = 2020,
n: int = 1_500,
scale: float = 0.01,
stdev: float = 1.,
) -> pd.DataFrame:
date = pd.date_range(
name='Date', inclusive='left', freq='D', periods=n,
start=pd.Timestamp(year=year, month=1, day=1),
)
diff = rand.normal(loc=scale, scale=stdev, size=1 + n)
series = diff.cumsum()
series += scale - series.min()
open_ = series[:-1]
return pd.DataFrame(
index=date,
data={
'Date': date,
'Open': open_,
'High': open_ + rand.uniform(low=0, high=scale*0.2, size=n),
'Low': open_ - rand.uniform(low=0, high=scale*0.2, size=n),
'Close': series[1:],
},
)
def test() -> None:
rand = np.random.default_rng(seed=0)
files_loaded = {
'XAO': fake_frame(rand),
'ABC': fake_frame(rand),
}
joined = run_op(files_loaded)
# joined.to_csv('reference.csv')
reference = pd.read_csv('reference.csv', usecols=['market', 'stock'])
assert np.allclose(joined.values, reference.values, equal_nan=True, atol=0, rtol=1e-12)
# for name, df in files_loaded.items():
# df.to_csv(f'ref_{name}.csv')
for name, actual in files_loaded.items():
expected = pd.read_csv(f'ref_{name}.csv', usecols=['Return', 'Beta'])
assert np.allclose(expected.values, actual[['Return', 'Beta']].values, equal_nan=True, atol=0, rtol=1e-12)
if __name__ == '__main__':
test()
Explore related questions
See similar questions with these tags.
dask
package? Rather than doing this individually for each DataFrame, dask will create a single "virtual" DataFrame out of your on-disk data and figure out itself how best to run it. \$\endgroup\$rolling_apply
: dask.pydata.org/en/latest/….rolling_apply
is also available for regular pandas, but dask can operate on your entire set ofcsv
files at once. \$\endgroup\$