def cov(X, p): """vector autocovariance up to order p Parameters ---------- X : ndarray, shape (N, n) The N time series of length n Returns ------- R : ndarray, shape (p + 1, N, N) The autocovariance up to order p """ import numpy as np N, n = X.shape R = np.zeros((p + 1, N, N)) for k in range(p + 1): R[k] = (1. / float(n - k)) * np.dot(X[:, :n - k], X[:, k:].T) return R def mvar_fit(X, p): """Fit MVAR model of order p using Yule Walker Parameters ---------- X : ndarray, shape (N, n) The N time series of length n n_fft : int The length of the FFT Returns ------- A : ndarray, shape (p, N, N) The AR coefficients where N is the number of signals and p the order of the model. sigma : array, shape (N,) The noise for each time series """ import numpy as np N, n = X.shape gamma = cov(X, p) # gamma(r,i,j) cov between X_i(0) et X_j(r) G = np.zeros((p * N, p * N)) gamma2 = np.concatenate(gamma, axis=0) gamma2[:N, :N] /= 2. for i in range(p): G[N * i:, N * i:N * (i + 1)] = gamma2[:N * (p - i)] G = G + G.T # big block matrix gamma4 = np.concatenate(gamma[1:], axis=0) phi = np.linalg.solve(G, gamma4) # solve Yule Walker tmp = np.dot(gamma4[:N * p].T, phi) sigma = gamma[0] - tmp - tmp.T + np.dot(phi.T, np.dot(G, phi)) phi = np.reshape(phi, (p, N, N)) for k in range(p): phi[k] = phi[k].T return phi, sigma def compute_order_wrapper(X, p_max): """Estimate AR order with BIC Parameters ---------- X : ndarray, shape (N, n) The N time series of length n p_max : int The maximum model order to test Returns ------- p : int Estimated order bic : ndarray, shape (p_max + 1,) The BIC for the orders from 0 to p_max. """ import numpy as np import multiprocessing as mp num_cores = mp.cpu_count() pool = mp.Pool(num_cores) args = [(X, p) for p in range(1, p_max + 1)] bic = pool.starmap(compute_order, args) pool.close() bic = np.insert(bic, 0, np.nan) p = np.nanargmin(bic) return p def compute_order(X, p): """Estimate AR order with BIC Parameters ---------- X : ndarray, shape (N, n) The N time series of length n p_max : int The maximum model order to test Returns ------- p : int Estimated order bic : ndarray, shape (p_max + 1,) The BIC for the orders from 0 to p_max. """ from numpy import linalg import numpy as np import math import warnings with warnings.catch_warnings(): warnings.simplefilter("ignore") N, n = X.shape Y = X.T bic_p = np.nan try: A, sigma = mvar_fit(X, p) A_2d = np.concatenate(A, axis=1) n_samples = n - p bic_p = n_samples * N * math.log(2. * math.pi) bic_p += (n_samples * np.log(linalg.det(sigma))) bic_p += p * (N ** 2) * math.log(n_samples) sigma_inv = linalg.inv(sigma) S = 0. for i in range(p, n): res = Y[i] - np.dot(A_2d, Y[i - p:i][::-1, :].ravel()) S += np.dot(res, sigma_inv.dot(res)) bic_p += S except: pass return bic_p