Ver código fonte

Extra modules

valapil 2 anos atrás
pai
commit
7a2f8ea2f6
2 arquivos alterados com 408 adições e 0 exclusões
  1. 61 0
      TSPerformanceMeasure.py
  2. 347 0
      TimeSeriesClass.py

+ 61 - 0
TSPerformanceMeasure.py

@@ -0,0 +1,61 @@
+def overlapintervals(i1, i2):
+    """
+    Calculate intersection of two lists of intervals.
+
+    :param i1: list of intervals
+    :param i2: list of intervals
+
+    :return: list of intervals with overlap of both intervals only
+    """
+    ollist = []
+    for i in i1:
+        for j in i2:
+            if j[0] < i[0]+i[1] and j[0]+j[1] > i[0]:
+                ollist += [[max(i[0], j[0]),
+                            min(i[0]+i[1], j[0]+j[1])-max(i[0], j[0])]]
+    return ollist
+
+
+
+def calcPrecisionRecall(detection, groundtruth, setlength):
+    """
+    Calculated precision and recall for intervals in time series
+
+    :param detection: array with detected intervals
+    :param groundtruth: array with ground truth
+    :param setlength: length of time series
+
+    :return: precision, recall
+    """
+    import numpy as np
+
+
+    if len(groundtruth) == 0 and len(detection) == 0:
+        precision = 1
+        recall = 1
+    else:
+        # Turn detection into array
+        detArray = IntervalsToBinary(detection, setlength)
+        gtArray = IntervalsToBinary(groundtruth, setlength)
+        if np.sum(gtArray) == 0:
+            gtArray = -1 * (gtArray - 1)
+            detArray = -1*(detArray-1)
+
+        unique, counts = np.unique(detArray+gtArray, return_counts=True)
+        countvalues = dict(zip(unique, counts))
+        tp = countvalues[2] if 2 in countvalues else 0
+        tn = countvalues[0] if 0 in countvalues else 0
+
+        unique, counts = np.unique(detArray-gtArray, return_counts=True)
+        countvalues = dict(zip(unique, counts))
+        fp = countvalues[1] if 1 in countvalues else 0
+
+        unique, counts = np.unique(gtArray-detArray, return_counts=True)
+        countvalues = dict(zip(unique, counts))
+        fn = countvalues[1] if 1 in countvalues else 0
+
+        precision = tp / (tp + fp) if tp + fp > 0 else 0
+        recall = tp / (fn + tp) if tp + fn > 0 else 0
+
+    #print(detection, groundtruth, precision, recall)
+    return precision, recall

+ 347 - 0
TimeSeriesClass.py

