I have a TimeSeries dataset which has plots like the one showed below. I am trying to find the best way to do segmentation of the time series. I need the time series divided into three regions - 'RampUp', 'Plateua' and 'CoolDown' for the initial slope up part, the approximately constant part and the final cooldown part respectively.
Which is the best way to do this segmentation on the future series that I will get? As in, if my model is given such a timeseries as an input, it should be able to output the indices of the region boundaries? Also, is this possible to do unsupervised?
Thank you for the time. Any suggestions are greatly appreciated.
5 Answers 5
This sounds like a problem that could be solved with changepoint detection.
The ruptures library would be a good place to start for offline methods.
You will find methods for both known and unknown numbers of changepoints.
Comments
As @mdgorgan offers, you could use this signal processing-based approach besides others (ML-based methods/hacks) for segmentation tasks. I tried that based on this document:
#install library
#!python -m pip install ruptures
#import libraries
import matplotlib.pyplot as plt
import ruptures as rpt
#genearte sample data in form of a signal
n_samples, n_dims, sigma = 1000, 1, 1
n_bkps = 2 # number of breakpoints
signal, bkps = rpt.pw_constant(n_samples, n_dims, n_bkps, noise_std=sigma)
print(bkps) #[340, 671, 1000]
# detection
algo = rpt.Dynp(model="l2").fit(signal)
result = algo.predict(n_bkps=2)
print(result)
#[340, 670, 1000]
# display segmentation
rpt.display(signal, bkps, result)
plt.show()
It does segmentation job according to Change point detection. img
1 Comment
i am new with stackoverflow; i got very likely your problem, and i solved with dynamic programming(DP), but after that i needed to do it for real time, the problem with DP is when your dataset goes bigger becomes very slow, i had one over 20000 values and it takes about 7:30 hours to accomplish the algorithm. But it works. So here is the code i use for the segmentation:
import csv
import numpy as np
import matplotlib.pyplot as plt
class sls:
"""
find best least squares fit of multiple segments to the input data
"""
def __init__(self, filename):
"""
Inputs:
filename - text file of pairs of x-y points, one pair per row,
separated by a space
"""
self.get_data(filename)
def get_data(self, filename):
"""
read the data and find the least squares coefficients and errors
for all possible pairs of start and end points
"""
with open(filename) as csvfile:
r = csv.reader(csvfile, delimiter = '\t')
data = np.array([[float(row[0]), float(row[1])] for row in r])
x = data[:,0] #data
y = data[:,1]
v = np.std(y)**2 #variance of y for caculating penalty
n = len(x)
err_arr = np.zeros((n,n))
a_arr = np.zeros((n,n))
b_arr = np.zeros((n,n))
#for j end indices of data, 0 to n-1
for j in range(n):
#for i starting indices of this data, 0 to j
for i in range(j+1):
#for this segment, i to j, get coefs and error of fit
a, b, e2 = self.lscoef(x[i:j+1],y[i:j+1])
#store error and coefs for segment i to j in array at [i,j]
err_arr[i,j] = e2
a_arr[i,j] = a
b_arr[i,j] = b
self.x = x
self.y = y
self.err_arr = err_arr
self.a_arr = a_arr
self.b_arr = b_arr
self.v = v
def lscoef(self, x, y):
"""
least squares fit
"""
n=len(x)
if (n == 1):
return (0.0, y[0], 0.0)
sx = np.sum(x)
sy = np.sum(y)
sx2 = np.sum(x ** 2)
sxy = np.sum(x * y)
a = (n*sxy-sx*sy)/(n*sx2-sx*sx)
b = (sy-a*sx)/n
e2 = np.sum((y-a*x-b)**2)
return (a, b, e2)
def find_segments(self, max_num_seg=None, desired_penalty=30.0,
penalty_start=0.15, penalty_inc=0.15):
"""
Find the segments and least squares segment coefficients.
If desired_penalty is used, find_opt() is called directly (so is faster),
otherwise there is a loop over penalty values to find solution that has less
than maximum number of segments.
If no parameters input, defaults to desired_penalty of 0.35.
To set a desired_penalty, do not input max_num_seg to set max_num_seg to None.
To limit the number of segments, set max_num_seg (desired_penalty ignored),
and to control the search for the penalty to get the number of segments,
set penalty_start and penalty_inc. These may depend on the data.
Inputs
max_num_seg - set if a limit to the number of segments is desired
desired_penalty - penalty term to add to error in optimization to all
for noise
penalty_start, penalty_inc - parameter for searching for penalty
term in case of limiting the number of
segments
Returns
penalty value used
number of segments used
"""
if max_num_seg is not None:
# setting max_num_seg, ignore desired_penalty and check other parameters
desired_penalty = None
assert max_num_seg >= 1, 'max_num_seg must be at least 1'
assert penalty_start is not None and penalty_start > 0, 'penalty_start must not None, > 0'
assert penalty_inc is not None and penalty_inc > 0, 'penalty_inc must be not None, > 0'
else:
# using desired_penalty, check it
assert desired_penalty is not None and desired_penalty > 0, 'desired_penalty must be not None, > 0'
if desired_penalty:
# call find_opt directly
penalty = desired_penalty
self.find_opt(penalty_factor=penalty)
num_seg = self.get_num_segments()
else:
# loop over penalties to find one that gives number of segments
# less than max_num_seg
penalty = penalty_start - penalty_inc
num_seg = np.inf
while num_seg > max_num_seg:
penalty += penalty_inc
self.find_opt(penalty_factor=penalty)
num_seg = self.get_num_segments()
return penalty, num_seg
def find_opt(self, penalty_factor):
"""
dynamic programming segmented least squares
"""
penalty = self.v * penalty_factor
n = self.err_arr.shape[0]
#for each end index, get minimum error over possible start indices
opt_arr = np.zeros(n)
for j in range(n):
#for this end index, use min error for previous end index and
#current error to get errors for all possible start indices
tmp_opt = np.zeros(j+1)
tmp_opt[0] = self.err_arr[0,j] + penalty
for i in range(1,j+1):
tmp_opt[i] = opt_arr[i-1] + self.err_arr[i,j] + penalty
opt_arr[j] = np.amin(tmp_opt)
#backtrack from opt_arr[n-1] to get segment and coefficients
opt_coefs = []
j = n-1
while j >= 0:
tmp_opt = np.zeros(j+1)
tmp_opt[0] = self.err_arr[0,j] + penalty
for i in range(1,j+1):
tmp_opt[i] = opt_arr[i-1] + self.err_arr[i,j] + penalty
i_opt = np.argmin(tmp_opt)
a_opt = self.a_arr[i_opt,j]
b_opt = self.b_arr[i_opt,j]
#set boundaries of interval for these coefs in terms of x
if i_opt <= 0:
xmin = np.NINF
else:
xmin = (self.x[i_opt-1] + self.x[i_opt])/2
if j >= n-1:
xmax = np.Inf
else:
xmax = (self.x[j] + self.x[j+1])/2
opt_coefs.insert(0, (xmin,xmax,a_opt,b_opt))
print(xmin)
j = i_opt-1
self.opt_coefs = opt_coefs
def get_num_segments(self):
return len(self.opt_coefs)
def get_fit(self,x):
#get fit using coefs
n = len(x)
yfit = np.zeros(n)
for opt_coef in self.opt_coefs:
ind = [i for i,elem in enumerate(x) \
if elem >= opt_coef[0] and elem <= opt_coef[1]]
yfit[ind] = x[ind] * opt_coef[2] + opt_coef[3]
return yfit
def plot_fit(self,xplt):
yfit = self.get_fit(xplt)
plt.plot(self.x, self.y, '.')
plt.plot(xplt, yfit)
plt.show()
Then yo must use this module in other one for do the operations:
from sls_class import sls
from itertools import count
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
data = pd.read_csv("norm1Ac2.txt",sep="\t")#The format of the data is index(0,1,2,3,...) value(y1,y2,y3,...)
x = data.index
y1 = data.values
y1 = pd.DataFrame(y1)
y1 = y1[1]
s = sls("norm1Ac2.txt")
penalty, num_segments = s.find_segments()
print("penalty = {0}, num segments = {1}".format(penalty, num_segments))
n = len(s.x)
minx = np.amin(s.x)
maxx = np.amax(s.x)
dx = 0.33*(maxx-minx)/(n-1)
xplt = np.arange(minx,maxx+dx,dx)
s.plot_fit(xplt)
plt.plot(x, data["y1"], label='Gas')
plt.plot(xplt,s.get_fit(xplt),"r")
plt.xticks(rotation=90)
plt.legend(loc='upper left')
plt.tight_layout()
plt.show()
I hope this could help you.
And i want to know if someone have a solution for after get the segments, how can i store it?, like cutting off the dataset at that point and re run from the next point where cut the dataset (think its running in real-time) i am using Animate form matplotlib for represent the real time, but i felt stacked with tath point. Thank you for reading.
Comments
If anybody wants to find a Bayesian alternative, one possibility in Python is my package for Bayesian changepoint detection and time series segmentation/decomposition: https://pypi.org/project/Rbeast. Unlike nonBayesian methods, it tells not just when and how many changepoints/segments there are but also the probabilities associated with changepoint occurrence (e.g., what is the probability of having 3 changepoints...). Statistically, it fits a model Y = seasonal + trend + error (the seasonal component is optional) where piecewise models are used for the seasonal or trend components assuming idd normal errors. It can be installed via pip install Rbeast.
Below is a quick example:
import Rbeast as rb
nile, year = rb.load_example('nile') # an annual time series of streamflow of the River Nile
o = rb.beast( nile, start=1871, season='none')
rb.plot(o)
rb.print(o)
o # see a list of output fields in the output variable o
3 Comments
Here's one, the code is in Google Colab in bottom of the article. Segments of variable lengths are classified as:
a. Sideway
b. Rising|Falling converging
c. Rising|Falling diverging
The core segmentation logic in encapsulated in standalone function "partition_sliding_window" under "Helper Functions" of the notebook. The code is reusable and can work in higher-lower timeframe.
If you go thru the code, there's no AI involved. It uses simple moving average cross overs as first pass to detect segment edges. Then it uses linear regression and std error tells if segment can/should be further segmented recursively.
Comments
Explore related questions
See similar questions with these tags.
[a, b]as the desired segment or, let's call it, clusters/class, depending on the task.