Steady state detection in time series analysis
Introduction
Let , where stands for time. is called time series. Steady state of a time series is a period in which the value of roughly a constant. Steady State Detection (SSD) aims to determine such periods. A simple method is to use weighted means and variances.
At time , the weighted mean is
and  the weighted variance is
Now, time  is said to be a steady state if  for some given .
Note 1. Above formulas can be computed recursively (see equation (18) in (Glass)) by using
the -moment at time  weighted by .  Then
and
As soon as the steady states are detected, we can use the Kmeans algorithms to figure out the ‘‘center’’ of steady states.
Note 2. To apply Kmeans, the data should be data = [t, x_t] of size (n, d+1), i.e., the time should be considered as a data feature. To ensure the good performance of Kmeans, the data should be normalized, for example, by max of each columns.
Reference.
(Glass). Glass, A. S., et al. “Qualitative model-based fault detection in air-handling units.” IEEE Control Systems Magazine_ 15.4 (1995): 11-22.
Demo example
Code
import numpy as np 
import matplotlib.pyplot as plt 
from sklearn.cluster import KMeans
def ss_sampling(centers, random_seed=12, n_width=20):
    np.random.seed(random_seed) 
    n = len(centers)
    x = np.linspace(0., 1., n_width)
    noise_level = 5*np.abs(x-0.5)**3
    noise = noise_level*np.random.rand(n, n_width) 
    centers_with_noise = centers.reshape(-1, 1) + noise
    feature = centers_with_noise.reshape(-1)    
    return feature
# Steady State Detection (SSD)
class SSD: 
    # based on paper: AS Glass: Qualitative model-based fault detection in air-handling units
    def __init__(self, data, alpha=0.5, eps=1e-4):
        self.data = data 
        self.eps = eps
        self.alpha = alpha 
        self.beta = self.alpha 
        self.values = None 
        self.positions = None 
        self.vars = None 
        
        self.cur_moments = {
            "moment_0_alpha": 0,
            "moment_1_alpha": 0,
            "moment_0_beta": 0,
            "moment_1_beta": 0,
            "moment_2_beta": 0,
        } 
        self._run() 
    def _compute_a_moment(self, x_new, pre_moment,  order, coeff):
        # moment = x_new^order + alpha * pre_moment (see eq. (17)) 
        # assert isinstance(x_new, float)
        # print("check:", x_new, pre_moment, order, coeff)
        a = x_new**order 
        b = coeff * pre_moment 
        return a+b 
    def _update_moments(self, x_new):
        pre_moments = self.cur_moments.copy()
        for key, val in pre_moments.items(): 
            a, b, c = key.split("_") 
            if c=="alpha":
                coeff = self.alpha 
            elif c=="beta":
                coeff = self.beta 
            else:
                raise ValueError(f"Please check keys of pre_moments: {pre_moments.keys()}")
            self.cur_moments[key] = self._compute_a_moment(x_new, pre_moment=val, order=int(b), coeff=coeff)
        
    def _compute_mean_var(self):
        ms = self.cur_moments 
        mean = ms["moment_1_alpha"]/ms["moment_0_alpha"]
        a = ms["moment_2_beta"]/ms["moment_0_beta"]
        b = (ms["moment_1_beta"]*ms["moment_1_alpha"])/(ms["moment_0_beta"]*ms["moment_0_alpha"]) 
        c = ms["moment_1_alpha"]/ms["moment_0_alpha"]
        var = a-2*b+c**2 
        return mean, var 
    def _run(self):
        data = self.data
        mean_list = [] 
        var_list = [] 
        # print(len(data))
        for i in range(len(data)):
            # print(f"iter={i}")
            # if i>10:
            #     break
            # if i%10==0:
            #     print(f"iter={i}")
            x_new = data[i, :]
            self._update_moments(x_new) 
            mean, var = self._compute_mean_var() 
            mean_list += [mean] 
            var_list += [var] 
        means, vars = np.array(mean_list), np.array(var_list) 
        self.values = means 
        self.vars = vars 
        self.positions = vars < self.eps
        # return means, vars
def demo():
    # 1. build data
    centers_0 = np.array([1,3,1, 4,7,5, 7,9,8])
    centers_1 = np.array([9,6,3, 9,6,3, 9,6,3])
    feature_0 = ss_sampling(centers_0, random_seed=12)
    feature_1 = ss_sampling(centers_1, random_seed=1)
    time = np.linspace(0., 1., len(feature_0)) 
    data = np.hstack([time.reshape(-1, 1), feature_0.reshape(-1, 1), feature_1.reshape(-1, 1)])
    # 2. steady state detection 
    ssd = SSD(data[:, [1, 2]], alpha=0.3, eps=1e-3) 
    indices = ssd.positions.all(axis=1)
    # print(indices)
    ss_dt = data[indices, :] #steady state data
    # 3. center of steady data
    data_max =ss_dt.max(axis=0)
    data_normal = ss_dt/data_max
    kmeans = KMeans(n_clusters=9, random_state=0).fit(data_normal) 
    ss_centers = kmeans.cluster_centers_ * data_max 
    # 4. plot
    f, ax = plt.subplots(figsize=(15, 7))
    ax.plot(data[:, 0], data[:, 1], label="feature 0", color="gray", alpha=0.3)
    ax.scatter(ss_dt[:, 0], ss_dt[:, 1], label="steady states", marker="o", color="orange")
    ax.scatter(ss_centers[:, 0], ss_centers[:, 1], label="centers of steady states", marker="x", s=100, color="blue")
    ax.plot(data[:, 0], data[:, 2], label="feature 1", color="black", alpha=0.5)
    ax.scatter(ss_dt[:, 0], ss_dt[:, 2], marker="o", color="orange")
    ax.scatter(ss_centers[:, 0], ss_centers[:, 2], marker="x", s=100, color="blue")
    ax.legend()
    ax.set_title("Steady Dtate Detection: Data with 2 features and 9 steady states")
    ax.set_xlabel("time")
    ax.set_ylabel("value of features")
    plt.show()
    return 
demo()
