From 7b8d0c4ef5eb8b4c17ea2526253c65c790181bcc Mon Sep 17 00:00:00 2001 From: Akash Desarda Date: Sun, 27 Sep 2020 18:51:35 +0530 Subject: [PATCH] segregate helper function to make api more focused --- stock_analysis/indicator.py | 155 +++--------------------- stock_analysis/unit_strategy.py | 98 ++------------- stock_analysis/utils/formula_helpers.py | 105 ++++++++++++++++ stock_analysis/utils/helpers.py | 110 +++++++++++++++++ 4 files changed, 244 insertions(+), 224 deletions(-) create mode 100644 stock_analysis/utils/formula_helpers.py create mode 100644 stock_analysis/utils/helpers.py diff --git a/stock_analysis/indicator.py b/stock_analysis/indicator.py index 100f59a..ad021ee 100644 --- a/stock_analysis/indicator.py +++ b/stock_analysis/indicator.py @@ -1,17 +1,16 @@ -import os import yaml -import datetime import dateutil +import datetime import pandas as pd -import yfinance as yf import multiprocessing from dataclasses import dataclass -from stock_analysis.utils.logger import logger -from stock_analysis.data_retrive import DataRetrive from typing import Dict, List, Tuple, Union +from stock_analysis.data_retrive import DataRetrive +from stock_analysis.utils.logger import logger +from stock_analysis.utils.formula_helpers import exponential_moving_avarage,\ + percentage_diff_analysis, outcome_analysis now_strting = datetime.datetime.now().strftime('%d-%m-%Y') -yf.pdr_override() logger = logger() pd.options.display.float_format = '{:,.2f}'.format @@ -117,13 +116,13 @@ def ema_indicator(self, ema_canditate: Tuple[int, int] = (50, 200), ema_indicator_df = pd.DataFrame(result) ema_indicator_df.dropna(inplace=True) ema_indicator_df['percentage_diff'] = ema_indicator_df.apply( - lambda x: self._percentage_diff_analysis( + lambda x: percentage_diff_analysis( x[f'ema{str(ema_canditate[0])}'], x[f'ema{str(ema_canditate[1])}']), axis=1 ) ema_indicator_df['outcome'] = ema_indicator_df.apply( - lambda x: self._outcome_analysis(x['percentage_diff']), + lambda x: outcome_analysis(x['percentage_diff']), axis=1 ) @@ -293,9 +292,11 @@ def ema_crossover_indicator_detail(self, else: return ema_quote - def _ema_indicator_n3(self, ema_canditate: Tuple[int, int] = (5, 13, 26), + + def _ema_indicator_n3(self, + ema_canditate: Tuple[int, int] = (5, 13, 26), cutoff_date: Union[str, datetime.datetime] = 'today', - verbosity: int = 1): + verbosity: int = 1)->pd.DataFrame: with multiprocessing.Pool(multiprocessing.cpu_count() - 1) as pool: result = pool.starmap(self._parallel_ema_indicator_n3, @@ -308,124 +309,6 @@ def _ema_indicator_n3(self, ema_canditate: Tuple[int, int] = (5, 13, 26), f"Here are sample 5 company\n{ema_indicator_df.head()}") return ema_indicator_df - def _exponential_moving_avarage(self, - data_df: Union[pd.Series, List], - period: int, - cutoff_date: Union[str, datetime.datetime] = 'today', - smoothing_factor: int = 2, - verbosity: int = 1) -> float: - """Calculate exponential moving avarage based on given period - - Parameters - ---------- - data : Union[pd.Series,List] - Data to calculate ema - period : int - Period for which ema has to be calculated - smoothing_factor : int, optional - Smoothing factor which will be used to calculate - Multiplying factor, by default 2 - - Returns - ------- - float - ema value - """ - ema_list = [] - # Calculating multiplying factor - mf = smoothing_factor/(1 + period) - - # Calculating first SMA - sma0 = (sum(data_df['Close'][:period])) / period - - # Calculating first EMA - ema0 = (data_df['Close'][period] * mf) + (sma0 * (1 - mf)) - - # Calculating latest EMA - ema_pre = ema0 - - for idx in range(1, len(data_df)-50): - ema = (data_df['Close'][idx + 50] * mf) + (ema_pre * (1 - mf)) - ema_pre = ema - ema_list.append(ema) - # if cutoff_date is not None: - if idx == (len(data_df) - 50): - break - data_df['ema'] = [pd.NA] * (len(data_df) - len(ema_list)) + ema_list - if cutoff_date == 'today': - date = data_df.index[-1] - else: - date = self._get_appropriate_date( - company_df=data_df, - desired_date=cutoff_date, - verbosity=verbosity - ) - - return float(data_df[data_df.index == date]['ema']) - - def _get_appropriate_date(self, - company_df: pd.DataFrame, - desired_date: datetime.datetime, - verbosity: int = 1) -> Tuple[datetime.datetime, float]: - """ - Return appropriate date which is present in data record. - - Parameters - ---------- - company_df : pd.DataFrame - Company dataframe - duration : datetime.datetime - Desired date cut-off to calculate ema - verbosity : int, optional - Level of detail logging, by default 1 - - Returns - ------- - Tuple[datetime.datetime,float] - Date,Close value on date retrived - - Raises - ------ - ValueError - If desired old is older than first record - """ - if desired_date < company_df.index[0]: - logger.error( - f"Given desired date {desired_date.strftime('%d-%m-%Y')} is older than first recorded date {company_df.index[0].strftime('%d-%m-%Y')}") - - if verbosity > 0: - logger.debug( - f"Your desired EMA cut-off date is {desired_date.strftime('%d-%m-%Y')}") - - for day_idx in range(1, 100): - if desired_date not in company_df.index: - date = desired_date - \ - dateutil.relativedelta.relativedelta(days=day_idx) - else: - date = desired_date - if date in company_df.index: - break - if verbosity > 0 and desired_date != date: - logger.warning( - f"Desired date: {desired_date.strftime('%d-%m-%Y')} not found going for next possible date: {date.strftime('%d-%m-%Y')}") - - return date - - def _percentage_diff_analysis(self, - ema_a, - ema_b): - """ - Used to calculate Percentage difference - """ - return abs((ema_b - ema_a)/((ema_a + ema_b) / 2) * 100) - - def _outcome_analysis(self, percentage_diff): - if 5 < percentage_diff < 5: - outcome = 'close by' - else: - outcome = 'far away' - return outcome - # TODO: Add all parallel executor function here def _parallel_vol_indicator_n_days(self, company: str = None, @@ -463,13 +346,13 @@ def _parallel_ema_indicator(self, logger.warning(f"{company} have some missing value, fixing it") company_df.dropna(inplace=True) try: - EMA_A = self._exponential_moving_avarage( + EMA_A = exponential_moving_avarage( data_df=company_df, cutoff_date=cutoff_date, period=ema_canditate[0], verbosity=verbosity ) - EMA_B = self._exponential_moving_avarage( + EMA_B = exponential_moving_avarage( data_df=company_df, cutoff_date=cutoff_date, period=ema_canditate[1], @@ -505,28 +388,28 @@ def _parallel_ema_indicator_n3(self, company: str, logger.warning(f"{company} have some missing value, fixing it") company_df.dropna(inplace=True) try: - EMA_A = self._exponential_moving_avarage( + EMA_A = exponential_moving_avarage( data_df=company_df, cutoff_date=cutoff_date, period=ema_canditate[0], verbosity=verbosity ) - EMA_B = self._exponential_moving_avarage( + EMA_B = exponential_moving_avarage( data_df=company_df, cutoff_date=cutoff_date, period=ema_canditate[1], verbosity=verbosity ) - EMA_C = self._exponential_moving_avarage( + EMA_C = exponential_moving_avarage( data_df=company_df, cutoff_date=cutoff_date, period=ema_canditate[2], verbosity=verbosity ) - percentage_diff_cb = self._percentage_diff_analysis(EMA_C, EMA_B) - percentage_diff_ca = self._percentage_diff_analysis(EMA_C, EMA_A) - percentage_diff_ba = self._percentage_diff_analysis(EMA_B, EMA_A) + percentage_diff_cb = percentage_diff_analysis(EMA_C, EMA_B) + percentage_diff_ca = percentage_diff_analysis(EMA_C, EMA_A) + percentage_diff_ba = percentage_diff_analysis(EMA_B, EMA_A) if (percentage_diff_cb < 1) and (percentage_diff_ca < 1) and (percentage_diff_ba < 1): action = 'buy' diff --git a/stock_analysis/unit_strategy.py b/stock_analysis/unit_strategy.py index c509ff7..e1c137b 100644 --- a/stock_analysis/unit_strategy.py +++ b/stock_analysis/unit_strategy.py @@ -4,11 +4,13 @@ import pandas as pd import yfinance as yf import multiprocessing -from dataclasses import dataclass from typing import List, Tuple +from dataclasses import dataclass from stock_analysis.indicator import Indicator -from stock_analysis.utils.logger import logger from stock_analysis.data_retrive import DataRetrive +from stock_analysis.utils.logger import logger +from stock_analysis.utils.helpers import get_appropriate_date_momentum +from stock_analysis.utils.formula_helpers import annualized_rate_of_return yf.pdr_override() logger = logger() @@ -188,88 +190,8 @@ def momentum_with_ema_strategy(self, else: return momentum_ema_df - @staticmethod - def _annualized_rate_of_return(end_date: int, - start_date: int, - duration: float) -> float: - """ - Calculate annulized rate of return - - Parameters - ---------- - end_date : int - Close value Current date or most present date. - Consider it as going from bottom to top. - start_date : int - Close value on Start date or first record. - Consider it as going from bottom to top. - duration : float - Total duration wrt to year - - Returns - ------- - float - Annulized return - """ - return (((end_date / start_date) ** (1/duration)) - 1) * 100 - - @staticmethod - def _get_appropriate_date(company_df: pd.DataFrame, - company, - duration: Tuple[int, int] = (0, 1), - verbosity: int = 1) -> Tuple[datetime.datetime, float]: - """ - Return appropriate date which is present in data record. - - Parameters - ---------- - company_df : pd.DataFrame - Company dataframe - duration : Tuple[year,month], optional - Desired duration to go back to retrive record, by default (0,1) - verbosity : int, optional - Level of detail logging,1=< Deatil, 0=Less detail , by default 1 - - Returns - ------- - Tuple[datetime.datetime,float] - Date,Close value on date retrived - - Raises - ------ - ValueError - If desired old is older than first record - """ - - current_date = company_df.iloc[-1].Date - desired_date = current_date - \ - dateutil.relativedelta.relativedelta( - years=duration[0], months=duration[1]) - if desired_date < company_df.iloc[0].Date: - logger.error( - f"Given desired date {desired_date.strftime('%d-%m-%Y')} is older than first recorded date {company_df.iloc[0].Date.strftime('%d-%m-%Y')}") - raise ValueError - dd_copy = desired_date - - if verbosity > 0: - logger.debug( - f"Your desired date for monthly return for {company} is {desired_date.strftime('%d-%m-%Y')}") - - if len(company_df.loc[company_df['Date'] == desired_date]) != 0: - desired_close = company_df.loc[company_df['Date'] == desired_date] - else: - for i in range(1, 100): - if len(company_df.loc[company_df['Date'] == desired_date]) == 0: - desired_date = desired_date - \ - dateutil.relativedelta.relativedelta(days=i) - desired_close = company_df.loc[company_df['Date'] == desired_date] - break - if verbosity > 0: - logger.warning( - f"Desired date: {dd_copy.strftime('%d-%m-%Y')} not found going for next possible date: {desired_date.strftime('%d-%m-%Y')}") - return desired_date, desired_close.iloc[-1].Close - - def _parallel_momentum(self, company: str, + def _parallel_momentum(self, + company: str, start, end, verbosity: int = 1): @@ -280,18 +202,18 @@ def _parallel_momentum(self, company: str, company_df = DataRetrive.single_company_specific( company_name=f"{company}.NS", start_date=start, end_date=end) company_df.reset_index(inplace=True) - ar_yearly = self._annualized_rate_of_return( + ar_yearly = annualized_rate_of_return( end_date=company_df.iloc[-1].Close, start_date=company_df.iloc[0].Close, duration=1 ) # (company_df.iloc[-30,0] - company_df.iloc[0,0]).days/365) - ar_monthly = self._annualized_rate_of_return( + ar_monthly = annualized_rate_of_return( end_date=company_df.iloc[-1].Close, - start_date=self._get_appropriate_date( + start_date=get_appropriate_date_momentum( company_df, company, verbosity=verbosity)[1], duration=(company_df.iloc[-1, 0] - company_df.iloc[-30, 0]).days/30 ) - monthly_start_date = self._get_appropriate_date( + monthly_start_date = get_appropriate_date_momentum( company_df, company, verbosity=0)[0].strftime('%d-%m-%Y') except (IndexError, KeyError, ValueError): if verbosity > 0: diff --git a/stock_analysis/utils/formula_helpers.py b/stock_analysis/utils/formula_helpers.py new file mode 100644 index 0000000..eb88454 --- /dev/null +++ b/stock_analysis/utils/formula_helpers.py @@ -0,0 +1,105 @@ +import datetime +import pandas as pd +from typing import List, Union +from stock_analysis.utils.helpers import get_appropriate_date_ema + + +def annualized_rate_of_return(end_date: int, + start_date: int, + duration: float) -> float: + """ + Calculate annulized rate of return + + Parameters + ---------- + end_date : int + Close value Current date or most present date. + Consider it as going from bottom to top. + start_date : int + Close value on Start date or first record. + Consider it as going from bottom to top. + duration : float + Total duration wrt to year + + Returns + ------- + float + Annulized return + """ + return (((end_date / start_date) ** (1/duration)) - 1) * 100 + + +def exponential_moving_avarage(data_df: Union[pd.Series, List], + period: int, + cutoff_date: Union[str, + datetime.datetime] = 'today', + smoothing_factor: int = 2, + verbosity: int = 1) -> float: + """Calculate exponential moving avarage based on given period + + Parameters + ---------- + data : Union[pd.Series,List] + Data to calculate ema + period : int + Period for which ema has to be calculated + smoothing_factor : int, optional + Smoothing factor which will be used to calculate + Multiplying factor, by default 2 + + Returns + ------- + float + ema value + """ + ema_list = [] + # Calculating multiplying factor + mf = smoothing_factor/(1 + period) + + # Calculating first SMA + sma0 = (sum(data_df['Close'][:period])) / period + + # Calculating first EMA + ema0 = (data_df['Close'][period] * mf) + (sma0 * (1 - mf)) + + # Calculating latest EMA + ema_pre = ema0 + + for idx in range(1, len(data_df)-50): + ema = (data_df['Close'][idx + 50] * mf) + (ema_pre * (1 - mf)) + ema_pre = ema + ema_list.append(ema) + # if cutoff_date is not None: + if idx == (len(data_df) - 50): + break + data_df['ema'] = [pd.NA] * (len(data_df) - len(ema_list)) + ema_list + if cutoff_date == 'today': + date = data_df.index[-1] + else: + date = get_appropriate_date_ema( + company_df=data_df, + desired_date=cutoff_date, + verbosity=verbosity + ) + + return float(data_df[data_df.index == date]['ema']) + + +def percentage_diff_analysis(value_a: float, + value_b: float): + """ + Used to calculate Percentage difference of Value of B wrt to A + """ + return abs((value_b - value_a)/((value_a + value_b) / 2) * 100) + + +def outcome_analysis(ratio: float): + """ + Used to determine closeness based on any given ratio analysis + like percentage difference + """ + if 5 < ratio < 5: + outcome = 'close by' + else: + outcome = 'far away' + return outcome diff --git a/stock_analysis/utils/helpers.py b/stock_analysis/utils/helpers.py new file mode 100644 index 0000000..e525ff6 --- /dev/null +++ b/stock_analysis/utils/helpers.py @@ -0,0 +1,110 @@ +import datetime +import dateutil +import pandas as pd +from typing import Tuple +from stock_analysis.utils.logger import logger + + +logger = logger() + +def get_appropriate_date_ema(company_df: pd.DataFrame, + desired_date: datetime.datetime, + verbosity: int = 1) -> Tuple[datetime.datetime, float]: + """ + Return appropriate date which is present in data record. + + Parameters + ---------- + company_df : pd.DataFrame + Company dataframe + duration : datetime.datetime + Desired date cut-off to calculate ema + verbosity : int, optional + Level of detail logging, by default 1 + + Returns + ------- + Tuple[datetime.datetime,float] + Date,Close value on date retrived + + Raises + ------ + ValueError + If desired old is older than first record + """ + if desired_date < company_df.index[0]: + logger.error( + f"Given desired date {desired_date.strftime('%d-%m-%Y')} is older than first recorded date {company_df.index[0].strftime('%d-%m-%Y')}") + + if verbosity > 0: + logger.debug( + f"Your desired EMA cut-off date is {desired_date.strftime('%d-%m-%Y')}") + + for day_idx in range(1, 100): + if desired_date not in company_df.index: + date = desired_date - \ + dateutil.relativedelta.relativedelta(days=day_idx) + else: + date = desired_date + if date in company_df.index: + break + if verbosity > 0 and desired_date != date: + logger.warning( + f"Desired date: {desired_date.strftime('%d-%m-%Y')} not found going for next possible date: {date.strftime('%d-%m-%Y')}") + + return date + +def get_appropriate_date_momentum(company_df: pd.DataFrame, + company, + duration: Tuple[int, int] = (0, 1), + verbosity: int = 1) -> Tuple[datetime.datetime, float]: + """ + Return appropriate date which is present in data record. + + Parameters + ---------- + company_df : pd.DataFrame + Company dataframe + duration : Tuple[year,month], optional + Desired duration to go back to retrive record, by default (0,1) + verbosity : int, optional + Level of detail logging,1=< Deatil, 0=Less detail , by default 1 + + Returns + ------- + Tuple[datetime.datetime,float] + Date,Close value on date retrived + + Raises + ------ + ValueError + If desired old is older than first record + """ + + current_date = company_df.iloc[-1].Date + desired_date = current_date - \ + dateutil.relativedelta.relativedelta( + years=duration[0], months=duration[1]) + if desired_date < company_df.iloc[0].Date: + logger.error( + f"Given desired date {desired_date.strftime('%d-%m-%Y')} is older than first recorded date {company_df.iloc[0].Date.strftime('%d-%m-%Y')}") + raise ValueError + dd_copy = desired_date + + if verbosity > 0: + logger.debug( + f"Your desired date for monthly return for {company} is {desired_date.strftime('%d-%m-%Y')}") + + if len(company_df.loc[company_df['Date'] == desired_date]) != 0: + desired_close = company_df.loc[company_df['Date'] == desired_date] + else: + for i in range(1, 100): + if len(company_df.loc[company_df['Date'] == desired_date]) == 0: + desired_date = desired_date - \ + dateutil.relativedelta.relativedelta(days=i) + desired_close = company_df.loc[company_df['Date'] == desired_date] + break + if verbosity > 0: + logger.warning( + f"Desired date: {dd_copy.strftime('%d-%m-%Y')} not found going for next possible date: {desired_date.strftime('%d-%m-%Y')}") + return desired_date, desired_close.iloc[-1].Close \ No newline at end of file