from statsmodels.compat.python import range, lrange, lzip, long
import numpy as np
import numpy.lib.recfunctions as nprf
import pandas as pd
from pandas import DataFrame
from pandas.tseries import offsets
from pandas.tseries.frequencies import to_offset
from statsmodels.tools.sm_exceptions import ValueWarning
from statsmodels.tools.data import _is_using_pandas, _is_recarray
[docs]def add_trend(x, trend="c", prepend=False, has_constant='skip'):
"""
Adds a trend and/or constant to an array.
Parameters
----------
X : array-like
Original array of data.
trend : str {"c","t","ct","ctt"}
"c" add constant only
"t" add trend only
"ct" add constant and linear trend
"ctt" add constant and linear and quadratic trend.
prepend : bool
If True, prepends the new data to the columns of X.
has_constant : str {'raise', 'add', 'skip'}
Controls what happens when trend is 'c' and a constant already
exists in X. 'raise' will raise an error. 'add' will duplicate a
constant. 'skip' will return the data without change. 'skip' is the
default.
Returns
-------
y : array, recarray or DataFrame
The original data with the additional trend columns. If x is a
recarray or pandas Series or DataFrame, then the trend column names
are 'const', 'trend' and 'trend_squared'.
Notes
-----
Returns columns as ["ctt","ct","c"] whenever applicable. There is currently
no checking for an existing trend.
See also
--------
statsmodels.tools.tools.add_constant
"""
# TODO: could be generalized for trend of aribitrary order
trend = trend.lower()
columns = ['const', 'trend', 'trend_squared']
if trend == "c": # handles structured arrays
columns = columns[:1]
trendorder = 0
elif trend == "ct" or trend == "t":
columns = columns[:2]
if trend == "t":
columns = columns[1:2]
trendorder = 1
elif trend == "ctt":
trendorder = 2
else:
raise ValueError("trend %s not understood" % trend)
is_recarray = _is_recarray(x)
is_pandas = _is_using_pandas(x, None) or is_recarray
if is_pandas or is_recarray:
if is_recarray:
descr = x.dtype.descr
x = pd.DataFrame.from_records(x)
elif isinstance(x, pd.Series):
x = pd.DataFrame(x)
else:
x = x.copy()
else:
x = np.asanyarray(x)
nobs = len(x)
trendarr = np.vander(np.arange(1, nobs + 1, dtype=np.float64), trendorder + 1)
# put in order ctt
trendarr = np.fliplr(trendarr)
if trend == "t":
trendarr = trendarr[:, 1]
if "c" in trend:
if is_pandas or is_recarray:
# Mixed type protection
def safe_is_const(s):
try:
return np.ptp(s) == 0.0 and np.any(s != 0.0)
except:
return False
col_const = x.apply(safe_is_const, 0)
else:
col_const = np.logical_and(np.any(np.ptp(np.asanyarray(x), axis=0) == 0, axis=0),
np.all(x != 0.0, axis=0))
if np.any(col_const):
if has_constant == 'raise':
raise ValueError("x already contains a constant")
elif has_constant == 'skip':
columns = columns[1:]
trendarr = trendarr[:, 1:]
order = 1 if prepend else -1
if is_recarray or is_pandas:
trendarr = pd.DataFrame(trendarr, index=x.index, columns=columns)
x = [trendarr, x]
x = pd.concat(x[::order], 1)
else:
x = [trendarr, x]
x = np.column_stack(x[::order])
if is_recarray:
x = x.to_records(index=False, convert_datetime64=False)
new_descr = x.dtype.descr
extra_col = len(new_descr) - len(descr)
descr = new_descr[:extra_col] + descr if prepend else descr + new_descr[-extra_col:]
x = x.astype(np.dtype(descr))
return x
def add_lag(x, col=None, lags=1, drop=False, insert=True):
"""
Returns an array with lags included given an array.
Parameters
----------
x : array
An array or NumPy ndarray subclass. Can be either a 1d or 2d array with
observations in columns.
col : 'string', int, or None
If data is a structured array or a recarray, `col` can be a string
that is the name of the column containing the variable. Or `col` can
be an int of the zero-based column index. If it's a 1d array `col`
can be None.
lags : int
The number of lags desired.
drop : bool
Whether to keep the contemporaneous variable for the data.
insert : bool or int
If True, inserts the lagged values after `col`. If False, appends
the data. If int inserts the lags at int.
Returns
-------
array : ndarray
Array with lags
Examples
--------
>>> import statsmodels.api as sm
>>> data = sm.datasets.macrodata.load()
>>> data = data.data[['year','quarter','realgdp','cpi']]
>>> data = sm.tsa.add_lag(data, 'realgdp', lags=2)
Notes
-----
Trims the array both forward and backward, so that the array returned
so that the length of the returned array is len(`X`) - lags. The lags are
returned in increasing order, ie., t-1,t-2,...,t-lags
"""
if x.dtype.names:
names = x.dtype.names
if not col and np.squeeze(x).ndim > 1:
raise IndexError("col is None and the input array is not 1d")
elif len(names) == 1:
col = names[0]
if isinstance(col, (int, long)):
col = x.dtype.names[col]
contemp = x[col]
# make names for lags
tmp_names = [col + '_'+'L(%i)' % i for i in range(1,lags+1)]
ndlags = lagmat(contemp, maxlag=lags, trim='Both')
# get index for return
if insert is True:
ins_idx = list(names).index(col) + 1
elif insert is False:
ins_idx = len(names) + 1
else: # insert is an int
if insert > len(names):
import warnings
warnings.warn("insert > number of variables, inserting at the"
" last position", ValueWarning)
ins_idx = insert
first_names = list(names[:ins_idx])
last_names = list(names[ins_idx:])
if drop:
if col in first_names:
first_names.pop(first_names.index(col))
else:
last_names.pop(last_names.index(col))
if first_names: # only do this if x isn't "empty"
first_arr = nprf.append_fields(x[first_names][lags:],tmp_names,
ndlags.T, usemask=False)
else:
first_arr = np.zeros(len(x)-lags, dtype=lzip(tmp_names,
(x[col].dtype,)*lags))
for i,name in enumerate(tmp_names):
first_arr[name] = ndlags[:,i]
if last_names:
return nprf.append_fields(first_arr, last_names,
[x[name][lags:] for name in last_names], usemask=False)
else: # lags for last variable
return first_arr
else: # we have an ndarray
if x.ndim == 1: # make 2d if 1d
x = x[:,None]
if col is None:
col = 0
# handle negative index
if col < 0:
col = x.shape[1] + col
contemp = x[:,col]
if insert is True:
ins_idx = col + 1
elif insert is False:
ins_idx = x.shape[1]
else:
if insert < 0: # handle negative index
insert = x.shape[1] + insert + 1
if insert > x.shape[1]:
insert = x.shape[1]
import warnings
warnings.warn("insert > number of variables, inserting at the"
" last position", ValueWarning)
ins_idx = insert
ndlags = lagmat(contemp, lags, trim='Both')
first_cols = lrange(ins_idx)
last_cols = lrange(ins_idx,x.shape[1])
if drop:
if col in first_cols:
first_cols.pop(first_cols.index(col))
else:
last_cols.pop(last_cols.index(col))
return np.column_stack((x[lags:,first_cols],ndlags,
x[lags:,last_cols]))
[docs]def detrend(x, order=1, axis=0):
"""
Detrend an array with a trend of given order along axis 0 or 1
Parameters
----------
x : array_like, 1d or 2d
data, if 2d, then each row or column is independently detrended with the
same trendorder, but independent trend estimates
order : int
specifies the polynomial order of the trend, zero is constant, one is
linear trend, two is quadratic trend
axis : int
axis can be either 0, observations by rows,
or 1, observations by columns
Returns
-------
detrended data series : ndarray
The detrended series is the residual of the linear regression of the
data on the trend of given order.
"""
if x.ndim == 2 and int(axis) == 1:
x = x.T
elif x.ndim > 2:
raise NotImplementedError('x.ndim > 2 is not implemented until it is needed')
nobs = x.shape[0]
if order == 0:
# Special case demean
resid = x - x.mean(axis=0)
else:
trends = np.vander(np.arange(float(nobs)), N=order + 1)
beta = np.linalg.pinv(trends).dot(x)
resid = x - np.dot(trends, beta)
if x.ndim == 2 and int(axis) == 1:
resid = resid.T
return resid
[docs]def lagmat(x, maxlag, trim='forward', original='ex', use_pandas=False):
"""
Create 2d array of lags
Parameters
----------
x : array_like, 1d or 2d
data; if 2d, observation in rows and variables in columns
maxlag : int
all lags from zero to maxlag are included
trim : str {'forward', 'backward', 'both', 'none'} or None
* 'forward' : trim invalid observations in front
* 'backward' : trim invalid initial observations
* 'both' : trim invalid observations on both sides
* 'none', None : no trimming of observations
original : str {'ex','sep','in'}
* 'ex' : drops the original array returning only the lagged values.
* 'in' : returns the original array and the lagged values as a single
array.
* 'sep' : returns a tuple (original array, lagged values). The original
array is truncated to have the same number of rows as
the returned lagmat.
use_pandas : bool, optional
If true, returns a DataFrame when the input is a pandas
Series or DataFrame. If false, return numpy ndarrays.
Returns
-------
lagmat : 2d array
array with lagged observations
y : 2d array, optional
Only returned if original == 'sep'
Examples
--------
>>> from statsmodels.tsa.tsatools import lagmat
>>> import numpy as np
>>> X = np.arange(1,7).reshape(-1,2)
>>> lagmat(X, maxlag=2, trim="forward", original='in')
array([[ 1., 2., 0., 0., 0., 0.],
[ 3., 4., 1., 2., 0., 0.],
[ 5., 6., 3., 4., 1., 2.]])
>>> lagmat(X, maxlag=2, trim="backward", original='in')
array([[ 5., 6., 3., 4., 1., 2.],
[ 0., 0., 5., 6., 3., 4.],
[ 0., 0., 0., 0., 5., 6.]])
>>> lagmat(X, maxlag=2, trim="both", original='in')
array([[ 5., 6., 3., 4., 1., 2.]])
>>> lagmat(X, maxlag=2, trim="none", original='in')
array([[ 1., 2., 0., 0., 0., 0.],
[ 3., 4., 1., 2., 0., 0.],
[ 5., 6., 3., 4., 1., 2.],
[ 0., 0., 5., 6., 3., 4.],
[ 0., 0., 0., 0., 5., 6.]])
Notes
-----
When using a pandas DataFrame or Series with use_pandas=True, trim can only
be 'forward' or 'both' since it is not possible to consistently extend index
values.
"""
# TODO: allow list of lags additional to maxlag
is_pandas = _is_using_pandas(x, None) and use_pandas
trim = 'none' if trim is None else trim
trim = trim.lower()
if is_pandas and trim in ('none', 'backward'):
raise ValueError("trim cannot be 'none' or 'forward' when used on "
"Series or DataFrames")
xa = np.asarray(x)
dropidx = 0
if xa.ndim == 1:
xa = xa[:, None]
nobs, nvar = xa.shape
if original in ['ex', 'sep']:
dropidx = nvar
if maxlag >= nobs:
raise ValueError("maxlag should be < nobs")
lm = np.zeros((nobs + maxlag, nvar * (maxlag + 1)))
for k in range(0, int(maxlag + 1)):
lm[maxlag - k:nobs + maxlag - k,
nvar * (maxlag - k):nvar * (maxlag - k + 1)] = xa
if trim in ('none', 'forward'):
startobs = 0
elif trim in ('backward', 'both'):
startobs = maxlag
else:
raise ValueError('trim option not valid')
if trim in ('none', 'backward'):
stopobs = len(lm)
else:
stopobs = nobs
if is_pandas:
x_columns = x.columns if isinstance(x, DataFrame) else [x.name]
columns = [str(col) for col in x_columns]
for lag in range(maxlag):
lag_str = str(lag + 1)
columns.extend([str(col) + '.L.' + lag_str for col in x_columns])
lm = DataFrame(lm[:stopobs], index=x.index, columns=columns)
lags = lm.iloc[startobs:]
if original in ('sep', 'ex'):
leads = lags[x_columns]
lags = lags.drop(x_columns, 1)
else:
lags = lm[startobs:stopobs, dropidx:]
if original == 'sep':
leads = lm[startobs:stopobs, :dropidx]
if original == 'sep':
return lags, leads
else:
return lags
[docs]def lagmat2ds(x, maxlag0, maxlagex=None, dropex=0, trim='forward',
use_pandas=False):
"""
Generate lagmatrix for 2d array, columns arranged by variables
Parameters
----------
x : array_like, 2d
2d data, observation in rows and variables in columns
maxlag0 : int
for first variable all lags from zero to maxlag are included
maxlagex : None or int
max lag for all other variables all lags from zero to maxlag are included
dropex : int (default is 0)
exclude first dropex lags from other variables
for all variables, except the first, lags from dropex to maxlagex are
included
trim : string
* 'forward' : trim invalid observations in front
* 'backward' : trim invalid initial observations
* 'both' : trim invalid observations on both sides
* 'none' : no trimming of observations
use_pandas : bool, optional
If true, returns a DataFrame when the input is a pandas
Series or DataFrame. If false, return numpy ndarrays.
Returns
-------
lagmat : 2d array
array with lagged observations, columns ordered by variable
Notes
-----
Inefficient implementation for unequal lags, implemented for convenience
"""
if maxlagex is None:
maxlagex = maxlag0
maxlag = max(maxlag0, maxlagex)
is_pandas = _is_using_pandas(x, None)
if x.ndim == 1:
if is_pandas:
x = pd.DataFrame(x)
else:
x = x[:, None]
elif x.ndim == 0 or x.ndim > 2:
raise TypeError('Only supports 1 and 2-dimensional data.')
nobs, nvar = x.shape
if is_pandas and use_pandas:
lags = lagmat(x.iloc[:, 0], maxlag, trim=trim,
original='in', use_pandas=True)
lagsli = [lags.iloc[:, :maxlag0 + 1]]
for k in range(1, nvar):
lags = lagmat(x.iloc[:, k], maxlag, trim=trim,
original='in', use_pandas=True)
lagsli.append(lags.iloc[:, dropex:maxlagex + 1])
return pd.concat(lagsli, axis=1)
elif is_pandas:
x = np.asanyarray(x)
lagsli = [lagmat(x[:, 0], maxlag, trim=trim, original='in')[:, :maxlag0 + 1]]
for k in range(1, nvar):
lagsli.append(lagmat(x[:, k], maxlag, trim=trim, original='in')[:, dropex:maxlagex + 1])
return np.column_stack(lagsli)
def vec(mat):
return mat.ravel('F')
def vech(mat):
# Gets Fortran-order
return mat.T.take(_triu_indices(len(mat)))
# tril/triu/diag, suitable for ndarray.take
def _tril_indices(n):
rows, cols = np.tril_indices(n)
return rows * n + cols
def _triu_indices(n):
rows, cols = np.triu_indices(n)
return rows * n + cols
def _diag_indices(n):
rows, cols = np.diag_indices(n)
return rows * n + cols
def unvec(v):
k = int(np.sqrt(len(v)))
assert(k * k == len(v))
return v.reshape((k, k), order='F')
def unvech(v):
# quadratic formula, correct fp error
rows = .5 * (-1 + np.sqrt(1 + 8 * len(v)))
rows = int(np.round(rows))
result = np.zeros((rows, rows))
result[np.triu_indices(rows)] = v
result = result + result.T
# divide diagonal elements by 2
result[np.diag_indices(rows)] /= 2
return result
def duplication_matrix(n):
"""
Create duplication matrix D_n which satisfies vec(S) = D_n vech(S) for
symmetric matrix S
Returns
-------
D_n : ndarray
"""
tmp = np.eye(n * (n + 1) // 2)
return np.array([unvech(x).ravel() for x in tmp]).T
def elimination_matrix(n):
"""
Create the elimination matrix L_n which satisfies vech(M) = L_n vec(M) for
any matrix M
Parameters
----------
Returns
-------
"""
vech_indices = vec(np.tril(np.ones((n, n))))
return np.eye(n * n)[vech_indices != 0]
def commutation_matrix(p, q):
"""
Create the commutation matrix K_{p,q} satisfying vec(A') = K_{p,q} vec(A)
Parameters
----------
p : int
q : int
Returns
-------
K : ndarray (pq x pq)
"""
K = np.eye(p * q)
indices = np.arange(p * q).reshape((p, q), order='F')
return K.take(indices.ravel(), axis=0)
def _ar_transparams(params):
"""
Transforms params to induce stationarity/invertability.
Parameters
----------
params : array
The AR coefficients
Reference
---------
Jones(1980)
"""
newparams = ((1-np.exp(-params))/
(1+np.exp(-params))).copy()
tmp = ((1-np.exp(-params))/
(1+np.exp(-params))).copy()
for j in range(1,len(params)):
a = newparams[j]
for kiter in range(j):
tmp[kiter] -= a * newparams[j-kiter-1]
newparams[:j] = tmp[:j]
return newparams
def _ar_invtransparams(params):
"""
Inverse of the Jones reparameterization
Parameters
----------
params : array
The transformed AR coefficients
"""
# AR coeffs
tmp = params.copy()
for j in range(len(params)-1,0,-1):
a = params[j]
for kiter in range(j):
tmp[kiter] = (params[kiter] + a * params[j-kiter-1])/\
(1-a**2)
params[:j] = tmp[:j]
invarcoefs = -np.log((1-params)/(1+params))
return invarcoefs
def _ma_transparams(params):
"""
Transforms params to induce stationarity/invertability.
Parameters
----------
params : array
The ma coeffecients of an (AR)MA model.
Reference
---------
Jones(1980)
"""
newparams = ((1-np.exp(-params))/(1+np.exp(-params))).copy()
tmp = ((1-np.exp(-params))/(1+np.exp(-params))).copy()
# levinson-durbin to get macf
for j in range(1,len(params)):
b = newparams[j]
for kiter in range(j):
tmp[kiter] += b * newparams[j-kiter-1]
newparams[:j] = tmp[:j]
return newparams
def _ma_invtransparams(macoefs):
"""
Inverse of the Jones reparameterization
Parameters
----------
params : array
The transformed MA coefficients
"""
tmp = macoefs.copy()
for j in range(len(macoefs)-1,0,-1):
b = macoefs[j]
for kiter in range(j):
tmp[kiter] = (macoefs[kiter]-b *macoefs[j-kiter-1])/(1-b**2)
macoefs[:j] = tmp[:j]
invmacoefs = -np.log((1-macoefs)/(1+macoefs))
return invmacoefs
def unintegrate_levels(x, d):
"""
Returns the successive differences needed to unintegrate the series.
Parameters
----------
x : array-like
The original series
d : int
The number of differences of the differenced series.
Returns
-------
y : array-like
The increasing differences from 0 to d-1 of the first d elements
of x.
See Also
--------
unintegrate
"""
x = x[:d]
return np.asarray([np.diff(x, d - i)[0] for i in range(d, 0, -1)])
def unintegrate(x, levels):
"""
After taking n-differences of a series, return the original series
Parameters
----------
x : array-like
The n-th differenced series
levels : list
A list of the first-value in each differenced series, for
[first-difference, second-difference, ..., n-th difference]
Returns
-------
y : array-like
The original series de-differenced
Examples
--------
>>> x = np.array([1, 3, 9., 19, 8.])
>>> levels = unintegrate_levels(x, 2)
>>> levels
array([ 1., 2.])
>>> unintegrate(np.diff(x, 2), levels)
array([ 1., 3., 9., 19., 8.])
"""
levels = list(levels)[:] # copy
if len(levels) > 1:
x0 = levels.pop(-1)
return unintegrate(np.cumsum(np.r_[x0, x]), levels)
x0 = levels[0]
return np.cumsum(np.r_[x0, x])
def freq_to_period(freq):
"""
Convert a pandas frequency to a periodicity
Parameters
----------
freq : str or offset
Frequency to convert
Returns
-------
period : int
Periodicity of freq
Notes
-----
Annual maps to 1, quarterly maps to 4, monthly to 12, weekly to 52.
"""
if not isinstance(freq, offsets.DateOffset):
freq = to_offset(freq) # go ahead and standardize
freq = freq.rule_code.upper()
if freq == 'A' or freq.startswith(('A-', 'AS-')):
return 1
elif freq == 'Q' or freq.startswith(('Q-', 'QS-')):
return 4
elif freq == 'M' or freq.startswith(('M-', 'MS')):
return 12
elif freq == 'W' or freq.startswith('W-'):
return 52
elif freq == 'D':
return 7
elif freq == 'B':
return 5
elif freq == 'H':
return 24
else: # pragma : no cover
raise ValueError("freq {} not understood. Please report if you "
"think this in error.".format(freq))
__all__ = ['lagmat', 'lagmat2ds','add_trend', 'duplication_matrix',
'elimination_matrix', 'commutation_matrix',
'vec', 'vech', 'unvec', 'unvech']