class Intervallist(): """ This is a class for lists of intervals in form [(index,length), ...] """ def __init__(self, intervals): import numpy as np self.idx = np.array(intervals) self.binary = None self.medfiltidx = None self.lengthidx = None self.countidx = None self.lengthmedfilt = None self.countmedfilt = None self.LongestSet = None self.computelength() self.countelements() def computelength(self): """ Compute total length of intervals in list of intervals """ import numpy as np if len(self.idx) > 0: self.lengthidx = int(np.sum(np.array(self.idx)[:, 1])) else: self.lengthidx = 0 def countelements(self): """ Count intervals in list of intervals """ if len(self.idx) > 0: self.countidx = len(self.idx) else: self.countidx = 0 def intervalsToBinary(self, tslength): """ Transform list of intervals to an binary array of length tslength, where 1 is interval and 0 is no interval. E.g. self.idx=[(3,4)], tslength=10 returns [0,0,0,1,1,1,1,0,0,0]""" import numpy as np gt = np.zeros(tslength) if len(self.idx) > 0: for i in self.idx: gt[int(i[0]):int(i[0])+int(i[1])] = 1 self.binary = gt return gt def binaryToIntervals(self, array): """ Transform binary array, where 1 is interval and 0 is no interval, to list of intervals. E.g. array=[0,0,0,1,1,1,1,0,0,0] returns [(3,4)]""" import numpy as np intervals = [] start = np.where(array == 1)[0][0] current = np.where(array == 1)[0][0] count = 1 for val in np.where(array == 1)[0][1:]: if val-1 == current and val != len(array)-1: current = val count += 1 elif val-1 == current and val == len(array)-1: count += 1 intervals += [start, count] else: intervals += [start, count] start = val current = val count = 1 intervals = np.array(intervals).reshape(-1,2) return intervals def median_filter_intervals(self, kernel, tslength=None): """ Median filter intervals with kernel size kernel. """ from scipy.signal import medfilt import numpy as np # If no length provided, assume end of last interval to be length of # time series if tslength == None: tslength = int(np.sum(self.idx[-1])) if self.binary is None: self.intervalsToBinary(tslength) filteredintervals = medfilt(self.binary, kernel_size=kernel) # Keep arrays that were 1 filteredintervals = np.maximum.reduce([filteredintervals, self.binary]) self.medfiltidx = self.binaryToIntervals(filteredintervals) self.lengthmedfilt = np.sum(np.array(self.medfiltidx)[:,1]) self.countmedfilt = len(self.medfiltidx) def longestSet(self, on='idx'): """ Computes the longest set of intervals, where longest refers to the sum of lengths of each interval""" from RelevantIntervalSelection import LongestSet if on == 'idx': self.LongestSet = LongestSet(self.idx) elif on == 'medfilt': self.LongestSet = LongestSet(self.medfiltidx) class TimeSeries: """ This is a class for a time series""" def __init__(self, ts, varname=None): import numpy as np self.values = np.array(ts) self.varname = varname self.countTimePoints = self.values.shape[0] self.stationary = None self.concatTS = None self.concatIntervals = None def stationarity_tests(self, autolag='BIC', signiveau = 0.05): """ Test if time series is stationary using an adf-test""" from statsmodels.tsa.stattools import adfuller import numpy as np X = self.values[~np.isnan(self.values)] adf = adfuller(X, autolag=autolag) # Reject null hypothesis (X is not stationary) if p-val < signiveau testresult = True if adf[1] < signiveau else False self.stationary = testresult return testresult def get_stationartiy_test_result(self): """ Return if time series is stationary""" return self.stationary def concatRelevantIntervalsTS(self, intervals, addvalue=0): """ Concatenates intervals of a time series """ import numpy as np concatTS = [] self.concatIntervals = intervals # Add 'addvalue' on left and right side of interval addvalue = abs(addvalue) intervals = intervals + np.array([-addvalue,2*addvalue]) # Check if length of time series was exceeded intervals[0][0] = 0 if intervals[0][0] < 0 else intervals[0][0] endLI = intervals[-1][0] + intervals[-1][1] intervals[-1][1] = intervals[-1][1] - \ (endLI - self.countTimePoints - 1) \ if endLI > self.countTimePoints else intervals[-1][1] # concatenate intervals / remove irrelevant data for i in intervals: concatTS += list(self.values[int(i[0]):int(i[0]+i[1])]) self.concatTS = np.array(concatTS) class TimeSeriesPair(): """ Class for two time series """ def __init__(self, X, Y, intervals=None): self.ts1 = X self.ts2 = Y # Parameters for relevant interval selection self.relInt = {'intervals': None, 'parameters': None} self.intervals = intervals self.check_type() self.addvalue = None def check_type(self): """ Check if provided time series are TimeSeries obejcts""" if not isinstance(self.ts1, TimeSeries) or not isinstance(self.ts2, TimeSeries): raise TypeError('X and Y must be objects from class TimeSeries') if self.intervals is not None: if not isinstance(self.intervals, Interval): raise TypeError('Intervals must be in form [[index, length], ' '...] and of class type Interval') ############################################################################### # Compute Relevant Intervals of a pair of time series ############################################################################### def SetRelevantIntervals(self, intervallist): """ Add list of intervals """ self.intervals = intervallist def ComputeRelevantIntervals(self, shifts, threshold, minLenInts, maxLenInts, distancemeasure=None): """ Computes selected relevant intervals of TimeSeriesPair """ from RelevantIntervalSelection import selectRelevantIntervals relevantIntervals = selectRelevantIntervals(X=self.ts1.values, Y=self.ts2.values, shifts=shifts, threshold=threshold, minLenInts=minLenInts, maxLenInts=maxLenInts, distancemeasure=distancemeasure) self.relInt['intervals'] = relevantIntervals self.relInt['parameters'] = {'threshold': threshold, 'minlen': minLenInts, 'maxLenInts': maxLenInts, 'shifts': shifts, 'distancemeasure': distancemeasure} def concatRelevantIntervals(self, relInts, addvalue=0): """ Concatenate selected relevant intervals""" self.addvalue = addvalue if relInts is not None: self.intervals = relInts if self.intervals is not None: self.ts1.concatRelevantIntervalsTS(intervals=self.intervals.idx, addvalue=self.addvalue) self.ts2.concatRelevantIntervalsTS(intervals=self.intervals.idx, addvalue=self.addvalue) else: print('No intervals for concatenation provided') # Remove this function as it was not used in thesis def multivariateGrangerCausality(self, varsX, varsY, onConcat=False): from statsmodels.tsa.vector_ar.var_model import VARResults import numpy as np from statsmodels.tsa.api import VAR import scipy.stats if len(varsX) <= 1 or len(varsY) <= 1: raise TypeError('Multivariate means more than one variable') # if onConcat is True use concatenated time series if onConcat: X = self.ts1.concatTS Y = self.ts2.concatTS else: X = self.ts1.values Y = self.ts2.values n = self.ts1.values.shape[1] # Select variables from data index = [self.ts1.varNames[x] for x in varsX] X = np.column_stack(X[index,:]) index = [self.ts2.varNames[x] for x in varsY] Y = np.column_stack(Y[index,:]) # Compute order XY = np.column_stack((X, Y)) XY = XY[~np.isnan(XY).any(axis=1)] order = self.compute_order_wrapper(XY) # Build multivariate model with X modelX = VAR(X) resultX = modelX.fit(order) sigmaX = np.linalg.det(np.cov(resultX.resid.T)) # Enrich multivariate model with X with Y modelXY = VAR(XY) resultXY = modelXY.fit(order) sigmaXY = np.linalg.det(np.cov(resultXY.resid[:,np.arange(0,len(varsX))].T)) # Results Y granger causes X Fstats_YgcX = np.log(sigmaX)-np.log(sigmaXY) Pval_YgcX = scipy.stats.f.cdf(Fstats_YgcX, n, n) print(Fstats_YgcX, Pval_YgcX) # Build multivariate model with Y modelY = VAR(Y) resultY = modelY.fit(order) sigmaY = np.linalg.det(np.cov(resultY.resid.T)) # Enrich multivariate model with Y with X YX = np.column_stack((Y, X)) YX = YX[~np.isnan(YX).any(axis=1)] modelYX = VAR(YX) resultYX = modelYX.fit(order) sigmaYX = np.linalg.det(np.cov(resultYX.resid[:,np.arange(0+len(varsY))].T)) # Results X granger causes Y Fstats_XgcY = np.log(sigmaY)-np.log(sigmaYX) Pval_XgcY = scipy.stats.f.cdf(Fstats_XgcY, n, n) print(Fstats_XgcY, Pval_XgcY) return Pval_XgcY, Pval_YgcX def univariateGrangerCausality(self, onRelInts=False, signiveau=0.05, orderlimit=120): """ Test for Granger causality in both directions :param onRelInts: Bool, if true Granger causality is computed on concatenation of selected relevant intervals :param signiveau: Level of significance for Granger causality Test :return: Bool for X GC Y, bool for Y GC X """ from statsmodels.tsa.api import VAR from VARModels import compute_order_wrapper import numpy as np # if onConcat is True use concatenated time series. # Therefore TimeSeries object function # for interval concatenation needs to be called if onRelInts: X = self.ts1.concatTS Y = self.ts2.concatTS else: X = self.ts1.values Y = self.ts2.values # Compute model order X in column 0, Y in column 1 XY = np.column_stack((X,Y)) XY = XY[~np.isnan(XY).any(axis=1)] order = compute_order_wrapper(XY.T, min(orderlimit, len(X-10), len(Y-10))) # Test if X granger causes Y model = VAR(XY) result = model.fit(order) resultXcY = result.test_causality(1, 0, kind='f') # Test if Y granger causes X resultYcX = result.test_causality(0, 1, kind='f') XcY = resultXcY.pvalue < signiveau YcX = resultYcX.pvalue < signiveau return XcY, YcX, resultXcY.pvalue, resultYcX.pvalue, order