| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347 |
- class Intervallist():
- """
- This is a class for lists of intervals in form [(index,length), ...]
- """
- def __init__(self, intervals):
- import numpy as np
- self.idx = np.array(intervals)
- self.binary = None
- self.medfiltidx = None
- self.lengthidx = None
- self.countidx = None
- self.lengthmedfilt = None
- self.countmedfilt = None
- self.LongestSet = None
- self.computelength()
- self.countelements()
- def computelength(self):
- """ Compute total length of intervals in list of intervals """
- import numpy as np
- if len(self.idx) > 0:
- self.lengthidx = int(np.sum(np.array(self.idx)[:, 1]))
- else:
- self.lengthidx = 0
- def countelements(self):
- """ Count intervals in list of intervals """
- if len(self.idx) > 0:
- self.countidx = len(self.idx)
- else:
- self.countidx = 0
- def intervalsToBinary(self, tslength):
- """ Transform list of intervals to an binary array of length tslength,
- where 1 is interval and 0 is no interval.
- E.g. self.idx=[(3,4)], tslength=10 returns [0,0,0,1,1,1,1,0,0,0]"""
- import numpy as np
- gt = np.zeros(tslength)
- if len(self.idx) > 0:
- for i in self.idx:
- gt[int(i[0]):int(i[0])+int(i[1])] = 1
- self.binary = gt
- return gt
- def binaryToIntervals(self, array):
- """ Transform binary array, where 1 is interval and 0 is no interval,
- to list of intervals.
- E.g. array=[0,0,0,1,1,1,1,0,0,0] returns [(3,4)]"""
- import numpy as np
- intervals = []
- start = np.where(array == 1)[0][0]
- current = np.where(array == 1)[0][0]
- count = 1
- for val in np.where(array == 1)[0][1:]:
- if val-1 == current and val != len(array)-1:
- current = val
- count += 1
- elif val-1 == current and val == len(array)-1:
- count += 1
- intervals += [start, count]
- else:
- intervals += [start, count]
- start = val
- current = val
- count = 1
- intervals = np.array(intervals).reshape(-1,2)
- return intervals
- def median_filter_intervals(self, kernel, tslength=None):
- """ Median filter intervals with kernel size kernel. """
- from scipy.signal import medfilt
- import numpy as np
- # If no length provided, assume end of last interval to be length of
- # time series
- if tslength == None:
- tslength = int(np.sum(self.idx[-1]))
- if self.binary is None:
- self.intervalsToBinary(tslength)
- filteredintervals = medfilt(self.binary, kernel_size=kernel)
- # Keep arrays that were 1
- filteredintervals = np.maximum.reduce([filteredintervals, self.binary])
- self.medfiltidx = self.binaryToIntervals(filteredintervals)
- self.lengthmedfilt = np.sum(np.array(self.medfiltidx)[:,1])
- self.countmedfilt = len(self.medfiltidx)
- def longestSet(self, on='idx'):
- """ Computes the longest set of intervals, where longest refers to
- the sum of lengths of each interval"""
- from RelevantIntervalSelection import LongestSet
- if on == 'idx':
- self.LongestSet = LongestSet(self.idx)
- elif on == 'medfilt':
- self.LongestSet = LongestSet(self.medfiltidx)
- class TimeSeries:
- """ This is a class for a time series"""
- def __init__(self, ts, varname=None):
- import numpy as np
- self.values = np.array(ts)
- self.varname = varname
- self.countTimePoints = self.values.shape[0]
- self.stationary = None
- self.concatTS = None
- self.concatIntervals = None
- def stationarity_tests(self, autolag='BIC', signiveau = 0.05):
- """ Test if time series is stationary using an adf-test"""
- from statsmodels.tsa.stattools import adfuller
- import numpy as np
- X = self.values[~np.isnan(self.values)]
- adf = adfuller(X, autolag=autolag)
- # Reject null hypothesis (X is not stationary) if p-val < signiveau
- testresult = True if adf[1] < signiveau else False
- self.stationary = testresult
- return testresult
- def get_stationartiy_test_result(self):
- """ Return if time series is stationary"""
- return self.stationary
- def concatRelevantIntervalsTS(self, intervals, addvalue=0):
- """ Concatenates intervals of a time series """
- import numpy as np
- concatTS = []
- self.concatIntervals = intervals
- # Add 'addvalue' on left and right side of interval
- addvalue = abs(addvalue)
- intervals = intervals + np.array([-addvalue,2*addvalue])
- # Check if length of time series was exceeded
- intervals[0][0] = 0 if intervals[0][0] < 0 else intervals[0][0]
- endLI = intervals[-1][0] + intervals[-1][1]
- intervals[-1][1] = intervals[-1][1] - \
- (endLI - self.countTimePoints - 1) \
- if endLI > self.countTimePoints else intervals[-1][1]
- # concatenate intervals / remove irrelevant data
- for i in intervals:
- concatTS += list(self.values[int(i[0]):int(i[0]+i[1])])
- self.concatTS = np.array(concatTS)
- class TimeSeriesPair():
- """
- Class for two time series
- """
- def __init__(self, X, Y, intervals=None):
- self.ts1 = X
- self.ts2 = Y
- # Parameters for relevant interval selection
- self.relInt = {'intervals': None,
- 'parameters': None}
- self.intervals = intervals
- self.check_type()
- self.addvalue = None
- def check_type(self):
- """ Check if provided time series are TimeSeries obejcts"""
- if not isinstance(self.ts1, TimeSeries) or not isinstance(self.ts2,
- TimeSeries):
- raise TypeError('X and Y must be objects from class TimeSeries')
- if self.intervals is not None:
- if not isinstance(self.intervals, Interval):
- raise TypeError('Intervals must be in form [[index, length], '
- '...] and of class type Interval')
- ###############################################################################
- # Compute Relevant Intervals of a pair of time series
- ###############################################################################
- def SetRelevantIntervals(self, intervallist):
- """ Add list of intervals """
- self.intervals = intervallist
- def ComputeRelevantIntervals(self, shifts, threshold, minLenInts,
- maxLenInts, distancemeasure=None):
- """ Computes selected relevant intervals of TimeSeriesPair """
- from RelevantIntervalSelection import selectRelevantIntervals
- relevantIntervals = selectRelevantIntervals(X=self.ts1.values,
- Y=self.ts2.values,
- shifts=shifts,
- threshold=threshold,
- minLenInts=minLenInts,
- maxLenInts=maxLenInts,
- distancemeasure=distancemeasure)
- self.relInt['intervals'] = relevantIntervals
- self.relInt['parameters'] = {'threshold': threshold,
- 'minlen': minLenInts,
- 'maxLenInts': maxLenInts,
- 'shifts': shifts,
- 'distancemeasure': distancemeasure}
- def concatRelevantIntervals(self, relInts, addvalue=0):
- """ Concatenate selected relevant intervals"""
- self.addvalue = addvalue
- if relInts is not None:
- self.intervals = relInts
- if self.intervals is not None:
- self.ts1.concatRelevantIntervalsTS(intervals=self.intervals.idx,
- addvalue=self.addvalue)
- self.ts2.concatRelevantIntervalsTS(intervals=self.intervals.idx,
- addvalue=self.addvalue)
- else:
- print('No intervals for concatenation provided')
- # Remove this function as it was not used in thesis
- def multivariateGrangerCausality(self, varsX, varsY, onConcat=False):
- from statsmodels.tsa.vector_ar.var_model import VARResults
- import numpy as np
- from statsmodels.tsa.api import VAR
- import scipy.stats
- if len(varsX) <= 1 or len(varsY) <= 1:
- raise TypeError('Multivariate means more than one variable')
- # if onConcat is True use concatenated time series
- if onConcat:
- X = self.ts1.concatTS
- Y = self.ts2.concatTS
- else:
- X = self.ts1.values
- Y = self.ts2.values
- n = self.ts1.values.shape[1]
- # Select variables from data
- index = [self.ts1.varNames[x] for x in varsX]
- X = np.column_stack(X[index,:])
- index = [self.ts2.varNames[x] for x in varsY]
- Y = np.column_stack(Y[index,:])
- # Compute order
- XY = np.column_stack((X, Y))
- XY = XY[~np.isnan(XY).any(axis=1)]
- order = self.compute_order_wrapper(XY)
- # Build multivariate model with X
- modelX = VAR(X)
- resultX = modelX.fit(order)
- sigmaX = np.linalg.det(np.cov(resultX.resid.T))
- # Enrich multivariate model with X with Y
- modelXY = VAR(XY)
- resultXY = modelXY.fit(order)
- sigmaXY = np.linalg.det(np.cov(resultXY.resid[:,np.arange(0,len(varsX))].T))
- # Results Y granger causes X
- Fstats_YgcX = np.log(sigmaX)-np.log(sigmaXY)
- Pval_YgcX = scipy.stats.f.cdf(Fstats_YgcX, n, n)
- print(Fstats_YgcX, Pval_YgcX)
- # Build multivariate model with Y
- modelY = VAR(Y)
- resultY = modelY.fit(order)
- sigmaY = np.linalg.det(np.cov(resultY.resid.T))
- # Enrich multivariate model with Y with X
- YX = np.column_stack((Y, X))
- YX = YX[~np.isnan(YX).any(axis=1)]
- modelYX = VAR(YX)
- resultYX = modelYX.fit(order)
- sigmaYX = np.linalg.det(np.cov(resultYX.resid[:,np.arange(0+len(varsY))].T))
- # Results X granger causes Y
- Fstats_XgcY = np.log(sigmaY)-np.log(sigmaYX)
- Pval_XgcY = scipy.stats.f.cdf(Fstats_XgcY, n, n)
- print(Fstats_XgcY, Pval_XgcY)
- return Pval_XgcY, Pval_YgcX
- def univariateGrangerCausality(self, onRelInts=False,
- signiveau=0.05, orderlimit=120):
- """
- Test for Granger causality in both directions
- :param onRelInts: Bool, if true Granger causality is computed on
- concatenation of selected relevant intervals
- :param signiveau: Level of significance for Granger causality Test
- :return: Bool for X GC Y, bool for Y GC X
- """
- from statsmodels.tsa.api import VAR
- from VARModels import compute_order_wrapper
- import numpy as np
- # if onConcat is True use concatenated time series.
- # Therefore TimeSeries object function
- # for interval concatenation needs to be called
- if onRelInts:
- X = self.ts1.concatTS
- Y = self.ts2.concatTS
- else:
- X = self.ts1.values
- Y = self.ts2.values
- # Compute model order X in column 0, Y in column 1
- XY = np.column_stack((X,Y))
- XY = XY[~np.isnan(XY).any(axis=1)]
- order = compute_order_wrapper(XY.T, min(orderlimit, len(X-10), len(Y-10)))
- # Test if X granger causes Y
- model = VAR(XY)
- result = model.fit(order)
- resultXcY = result.test_causality(1, 0, kind='f')
- # Test if Y granger causes X
- resultYcX = result.test_causality(0, 1, kind='f')
- XcY = resultXcY.pvalue < signiveau
- YcX = resultYcX.pvalue < signiveau
- return XcY, YcX, resultXcY.pvalue, resultYcX.pvalue, order
|