@@ -0,0 +1,347 @@
+class Intervallist():
+    """
+    This is a class for lists of intervals in form [(index,length), ...]
+    """
+
+    def __init__(self, intervals):
+        import numpy as np
+
+        self.idx = np.array(intervals)
+        self.binary = None
+        self.medfiltidx = None
+        self.lengthidx = None
+        self.countidx = None
+        self.lengthmedfilt = None
+        self.countmedfilt = None
+        self.LongestSet = None
+        self.computelength()
+        self.countelements()
+
+    def computelength(self):
+        """ Compute total length of intervals in list of intervals """
+        import numpy as np
+        if len(self.idx) > 0:
+            self.lengthidx = int(np.sum(np.array(self.idx)[:, 1]))
+        else:
+            self.lengthidx = 0
+
+    def countelements(self):
+        """ Count intervals in list of intervals """
+        if len(self.idx) > 0:
+            self.countidx = len(self.idx)
+        else:
+            self.countidx = 0
+
+    def intervalsToBinary(self, tslength):
+        """ Transform list of intervals to an binary array of length tslength,
+        where 1 is interval and 0 is no interval.
+        E.g. self.idx=[(3,4)], tslength=10 returns [0,0,0,1,1,1,1,0,0,0]"""
+        import numpy as np
+
+        gt = np.zeros(tslength)
+        if len(self.idx) > 0:
+            for i in self.idx:
+                gt[int(i[0]):int(i[0])+int(i[1])] = 1
+        self.binary = gt
+
+        return gt
+
+    def binaryToIntervals(self, array):
+        """ Transform binary array, where 1 is interval and 0 is no interval,
+         to list of intervals.
+         E.g. array=[0,0,0,1,1,1,1,0,0,0] returns [(3,4)]"""
+        import numpy as np
+
+        intervals = []
+        start = np.where(array == 1)[0][0]
+        current = np.where(array == 1)[0][0]
+        count = 1
+        for val in np.where(array == 1)[0][1:]:
+            if val-1 == current and val != len(array)-1:
+                current = val
+                count += 1
+            elif val-1 == current and val == len(array)-1:
+                count += 1
+                intervals += [start, count]
+            else:
+                intervals += [start, count]
+                start = val
+                current = val
+                count = 1
+        intervals = np.array(intervals).reshape(-1,2)
+
+        return intervals
+
+    def median_filter_intervals(self, kernel, tslength=None):
+        """ Median filter intervals with kernel size kernel. """
+        from scipy.signal import medfilt
+        import numpy as np
+
+        # If no length provided, assume end of last interval to be length of
+        # time series
+        if tslength == None:
+            tslength = int(np.sum(self.idx[-1]))
+
+        if self.binary is None:
+            self.intervalsToBinary(tslength)
+        filteredintervals = medfilt(self.binary, kernel_size=kernel)
+        # Keep arrays that were 1
+        filteredintervals = np.maximum.reduce([filteredintervals, self.binary])
+        self.medfiltidx = self.binaryToIntervals(filteredintervals)
+        self.lengthmedfilt = np.sum(np.array(self.medfiltidx)[:,1])
+        self.countmedfilt = len(self.medfiltidx)
+
+
+    def longestSet(self, on='idx'):
+        """ Computes the longest set of intervals, where longest refers to
+        the sum of lengths of each interval"""
+        from RelevantIntervalSelection import LongestSet
+        if on == 'idx':
+            self.LongestSet = LongestSet(self.idx)
+        elif on == 'medfilt':
+            self.LongestSet = LongestSet(self.medfiltidx)
+
+
+
+class TimeSeries:
+    """ This is a class for a time series"""
+    def __init__(self, ts, varname=None):
+        import numpy as np
+
+        self.values = np.array(ts)
+        self.varname = varname
+        self.countTimePoints = self.values.shape[0]
+
+        self.stationary = None
+
+        self.concatTS = None
+        self.concatIntervals = None
+
+
+    def stationarity_tests(self, autolag='BIC', signiveau = 0.05):
+        """ Test if time series is stationary using an adf-test"""
+        from statsmodels.tsa.stattools import adfuller
+        import numpy as np
+
+        X = self.values[~np.isnan(self.values)]
+
+        adf = adfuller(X, autolag=autolag)
+        # Reject null hypothesis (X is not stationary) if p-val < signiveau
+        testresult = True if adf[1] < signiveau else False
+
+        self.stationary = testresult
+
+        return testresult
+
+
+    def get_stationartiy_test_result(self):
+        """ Return if time series is stationary"""
+        return self.stationary
+
+
+    def concatRelevantIntervalsTS(self, intervals, addvalue=0):
+        """ Concatenates intervals of a time series """
+        import numpy as np
+
+        concatTS = []
+        self.concatIntervals = intervals
+
+        # Add 'addvalue' on left and right side of interval
+        addvalue = abs(addvalue)
+        intervals = intervals + np.array([-addvalue,2*addvalue])
+
+        # Check if length of time series was exceeded
+        intervals[0][0] = 0 if intervals[0][0] < 0 else intervals[0][0]
+        endLI = intervals[-1][0] + intervals[-1][1]
+        intervals[-1][1] = intervals[-1][1] - \
+                           (endLI - self.countTimePoints - 1) \
+            if endLI > self.countTimePoints else intervals[-1][1]
+
+        # concatenate intervals / remove irrelevant data
+        for i in intervals:
+            concatTS += list(self.values[int(i[0]):int(i[0]+i[1])])
+        self.concatTS = np.array(concatTS)
+
+
+
+class TimeSeriesPair():
+    """
+    Class for two time series
+    """
+
+    def __init__(self, X, Y, intervals=None):
+        self.ts1 = X
+        self.ts2 = Y
+
+        # Parameters for relevant interval selection
+        self.relInt = {'intervals': None,
+                       'parameters': None}
+
+        self.intervals = intervals
+
+        self.check_type()
+
+        self.addvalue = None
+
+
+    def check_type(self):
+        """ Check if provided time series are TimeSeries obejcts"""
+        if not isinstance(self.ts1, TimeSeries) or not isinstance(self.ts2,
+                                                                  TimeSeries):
+            raise TypeError('X and Y must be objects from class TimeSeries')
+        if self.intervals is not None:
+            if not isinstance(self.intervals, Interval):
+                raise TypeError('Intervals must be in form [[index, length], '
+                                '...] and of class type Interval')
+
+###############################################################################
+# Compute Relevant Intervals of a pair of time series
+###############################################################################
+
+    def SetRelevantIntervals(self, intervallist):
+        """ Add list of intervals """
+        self.intervals = intervallist
+
+    def ComputeRelevantIntervals(self, shifts, threshold, minLenInts,
+                                maxLenInts, distancemeasure=None):
+        """ Computes selected relevant intervals of TimeSeriesPair """
+
+        from RelevantIntervalSelection import selectRelevantIntervals
+
+        relevantIntervals = selectRelevantIntervals(X=self.ts1.values, 
+						    Y=self.ts2.values,
+                            shifts=shifts,
+                            threshold=threshold,
+                            minLenInts=minLenInts,
+                            maxLenInts=maxLenInts,
+                            distancemeasure=distancemeasure)
+
+        self.relInt['intervals'] = relevantIntervals
+        self.relInt['parameters'] = {'threshold': threshold,
+                                     'minlen': minLenInts,
+                                     'maxLenInts': maxLenInts,
+                                     'shifts': shifts,
+                                     'distancemeasure': distancemeasure}
+
+
+    def concatRelevantIntervals(self, relInts, addvalue=0):
+        """ Concatenate selected relevant intervals"""
+
+        self.addvalue = addvalue
+        if relInts is not None:
+            self.intervals = relInts
+
+        if self.intervals is not None:
+            self.ts1.concatRelevantIntervalsTS(intervals=self.intervals.idx,
+                                             addvalue=self.addvalue)
+            self.ts2.concatRelevantIntervalsTS(intervals=self.intervals.idx,
+                                             addvalue=self.addvalue)
+        else:
+            print('No intervals for concatenation provided')
+
+
+    # Remove this function as it was not used in thesis
+    def multivariateGrangerCausality(self, varsX, varsY, onConcat=False):
+
+        from statsmodels.tsa.vector_ar.var_model import VARResults
+        import numpy as np
+        from statsmodels.tsa.api import VAR
+        import scipy.stats
+
+        if len(varsX) <= 1 or len(varsY) <= 1:
+            raise TypeError('Multivariate means more than one variable')
+
+        # if onConcat is True use concatenated time series
+        if onConcat:
+            X = self.ts1.concatTS
+            Y = self.ts2.concatTS
+        else:
+            X = self.ts1.values
+            Y = self.ts2.values
+
+        n = self.ts1.values.shape[1]
+
+        # Select variables from data
+        index = [self.ts1.varNames[x] for x in varsX]
+        X = np.column_stack(X[index,:])
+        index = [self.ts2.varNames[x] for x in varsY]
+        Y = np.column_stack(Y[index,:])
+
+        # Compute order
+        XY = np.column_stack((X, Y))
+        XY = XY[~np.isnan(XY).any(axis=1)]
+        order = self.compute_order_wrapper(XY)
+
+        # Build multivariate model with X
+        modelX = VAR(X)
+        resultX = modelX.fit(order)
+        sigmaX = np.linalg.det(np.cov(resultX.resid.T))
+        # Enrich multivariate model with X with Y
+        modelXY = VAR(XY)
+        resultXY = modelXY.fit(order)
+        sigmaXY = np.linalg.det(np.cov(resultXY.resid[:,np.arange(0,len(varsX))].T))
+        # Results Y granger causes X
+        Fstats_YgcX = np.log(sigmaX)-np.log(sigmaXY)
+        Pval_YgcX = scipy.stats.f.cdf(Fstats_YgcX, n, n)
+        print(Fstats_YgcX, Pval_YgcX)
+
+        # Build multivariate model with Y
+        modelY = VAR(Y)
+        resultY = modelY.fit(order)
+        sigmaY = np.linalg.det(np.cov(resultY.resid.T))
+        # Enrich multivariate model with Y with X
+        YX = np.column_stack((Y, X))
+        YX = YX[~np.isnan(YX).any(axis=1)]
+        modelYX = VAR(YX)
+        resultYX = modelYX.fit(order)
+        sigmaYX = np.linalg.det(np.cov(resultYX.resid[:,np.arange(0+len(varsY))].T))
+        # Results X granger causes Y
+        Fstats_XgcY = np.log(sigmaY)-np.log(sigmaYX)
+        Pval_XgcY = scipy.stats.f.cdf(Fstats_XgcY, n, n)
+        print(Fstats_XgcY, Pval_XgcY)
+
+        return Pval_XgcY, Pval_YgcX
+
+
+    def univariateGrangerCausality(self, onRelInts=False,
+                                   signiveau=0.05, orderlimit=120):
+        """
+        Test for Granger causality in both directions
+
+        :param onRelInts: Bool, if true Granger causality is computed on
+        concatenation of selected relevant intervals
+        :param signiveau: Level of significance for Granger causality Test
+        :return: Bool for X GC Y, bool for Y GC X
+        """
+
+        from statsmodels.tsa.api import VAR
+        from VARModels import compute_order_wrapper
+        import numpy as np
+        # if onConcat is True use concatenated time series.
+        # Therefore TimeSeries object function
+        # for interval concatenation needs to be called
+        if onRelInts:
+            X = self.ts1.concatTS
+            Y = self.ts2.concatTS
+        else:
+            X = self.ts1.values
+            Y = self.ts2.values
+
+
+        # Compute model order X in column 0, Y in column 1
+        XY = np.column_stack((X,Y))
+        XY = XY[~np.isnan(XY).any(axis=1)]
+        order = compute_order_wrapper(XY.T, min(orderlimit, len(X-10), len(Y-10)))
+
+        # Test if X granger causes Y
+        model = VAR(XY)
+        result = model.fit(order)
+        resultXcY = result.test_causality(1, 0, kind='f')
+
+        # Test if Y granger causes X
+        resultYcX = result.test_causality(0, 1, kind='f')
+
+        XcY = resultXcY.pvalue < signiveau
+        YcX = resultYcX.pvalue < signiveau
+
+        return XcY, YcX, resultXcY.pvalue, resultYcX.pvalue, order