Steady state detection in time series analysis
Introduction
Let , where stands for time. is called time series. Steady state of a time series is a period in which the value of roughly a constant. Steady State Detection (SSD) aims to determine such periods. A simple method is to use weighted means and variances.
At time , the weighted mean is
and the weighted variance is
Now, time is said to be a steady state if for some given .
Note 1. Above formulas can be computed recursively (see equation (18) in (Glass)) by using
the -moment at time weighted by . Then
and
As soon as the steady states are detected, we can use the Kmeans algorithms to figure out the ‘‘center’’ of steady states.
Note 2. To apply Kmeans, the data should be data = [t, x_t]
of size (n, d+1)
, i.e., the time should be considered as a data feature. To ensure the good performance of Kmeans, the data
should be normalized, for example, by max of each columns.
Reference.
(Glass). Glass, A. S., et al. “Qualitative model-based fault detection in air-handling units.” IEEE Control Systems Magazine_ 15.4 (1995): 11-22.
Demo example
Code
import numpy as np
import matplotlib.pyplot as plt
from sklearn.cluster import KMeans
def ss_sampling(centers, random_seed=12, n_width=20):
np.random.seed(random_seed)
n = len(centers)
x = np.linspace(0., 1., n_width)
noise_level = 5*np.abs(x-0.5)**3
noise = noise_level*np.random.rand(n, n_width)
centers_with_noise = centers.reshape(-1, 1) + noise
feature = centers_with_noise.reshape(-1)
return feature
# Steady State Detection (SSD)
class SSD:
# based on paper: AS Glass: Qualitative model-based fault detection in air-handling units
def __init__(self, data, alpha=0.5, eps=1e-4):
self.data = data
self.eps = eps
self.alpha = alpha
self.beta = self.alpha
self.values = None
self.positions = None
self.vars = None
self.cur_moments = {
"moment_0_alpha": 0,
"moment_1_alpha": 0,
"moment_0_beta": 0,
"moment_1_beta": 0,
"moment_2_beta": 0,
}
self._run()
def _compute_a_moment(self, x_new, pre_moment, order, coeff):
# moment = x_new^order + alpha * pre_moment (see eq. (17))
# assert isinstance(x_new, float)
# print("check:", x_new, pre_moment, order, coeff)
a = x_new**order
b = coeff * pre_moment
return a+b
def _update_moments(self, x_new):
pre_moments = self.cur_moments.copy()
for key, val in pre_moments.items():
a, b, c = key.split("_")
if c=="alpha":
coeff = self.alpha
elif c=="beta":
coeff = self.beta
else:
raise ValueError(f"Please check keys of pre_moments: {pre_moments.keys()}")
self.cur_moments[key] = self._compute_a_moment(x_new, pre_moment=val, order=int(b), coeff=coeff)
def _compute_mean_var(self):
ms = self.cur_moments
mean = ms["moment_1_alpha"]/ms["moment_0_alpha"]
a = ms["moment_2_beta"]/ms["moment_0_beta"]
b = (ms["moment_1_beta"]*ms["moment_1_alpha"])/(ms["moment_0_beta"]*ms["moment_0_alpha"])
c = ms["moment_1_alpha"]/ms["moment_0_alpha"]
var = a-2*b+c**2
return mean, var
def _run(self):
data = self.data
mean_list = []
var_list = []
# print(len(data))
for i in range(len(data)):
# print(f"iter={i}")
# if i>10:
# break
# if i%10==0:
# print(f"iter={i}")
x_new = data[i, :]
self._update_moments(x_new)
mean, var = self._compute_mean_var()
mean_list += [mean]
var_list += [var]
means, vars = np.array(mean_list), np.array(var_list)
self.values = means
self.vars = vars
self.positions = vars < self.eps
# return means, vars
def demo():
# 1. build data
centers_0 = np.array([1,3,1, 4,7,5, 7,9,8])
centers_1 = np.array([9,6,3, 9,6,3, 9,6,3])
feature_0 = ss_sampling(centers_0, random_seed=12)
feature_1 = ss_sampling(centers_1, random_seed=1)
time = np.linspace(0., 1., len(feature_0))
data = np.hstack([time.reshape(-1, 1), feature_0.reshape(-1, 1), feature_1.reshape(-1, 1)])
# 2. steady state detection
ssd = SSD(data[:, [1, 2]], alpha=0.3, eps=1e-3)
indices = ssd.positions.all(axis=1)
# print(indices)
ss_dt = data[indices, :] #steady state data
# 3. center of steady data
data_max =ss_dt.max(axis=0)
data_normal = ss_dt/data_max
kmeans = KMeans(n_clusters=9, random_state=0).fit(data_normal)
ss_centers = kmeans.cluster_centers_ * data_max
# 4. plot
f, ax = plt.subplots(figsize=(15, 7))
ax.plot(data[:, 0], data[:, 1], label="feature 0", color="gray", alpha=0.3)
ax.scatter(ss_dt[:, 0], ss_dt[:, 1], label="steady states", marker="o", color="orange")
ax.scatter(ss_centers[:, 0], ss_centers[:, 1], label="centers of steady states", marker="x", s=100, color="blue")
ax.plot(data[:, 0], data[:, 2], label="feature 1", color="black", alpha=0.5)
ax.scatter(ss_dt[:, 0], ss_dt[:, 2], marker="o", color="orange")
ax.scatter(ss_centers[:, 0], ss_centers[:, 2], marker="x", s=100, color="blue")
ax.legend()
ax.set_title("Steady Dtate Detection: Data with 2 features and 9 steady states")
ax.set_xlabel("time")
ax.set_ylabel("value of features")
plt.show()
return
demo()