|
|
@@ -0,0 +1,258 @@
|
|
|
+import numpy as np
|
|
|
+from TSGeneration import genSynchronizedTimeSeries
|
|
|
+# from TSPlot import plotTS
|
|
|
+
|
|
|
+def mutual_information_short(hgram):
|
|
|
+ s = float(np.sum(hgram))
|
|
|
+ hgram = hgram / s
|
|
|
+ px = np.sum(hgram, axis=1)
|
|
|
+ py = np.sum(hgram, axis=0)
|
|
|
+ px_py = px[:, None] * py[None, :]
|
|
|
+ nzs = hgram > 0
|
|
|
+ return np.sum(hgram[nzs] * np.log(hgram[nzs] / px_py[nzs]))
|
|
|
+
|
|
|
+
|
|
|
+def ts_distance(distancemeasure,X,Y,bins):
|
|
|
+ """
|
|
|
+ Similarity measures that can be called in relevant interval selection
|
|
|
+ approach. Either Peasron correlation or Normalized mutual information.
|
|
|
+ New similarity measures can be added with other elif statements.
|
|
|
+
|
|
|
+ :param distancemeasure: 'pearson' or 'mutual'
|
|
|
+ :param X: Time series
|
|
|
+ :param Y: Time series
|
|
|
+ :param bins: Bins for mutual information computation
|
|
|
+
|
|
|
+ :return: returns similarity of X and Y
|
|
|
+ """
|
|
|
+ import warnings
|
|
|
+ if distancemeasure == 'pearson':
|
|
|
+ with warnings.catch_warnings():
|
|
|
+ warnings.simplefilter("ignore")
|
|
|
+ r = np.corrcoef(X, Y)[0, 1]
|
|
|
+
|
|
|
+ elif distancemeasure == 'mutual':
|
|
|
+
|
|
|
+ #compute best number of bins
|
|
|
+ #breaks program sometimes...
|
|
|
+ #k = 10
|
|
|
+ #sigma = np.corrcoef(X, Y)[0][1]
|
|
|
+ #if not np.isnan(sigma) or sigma == 1:
|
|
|
+ # N = len(X)
|
|
|
+ # k = int(np.ceil(0.5 + 0.5 * np.sqrt(
|
|
|
+ # 1 + 4 * np.sqrt((6 * N * sigma * sigma) / (
|
|
|
+ # 1 - (sigma * sigma))))))
|
|
|
+
|
|
|
+ # Calculate histograms
|
|
|
+ concat = np.hstack((X, Y))
|
|
|
+ l, h = np.nanmin(concat), np.nanmax(concat)
|
|
|
+ hgram = np.histogram2d(X, Y,
|
|
|
+ range=[[l, h], [l, h]],
|
|
|
+ bins=bins)[0]
|
|
|
+ r = mutual_information_short(hgram)
|
|
|
+ # Normalize mutual information
|
|
|
+ r = np.sqrt(1 - np.exp(-2 * r))
|
|
|
+ return r
|
|
|
+
|
|
|
+
|
|
|
+def LongestSet(IntList):
|
|
|
+ """
|
|
|
+ Calculate longest set of maximal correlated intervals
|
|
|
+
|
|
|
+ :param IntList: 2d list of intervals, e.g. [[0,2], [1,2], [1,4], [4,3],
|
|
|
+ [7,2], [8,2], [4,3]]
|
|
|
+
|
|
|
+ :return: numpy array as list of intervals where sum of length of
|
|
|
+ intervals is largest regarding all possible combinations of
|
|
|
+ non-overlapping intervals
|
|
|
+ """
|
|
|
+ if type(IntList) == np.ndarray:
|
|
|
+ IntList = [(x[0], x[1]) for x in IntList]
|
|
|
+
|
|
|
+ k = len(IntList)
|
|
|
+ if k == 0:
|
|
|
+ return 0
|
|
|
+ IntList = np.array(sorted(IntList))
|
|
|
+ NextInt = {}
|
|
|
+
|
|
|
+ # Compute immediate starting intervals after i ends
|
|
|
+ for i, Interval in enumerate(IntList):
|
|
|
+ end = Interval[0] + Interval[1] # Interval[0] is the starting frame and Interval[1] is the duration
|
|
|
+ nextidx = IntList[IntList[:, 0] >= end]
|
|
|
+ if nextidx.size > 0:
|
|
|
+ minstart = np.min(nextidx[:, 0])
|
|
|
+ NextInt[i] = np.argwhere(IntList[:, 0] >= minstart)
|
|
|
+ else:
|
|
|
+ NextInt[i] = None
|
|
|
+
|
|
|
+ # Calculate cumulated sum iterating by end of the list
|
|
|
+ CumSum = np.zeros(k)
|
|
|
+ SelInts = {}
|
|
|
+ for idx, Interval in enumerate(reversed(IntList)):
|
|
|
+ i = (k - 1) - idx
|
|
|
+ b = NextInt[i]
|
|
|
+ if np.all(b != None):
|
|
|
+ # print(
|
|
|
+ # CumSum[NextInt[i]] + Interval[1])
|
|
|
+ bestfollower = int(NextInt[i][np.argmax(
|
|
|
+ CumSum[NextInt[i]] + Interval[1])])
|
|
|
+ CumSum[i] = int(Interval[1] + CumSum[bestfollower])
|
|
|
+ if Interval[1] + CumSum[bestfollower] >= CumSum[i + 1]:
|
|
|
+ SelInts[i] = bestfollower
|
|
|
+ else:
|
|
|
+ CumSum[i] = Interval[1]
|
|
|
+ SelInts[i] = None
|
|
|
+
|
|
|
+ # Loop forward
|
|
|
+ Result = np.array([])
|
|
|
+ current = np.where(CumSum == CumSum.max())[0][-1]
|
|
|
+ while True:
|
|
|
+ intval = IntList[current]
|
|
|
+ Result = np.append(Result, [intval])
|
|
|
+ current = SelInts[current]
|
|
|
+ if current not in SelInts:
|
|
|
+ break
|
|
|
+
|
|
|
+ Result = Result.reshape(-1, 2)
|
|
|
+ return Result
|
|
|
+
|
|
|
+
|
|
|
+def LAMINABO(X, Y, threshold, minlen, maxlen, distancemeasure=None):
|
|
|
+ """
|
|
|
+ Computes longest Set of maximal correlated intervals, as described in
|
|
|
+ Atluri et al. paper.
|
|
|
+
|
|
|
+ :param X: time series
|
|
|
+ :param Y: times series
|
|
|
+ :param threshold: threshold for interval correlation
|
|
|
+ :param minlen: minimum length of interval
|
|
|
+ :param maxlen: maximum length of interval
|
|
|
+
|
|
|
+ :return: returns list of intervals in form [start_index, length] of
|
|
|
+ correlated, maximal, distinct intervals
|
|
|
+ """
|
|
|
+
|
|
|
+ indexlength = min(len(X), len(Y))
|
|
|
+ IntMat = np.zeros([indexlength, maxlen-minlen+1], dtype=np.int8)
|
|
|
+
|
|
|
+ # calculate correlation for minimum interval length
|
|
|
+ winlen = minlen
|
|
|
+ for i in range(indexlength-minlen+1):
|
|
|
+ r = ts_distance(distancemeasure, X[i:i + winlen],
|
|
|
+ Y[i:i + winlen], bins=10)
|
|
|
+ if r >= threshold:
|
|
|
+ IntMat[i, 0] = 1
|
|
|
+
|
|
|
+ # calculate correlation for all intervallength > minimumlength
|
|
|
+ for winlen in range(minlen+1, maxlen+1):
|
|
|
+ for i in range(indexlength-winlen+1):
|
|
|
+ if IntMat[i, winlen-1-minlen] == 1 and \
|
|
|
+ IntMat[i+1, winlen-1-minlen] == 1:
|
|
|
+ r = ts_distance(distancemeasure, X[i:i + winlen],
|
|
|
+ Y[i:i + winlen], bins=10)
|
|
|
+ if r >= threshold:
|
|
|
+ IntMat[i, winlen-minlen] = 1
|
|
|
+
|
|
|
+ CorInts = np.where(IntMat == 1)
|
|
|
+ del IntMat
|
|
|
+ CorInts = list(zip(CorInts[0], CorInts[1]+minlen))
|
|
|
+
|
|
|
+ # check if correlated intervals are maximal
|
|
|
+ ResultInt = []
|
|
|
+ for i, lenWin in CorInts:
|
|
|
+ if ((i - 1, lenWin + 1) not in CorInts) and ((i, lenWin + 1) not in CorInts):
|
|
|
+ ResultInt += [(i, lenWin)]
|
|
|
+
|
|
|
+ del CorInts
|
|
|
+
|
|
|
+ return ResultInt
|
|
|
+
|
|
|
+
|
|
|
+def CalcLAMINA(X, Y, thresCorrelation, minLenInt,
|
|
|
+ maxLenInt, shiftTS, distancemeasure):
|
|
|
+ """
|
|
|
+ Shifts time series to left an right and computes Longest set of maximal
|
|
|
+ correlated intervals for each shift
|
|
|
+
|
|
|
+ :param X: Time series
|
|
|
+ :param Y: Time series
|
|
|
+ :param thresCorrelation: Threshold for similarity measure
|
|
|
+ :param minLenInt: Minimum length of intervals
|
|
|
+ :param maxLenInt: Maximum length of intervals
|
|
|
+ :param shiftTS: Shift time series
|
|
|
+ :param distancemeasure:
|
|
|
+
|
|
|
+ :return: List of lists. Each list contains maximal correlated
|
|
|
+ intervals of each shift.
|
|
|
+ """
|
|
|
+ import multiprocessing as mp
|
|
|
+
|
|
|
+ IntervalsEACH = []
|
|
|
+
|
|
|
+ # Outer loop through pairs and conditions
|
|
|
+ num_cores = mp.cpu_count()
|
|
|
+ pool = mp.Pool(num_cores)
|
|
|
+
|
|
|
+ # argss1 and argss2 are used to get all shifts (no-shift, forward and backward) and all minimum length combinations.
|
|
|
+ argss1 = [(X[shift:], Y[:-shift], thresCorrelation, minLen, maxLenInt, distancemeasure) if shift != 0
|
|
|
+ else (X, Y, thresCorrelation, minLen, maxLenInt, distancemeasure)
|
|
|
+ for shift in shiftTS for minLen in minLenInt]
|
|
|
+ argss2 = [(X[:-shift], Y[shift:], thresCorrelation, minLen, maxLenInt, distancemeasure)
|
|
|
+ for shift in shiftTS for minLen in minLenInt if shift != 0]
|
|
|
+ args = argss1 + argss2
|
|
|
+ IntervalsEACH = pool.starmap(LAMINABO, args)
|
|
|
+ pool.close()
|
|
|
+
|
|
|
+ return IntervalsEACH
|
|
|
+
|
|
|
+
|
|
|
+def selectRelevantIntervals(X, Y, shifts, threshold, minLenInts, maxLenInts,
|
|
|
+ distancemeasure=None):
|
|
|
+ """
|
|
|
+ Compute selected relevant intervals of two time series
|
|
|
+
|
|
|
+ :param X: array with time series values
|
|
|
+ :param Y: array with time series values
|
|
|
+ :param shifts: list of values with which X is shifted back and forth in
|
|
|
+ time.
|
|
|
+ :param threshold: Threshold for similarity measures
|
|
|
+ :param minLenInt: Minimum length of intervals
|
|
|
+ :param maxLenInt: Maximum length of intervals
|
|
|
+ :param shiftTS: Shift time series
|
|
|
+
|
|
|
+ :return: Selected relevant intervals
|
|
|
+ """
|
|
|
+
|
|
|
+ IntervalsEACH = CalcLAMINA(X, Y, threshold, minLenInts, maxLenInts, shifts, distancemeasure)
|
|
|
+ if len(IntervalsEACH) > 0:
|
|
|
+ Intervals = [int for intlist in IntervalsEACH for int in intlist] # flatten the array
|
|
|
+
|
|
|
+ if len(Intervals) > 0:
|
|
|
+ relevant_Intervals = LongestSet(Intervals)
|
|
|
+
|
|
|
+
|
|
|
+ return relevant_Intervals
|
|
|
+
|
|
|
+if __name__ == '__main__':
|
|
|
+ """
|
|
|
+ This is an example for relevant interval computation
|
|
|
+ """
|
|
|
+
|
|
|
+ # parameters
|
|
|
+ distancemeasure = 'pearson'
|
|
|
+ shifts = [0, 4, 8, 12]
|
|
|
+ threshold = 0.8
|
|
|
+ minLenInts = [75, 80]
|
|
|
+ maxLenInts = 400
|
|
|
+
|
|
|
+ # generate synchronised time series
|
|
|
+ X, Y, syncI = genSynchronizedTimeSeries(lenTS=500)
|
|
|
+
|
|
|
+ expressions_x = '/home/valapil/Project/ForkCausal_Adithya/test/'
|
|
|
+
|
|
|
+ Windows = selectRelevantIntervals(X, Y, shifts, threshold, minLenInts,
|
|
|
+ maxLenInts, distancemeasure=distancemeasure)
|
|
|
+
|
|
|
+ print('Adaptable Windows: \n', Windows)
|
|
|
+ print('Ground truth: \n', syncI)
|
|
|
+ # plotTS(X,Y,CI=Windows,groundtruth=syncI)
|