diff --git a/lppls/lppls.py b/lppls/lppls.py index 6302d28..c46668a 100644 --- a/lppls/lppls.py +++ b/lppls/lppls.py @@ -10,6 +10,7 @@ from tqdm import tqdm import xarray as xr + class LPPLS(object): def __init__(self, observations): @@ -17,8 +18,9 @@ def __init__(self, observations): Args: observations (np.array,pd.DataFrame): 2xM matrix with timestamp and observed value. """ - assert isinstance(observations, (np.ndarray, pd.DataFrame)), \ - f'Expected observations to be or , got :{type(observations)}' + assert isinstance( + observations, (np.ndarray, pd.DataFrame) + ), f"Expected observations to be or , got :{type(observations)}" self.observations = observations self.coef_ = {} @@ -28,7 +30,9 @@ def __init__(self, observations): @njit def lppls(t, tc, m, w, a, b, c1, c2): dt = np.abs(tc - t) + 1e-8 - return a + np.power(dt, m) * (b + ((c1 * np.cos(w * np.log(dt))) + (c2 * np.sin(w * np.log(dt))))) + return a + np.power(dt, m) * ( + b + ((c1 * np.cos(w * np.log(dt))) + (c2 * np.sin(w * np.log(dt)))) + ) def func_restricted(self, x, *args): """ @@ -85,25 +89,24 @@ def matrix_equation(observations, tc, m, w): yigi = np.multiply(yi, gi) yihi = np.multiply(yi, hi) - matrix_1 = np.array([ - [N, np.sum(fi), np.sum(gi), np.sum(hi)], - [np.sum(fi), np.sum(fi_pow_2), np.sum(figi), np.sum(fihi)], - [np.sum(gi), np.sum(figi), np.sum(gi_pow_2), np.sum(gihi)], - [np.sum(hi), np.sum(fihi), np.sum(gihi), np.sum(hi_pow_2)] - ]) + matrix_1 = np.array( + [ + [N, np.sum(fi), np.sum(gi), np.sum(hi)], + [np.sum(fi), np.sum(fi_pow_2), np.sum(figi), np.sum(fihi)], + [np.sum(gi), np.sum(figi), np.sum(gi_pow_2), np.sum(gihi)], + [np.sum(hi), np.sum(fihi), np.sum(gihi), np.sum(hi_pow_2)], + ] + ) - matrix_2 = np.array([ - [np.sum(yi)], - [np.sum(yifi)], - [np.sum(yigi)], - [np.sum(yihi)] - ]) + matrix_2 = np.array( + [[np.sum(yi)], [np.sum(yifi)], [np.sum(yigi)], [np.sum(yihi)]] + ) matrix_1 += 1e-8 * np.eye(matrix_1.shape[0]) return np.linalg.solve(matrix_1, matrix_2) - def fit(self, max_searches, minimizer='Nelder-Mead', obs=None): + def fit(self, max_searches, minimizer="Nelder-Mead", obs=None): """ Args: max_searches (int): The maxi amount of searches to perform before giving up. The literature suggests 25. @@ -163,10 +166,7 @@ def estimate_params(self, observations, seed, minimizer): """ cofs = minimize( - args=observations, - fun=self.func_restricted, - x0=seed, - method=minimizer + args=observations, fun=self.func_restricted, x0=seed, method=minimizer ) if cofs.success: @@ -183,7 +183,7 @@ def estimate_params(self, observations, seed, minimizer): # Use sklearn format for storing fit params # @TODO only save when running single fits. - for coef in ['tc', 'm', 'w', 'a', 'b', 'c', 'c1', 'c2']: + for coef in ["tc", "m", "w", "a", "b", "c", "c1", "c2"]: self.coef_[coef] = eval(coef) return tc, m, w, a, b, c, c1, c2 else: @@ -197,7 +197,9 @@ def plot_fit(self, show_tc=False): nothing, should plot the fit """ tc, m, w, a, b, c, c1, c2 = self.coef_.values() - time_ord = [pd.Timestamp.fromordinal(d) for d in self.observations[0, :].astype('int32')] + time_ord = [ + pd.Timestamp.fromordinal(d) for d in self.observations[0, :].astype("int32") + ] t_obs = self.observations[0, :] # ts = pd.to_datetime(t_obs*10**9) # compatible_date = np.array(ts, dtype=np.datetime64) @@ -208,7 +210,7 @@ def plot_fit(self, show_tc=False): first = t_obs[0] last = t_obs[-1] - O = ((w / (2.0 * np.pi)) * np.log((tc - first) / (tc - last))) + O = (w / (2.0 * np.pi)) * np.log((tc - first) / (tc - last)) D = (m * np.abs(b)) / (w * np.abs(c)) fig, (ax1) = plt.subplots(nrows=1, ncols=1, sharex=True, figsize=(14, 8)) @@ -216,14 +218,14 @@ def plot_fit(self, show_tc=False): # 'Single Fit\ntc: {:.2f}, m: {:.2f}, w: {:.2f}, a: {:.2f}, b: {:.2f}, c: {:.2f}, O: {:.2f}, D: {:.2f}'.format(tc, m, w, a, b, c, O, D), # fontsize=16) - ax1.plot(time_ord, price, label='price', color='black', linewidth=0.75) - ax1.plot(time_ord, lppls_fit, label='lppls fit', color='blue', alpha=0.5) + ax1.plot(time_ord, price, label="price", color="black", linewidth=0.75) + ax1.plot(time_ord, lppls_fit, label="lppls fit", color="blue", alpha=0.5) # if show_tc: # ax1.axvline(x=np.array(tc_ts, dtype=np.datetime64), label='tc={}'.format(ts), color='red', alpha=0.5) # set grids - ax1.grid(which='major', axis='both', linestyle='--') + ax1.grid(which="major", axis="both", linestyle="--") # set labels - ax1.set_ylabel('ln(p)') + ax1.set_ylabel("ln(p)") ax1.legend(loc=2) plt.xticks(rotation=45) @@ -252,24 +254,24 @@ def compute_indicators(self, res, filter_conditions_config=None): pass for r in res: - ts.append(r['t2']) - price.append(r['p2']) + ts.append(r["t2"]) + price.append(r["p2"]) pos_qual_count = 0 neg_qual_count = 0 pos_count = 0 neg_count = 0 # _fits.append(r['res']) - for idx, fits in enumerate(r['res']): - t1 = fits['t1'] - t2 = fits['t2'] - tc = fits['tc'] - m = fits['m'] - w = fits['w'] - b = fits['b'] - c = fits['c'] - O = fits['O'] - D = fits['D'] + for idx, fits in enumerate(r["res"]): + t1 = fits["t1"] + t2 = fits["t2"] + tc = fits["tc"] + m = fits["m"] + w = fits["w"] + b = fits["b"] + c = fits["c"] + O = fits["O"] + D = fits["D"] # t_delta = t2 - t1 # pct_delta_min = t_delta * 0.5 @@ -285,7 +287,11 @@ def compute_indicators(self, res, filter_conditions_config=None): # print('{} < {} < {}'.format(max(t2 - 60, t2 - 0.5 * (t2 - t1)), tc, min(t2 + 252, t2 + 0.5 * (t2 - t1)))) # print('______________') - tc_in_range = max(t2 - 60, t2 - 0.5 * (t2 - t1)) < tc < min(t2 + 252, t2 + 0.5 * (t2 - t1)) + tc_in_range = ( + max(t2 - 60, t2 - 0.5 * (t2 - t1)) + < tc + < min(t2 + 252, t2 + 0.5 * (t2 - t1)) + ) m_in_range = m_min < m < m_max w_in_range = w_min < w < w_max @@ -297,7 +303,13 @@ def compute_indicators(self, res, filter_conditions_config=None): O_in_range = O > O_min D_in_range = D > D_min # if m > 0 and w > 0 else False - if tc_in_range and m_in_range and w_in_range and O_in_range and D_in_range: + if ( + tc_in_range + and m_in_range + and w_in_range + and O_in_range + and D_in_range + ): is_qualified = True else: is_qualified = False @@ -311,9 +323,9 @@ def compute_indicators(self, res, filter_conditions_config=None): if is_qualified: neg_qual_count += 1 # add this to res to make life easier - r['res'][idx]['is_qualified'] = is_qualified + r["res"][idx]["is_qualified"] = is_qualified - _fits.append(r['res']) + _fits.append(r["res"]) pos_conf = pos_qual_count / pos_count if pos_count > 0 else 0 neg_conf = neg_qual_count / neg_count if neg_count > 0 else 0 @@ -329,13 +341,15 @@ def compute_indicators(self, res, filter_conditions_config=None): # O_lst.append(O_cnt) # D_lst.append(D_cnt) - res_df = pd.DataFrame({ - 'time': ts, - 'price': price, - 'pos_conf': pos_conf_lst, - 'neg_conf': neg_conf_lst, - '_fits': _fits, - }) + res_df = pd.DataFrame( + { + "time": ts, + "price": price, + "pos_conf": pos_conf_lst, + "neg_conf": neg_conf_lst, + "_fits": _fits, + } + ) return res_df # return ts, price, pos_lst, neg_lst, pos_conf_lst, neg_conf_lst, #tc_lst, m_lst, w_lst, O_lst, D_lst @@ -351,20 +365,32 @@ def plot_confidence_indicators(self, res): res_df = self.compute_indicators(res) fig, (ax1, ax2) = plt.subplots(nrows=2, ncols=1, sharex=True, figsize=(18, 10)) - ord = res_df['time'].astype('int32') + ord = res_df["time"].astype("int32") ts = [pd.Timestamp.fromordinal(d) for d in ord] # plot pos bubbles ax1_0 = ax1.twinx() - ax1.plot(ts, res_df['price'], color='black', linewidth=0.75) + ax1.plot(ts, res_df["price"], color="black", linewidth=0.75) # ax1_0.plot(compatible_date, pos_lst, label='pos bubbles', color='gray', alpha=0.5) - ax1_0.plot(ts, res_df['pos_conf'], label='bubble indicator (pos)', color='red', alpha=0.5) + ax1_0.plot( + ts, + res_df["pos_conf"], + label="bubble indicator (pos)", + color="red", + alpha=0.5, + ) # plot neg bubbles ax2_0 = ax2.twinx() - ax2.plot(ts, res_df['price'], color='black', linewidth=0.75) + ax2.plot(ts, res_df["price"], color="black", linewidth=0.75) # ax2_0.plot(compatible_date, neg_lst, label='neg bubbles', color='gray', alpha=0.5) - ax2_0.plot(ts, res_df['neg_conf'], label='bubble indicator (neg)', color='green', alpha=0.5) + ax2_0.plot( + ts, + res_df["neg_conf"], + label="bubble indicator (neg)", + color="green", + alpha=0.5, + ) # if debug: # ax3.plot(ts, tc_lst, label='tc count') @@ -374,15 +400,15 @@ def plot_confidence_indicators(self, res): # ax3.plot(ts, D_lst, label='D count') # set grids - ax1.grid(which='major', axis='both', linestyle='--') - ax2.grid(which='major', axis='both', linestyle='--') + ax1.grid(which="major", axis="both", linestyle="--") + ax2.grid(which="major", axis="both", linestyle="--") # set labels - ax1.set_ylabel('ln(p)') - ax2.set_ylabel('ln(p)') + ax1.set_ylabel("ln(p)") + ax2.set_ylabel("ln(p)") - ax1_0.set_ylabel('bubble indicator (pos)') - ax2_0.set_ylabel('bubble indicator (neg)') + ax1_0.set_ylabel("bubble indicator (pos)") + ax2_0.set_ylabel("bubble indicator (neg)") ax1_0.legend(loc=2) ax2_0.legend(loc=2) @@ -400,7 +426,16 @@ def plot_confidence_indicators(self, res): # axes up to make room for them # fig.autofmt_xdate() - def mp_compute_nested_fits(self, workers, window_size=80, smallest_window_size=20, outer_increment=5, inner_increment=2, max_searches=25, filter_conditions_config={}): + def mp_compute_nested_fits( + self, + workers, + window_size=80, + smallest_window_size=20, + outer_increment=5, + inner_increment=2, + max_searches=25, + filter_conditions_config={}, + ): obs_copy = self.observations obs_opy_len = len(obs_copy[0]) - window_size func = self._func_compute_nested_fits @@ -408,23 +443,34 @@ def mp_compute_nested_fits(self, workers, window_size=80, smallest_window_size=2 # print('obs_copy', obs_copy) # print('obs_opy_len', obs_opy_len) - func_arg_map = [( - obs_copy[:, i:window_size + i], - window_size, - i, - smallest_window_size, - outer_increment, - inner_increment, - max_searches, - ) for i in range(0, obs_opy_len+1, outer_increment)] + func_arg_map = [ + ( + obs_copy[:, i : window_size + i], + window_size, + i, + smallest_window_size, + outer_increment, + inner_increment, + max_searches, + ) + for i in range(0, obs_opy_len + 1, outer_increment) + ] with Pool(processes=workers) as pool: - self.indicator_result = list(tqdm(pool.imap(func, func_arg_map), total=len(func_arg_map))) + self.indicator_result = list( + tqdm(pool.imap(func, func_arg_map), total=len(func_arg_map)) + ) return self.indicator_result - def compute_nested_fits(self, window_size=80, smallest_window_size=20, outer_increment=5, inner_increment=2, - max_searches=25): + def compute_nested_fits( + self, + window_size=80, + smallest_window_size=20, + outer_increment=5, + inner_increment=2, + max_searches=25, + ): obs_copy = self.observations obs_copy_len = len(obs_copy[0]) - window_size window_delta = window_size - smallest_window_size @@ -432,31 +478,41 @@ def compute_nested_fits(self, window_size=80, smallest_window_size=20, outer_inc i_idx = 0 for i in range(0, obs_copy_len + 1, outer_increment): j_idx = 0 - obs = obs_copy[:, i:window_size + i] + obs = obs_copy[:, i : window_size + i] t1 = obs[0][0] t2 = obs[0][-1] res.append([]) i_idx += 1 for j in range(0, window_delta, inner_increment): obs_shrinking_slice = obs[:, j:window_size] - tc, m, w, a, b, c, c1, c2, O, D = self.fit(max_searches, obs=obs_shrinking_slice) - res[i_idx-1].append([]) + tc, m, w, a, b, c, c1, c2, O, D = self.fit( + max_searches, obs=obs_shrinking_slice + ) + res[i_idx - 1].append([]) j_idx += 1 for k in [t2, t1, a, b, c, m, 0, tc]: - res[i_idx-1][j_idx-1].append(k) + res[i_idx - 1][j_idx - 1].append(k) return xr.DataArray( data=res, - dims=('t2', 'windowsizes', 'params'), + dims=("t2", "windowsizes", "params"), coords=dict( - t2=obs_copy[0][(window_size-1):], - windowsizes=range(smallest_window_size, window_size, inner_increment), - params=['t2', 't1', 'a', 'b', 'c', 'm', '0', 'tc'], - ) + t2=obs_copy[0][(window_size - 1) :], + windowsizes=range(smallest_window_size, window_size, inner_increment), + params=["t2", "t1", "a", "b", "c", "m", "0", "tc"], + ), ) def _func_compute_nested_fits(self, args): - obs, window_size, n_iter, smallest_window_size, outer_increment, inner_increment, max_searches = args + ( + obs, + window_size, + n_iter, + smallest_window_size, + outer_increment, + inner_increment, + max_searches, + ) = args window_delta = window_size - smallest_window_size @@ -474,26 +530,30 @@ def _func_compute_nested_fits(self, args): # p1 = self.inverse_transform_observations([[t1, p1]])[0, 1] # p2 = self.inverse_transform_observations([[t2, p2]])[0, 1] - # tc_init_min, tc_init_max = self._get_tc_bounds(obs_shrinking_slice, tc_min, tc_max) - # - # tc_in_range = last - tc_init_min < tc < last + tc_init_max - # m_in_range = m_min < m < m_max - # w_in_range = w_min < w < w_max - # O_in_range = self._is_O_in_range(tc, w, last, O_min) - # D_in_range = self._is_D_in_range(m, w, b, c, D_min) - # - # qualified[value] = tc_in_range and m_in_range and w_in_range and O_in_range and D_in_range + # tc_init_min, tc_init_max = self._get_tc_bounds(obs_shrinking_slice, tc_min, tc_max) + # + # tc_in_range = last - tc_init_min < tc < last + tc_init_max + # m_in_range = m_min < m < m_max + # w_in_range = w_min < w < w_max + # O_in_range = self._is_O_in_range(tc, w, last, O_min) + # D_in_range = self._is_D_in_range(m, w, b, c, D_min) + # + # qualified[value] = tc_in_range and m_in_range and w_in_range and O_in_range and D_in_range # run n fits on the observation slice. for j in range(0, window_delta, inner_increment): obs_shrinking_slice = obs[:, j:window_size] # fit the model to the data and get back the params - if self.__class__.__name__ == 'LPPLSCMAES': + if self.__class__.__name__ == "LPPLSCMAES": # print('cmaes fit is running!') - tc, m, w, a, b, c, c1, c2, O, D = self.fit(max_iteration=2500, pop_size=4, obs=obs_shrinking_slice) + tc, m, w, a, b, c, c1, c2, O, D = self.fit( + max_iteration=2500, pop_size=4, obs=obs_shrinking_slice + ) else: - tc, m, w, a, b, c, c1, c2, O, D = self.fit(max_searches, obs=obs_shrinking_slice) + tc, m, w, a, b, c, c1, c2, O, D = self.fit( + max_searches, obs=obs_shrinking_slice + ) nested_t1 = obs_shrinking_slice[0][0] nested_t2 = obs_shrinking_slice[0][-1] @@ -506,26 +566,28 @@ def _func_compute_nested_fits(self, args): # sub_t2 = self.inverse_transform_observations([[sub_t2, sub_p2]])[0, 0] # tc = self.inverse_transform_observations([[tc, 0]])[0, 0] - res.append({ - 'tc_d': self.ordinal_to_date(tc), - 'tc': tc, - 'm': m, - 'w': w, - 'a': a, - 'b': b, - 'c': c, - 'c1': c1, - 'c2': c2, - 't1_d': self.ordinal_to_date(nested_t1), - 't2_d': self.ordinal_to_date(nested_t2), - 't1': nested_t1, - 't2': nested_t2, - 'O': O, - 'D': D, - }) + res.append( + { + "tc_d": self.ordinal_to_date(tc), + "tc": tc, + "m": m, + "w": w, + "a": a, + "b": b, + "c": c, + "c1": c1, + "c2": c2, + "t1_d": self.ordinal_to_date(nested_t1), + "t2_d": self.ordinal_to_date(nested_t2), + "t1": nested_t1, + "t2": nested_t2, + "O": O, + "D": D, + } + ) # return {'t1': self.ordinal_to_date(t1), 't2': self.ordinal_to_date(t2), 'p2': p2, 'res': res} - return {'t1': t1, 't2': t2, 'p2': p2, 'res': res} + return {"t1": t1, "t2": t2, "p2": p2, "res": res} def _get_tc_bounds(self, obs, lower_bound_pct, upper_bound_pct): """ @@ -552,8 +614,8 @@ def _is_D_in_range(self, m, w, b, c, D_min): return False if m <= 0 or w <= 0 else abs((m * b) / (w * c)) > D_min def get_oscillations(self, w, tc, t1, t2): - dt = np.abs(tc - t2) - return (w / (2.0 * np.pi)) * np.log((tc - t1) / dt) + dt = np.abs(tc - t2) + return (w / (2.0 * np.pi)) * np.log((tc - t1) / dt) def get_damping(self, m, w, b, c): return (m * np.abs(b)) / (w * np.abs(c)) @@ -570,6 +632,6 @@ def ordinal_to_date(self, ordinal): # the time span that can be represented using a 64-bit integer # is limited to approximately 584 years try: - return date.fromordinal(int(ordinal)).strftime('%Y-%m-%d') + return date.fromordinal(int(ordinal)).strftime("%Y-%m-%d") except (ValueError, OutOfBoundsDatetime): return str(pd.NaT)