| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258 |
- import numpy as np
- from TSGeneration import genSynchronizedTimeSeries
- # from TSPlot import plotTS
- def mutual_information_short(hgram):
- s = float(np.sum(hgram))
- hgram = hgram / s
- px = np.sum(hgram, axis=1)
- py = np.sum(hgram, axis=0)
- px_py = px[:, None] * py[None, :]
- nzs = hgram > 0
- return np.sum(hgram[nzs] * np.log(hgram[nzs] / px_py[nzs]))
- def ts_distance(distancemeasure,X,Y,bins):
- """
- Similarity measures that can be called in relevant interval selection
- approach. Either Peasron correlation or Normalized mutual information.
- New similarity measures can be added with other elif statements.
- :param distancemeasure: 'pearson' or 'mutual'
- :param X: Time series
- :param Y: Time series
- :param bins: Bins for mutual information computation
- :return: returns similarity of X and Y
- """
- import warnings
- if distancemeasure == 'pearson':
- with warnings.catch_warnings():
- warnings.simplefilter("ignore")
- r = np.corrcoef(X, Y)[0, 1]
- elif distancemeasure == 'mutual':
- #compute best number of bins
- #breaks program sometimes...
- #k = 10
- #sigma = np.corrcoef(X, Y)[0][1]
- #if not np.isnan(sigma) or sigma == 1:
- # N = len(X)
- # k = int(np.ceil(0.5 + 0.5 * np.sqrt(
- # 1 + 4 * np.sqrt((6 * N * sigma * sigma) / (
- # 1 - (sigma * sigma))))))
- # Calculate histograms
- concat = np.hstack((X, Y))
- l, h = np.nanmin(concat), np.nanmax(concat)
- hgram = np.histogram2d(X, Y,
- range=[[l, h], [l, h]],
- bins=bins)[0]
- r = mutual_information_short(hgram)
- # Normalize mutual information
- r = np.sqrt(1 - np.exp(-2 * r))
- return r
- def LongestSet(IntList):
- """
- Calculate longest set of maximal correlated intervals
- :param IntList: 2d list of intervals, e.g. [[0,2], [1,2], [1,4], [4,3],
- [7,2], [8,2], [4,3]]
- :return: numpy array as list of intervals where sum of length of
- intervals is largest regarding all possible combinations of
- non-overlapping intervals
- """
- if type(IntList) == np.ndarray:
- IntList = [(x[0], x[1]) for x in IntList]
- k = len(IntList)
- if k == 0:
- return 0
- IntList = np.array(sorted(IntList))
- NextInt = {}
- # Compute immediate starting intervals after i ends
- for i, Interval in enumerate(IntList):
- end = Interval[0] + Interval[1] # Interval[0] is the starting frame and Interval[1] is the duration
- nextidx = IntList[IntList[:, 0] >= end]
- if nextidx.size > 0:
- minstart = np.min(nextidx[:, 0])
- NextInt[i] = np.argwhere(IntList[:, 0] >= minstart)
- else:
- NextInt[i] = None
- # Calculate cumulated sum iterating by end of the list
- CumSum = np.zeros(k)
- SelInts = {}
- for idx, Interval in enumerate(reversed(IntList)):
- i = (k - 1) - idx
- b = NextInt[i]
- if np.all(b != None):
- # print(
- # CumSum[NextInt[i]] + Interval[1])
- bestfollower = int(NextInt[i][np.argmax(
- CumSum[NextInt[i]] + Interval[1])])
- CumSum[i] = int(Interval[1] + CumSum[bestfollower])
- if Interval[1] + CumSum[bestfollower] >= CumSum[i + 1]:
- SelInts[i] = bestfollower
- else:
- CumSum[i] = Interval[1]
- SelInts[i] = None
- # Loop forward
- Result = np.array([])
- current = np.where(CumSum == CumSum.max())[0][-1]
- while True:
- intval = IntList[current]
- Result = np.append(Result, [intval])
- current = SelInts[current]
- if current not in SelInts:
- break
- Result = Result.reshape(-1, 2)
- return Result
- def LAMINABO(X, Y, threshold, minlen, maxlen, distancemeasure=None):
- """
- Computes longest Set of maximal correlated intervals, as described in
- Atluri et al. paper.
- :param X: time series
- :param Y: times series
- :param threshold: threshold for interval correlation
- :param minlen: minimum length of interval
- :param maxlen: maximum length of interval
- :return: returns list of intervals in form [start_index, length] of
- correlated, maximal, distinct intervals
- """
- indexlength = min(len(X), len(Y))
- IntMat = np.zeros([indexlength, maxlen-minlen+1], dtype=np.int8)
- # calculate correlation for minimum interval length
- winlen = minlen
- for i in range(indexlength-minlen+1):
- r = ts_distance(distancemeasure, X[i:i + winlen],
- Y[i:i + winlen], bins=10)
- if r >= threshold:
- IntMat[i, 0] = 1
- # calculate correlation for all intervallength > minimumlength
- for winlen in range(minlen+1, maxlen+1):
- for i in range(indexlength-winlen+1):
- if IntMat[i, winlen-1-minlen] == 1 and \
- IntMat[i+1, winlen-1-minlen] == 1:
- r = ts_distance(distancemeasure, X[i:i + winlen],
- Y[i:i + winlen], bins=10)
- if r >= threshold:
- IntMat[i, winlen-minlen] = 1
- CorInts = np.where(IntMat == 1)
- del IntMat
- CorInts = list(zip(CorInts[0], CorInts[1]+minlen))
- # check if correlated intervals are maximal
- ResultInt = []
- for i, lenWin in CorInts:
- if ((i - 1, lenWin + 1) not in CorInts) and ((i, lenWin + 1) not in CorInts):
- ResultInt += [(i, lenWin)]
- del CorInts
- return ResultInt
- def CalcLAMINA(X, Y, thresCorrelation, minLenInt,
- maxLenInt, shiftTS, distancemeasure):
- """
- Shifts time series to left an right and computes Longest set of maximal
- correlated intervals for each shift
- :param X: Time series
- :param Y: Time series
- :param thresCorrelation: Threshold for similarity measure
- :param minLenInt: Minimum length of intervals
- :param maxLenInt: Maximum length of intervals
- :param shiftTS: Shift time series
- :param distancemeasure:
- :return: List of lists. Each list contains maximal correlated
- intervals of each shift.
- """
- import multiprocessing as mp
- IntervalsEACH = []
- # Outer loop through pairs and conditions
- num_cores = mp.cpu_count()
- pool = mp.Pool(num_cores)
- # argss1 and argss2 are used to get all shifts (no-shift, forward and backward) and all minimum length combinations.
- argss1 = [(X[shift:], Y[:-shift], thresCorrelation, minLen, maxLenInt, distancemeasure) if shift != 0
- else (X, Y, thresCorrelation, minLen, maxLenInt, distancemeasure)
- for shift in shiftTS for minLen in minLenInt]
- argss2 = [(X[:-shift], Y[shift:], thresCorrelation, minLen, maxLenInt, distancemeasure)
- for shift in shiftTS for minLen in minLenInt if shift != 0]
- args = argss1 + argss2
- IntervalsEACH = pool.starmap(LAMINABO, args)
- pool.close()
- return IntervalsEACH
- def selectRelevantIntervals(X, Y, shifts, threshold, minLenInts, maxLenInts,
- distancemeasure=None):
- """
- Compute selected relevant intervals of two time series
- :param X: array with time series values
- :param Y: array with time series values
- :param shifts: list of values with which X is shifted back and forth in
- time.
- :param threshold: Threshold for similarity measures
- :param minLenInt: Minimum length of intervals
- :param maxLenInt: Maximum length of intervals
- :param shiftTS: Shift time series
- :return: Selected relevant intervals
- """
- IntervalsEACH = CalcLAMINA(X, Y, threshold, minLenInts, maxLenInts, shifts, distancemeasure)
- if len(IntervalsEACH) > 0:
- Intervals = [int for intlist in IntervalsEACH for int in intlist] # flatten the array
- if len(Intervals) > 0:
- relevant_Intervals = LongestSet(Intervals)
- return relevant_Intervals
- if __name__ == '__main__':
- """
- This is an example for relevant interval computation
- """
- # parameters
- distancemeasure = 'pearson'
- shifts = [0, 4, 8, 12]
- threshold = 0.8
- minLenInts = [75, 80]
- maxLenInts = 400
- # generate synchronised time series
- X, Y, syncI = genSynchronizedTimeSeries(lenTS=500)
- expressions_x = '/home/valapil/Project/ForkCausal_Adithya/test/'
- Windows = selectRelevantIntervals(X, Y, shifts, threshold, minLenInts,
- maxLenInts, distancemeasure=distancemeasure)
- print('Adaptable Windows: \n', Windows)
- print('Ground truth: \n', syncI)
- # plotTS(X,Y,CI=Windows,groundtruth=syncI)
|