|
@@ -0,0 +1,140 @@
|
|
|
|
|
+
|
|
|
|
|
+def cov(X, p):
|
|
|
|
|
+ """vector autocovariance up to order p
|
|
|
|
|
+ Parameters
|
|
|
|
|
+ ----------
|
|
|
|
|
+ X : ndarray, shape (N, n)
|
|
|
|
|
+ The N time series of length n
|
|
|
|
|
+ Returns
|
|
|
|
|
+ -------
|
|
|
|
|
+ R : ndarray, shape (p + 1, N, N)
|
|
|
|
|
+ The autocovariance up to order p
|
|
|
|
|
+ """
|
|
|
|
|
+ import numpy as np
|
|
|
|
|
+
|
|
|
|
|
+ N, n = X.shape
|
|
|
|
|
+ R = np.zeros((p + 1, N, N))
|
|
|
|
|
+ for k in range(p + 1):
|
|
|
|
|
+ R[k] = (1. / float(n - k)) * np.dot(X[:, :n - k], X[:, k:].T)
|
|
|
|
|
+ return R
|
|
|
|
|
+
|
|
|
|
|
+def mvar_fit(X, p):
|
|
|
|
|
+ """Fit MVAR model of order p using Yule Walker
|
|
|
|
|
+ Parameters
|
|
|
|
|
+ ----------
|
|
|
|
|
+ X : ndarray, shape (N, n)
|
|
|
|
|
+ The N time series of length n
|
|
|
|
|
+ n_fft : int
|
|
|
|
|
+ The length of the FFT
|
|
|
|
|
+ Returns
|
|
|
|
|
+ -------
|
|
|
|
|
+ A : ndarray, shape (p, N, N)
|
|
|
|
|
+ The AR coefficients where N is the number of signals
|
|
|
|
|
+ and p the order of the model.
|
|
|
|
|
+ sigma : array, shape (N,)
|
|
|
|
|
+ The noise for each time series
|
|
|
|
|
+ """
|
|
|
|
|
+ import numpy as np
|
|
|
|
|
+
|
|
|
|
|
+ N, n = X.shape
|
|
|
|
|
+ gamma = cov(X, p) # gamma(r,i,j) cov between X_i(0) et X_j(r)
|
|
|
|
|
+ G = np.zeros((p * N, p * N))
|
|
|
|
|
+ gamma2 = np.concatenate(gamma, axis=0)
|
|
|
|
|
+ gamma2[:N, :N] /= 2.
|
|
|
|
|
+
|
|
|
|
|
+ for i in range(p):
|
|
|
|
|
+ G[N * i:, N * i:N * (i + 1)] = gamma2[:N * (p - i)]
|
|
|
|
|
+
|
|
|
|
|
+ G = G + G.T # big block matrix
|
|
|
|
|
+
|
|
|
|
|
+ gamma4 = np.concatenate(gamma[1:], axis=0)
|
|
|
|
|
+
|
|
|
|
|
+ phi = np.linalg.solve(G, gamma4) # solve Yule Walker
|
|
|
|
|
+
|
|
|
|
|
+ tmp = np.dot(gamma4[:N * p].T, phi)
|
|
|
|
|
+ sigma = gamma[0] - tmp - tmp.T + np.dot(phi.T, np.dot(G, phi))
|
|
|
|
|
+
|
|
|
|
|
+ phi = np.reshape(phi, (p, N, N))
|
|
|
|
|
+ for k in range(p):
|
|
|
|
|
+ phi[k] = phi[k].T
|
|
|
|
|
+
|
|
|
|
|
+ return phi, sigma
|
|
|
|
|
+
|
|
|
|
|
+def compute_order_wrapper(X, p_max):
|
|
|
|
|
+ """Estimate AR order with BIC
|
|
|
|
|
+ Parameters
|
|
|
|
|
+ ----------
|
|
|
|
|
+ X : ndarray, shape (N, n)
|
|
|
|
|
+ The N time series of length n
|
|
|
|
|
+ p_max : int
|
|
|
|
|
+ The maximum model order to test
|
|
|
|
|
+ Returns
|
|
|
|
|
+ -------
|
|
|
|
|
+ p : int
|
|
|
|
|
+ Estimated order
|
|
|
|
|
+ bic : ndarray, shape (p_max + 1,)
|
|
|
|
|
+ The BIC for the orders from 0 to p_max.
|
|
|
|
|
+ """
|
|
|
|
|
+ import numpy as np
|
|
|
|
|
+ import multiprocessing as mp
|
|
|
|
|
+
|
|
|
|
|
+ num_cores = mp.cpu_count()
|
|
|
|
|
+ pool = mp.Pool(num_cores)
|
|
|
|
|
+
|
|
|
|
|
+ args = [(X, p) for p in range(1, p_max + 1)]
|
|
|
|
|
+ bic = pool.starmap(compute_order, args)
|
|
|
|
|
+
|
|
|
|
|
+ pool.close()
|
|
|
|
|
+
|
|
|
|
|
+ bic = np.insert(bic, 0, np.nan)
|
|
|
|
|
+
|
|
|
|
|
+ p = np.nanargmin(bic)
|
|
|
|
|
+
|
|
|
|
|
+ return p
|
|
|
|
|
+
|
|
|
|
|
+def compute_order(X, p):
|
|
|
|
|
+ """Estimate AR order with BIC
|
|
|
|
|
+ Parameters
|
|
|
|
|
+ ----------
|
|
|
|
|
+ X : ndarray, shape (N, n)
|
|
|
|
|
+ The N time series of length n
|
|
|
|
|
+ p_max : int
|
|
|
|
|
+ The maximum model order to test
|
|
|
|
|
+ Returns
|
|
|
|
|
+ -------
|
|
|
|
|
+ p : int
|
|
|
|
|
+ Estimated order
|
|
|
|
|
+ bic : ndarray, shape (p_max + 1,)
|
|
|
|
|
+ The BIC for the orders from 0 to p_max.
|
|
|
|
|
+ """
|
|
|
|
|
+ from numpy import linalg
|
|
|
|
|
+ import numpy as np
|
|
|
|
|
+ import math
|
|
|
|
|
+ import warnings
|
|
|
|
|
+
|
|
|
|
|
+ with warnings.catch_warnings():
|
|
|
|
|
+ warnings.simplefilter("ignore")
|
|
|
|
|
+ N, n = X.shape
|
|
|
|
|
+ Y = X.T
|
|
|
|
|
+ bic_p = np.nan
|
|
|
|
|
+
|
|
|
|
|
+ try:
|
|
|
|
|
+ A, sigma = mvar_fit(X, p)
|
|
|
|
|
+ A_2d = np.concatenate(A, axis=1)
|
|
|
|
|
+
|
|
|
|
|
+ n_samples = n - p
|
|
|
|
|
+ bic_p = n_samples * N * math.log(2. * math.pi)
|
|
|
|
|
+ bic_p += (n_samples * np.log(linalg.det(sigma)))
|
|
|
|
|
+ bic_p += p * (N ** 2) * math.log(n_samples)
|
|
|
|
|
+
|
|
|
|
|
+ sigma_inv = linalg.inv(sigma)
|
|
|
|
|
+ S = 0.
|
|
|
|
|
+ for i in range(p, n):
|
|
|
|
|
+ res = Y[i] - np.dot(A_2d, Y[i - p:i][::-1, :].ravel())
|
|
|
|
|
+ S += np.dot(res, sigma_inv.dot(res))
|
|
|
|
|
+
|
|
|
|
|
+ bic_p += S
|
|
|
|
|
+ except:
|
|
|
|
|
+ pass
|
|
|
|
|
+
|
|
|
|
|
+ return bic_p
|