| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140 |
- def cov(X, p):
- """vector autocovariance up to order p
- Parameters
- ----------
- X : ndarray, shape (N, n)
- The N time series of length n
- Returns
- -------
- R : ndarray, shape (p + 1, N, N)
- The autocovariance up to order p
- """
- import numpy as np
- N, n = X.shape
- R = np.zeros((p + 1, N, N))
- for k in range(p + 1):
- R[k] = (1. / float(n - k)) * np.dot(X[:, :n - k], X[:, k:].T)
- return R
- def mvar_fit(X, p):
- """Fit MVAR model of order p using Yule Walker
- Parameters
- ----------
- X : ndarray, shape (N, n)
- The N time series of length n
- n_fft : int
- The length of the FFT
- Returns
- -------
- A : ndarray, shape (p, N, N)
- The AR coefficients where N is the number of signals
- and p the order of the model.
- sigma : array, shape (N,)
- The noise for each time series
- """
- import numpy as np
- N, n = X.shape
- gamma = cov(X, p) # gamma(r,i,j) cov between X_i(0) et X_j(r)
- G = np.zeros((p * N, p * N))
- gamma2 = np.concatenate(gamma, axis=0)
- gamma2[:N, :N] /= 2.
- for i in range(p):
- G[N * i:, N * i:N * (i + 1)] = gamma2[:N * (p - i)]
- G = G + G.T # big block matrix
- gamma4 = np.concatenate(gamma[1:], axis=0)
- phi = np.linalg.solve(G, gamma4) # solve Yule Walker
- tmp = np.dot(gamma4[:N * p].T, phi)
- sigma = gamma[0] - tmp - tmp.T + np.dot(phi.T, np.dot(G, phi))
- phi = np.reshape(phi, (p, N, N))
- for k in range(p):
- phi[k] = phi[k].T
- return phi, sigma
- def compute_order_wrapper(X, p_max):
- """Estimate AR order with BIC
- Parameters
- ----------
- X : ndarray, shape (N, n)
- The N time series of length n
- p_max : int
- The maximum model order to test
- Returns
- -------
- p : int
- Estimated order
- bic : ndarray, shape (p_max + 1,)
- The BIC for the orders from 0 to p_max.
- """
- import numpy as np
- import multiprocessing as mp
- num_cores = mp.cpu_count()
- pool = mp.Pool(num_cores)
- args = [(X, p) for p in range(1, p_max + 1)]
- bic = pool.starmap(compute_order, args)
- pool.close()
- bic = np.insert(bic, 0, np.nan)
- p = np.nanargmin(bic)
- return p
- def compute_order(X, p):
- """Estimate AR order with BIC
- Parameters
- ----------
- X : ndarray, shape (N, n)
- The N time series of length n
- p_max : int
- The maximum model order to test
- Returns
- -------
- p : int
- Estimated order
- bic : ndarray, shape (p_max + 1,)
- The BIC for the orders from 0 to p_max.
- """
- from numpy import linalg
- import numpy as np
- import math
- import warnings
- with warnings.catch_warnings():
- warnings.simplefilter("ignore")
- N, n = X.shape
- Y = X.T
- bic_p = np.nan
- try:
- A, sigma = mvar_fit(X, p)
- A_2d = np.concatenate(A, axis=1)
- n_samples = n - p
- bic_p = n_samples * N * math.log(2. * math.pi)
- bic_p += (n_samples * np.log(linalg.det(sigma)))
- bic_p += p * (N ** 2) * math.log(n_samples)
- sigma_inv = linalg.inv(sigma)
- S = 0.
- for i in range(p, n):
- res = Y[i] - np.dot(A_2d, Y[i - p:i][::-1, :].ravel())
- S += np.dot(res, sigma_inv.dot(res))
- bic_p += S
- except:
- pass
- return bic_p
|