TimeSeriesClass.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347
  1. class Intervallist():
  2. """
  3. This is a class for lists of intervals in form [(index,length), ...]
  4. """
  5. def __init__(self, intervals):
  6. import numpy as np
  7. self.idx = np.array(intervals)
  8. self.binary = None
  9. self.medfiltidx = None
  10. self.lengthidx = None
  11. self.countidx = None
  12. self.lengthmedfilt = None
  13. self.countmedfilt = None
  14. self.LongestSet = None
  15. self.computelength()
  16. self.countelements()
  17. def computelength(self):
  18. """ Compute total length of intervals in list of intervals """
  19. import numpy as np
  20. if len(self.idx) > 0:
  21. self.lengthidx = int(np.sum(np.array(self.idx)[:, 1]))
  22. else:
  23. self.lengthidx = 0
  24. def countelements(self):
  25. """ Count intervals in list of intervals """
  26. if len(self.idx) > 0:
  27. self.countidx = len(self.idx)
  28. else:
  29. self.countidx = 0
  30. def intervalsToBinary(self, tslength):
  31. """ Transform list of intervals to an binary array of length tslength,
  32. where 1 is interval and 0 is no interval.
  33. E.g. self.idx=[(3,4)], tslength=10 returns [0,0,0,1,1,1,1,0,0,0]"""
  34. import numpy as np
  35. gt = np.zeros(tslength)
  36. if len(self.idx) > 0:
  37. for i in self.idx:
  38. gt[int(i[0]):int(i[0])+int(i[1])] = 1
  39. self.binary = gt
  40. return gt
  41. def binaryToIntervals(self, array):
  42. """ Transform binary array, where 1 is interval and 0 is no interval,
  43. to list of intervals.
  44. E.g. array=[0,0,0,1,1,1,1,0,0,0] returns [(3,4)]"""
  45. import numpy as np
  46. intervals = []
  47. start = np.where(array == 1)[0][0]
  48. current = np.where(array == 1)[0][0]
  49. count = 1
  50. for val in np.where(array == 1)[0][1:]:
  51. if val-1 == current and val != len(array)-1:
  52. current = val
  53. count += 1
  54. elif val-1 == current and val == len(array)-1:
  55. count += 1
  56. intervals += [start, count]
  57. else:
  58. intervals += [start, count]
  59. start = val
  60. current = val
  61. count = 1
  62. intervals = np.array(intervals).reshape(-1,2)
  63. return intervals
  64. def median_filter_intervals(self, kernel, tslength=None):
  65. """ Median filter intervals with kernel size kernel. """
  66. from scipy.signal import medfilt
  67. import numpy as np
  68. # If no length provided, assume end of last interval to be length of
  69. # time series
  70. if tslength == None:
  71. tslength = int(np.sum(self.idx[-1]))
  72. if self.binary is None:
  73. self.intervalsToBinary(tslength)
  74. filteredintervals = medfilt(self.binary, kernel_size=kernel)
  75. # Keep arrays that were 1
  76. filteredintervals = np.maximum.reduce([filteredintervals, self.binary])
  77. self.medfiltidx = self.binaryToIntervals(filteredintervals)
  78. self.lengthmedfilt = np.sum(np.array(self.medfiltidx)[:,1])
  79. self.countmedfilt = len(self.medfiltidx)
  80. def longestSet(self, on='idx'):
  81. """ Computes the longest set of intervals, where longest refers to
  82. the sum of lengths of each interval"""
  83. from RelevantIntervalSelection import LongestSet
  84. if on == 'idx':
  85. self.LongestSet = LongestSet(self.idx)
  86. elif on == 'medfilt':
  87. self.LongestSet = LongestSet(self.medfiltidx)
  88. class TimeSeries:
  89. """ This is a class for a time series"""
  90. def __init__(self, ts, varname=None):
  91. import numpy as np
  92. self.values = np.array(ts)
  93. self.varname = varname
  94. self.countTimePoints = self.values.shape[0]
  95. self.stationary = None
  96. self.concatTS = None
  97. self.concatIntervals = None
  98. def stationarity_tests(self, autolag='BIC', signiveau = 0.05):
  99. """ Test if time series is stationary using an adf-test"""
  100. from statsmodels.tsa.stattools import adfuller
  101. import numpy as np
  102. X = self.values[~np.isnan(self.values)]
  103. adf = adfuller(X, autolag=autolag)
  104. # Reject null hypothesis (X is not stationary) if p-val < signiveau
  105. testresult = True if adf[1] < signiveau else False
  106. self.stationary = testresult
  107. return testresult
  108. def get_stationartiy_test_result(self):
  109. """ Return if time series is stationary"""
  110. return self.stationary
  111. def concatRelevantIntervalsTS(self, intervals, addvalue=0):
  112. """ Concatenates intervals of a time series """
  113. import numpy as np
  114. concatTS = []
  115. self.concatIntervals = intervals
  116. # Add 'addvalue' on left and right side of interval
  117. addvalue = abs(addvalue)
  118. intervals = intervals + np.array([-addvalue,2*addvalue])
  119. # Check if length of time series was exceeded
  120. intervals[0][0] = 0 if intervals[0][0] < 0 else intervals[0][0]
  121. endLI = intervals[-1][0] + intervals[-1][1]
  122. intervals[-1][1] = intervals[-1][1] - \
  123. (endLI - self.countTimePoints - 1) \
  124. if endLI > self.countTimePoints else intervals[-1][1]
  125. # concatenate intervals / remove irrelevant data
  126. for i in intervals:
  127. concatTS += list(self.values[int(i[0]):int(i[0]+i[1])])
  128. self.concatTS = np.array(concatTS)
  129. class TimeSeriesPair():
  130. """
  131. Class for two time series
  132. """
  133. def __init__(self, X, Y, intervals=None):
  134. self.ts1 = X
  135. self.ts2 = Y
  136. # Parameters for relevant interval selection
  137. self.relInt = {'intervals': None,
  138. 'parameters': None}
  139. self.intervals = intervals
  140. self.check_type()
  141. self.addvalue = None
  142. def check_type(self):
  143. """ Check if provided time series are TimeSeries obejcts"""
  144. if not isinstance(self.ts1, TimeSeries) or not isinstance(self.ts2,
  145. TimeSeries):
  146. raise TypeError('X and Y must be objects from class TimeSeries')
  147. if self.intervals is not None:
  148. if not isinstance(self.intervals, Interval):
  149. raise TypeError('Intervals must be in form [[index, length], '
  150. '...] and of class type Interval')
  151. ###############################################################################
  152. # Compute Relevant Intervals of a pair of time series
  153. ###############################################################################
  154. def SetRelevantIntervals(self, intervallist):
  155. """ Add list of intervals """
  156. self.intervals = intervallist
  157. def ComputeRelevantIntervals(self, shifts, threshold, minLenInts,
  158. maxLenInts, distancemeasure=None):
  159. """ Computes selected relevant intervals of TimeSeriesPair """
  160. from RelevantIntervalSelection import selectRelevantIntervals
  161. relevantIntervals = selectRelevantIntervals(X=self.ts1.values,
  162. Y=self.ts2.values,
  163. shifts=shifts,
  164. threshold=threshold,
  165. minLenInts=minLenInts,
  166. maxLenInts=maxLenInts,
  167. distancemeasure=distancemeasure)
  168. self.relInt['intervals'] = relevantIntervals
  169. self.relInt['parameters'] = {'threshold': threshold,
  170. 'minlen': minLenInts,
  171. 'maxLenInts': maxLenInts,
  172. 'shifts': shifts,
  173. 'distancemeasure': distancemeasure}
  174. def concatRelevantIntervals(self, relInts, addvalue=0):
  175. """ Concatenate selected relevant intervals"""
  176. self.addvalue = addvalue
  177. if relInts is not None:
  178. self.intervals = relInts
  179. if self.intervals is not None:
  180. self.ts1.concatRelevantIntervalsTS(intervals=self.intervals.idx,
  181. addvalue=self.addvalue)
  182. self.ts2.concatRelevantIntervalsTS(intervals=self.intervals.idx,
  183. addvalue=self.addvalue)
  184. else:
  185. print('No intervals for concatenation provided')
  186. # Remove this function as it was not used in thesis
  187. def multivariateGrangerCausality(self, varsX, varsY, onConcat=False):
  188. from statsmodels.tsa.vector_ar.var_model import VARResults
  189. import numpy as np
  190. from statsmodels.tsa.api import VAR
  191. import scipy.stats
  192. if len(varsX) <= 1 or len(varsY) <= 1:
  193. raise TypeError('Multivariate means more than one variable')
  194. # if onConcat is True use concatenated time series
  195. if onConcat:
  196. X = self.ts1.concatTS
  197. Y = self.ts2.concatTS
  198. else:
  199. X = self.ts1.values
  200. Y = self.ts2.values
  201. n = self.ts1.values.shape[1]
  202. # Select variables from data
  203. index = [self.ts1.varNames[x] for x in varsX]
  204. X = np.column_stack(X[index,:])
  205. index = [self.ts2.varNames[x] for x in varsY]
  206. Y = np.column_stack(Y[index,:])
  207. # Compute order
  208. XY = np.column_stack((X, Y))
  209. XY = XY[~np.isnan(XY).any(axis=1)]
  210. order = self.compute_order_wrapper(XY)
  211. # Build multivariate model with X
  212. modelX = VAR(X)
  213. resultX = modelX.fit(order)
  214. sigmaX = np.linalg.det(np.cov(resultX.resid.T))
  215. # Enrich multivariate model with X with Y
  216. modelXY = VAR(XY)
  217. resultXY = modelXY.fit(order)
  218. sigmaXY = np.linalg.det(np.cov(resultXY.resid[:,np.arange(0,len(varsX))].T))
  219. # Results Y granger causes X
  220. Fstats_YgcX = np.log(sigmaX)-np.log(sigmaXY)
  221. Pval_YgcX = scipy.stats.f.cdf(Fstats_YgcX, n, n)
  222. print(Fstats_YgcX, Pval_YgcX)
  223. # Build multivariate model with Y
  224. modelY = VAR(Y)
  225. resultY = modelY.fit(order)
  226. sigmaY = np.linalg.det(np.cov(resultY.resid.T))
  227. # Enrich multivariate model with Y with X
  228. YX = np.column_stack((Y, X))
  229. YX = YX[~np.isnan(YX).any(axis=1)]
  230. modelYX = VAR(YX)
  231. resultYX = modelYX.fit(order)
  232. sigmaYX = np.linalg.det(np.cov(resultYX.resid[:,np.arange(0+len(varsY))].T))
  233. # Results X granger causes Y
  234. Fstats_XgcY = np.log(sigmaY)-np.log(sigmaYX)
  235. Pval_XgcY = scipy.stats.f.cdf(Fstats_XgcY, n, n)
  236. print(Fstats_XgcY, Pval_XgcY)
  237. return Pval_XgcY, Pval_YgcX
  238. def univariateGrangerCausality(self, onRelInts=False,
  239. signiveau=0.05, orderlimit=120):
  240. """
  241. Test for Granger causality in both directions
  242. :param onRelInts: Bool, if true Granger causality is computed on
  243. concatenation of selected relevant intervals
  244. :param signiveau: Level of significance for Granger causality Test
  245. :return: Bool for X GC Y, bool for Y GC X
  246. """
  247. from statsmodels.tsa.api import VAR
  248. from VARModels import compute_order_wrapper
  249. import numpy as np
  250. # if onConcat is True use concatenated time series.
  251. # Therefore TimeSeries object function
  252. # for interval concatenation needs to be called
  253. if onRelInts:
  254. X = self.ts1.concatTS
  255. Y = self.ts2.concatTS
  256. else:
  257. X = self.ts1.values
  258. Y = self.ts2.values
  259. # Compute model order X in column 0, Y in column 1
  260. XY = np.column_stack((X,Y))
  261. XY = XY[~np.isnan(XY).any(axis=1)]
  262. order = compute_order_wrapper(XY.T, min(orderlimit, len(X-10), len(Y-10)))
  263. # Test if X granger causes Y
  264. model = VAR(XY)
  265. result = model.fit(order)
  266. resultXcY = result.test_causality(1, 0, kind='f')
  267. # Test if Y granger causes X
  268. resultYcX = result.test_causality(0, 1, kind='f')
  269. XcY = resultXcY.pvalue < signiveau
  270. YcX = resultYcX.pvalue < signiveau
  271. return XcY, YcX, resultXcY.pvalue, resultYcX.pvalue, order