import pandas as pd import numpy as np def genSynchronizedTimeSeries(lenTS=1000, minlenI=3, maxlenI=100, sigma = 0.01, seed=None): """ Generates synthetic time series data e.g. for method evaluation :param lenTS: length of generated time series (s1 and s2 need to have equal length) :param minlenI, minimum length of synchronized interval :param maxlenI: maximum length of synchronized interval :param seed: Can be set to reproduce results :return s1: First time series s2: Second time series syncI: Array of size with synchronized intervals. Each first value startindex and each second value length of synchronized interval. """ np.random.seed(seed=seed) # Generate two uniform distributed vectors s1 = pd.Series(np.random.uniform(0, 1, lenTS)) s2 = pd.Series(np.random.uniform(0, 1, lenTS)) # Calculate average over each 11 points s1.rolling(window=11, min_periods=5, center=True) s2.rolling(window=11, min_periods=5, center=True) # Sample synchronous intervals i = 0 count = 0 syncI = [] while i < lenTS: lenI = np.random.randint(minlenI, maxlenI) if lenI+i >= lenTS: lenI = lenTS-i if count % 2 == 1: s2[i:i + lenI] = s1[i:i + lenI] + np.random.normal(0,sigma,1) syncI += [(i, lenI)] else: s2[i:i + lenI] = np.mean(s2[i:i + lenI]) + np.random.uniform(0,1, lenI) count += 1 i += lenI # syncI = np.reshape(syncI, (-1,2)) return s1, s2, syncI def glaette(s, window): """ smooth time series :param s: time series :param window: window size for smoothing :return smoothed time series """ import numpy as np ret = [] for i, val in enumerate(s[window:]): i += window ret += [np.mean(s[i-window:i+1])] return np.array(ret) def genShiftedSynchronizedTimeSeries(lents=1000, lenints=[25, 125], lennoints=[25, 125], noiseSigma=0.5, shift=4, shiftbothsides=False, seed=None): """ Generates synthetic time series data with one time series being shifed, e.g. for method evaluation :param lenTS: length of generated time series (s1 and s2 need to have equal length) :param lenints: minimum and maximum length for intervals being synchronized :param lennoints: length of sequences which are not synchronized :param noiseSigma: factor to multiply noise by :param shift: shift value for synchronized intervals :param shiftbothsides: if true shift intervals both back and forth in time, if false intervals are shifted into one direction only. default: False :param seed: Can be set to reproduce results :return s1: First time series s2: Second time series syncI: Array of size with synchronized intervals. Each first value startindex and each second value length of synchronized interval. """ import numpy as np np.random.seed(seed=seed) # Generate two uniform distributed vectors s1 = glaette(np.random.randn(lents), 10) s2 = glaette(np.random.randn(lents), 10) # Sample synchronous intervals i = shift count = 0 syncI = [] while i < lents + shift: if count % 2 == 1: lenI = np.random.randint(lenints[0], lenints[1]) else: lenI = np.random.randint(lennoints[0], lennoints[1]) # stop if interval longer than time series if lenI + i >= lents: break if shiftbothsides: # alternate between shift back and shift forth # later: randomly shift back and forth if count % 2 == 1: if count % 4 == 1: s2[i:i + lenI] = -1 * s1[i - shift:i + lenI - shift].copy() syncI += [(i, lenI, 1)] else: s2[i:i + lenI] = -1 * s1[i + shift:i + lenI + shift].copy() syncI += [(i, lenI, -1)] else: if count % 2 == 1: s2[i:i + lenI] = s1[i - shift:i + lenI - shift].copy() syncI += [(i, lenI, 1)] count += 1 i += lenI # syncI = np.reshape(syncI, (-1,2)) s2 += np.random.randn(len(s2)) * noiseSigma return s1, s2, syncI