| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439 |
- import numpy as np
- import pandas as pd
- import math
- from scipy.signal import medfilt
- import matplotlib.pyplot as plt
- def standardize(x):
- """
- standardize array, x-mu / sigma (mu: mean, sigma: standard deviation)
- :param x: pandas series
- :return: standardized array or series
- """
- mu = np.mean(x, axis=0)
- sigma = np.std(x, axis=0)
- return (x - mu) / sigma
- def smoothing(x):
- df = pd.DataFrame(x)
- df = df.rolling(window=5, center=True).median()
- df = df.rolling(window=5, win_type='gaussian', center=True).mean(std=20)
- return df.to_numpy()
- def intervalsToBinary(a, tslength):
- """ Transform list of intervals to an binary array of length tslength,
- where 1 is interval and 0 is no interval.
- E.g. self.idx=[(3,4)], tslength=10 returns [0,0,0,1,1,1,1,0,0,0]"""
- gt = np.zeros(tslength)
- if len(a) > 0:
- for i in a:
- gt[int(i[0]):int(i[0]) + int(i[1])] = 1
- return gt
- def binaryToIntervals(array):
- """ Transform binary array, where 1 is interval and 0 is no interval,
- to list of intervals.
- E.g. array=[0,0,0,1,1,1,1,0,0,0] returns [(3,4)]"""
- intervals = []
- start = np.where(array == 1)[0][0]
- current = np.where(array == 1)[0][0]
- count = 1
- for val in np.where(array == 1)[0][1:]:
- if val-1 == current and val != len(array)-1:
- current = val
- count += 1
- elif val-1 == current and val == len(array)-1:
- count += 1
- intervals += [start, count]
- else:
- intervals += [start, count]
- start = val
- current = val
- count = 1
- intervals = np.array(intervals).reshape(-1,2)
- return intervals
- def median_filter_intervals(a, kernel):
- tslength = int(np.sum(a[-1]))
- binary = intervalsToBinary(a, tslength)
- filteredintervals = medfilt(binary, kernel_size=kernel)
- filteredintervals = np.maximum.reduce([filteredintervals, binary])
- medfiltidx = binaryToIntervals(filteredintervals)
- lengthmedfilt = np.sum(np.array(medfiltidx)[:,1])
- countmedfilt = len(medfiltidx)
- return medfiltidx
- def arguments(X, Y, thresCorrelation, minLenInt,
- maxLenInt, shiftTS, distancemeasure):
- # argss1 and argss2 are used to get all shifts (no-shift, forward and backward) and all minimum length combinations.
- argss1 = [(X[shift:], Y[:-shift], thresCorrelation, minLen, maxLenInt, distancemeasure) if shift != 0
- else (X, Y, thresCorrelation, minLen, maxLenInt, distancemeasure)
- for shift in shiftTS for minLen in minLenInt]
- argss2 = [(X[:-shift], Y[shift:], thresCorrelation, minLen, maxLenInt, distancemeasure)
- for shift in shiftTS for minLen in minLenInt if shift != 0]
- args = argss1 + argss2
- return args
- def mutual_information_short(hgram):
- s = float(np.sum(hgram))
- hgram = hgram / s
- px = np.sum(hgram, axis=1)
- py = np.sum(hgram, axis=0)
- px_py = px[:, None] * py[None, :]
- nzs = hgram > 0
- return np.sum(hgram[nzs] * np.log(hgram[nzs] / px_py[nzs]))
- def ts_distance(distancemeasure,X,Y,bins):
- """
- Similarity measures that can be called in relevant interval selection
- approach. Either Peasron correlation or Normalized mutual information.
- New similarity measures can be added with other elif statements.
- :param distancemeasure: 'pearson' or 'mutual'
- :param X: Time series
- :param Y: Time series
- :param bins: Bins for mutual information computation
- :return: returns similarity of X and Y
- """
- import warnings
- if distancemeasure == 'pearson':
- with warnings.catch_warnings():
- warnings.simplefilter("ignore")
- r = np.corrcoef(X, Y)[0, 1]
- # k, _ = pearsonr(X, Y)
- elif distancemeasure == 'mutual':
- #compute best number of bins
- #breaks program sometimes...
- #k = 10
- #sigma = np.corrcoef(X, Y)[0][1]
- #if not np.isnan(sigma) or sigma == 1:
- # N = len(X)
- # k = int(np.ceil(0.5 + 0.5 * np.sqrt(
- # 1 + 4 * np.sqrt((6 * N * sigma * sigma) / (
- # 1 - (sigma * sigma))))))
- # Calculate histograms
- concat = np.hstack((X, Y))
- l, h = np.nanmin(concat), np.nanmax(concat)
- hgram = np.histogram2d(X, Y, range=[[l, h], [l, h]], bins=bins)[0]
- r = mutual_information_short(hgram)
- # Normalize mutual information
- r = np.sqrt(1 - np.exp(-2 * r))
- return r
- def LongestSet(IntList):
- """
- Calculate longest set of maximal correlated intervals
- :param IntList: 2d list of intervals, e.g. [[0,2], [1,2], [1,4], [4,3],
- [7,2], [8,2], [4,3]]
- :return: numpy array as list of intervals where sum of length of
- intervals is largest regarding all possible combinations of
- non-overlapping intervals
- """
- if type(IntList) == np.ndarray:
- IntList = [(x[0], x[1]) for x in IntList]
- k = len(IntList)
- if k == 0:
- return 0
- IntList = np.array(sorted(IntList))
- NextInt = {}
- # Compute immediate starting intervals after i ends
- for i, Interval in enumerate(IntList):
- end = Interval[0] + Interval[1] # Interval[0] is the starting frame and Interval[1] is the duration
- nextidx = IntList[IntList[:, 0] >= end]
- if nextidx.size > 0:
- minstart = np.min(nextidx[:, 0])
- NextInt[i] = np.argwhere(IntList[:, 0] >= minstart)
- else:
- NextInt[i] = None
- # Calculate cumulated sum iterating by end of the list
- CumSum = np.zeros(k)
- SelInts = {}
- for idx, Interval in enumerate(reversed(IntList)):
- i = (k - 1) - idx
- b = NextInt[i]
- if np.all(b != None):
- # print(
- # CumSum[NextInt[i]] + Interval[1])
- bestfollower = int(NextInt[i][np.argmax(
- CumSum[NextInt[i]] + Interval[1])])
- CumSum[i] = int(Interval[1] + CumSum[bestfollower])
- if Interval[1] + CumSum[bestfollower] >= CumSum[i + 1]:
- SelInts[i] = bestfollower
- else:
- CumSum[i] = Interval[1]
- SelInts[i] = None
- # Loop forward
- Result = np.array([])
- current = np.where(CumSum == CumSum.max())[0][-1]
- while True:
- intval = IntList[current]
- Result = np.append(Result, [intval])
- current = SelInts[current]
- if current not in SelInts:
- break
- Result = Result.reshape(-1, 2)
- return Result
- def glamina(X: np.ndarray, Y: np.ndarray, threshold: float, l_min: list[int], l_max: int, col_x: list[int] | None =None, col_y: list[int] | None =None, shifts: list[int] | None =None, distancemeasure: str ='pearson', bidirectional: bool = False):
- """
- Finds the relevant intervals among multiple time series based on correlation between them.
- :param X: whole data of subject 1 with columns holding each time series. By default, initial time series
- :param Y: whole data of subject 2 with columns holding each time series. By default, following (reaction) time series
- :param threshold: threshold value starting from which the correlation is significant
- :param l_min: list of minimum length of continuous relevant intervals to be considered
- :param l_max: maximum length of continuous relevant intervals
- :param col_x: list of selected columns in subject 1. By default, takes in all columns.
- :param col_y: list of selected columns in subject 2 with respect to col_x. By default, takes in all columns.
- :param shifts: list of values with which X is shifted back and forth in time.
- :param distancemeasure: correlation metric. By default, Pearson's correlation
- :param bidirectional: True if the influence of both subjects on each other are relevant. By default, influence of subject 1 on 2 is only considered
- :return: List of lists containing selected relevant intervals as [starting time, duration].
- """
- # error handling
- if (len(col_x) != len(col_y)):
- raise ValueError("Missing respective column indices in function call. There is a shape mismatch.")
- if shifts == None:
- shifts = [0]
- # to select the columns
- if (col_x != None and col_y != None):
- X = X[:, col_x]
- Y = Y[:, col_y]
- interval = []
- Result = []
- laminaresult = []
- x = []
- y = []
- # loop for each minimum interval length
- for m in l_min:
- if bidirectional:
- d1 = [[X[s:, :], Y[:-s, :]] if s != 0 else [X, Y] for s in shifts]
- d2 = [[X[:-s, :], Y[s:, :]] for s in shifts if s != 0]
- d = d1 + d2
- elif not bidirectional:
- d = [[X[:-s, :], Y[s:, :]] for s in shifts if s != 0]
- for X, Y in d:
- indexlength = min(X.shape[0], Y.shape[0])
- IntMat = np.zeros([indexlength, l_max - m + 1], dtype=np.float64)
- # calculate correlation for minimum interval length
- winlen = m
- n_channel = X.shape[1]
- # j = 0
- # loop to average correlation for minimum interval length
- for col in range(0, n_channel):
- x = X[:, col]
- y = Y[:, col]
- for i in range(indexlength - m + 1):
- r = ts_distance(distancemeasure, x[i:i + winlen],
- y[i:i + winlen], bins=10)
- if not math.isnan(r):
- IntMat[i, 0] += r
- # j +=1
- IntMat = IntMat/n_channel
- # loop to average correlation for intervals above minimum interval
- for col in range(0, n_channel):
- for winlen in range(m + 1, l_max + 1):
- for i in range(indexlength - winlen + 1):
- if IntMat[i, winlen - 1 - m] >= threshold and \
- IntMat[i + 1, winlen - 1 - m] >= threshold:
- r = ts_distance(distancemeasure, x[i:i + winlen],
- y[i:i + winlen], bins=10)
- if not math.isnan(r):
- IntMat[i, winlen - m] += r
- IntMat = IntMat[:, 1:] / n_channel
- CorInts = np.where(IntMat >= threshold)
- del IntMat
- CorInts = list(zip(CorInts[0], CorInts[1] + m))
- # check if correlated intervals are maximal
- ResultInt = []
- for i, lenWin in CorInts:
- if ((i - 1, lenWin + 1) not in CorInts) and ((i, lenWin + 1) not in CorInts):
- ResultInt += [(i, lenWin)]
- del CorInts
- interval.append(ResultInt)
- if len(interval) > 0:
- Result = [int for intlist in interval for int in intlist] # flatten the array
- if len(Result) > 0:
- laminaresult = LongestSet(Result) # find the longest set of intervals from Result
- return laminaresult
- def gen_series(lenTS=1000, start=50, end=300, amplitude=50, noise=1, seed=10):
- # step_signal = np.zeros(lenTS)
- # # step_signal[start:end+1] = amplitude
- # step_signal[start:end+1] = np.linspace(5, amplitude, end-start+1)
- step_signal = np.linspace(5, amplitude, lenTS)
- step_signal[:start] = step_signal[end+1:] = 0
- if noise !=0:
- np.random.seed(seed)
- noise = noise * np.random.randn(lenTS)
- s1 = step_signal + noise
- return s1
- if __name__ == "__main__":
- # np.random.seed(10)
- # shifts = [0]
- # threshold = 0.99
- # minLenInts = [70]
- # maxLenInts = 200
- # distancemeasure = 'pearson'
- #
- # medfiltlength = 51
- #
- # path = '/home/datasets4/fNIRS/fNIRSData/MI-003/Sub1/Sub1_preprocessed.csv'
- # df = pd.read_csv(path)
- # data_1 = df.to_numpy()
- #
- # path = '/home/datasets4/fNIRS/fNIRSData/MI-003/Sub2/Sub2_preprocessed.csv'
- # df = pd.read_csv(path)
- # data_2 = df.to_numpy()
- #
- # data1 = standardize(data_1)
- # data2 = standardize(data_2)
- #
- # data1 = smoothing(data1)
- # data2 = smoothing(data2)
- shifts = [0] # shifts that must be considered
- threshold = 0.87 # (imp) depends on the correlation values
- minLenInts = [50] # (imp) depends on the minimum continuous interval that must be detected
- maxLenInts = 250 # (less relevant) depends on the max continuous interval
- distancemeasure = 'pearson'
- medfiltlength = 51
- S1 = gen_series(start=100, end=300, amplitude=20, noise=1, seed=10)
- R1 = gen_series(start=120, end=320, amplitude=20, noise=1, seed=11)
- S2 = gen_series(start=90, end=280, amplitude=20, noise=1, seed=12)
- R2 = gen_series(start=200, end=260, amplitude=20, noise=1, seed=13)
- data1 = np.column_stack((S1, S2))
- data2 = np.column_stack((R1, R2))
- result = glamina(data1, data2, threshold=threshold, l_min=minLenInts, l_max=maxLenInts, col_x=[0,1], col_y=[0,1], shifts=shifts, distancemeasure=distancemeasure)
- Result = median_filter_intervals(result, kernel=medfiltlength)
- # import matplotlib.pyplot as plt
- # plt.subplot(5, 1, 1)
- # plt.plot(np.arange(0, len(S1)), np.array(S1), label='x1', color='k')
- # plt.legend(loc="upper left")
- # plt.subplot(5, 1, 2)
- # plt.plot(np.arange(0, len(S2)), np.array(S2), label='x2', color='b')
- # plt.legend(loc="upper left")
- # plt.subplot(5, 1, 3)
- # plt.plot(np.arange(0, len(R1)), np.array(R1), label='y1', color='k')
- # plt.legend(loc="upper left")
- # plt.subplot(5, 1, 4)
- # plt.plot(np.arange(0, len(R2)), np.array(R2), label='y2', color='b')
- # plt.legend(loc="upper left")
- # plt.subplot(5, 1, 5)
- # plt.plot(np.arange(0, len(R2)), np.array(gen_series(start=200, end=260, amplitude=30, noise=0)),
- # label='ground truth', color='g')
- # if len(Result) > 0:
- # for idx, i in enumerate(Result):
- # plt.axvspan(i[0], (i[0] + i[1] - 1), facecolor='r', alpha=0.5,
- # label=f'detection\nth:{threshold}\nmin:{minLenInts}' if idx == 0 else "")
- # plt.legend(loc="upper left")
- # plt.show()
- # S1 = data1[:, 6]
- # S2 = data1[:, 7]
- # S3 = data1[:, 8]
- # R1 = data2[:, 6]
- # R2 = data2[:, 7]
- # R3 = data2[:, 8]
- #
- # S4 = data1[:, 12]
- # R4 = data2[:, 12]
- #
- # plt.subplot(7, 1, 1)
- # plt.plot(np.arange(0, len(S1)), np.array(S1), label='x1', color='k') # color = bgrcmykw
- # plt.legend(loc="upper left")
- # plt.subplot(7, 1, 2)
- # plt.plot(np.arange(0, len(S2)), np.array(S2), label='x2', color='b')
- # plt.legend(loc="upper left")
- # plt.subplot(7, 1, 3)
- # plt.plot(np.arange(0, len(S3)), np.array(S3), label='x3', color='m')
- # plt.legend(loc="upper left")
- # plt.subplot(7, 1, 4)
- # plt.plot(np.arange(0, len(R1)), np.array(R1), label='y1', color='k')
- # plt.legend(loc="upper left")
- # plt.subplot(7, 1, 5)
- # plt.plot(np.arange(0, len(R2)), np.array(R2), label='y2', color='b')
- # plt.legend(loc="upper left")
- # plt.subplot(7, 1, 6)
- # plt.plot(np.arange(0, len(R3)), np.array(R3), label='y3', color='m')
- # plt.legend(loc="upper left")
- #
- # path = '/home/valapil/Project/fnirs_file/annotations/MI-003_clean.csv'
- # df = pd.read_csv(path)
- # truth = df.to_numpy()
- # ind = np.where(truth[:, 1] != 10)[0]
- # sig = np.zeros(data1.shape[0])
- # a = truth[ind, :].astype(np.int16)
- # for s, e in a[:, [0, 2]]:
- # sig[s:e + 1] = 1
- # plt.subplot(7, 1, 7)
- # plt.plot(np.arange(0, len(sig)), sig, label='ground truth', color='g')
- #
- # if len(Result) > 0:
- # for idx, i in enumerate(Result):
- # plt.axvspan(i[0], (i[0] + i[1] - 1), facecolor='r', alpha=0.5,
- # label=f'detection\nth:{threshold}\nmin:{minLenInts}' if idx == 0 else "")
- # plt.legend(loc="upper left")
- plt.plot(np.arange(0, len(S1)), np.array(S1), label='x1', color='k')
- if len(Result) > 0:
- for idx, i in enumerate(Result):
- plt.axvspan(i[0], (i[0] + i[1] - 1), facecolor='r', alpha=0.5,
- label=f'detection\nth:{threshold}\nmin:{minLenInts}' if idx == 0 else "")
- plt.legend(loc="upper left")
- print(Result)
- plt.show()
|