import numpy as np from TSGeneration import genSynchronizedTimeSeries # from TSPlot import plotTS def mutual_information_short(hgram): s = float(np.sum(hgram)) hgram = hgram / s px = np.sum(hgram, axis=1) py = np.sum(hgram, axis=0) px_py = px[:, None] * py[None, :] nzs = hgram > 0 return np.sum(hgram[nzs] * np.log(hgram[nzs] / px_py[nzs])) def ts_distance(distancemeasure,X,Y,bins): """ Similarity measures that can be called in relevant interval selection approach. Either Peasron correlation or Normalized mutual information. New similarity measures can be added with other elif statements. :param distancemeasure: 'pearson' or 'mutual' :param X: Time series :param Y: Time series :param bins: Bins for mutual information computation :return: returns similarity of X and Y """ import warnings if distancemeasure == 'pearson': with warnings.catch_warnings(): warnings.simplefilter("ignore") r = np.corrcoef(X, Y)[0, 1] elif distancemeasure == 'mutual': #compute best number of bins #breaks program sometimes... #k = 10 #sigma = np.corrcoef(X, Y)[0][1] #if not np.isnan(sigma) or sigma == 1: # N = len(X) # k = int(np.ceil(0.5 + 0.5 * np.sqrt( # 1 + 4 * np.sqrt((6 * N * sigma * sigma) / ( # 1 - (sigma * sigma)))))) # Calculate histograms concat = np.hstack((X, Y)) l, h = np.nanmin(concat), np.nanmax(concat) hgram = np.histogram2d(X, Y, range=[[l, h], [l, h]], bins=bins)[0] r = mutual_information_short(hgram) # Normalize mutual information r = np.sqrt(1 - np.exp(-2 * r)) return r def LongestSet(IntList): """ Calculate longest set of maximal correlated intervals :param IntList: 2d list of intervals, e.g. [[0,2], [1,2], [1,4], [4,3], [7,2], [8,2], [4,3]] :return: numpy array as list of intervals where sum of length of intervals is largest regarding all possible combinations of non-overlapping intervals """ if type(IntList) == np.ndarray: IntList = [(x[0], x[1]) for x in IntList] k = len(IntList) if k == 0: return 0 IntList = np.array(sorted(IntList)) NextInt = {} # Compute immediate starting intervals after i ends for i, Interval in enumerate(IntList): end = Interval[0] + Interval[1] # Interval[0] is the starting frame and Interval[1] is the duration nextidx = IntList[IntList[:, 0] >= end] if nextidx.size > 0: minstart = np.min(nextidx[:, 0]) NextInt[i] = np.argwhere(IntList[:, 0] >= minstart) else: NextInt[i] = None # Calculate cumulated sum iterating by end of the list CumSum = np.zeros(k) SelInts = {} for idx, Interval in enumerate(reversed(IntList)): i = (k - 1) - idx b = NextInt[i] if np.all(b != None): # print( # CumSum[NextInt[i]] + Interval[1]) bestfollower = int(NextInt[i][np.argmax( CumSum[NextInt[i]] + Interval[1])]) CumSum[i] = int(Interval[1] + CumSum[bestfollower]) if Interval[1] + CumSum[bestfollower] >= CumSum[i + 1]: SelInts[i] = bestfollower else: CumSum[i] = Interval[1] SelInts[i] = None # Loop forward Result = np.array([]) current = np.where(CumSum == CumSum.max())[0][-1] while True: intval = IntList[current] Result = np.append(Result, [intval]) current = SelInts[current] if current not in SelInts: break Result = Result.reshape(-1, 2) return Result def LAMINABO(X, Y, threshold, minlen, maxlen, distancemeasure=None): """ Computes longest Set of maximal correlated intervals, as described in Atluri et al. paper. :param X: time series :param Y: times series :param threshold: threshold for interval correlation :param minlen: minimum length of interval :param maxlen: maximum length of interval :return: returns list of intervals in form [start_index, length] of correlated, maximal, distinct intervals """ indexlength = min(len(X), len(Y)) IntMat = np.zeros([indexlength, maxlen-minlen+1], dtype=np.int8) # calculate correlation for minimum interval length winlen = minlen for i in range(indexlength-minlen+1): r = ts_distance(distancemeasure, X[i:i + winlen], Y[i:i + winlen], bins=10) if r >= threshold: IntMat[i, 0] = 1 # calculate correlation for all intervallength > minimumlength for winlen in range(minlen+1, maxlen+1): for i in range(indexlength-winlen+1): if IntMat[i, winlen-1-minlen] == 1 and \ IntMat[i+1, winlen-1-minlen] == 1: r = ts_distance(distancemeasure, X[i:i + winlen], Y[i:i + winlen], bins=10) if r >= threshold: IntMat[i, winlen-minlen] = 1 CorInts = np.where(IntMat == 1) del IntMat CorInts = list(zip(CorInts[0], CorInts[1]+minlen)) # check if correlated intervals are maximal ResultInt = [] for i, lenWin in CorInts: if ((i - 1, lenWin + 1) not in CorInts) and ((i, lenWin + 1) not in CorInts): ResultInt += [(i, lenWin)] del CorInts return ResultInt def CalcLAMINA(X, Y, thresCorrelation, minLenInt, maxLenInt, shiftTS, distancemeasure): """ Shifts time series to left an right and computes Longest set of maximal correlated intervals for each shift :param X: Time series :param Y: Time series :param thresCorrelation: Threshold for similarity measure :param minLenInt: Minimum length of intervals :param maxLenInt: Maximum length of intervals :param shiftTS: Shift time series :param distancemeasure: :return: List of lists. Each list contains maximal correlated intervals of each shift. """ import multiprocessing as mp IntervalsEACH = [] # Outer loop through pairs and conditions num_cores = mp.cpu_count() pool = mp.Pool(num_cores) # argss1 and argss2 are used to get all shifts (no-shift, forward and backward) and all minimum length combinations. argss1 = [(X[shift:], Y[:-shift], thresCorrelation, minLen, maxLenInt, distancemeasure) if shift != 0 else (X, Y, thresCorrelation, minLen, maxLenInt, distancemeasure) for shift in shiftTS for minLen in minLenInt] argss2 = [(X[:-shift], Y[shift:], thresCorrelation, minLen, maxLenInt, distancemeasure) for shift in shiftTS for minLen in minLenInt if shift != 0] args = argss1 + argss2 IntervalsEACH = pool.starmap(LAMINABO, args) pool.close() return IntervalsEACH def selectRelevantIntervals(X, Y, shifts, threshold, minLenInts, maxLenInts, distancemeasure=None): """ Compute selected relevant intervals of two time series :param X: array with time series values :param Y: array with time series values :param shifts: list of values with which X is shifted back and forth in time. :param threshold: Threshold for similarity measures :param minLenInt: Minimum length of intervals :param maxLenInt: Maximum length of intervals :param shiftTS: Shift time series :return: Selected relevant intervals """ IntervalsEACH = CalcLAMINA(X, Y, threshold, minLenInts, maxLenInts, shifts, distancemeasure) if len(IntervalsEACH) > 0: Intervals = [int for intlist in IntervalsEACH for int in intlist] # flatten the array if len(Intervals) > 0: relevant_Intervals = LongestSet(Intervals) return relevant_Intervals if __name__ == '__main__': """ This is an example for relevant interval computation """ # parameters distancemeasure = 'pearson' shifts = [0, 4, 8, 12] threshold = 0.8 minLenInts = [75, 80] maxLenInts = 400 # generate synchronised time series X, Y, syncI = genSynchronizedTimeSeries(lenTS=500) expressions_x = '/home/valapil/Project/ForkCausal_Adithya/test/' Windows = selectRelevantIntervals(X, Y, shifts, threshold, minLenInts, maxLenInts, distancemeasure=distancemeasure) print('Adaptable Windows: \n', Windows) print('Ground truth: \n', syncI) # plotTS(X,Y,CI=Windows,groundtruth=syncI)