Just for the sake of practice, I've decided to write a code for polynomial regression with Gradient Descent
Code:
import numpy as np
from matplotlib import pyplot as plt
from scipy.optimize import approx_fprime as gradient
class polynomial_regression():
def __init__(self,degrees):
self.degree = degrees
self.weights = np.random.randn(degrees+1)
self.training_loss = []
self.loss_type = None
def predict(self, x: float):
output = 0
for i in range(len(self.weights)-1):
output += (x**(i+1) * self.weights[i])
return output + self.weights[-1]
def fit(self,X: '1 dim array',y: '1 dim array',epochs,lr,loss,ridge=False,reg_rate=0.1):
training_loss = []
if loss == 'MSE':
loss = MSE
self.loss_type = 'MSE'
elif loss == 'RMSE':
loss = RMSE
self.loss_type = 'RMSE'
else:
raise Exception('Undefined loss function')
for epoch in range(epochs):
self.weights -= lr*gradient(self.weights,
MSE,
[np.sqrt(np.finfo(float).eps)/100]*len(self.weights),
X,
y,
ridge,
reg_rate
)
training_loss.append(MSE(self.weights,X,y))
self.training_loss = training_loss
def MSE(w,X,y,ridge=False,reg_rate=0.1):
total = 0
weights = w[:-1]
bias = w[-1]
for i in range(len(X)):
total += (np.dot(weights,[X[i]**(p+1) for p in range(len(weights))]) + bias - y[i])**2
if ridge == False:
return (1/len(X)) * total
else:
return (1/len(X)) * total + reg_rate*((w**2).sum())
def RMSE(w,X,y,ridge=False, reg_rate = 0.1):
total = 0
weights = w[:-1]
bias = w[-1]
for i in range(len(X)):
total += (np.dot(weights,[X[i]**(p+1) for p in range(len(weights))]) + bias - y[i])**2
if ridge == False:
return np.sqrt((1/len(X)) * total)
else:
return np.sqrt((1/len(X)) * total) + reg_rate*((w**2).sum())
def build_graph(X,y,model):
plt.figure(figsize=(20,8))
#Scatter plot of the dataset and the plot of the model's predictions
plt.subplot(1,2,1)
plt.scatter(X,y)
X.sort()
plt.plot(X,model.predict(X),c='red')
plt.title('Model',size=20)
#Curve of the training loss
plt.subplot(1,2,2)
plt.plot(np.arange(len(model.training_loss)),model.training_loss,label=f'{model.loss_type} loss')
plt.legend(prop={'size': 20})
plt.title('Training loss',size=20)
Several tests
rng = np.random.RandomState( 1)
x = (np.linspace(1,5,100))
y = 3*x + 10 + rng.rand(100)
x = x/10
y = y/10
degree = 1
epochs = 120
learning_rate = 0.9
model = polynomial_regression(degree)
model.fit(x,
y,
epochs,
learning_rate,
loss='MSE',
ridge=False,)
build_graph(x,y,model)
Output
And now with more complex dataset
rng = np.random.RandomState( 1)
x = (np.linspace(1,5,100))
y = (10*np.cos(x) + rng.rand(100))
x = x/10
y = y/10
degree = 3
epochs = 8*10**3
learning_rate = 0.9
model = polynomial_regression(degree)
model.fit(x,
y,
epochs,
learning_rate,
loss='MSE',
ridge=False,)
build_graph(x,y,model)
Output
Notes:
You might wonder why I moved functions for MSE and RMSE out of the class. The main reason is because
approx_fprime(I renamed it asgradient, for clarity) requires loss function to place an array of variables for which we calculate the gradient as the first argument (see the documentation). If I am to moveMSEandRMSEinto the class, the first argument, of course, will beself.Admittedly, Gradient Descent is not the best choice for optimizing polynomial functions. However, I would still prefer to use it here, just for the sake of solidifying my understanding of how GD works.
For more complex dataset (when we'd need to use higher degrees of polynomial), the model converges very slowly (see the training loss for the second dataset). If possible, I would like you to elaborate a bit on what might be the reason.
What can be improved?
Any suggestions will be welcome: algorithm efficiency/code style/naming conventions, or anything else you can come up with. Thanks!
2 Answers 2
Encoding polynomials
According to your code, you represent a polynomial
$$\sum\limits_{k=0}^{n} a_kx^k$$ as [a_1, ..., a_n, a_0] which is odd to my eyes.
The most common way to represent a polynomial is probably[a_n, ..., a_1, a_0]. Then for example your predict function becomes
def predict(self, x: float):
return np.vander([x], len(self.weights)).dot(self.weights)
which is vectorised (by using .dot), so it should be a bit faster.
On the other hand, we can vectorise it further by allowing vectorial inputs:
def predict(self, x):
return np.vander(x, len(self.weights)).dot(self.weights)
This allows us to evaluate things like predict(np.array([-1, 0, 1])).
One consequence is that in your error calculation code you can write something like
mean_sq_error = ((predict(X) - y)**2).mean()
which is vectorised and easy to read.
Calculating the gradients
In (the euclidean norm) norm_2 polynomial fitting reduces to finding weights such that the value of
norm_2(vander(x).dot(weights) - y)
is minimal. The minimum point doesn't change if we compose norm_2 by some non-decreasing function from the left, so e.g. using any of
norm_2_sq = (^ 2) . norm_2
mse_norm = (* 1/n) . (^ 2) . norm_2
rmse_norm = (^ 1/2) . mse_norm
will result in the same minimum points. The most natural of these is arguably norm_2_sq.
Let us generalize using this norm. Given a matrix A, and a vector b, we'd like to find $$\operatorname{argmin}_x \| Ax - b \|_2^2,$$
but $$\| Ax - b \|_2^2 = (Ax -b)^T (Ax-b) = x^TA^TAx - x^TA^Tb -b^TAx-b^Tb,$$
so its gradient is
$$
2x^T A^T A - 2b^TA.
$$
If you want, you can use this to calculate the gradients of mse, rmse using the chain rule. You don't need to use approx_fprime.
On the other hand, since its second derivative, 2A'A >= 0, our functional is convex, so it takes its global minimum at the zero of its gradient, i.e. the solution of the so-called normal equation:
$$
x^T A^TA = b^TA \quad \Leftrightarrow \quad A^TA x = A^T b.
$$
As a practice problem, you can solve this equation using some iterative method (e.g. the conjugate gradient method).
General comments
The general consensus is that function names should be written in snake_case, class names in CamelCase
There are a few unnecessary spaces RandomState( 1), parentheses x = (np.linspace(1,5,100)). class PolynomialRegression: is sufficient (no () needed).
Given the ML context, I would reserve weights, bias for denoting the weights of a(n affine-) linear mapping.
Despite writing
if loss == 'MSE':
loss = MSE
self.loss_type = 'MSE'
elif loss == 'RMSE':
loss = RMSE
self.loss_type = 'RMSE'
you hard-coded MSE a few lines later.
Tangentially related: globals()[loss] would be the method named by the value of loss, assuming this method is defined globally.
MSEandRMSEare suspiciously similar. Shouldn'tdef RMSE(w,X,y,ridge=False, reg_rate = 0.1): return np.sqrt(MSE(w,X,y,ridge=False, reg_rate = 0.1))suffice?
MSEcould be streamlined:.... retval = (1/len(X)) * total if ridge: retval += reg_rate*((w**2).sum()) return retvalThe loop
for i in range(len(X)): total += (np.dot(weights,[X[i]**(p+1) for p in range(len(weights))]) + bias - y[i])**2raised some brows on the first reading. Indexing
ywithi- which enumeratesX- is very suspicious. Is there any precondition on lengths ofXandy?Looking at your tests it appears that \$y_i = f(X_i)\$, so they are of the same length indeed. I strongly recommend to make it explicit, maybe by passing a list of tuples
x, yrather than two disconnected lists. I am not a numpy expert, so I can't say how it may affect performance.Similarly, I am not sure that
len(weights)is a good choice for an inner comprehension.degreesseems much more natural.In any case, indexing here is anti-pythonic. Consider
for arg, val in zip(X, y): total += (np.dot(weights,[arg**(p+1) for p in range(degrees + 1)]) + bias - val)**2I would like to see the tests exercising
ridge = Truecase. If you don't expect to ever use this case, do not passridgeargument tofitat all.