Monday, April 18, 2022

Steady state detection

Steady state detection

Steady state detection in time series analysis

Introduction

Let (xt)tTRd(x_t)_{t\in T}\subset \mathbb R^d, where T={t0,...,tn}T=\{t_0, ...,t_n\} stands for time. (xt)(x_t) is called time series. Steady state of a time series is a period in which the value of xtx_t roughly a constant. Steady State Detection (SSD) aims to determine such periods. A simple method is to use weighted means and variances.

At time tt, the weighted mean is
xt=i=0tαixii=0tαi\overline{x}_{t}=\frac{\sum_{i=0}^t \alpha^i x_i }{\sum_{i=0}^t \alpha^i }
and the weighted variance is
st=i=0tαi(xixt)2i=0tαi.s_{t}=\frac{\sum_{i=0}^t \alpha^i (x_i-\overline{x}_t)^2 }{\sum_{i=0}^t \alpha^i }.
Now, time tt is said to be a steady state if st<εs_t<\varepsilon for some given ε>0\varepsilon>0.

Note 1. Above formulas can be computed recursively (see equation (18) in (Glass)) by using
Xt(m)=i=0tαtximX_t^{(m)} = \sum_{i=0}^t \alpha^tx_i^m
the mm-moment at time tt weighted by α>0\alpha>0. Then
xt=Xt(1)Xt(0)\overline{x}_{t}=\frac{X_t^{(1)}}{X^{(0)}_t}
and
st=Xt(2)Xt(0)(Xt(1)Xt(0))2.s_{t}=\frac{ X^{(2)}_t }{X^{(0)}_t } - \left( \frac{ X^{(1)}_t }{X^{(0)}_t } \right)^2.

As soon as the steady states are detected, we can use the Kmeans algorithms to figure out the ‘‘center’’ of steady states.

Note 2. To apply Kmeans, the data should be data = [t, x_t] of size (n, d+1), i.e., the time should be considered as a data feature. To ensure the good performance of Kmeans, the data should be normalized, for example, by max of each columns.

Reference.
(Glass). Glass, A. S., et al. “Qualitative model-based fault detection in air-handling units.” IEEE Control Systems Magazine_ 15.4 (1995): 11-22.

Demo example

Code

import numpy as np 
import matplotlib.pyplot as plt 
from sklearn.cluster import KMeans

def ss_sampling(centers, random_seed=12, n_width=20):
    np.random.seed(random_seed) 
    n = len(centers)

    x = np.linspace(0., 1., n_width)
    noise_level = 5*np.abs(x-0.5)**3
    noise = noise_level*np.random.rand(n, n_width) 

    centers_with_noise = centers.reshape(-1, 1) + noise
    feature = centers_with_noise.reshape(-1)    

    return feature


# Steady State Detection (SSD)
class SSD: 
    # based on paper: AS Glass: Qualitative model-based fault detection in air-handling units

    def __init__(self, data, alpha=0.5, eps=1e-4):
        self.data = data 
        self.eps = eps

        self.alpha = alpha 
        self.beta = self.alpha 

        self.values = None 
        self.positions = None 
        self.vars = None 
        
        self.cur_moments = {
            "moment_0_alpha": 0,
            "moment_1_alpha": 0,

            "moment_0_beta": 0,
            "moment_1_beta": 0,
            "moment_2_beta": 0,
        } 

        self._run() 

    def _compute_a_moment(self, x_new, pre_moment,  order, coeff):
        # moment = x_new^order + alpha * pre_moment (see eq. (17)) 
        # assert isinstance(x_new, float)
        # print("check:", x_new, pre_moment, order, coeff)
        a = x_new**order 
        b = coeff * pre_moment 
        return a+b 

    def _update_moments(self, x_new):
        pre_moments = self.cur_moments.copy()

        for key, val in pre_moments.items(): 
            a, b, c = key.split("_") 
            if c=="alpha":
                coeff = self.alpha 
            elif c=="beta":
                coeff = self.beta 
            else:
                raise ValueError(f"Please check keys of pre_moments: {pre_moments.keys()}")

            self.cur_moments[key] = self._compute_a_moment(x_new, pre_moment=val, order=int(b), coeff=coeff)
        
    def _compute_mean_var(self):
        ms = self.cur_moments 

        mean = ms["moment_1_alpha"]/ms["moment_0_alpha"]

        a = ms["moment_2_beta"]/ms["moment_0_beta"]
        b = (ms["moment_1_beta"]*ms["moment_1_alpha"])/(ms["moment_0_beta"]*ms["moment_0_alpha"]) 
        c = ms["moment_1_alpha"]/ms["moment_0_alpha"]
        var = a-2*b+c**2 

        return mean, var 

    def _run(self):
        data = self.data

        mean_list = [] 
        var_list = [] 

        # print(len(data))

        for i in range(len(data)):
            # print(f"iter={i}")
            # if i>10:
            #     break
            # if i%10==0:
            #     print(f"iter={i}")
            x_new = data[i, :]
            self._update_moments(x_new) 
            mean, var = self._compute_mean_var() 

            mean_list += [mean] 
            var_list += [var] 

        means, vars = np.array(mean_list), np.array(var_list) 

        self.values = means 
        self.vars = vars 
        self.positions = vars < self.eps

        # return means, vars


def demo():

    # 1. build data
    centers_0 = np.array([1,3,1, 4,7,5, 7,9,8])
    centers_1 = np.array([9,6,3, 9,6,3, 9,6,3])
    feature_0 = ss_sampling(centers_0, random_seed=12)
    feature_1 = ss_sampling(centers_1, random_seed=1)
    time = np.linspace(0., 1., len(feature_0)) 
    data = np.hstack([time.reshape(-1, 1), feature_0.reshape(-1, 1), feature_1.reshape(-1, 1)])


    # 2. steady state detection 
    ssd = SSD(data[:, [1, 2]], alpha=0.3, eps=1e-3) 
    indices = ssd.positions.all(axis=1)
    # print(indices)
    ss_dt = data[indices, :] #steady state data

    # 3. center of steady data
    data_max =ss_dt.max(axis=0)
    data_normal = ss_dt/data_max
    kmeans = KMeans(n_clusters=9, random_state=0).fit(data_normal) 
    ss_centers = kmeans.cluster_centers_ * data_max 



    # 4. plot
    f, ax = plt.subplots(figsize=(15, 7))
    ax.plot(data[:, 0], data[:, 1], label="feature 0", color="gray", alpha=0.3)
    ax.scatter(ss_dt[:, 0], ss_dt[:, 1], label="steady states", marker="o", color="orange")
    ax.scatter(ss_centers[:, 0], ss_centers[:, 1], label="centers of steady states", marker="x", s=100, color="blue")


    ax.plot(data[:, 0], data[:, 2], label="feature 1", color="black", alpha=0.5)
    ax.scatter(ss_dt[:, 0], ss_dt[:, 2], marker="o", color="orange")
    ax.scatter(ss_centers[:, 0], ss_centers[:, 2], marker="x", s=100, color="blue")

    ax.legend()
    ax.set_title("Steady Dtate Detection: Data with 2 features and 9 steady states")
    ax.set_xlabel("time")
    ax.set_ylabel("value of features")
    plt.show()

    return 

demo()

Popular Posts