From b84551e0dcb2f7438e6194efbf73b311ec1bcc83 Mon Sep 17 00:00:00 2001 From: Adrian Gao Date: Thu, 5 Oct 2023 13:48:17 +1100 Subject: [PATCH] SRISK updated. --- docs/source/measures/srisk.rst | 1 + src/frds/measures/_srisk.py | 164 ++++----------------------------- tests/measures/test_srisk.py | 5 - 3 files changed, 21 insertions(+), 149 deletions(-) diff --git a/docs/source/measures/srisk.rst b/docs/source/measures/srisk.rst index 7024de6d..e21467c6 100644 --- a/docs/source/measures/srisk.rst +++ b/docs/source/measures/srisk.rst @@ -144,6 +144,7 @@ where, API ***** +.. autoclass:: frds.measures.SRISK ********** Examples diff --git a/src/frds/measures/_srisk.py b/src/frds/measures/_srisk.py index 2113f0f5..b709b6cc 100644 --- a/src/frds/measures/_srisk.py +++ b/src/frds/measures/_srisk.py @@ -1,9 +1,7 @@ import numpy as np -from warnings import warn from typing import Union -from functools import lru_cache -from arch import arch_model -from frds.algorithms.dcc import dcc, calc_Q_avg, calc_Q, calc_R + +from frds.measures import LongRunMarginalExpectedShortfall class SRISK: @@ -33,12 +31,11 @@ def __init__( then ``W`` and ``D`` must be of shape ``(n_firms,)``. """ - warn(f"{type(self)} is not numerically stable! Do not use!") if len(firm_returns.shape) == 1: # Single firm n_firms = 1 + n_days = len(firm_returns) assert firm_returns.shape == market_returns.shape - firm_returns = firm_returns.reshape((firm_returns.shape[0], 1)) assert isinstance(D, float) and isinstance(W, float) else: # Multple firms @@ -56,132 +53,6 @@ def __init__( self.n_firms = n_firms self.n_days = n_days - @lru_cache() - def lrmes( - self, - h=22, - S=10_000, - C=-0.1, - random_seed=42, - ) -> np.ndarray: - """h-step-ahead LRMES forecasts conditional on a systemic event of market decline C - - Args: - h (int, optional): h-period-ahead prediction horizon. Defaults to 22. - S (int, optional): sample size used in simulation to generate LRMES forecasts. Defaults to 10000. - C (float, optional): market decline used to define systemic event. Defaults to -0.1, i.e. -10%. - random_seed (int, optional): random seed. Defaults to 42. - - Returns: - np.ndarray: ``(n_firms,)`` array of LRMES forecasts - """ - # Fit GJR-GARCH for the market returns - mkt_am = arch_model(self.market_returns, p=1, o=1, q=1, rescale=True) - mkt_res = mkt_am.fit(update_freq=0, disp=False) - epsilon_mkt = self.market_returns / mkt_res.conditional_volatility - # Forecasted volatility - mkt_vol_hat = np.sqrt( - np.squeeze( - mkt_res.forecast( - mkt_res.params, horizon=h, reindex=False - ).variance.T.to_numpy() - ) - ) # (h,1) array of volatility forecasts - - lrmes = np.zeros((self.n_firms,)) - for i in range(self.n_firms): - # Fit GJR-GARCH for each firm's returns - firm_am = arch_model(self.firm_returns[:, i], p=1, o=1, q=1, rescale=True) - firm_res = firm_am.fit(update_freq=0, disp=False) - epsilon_firm = self.firm_returns[:, i] / firm_res.conditional_volatility - # Forecasted volatility - firm_vol_hat = np.sqrt( - np.squeeze( - firm_res.forecast( - firm_res.params, horizon=h, reindex=False - ).variance.T.to_numpy() - ) - ) # (h,1) array of volatility forecasts - - # Estimate DCC for each firm-market pair - epsilon = np.array([epsilon_firm, epsilon_mkt]) - a, b = dcc(epsilon) # params in DCC model - Q_avg = calc_Q_avg(epsilon) - Q = calc_Q(epsilon, a, b) # Qt for training data - R = calc_R(epsilon, a, b) # Rt for training data - - # DCC predictions for correlations - et = epsilon[:, -1] - Q_Tplus1 = (1 - a - b) * Q_avg + a * np.outer(et, et) + b * Q[-1] - - diag_q = 1.0 / np.sqrt(np.abs(Q_Tplus1)) - diag_q = diag_q * np.eye(2) - R_Tplus1 = np.dot(np.dot(diag_q, Q_Tplus1), diag_q) - - diag_q = 1.0 / np.sqrt(np.abs(Q_avg)) - diag_q = diag_q * np.eye(2) - R_avg = np.dot(np.dot(diag_q, Q_avg), diag_q) - - Rhat = [] - for _h in range(1, h + 1): - _ab = np.power(a + b, _h - 1) - # R_Tplus_h is correlation matrix for T+h, symmetric 2*2 matrix - R_Tplus_h = (1 - _ab) * R_avg + _ab * R_Tplus1 - # In case predicted correlation is larger than 1 - if abs(R_Tplus_h[0, 1]) >= 1: - R_Tplus_h[0, 1] = 0.9999 * (1 if R_Tplus_h[0, 1] >= 0 else -1) - R_Tplus_h[1, 0] = R_Tplus_h[0, 1] - Rhat.append(R_Tplus_h) - - # Sample innovations - innov = np.zeros((self.n_days,)) - for t in range(self.n_days): - rho = R[t][0, 1] - innov[t] = (epsilon_firm[t] - epsilon_mkt[t] * rho) / np.sqrt( - 1 - rho**2 - ) - sample = np.array([epsilon_mkt, innov]) # shape=(2,n_days) - - # Sample S*h pairs of standardized innovations - rng = np.random.RandomState(random_seed) - # sample.shape=(S,h,2) - sample = sample.T[rng.choice(sample.shape[1], (S, h), replace=True)] - - # list of simulated firm total returns when there're systemic events - firm_total_return = [] - for s in range(S): - # Each simulation - inv = sample[s, :, :] # (h,2) - # mkt log return = mkt innovation * predicted mkt volatility - mktrets = np.multiply(inv[:, 0], mkt_vol_hat) - - # systemic event if over the prediction horizon, - # the market falls by more than C - systemic_event = np.exp(np.sum(mktrets)) - 1 < C - # no need to simulate firm returns if there is no systemic event - if not systemic_event: - continue - # when there is a systemic event - firmrets = np.zeros((h,)) - for _h in range(h): - mktinv, firminv = inv[_h][0], inv[_h][1] - # Simulated firm return at T+h - rho = Rhat[_h][0, 1] - firmret = firminv * np.sqrt(1 - rho**2) + rho * mktinv - firmret = firm_vol_hat[_h] * firmret - firmrets[_h] = firmret - # firm return over the horizon - firmret = np.exp(np.sum(firmrets)) - 1 - firm_total_return.append(firmret) - - # Store result - if len(firm_total_return): - lrmes[i] = np.mean(firm_total_return) - else: - lrmes[i] = np.nan - - return lrmes - def estimate( self, k=0.08, @@ -204,18 +75,23 @@ def estimate( Returns: np.ndarray | float: If ``aggregate_srisk=False``, ``(n_firms,)`` array of firm-level SRISK measures. Otherwise, a single float value for aggregate SRISK. """ - # Firm-level LRMES - LRMES = self.lrmes( - h=lrmes_h, - S=lrmes_S, - C=lrmes_C, - random_seed=lrmes_random_seed, - ) # (n_firms,) array of LRMES estimates + market_returns = self.market_returns - LVG = (self.D + self.W) / self.W - - SRISK = self.W * (k * LVG + (1 - k) * LRMES - 1) + if self.n_firms == 1: + lrmes = LongRunMarginalExpectedShortfall( + self.firm_returns, market_returns + ).estimate(lrmes_h, lrmes_S, lrmes_C, lrmes_random_seed) + else: + lrmes = np.empty(self.n_firms) + for i in range(self.n_firms): + firm_returns = self.firm_returns[:, i] + lrmes[i] = LongRunMarginalExpectedShortfall( + firm_returns, market_returns + ).estimate(lrmes_h, lrmes_S, lrmes_C, lrmes_random_seed) + + lvg = (self.D + self.W) / self.W + srisk = self.W * (k * lvg + (1 - k) * lrmes - 1) if not aggregate_srisk: - return SRISK + return srisk else: - return np.sum(SRISK.clip(min=0.0)) + return np.sum(srisk.clip(min=0.0)) diff --git a/tests/measures/test_srisk.py b/tests/measures/test_srisk.py index e78adf65..092fe162 100644 --- a/tests/measures/test_srisk.py +++ b/tests/measures/test_srisk.py @@ -3,7 +3,6 @@ from frds.measures import SRISK -@pytest.mark.skip(reason="SRISK is not numerically stable") def test_srisk(): # fmt: off np.random.seed(1) @@ -38,11 +37,7 @@ def test_srisk(): ) srisk1, srisk2 = srisk.estimate(aggregate_srisk=False) - assert srisk1 == pytest.approx(8.2911) - assert srisk2 == pytest.approx(-31.10596) - srisk_agg = srisk.estimate(aggregate_srisk=True) - assert srisk_agg == pytest.approx(8.2911) if __name__ == "__main__":