From d7949e067539e3a45e8bf8622598ab4a49770737 Mon Sep 17 00:00:00 2001 From: chenzongwei <17695480342@163.com> Date: Sat, 13 Apr 2024 09:20:48 +0800 Subject: [PATCH] v4.2.1 --- pure_ocean_breeze/data/tools.py | 30 ++-- pure_ocean_breeze/jason/data/tools.py | 178 +++++++++++++++-------- pure_ocean_breeze/jason/labor/process.py | 1 + 3 files changed, 140 insertions(+), 69 deletions(-) diff --git a/pure_ocean_breeze/data/tools.py b/pure_ocean_breeze/data/tools.py index fdab2a3..2547a1c 100644 --- a/pure_ocean_breeze/data/tools.py +++ b/pure_ocean_breeze/data/tools.py @@ -99,7 +99,7 @@ def convert_code(x: str) -> Tuple[str, str]: @do_on_dfs -def add_suffix(code:str)->str: +def add_suffix(code: str) -> str: """给股票代码加上后缀 Parameters @@ -111,7 +111,7 @@ def add_suffix(code:str)->str: ------- str 添加完后缀后的股票代码,如000001.SZ - """ + """ if not isinstance(code, str): code = str(code) if len(code) < 6: @@ -290,8 +290,6 @@ def add_suffix(code: str) -> str: return code - - @do_on_dfs def 生成每日分类表( df: pd.DataFrame, code: str, entry: str, exit: str, kind: str @@ -537,7 +535,7 @@ def get_monthly_factor( old_date = old.index.max() if old_date == self.fac.date.max(): logger.info(f"本地文件已经是最新的了,无需计算") - self.fac=old + self.fac = old else: try: new_date = self.find_begin( @@ -624,9 +622,13 @@ def get_monthly_factor( self.fac.to_parquet(homeplace.update_data_file + history_file) logger.success(f"本地文件已经写入完成") else: - logger.warning("您本次计算没有指定任何本地文件路径,这很可能会导致大量的重复计算和不必要的时间浪费,请注意!") + logger.warning( + "您本次计算没有指定任何本地文件路径,这很可能会导致大量的重复计算和不必要的时间浪费,请注意!" + ) if daily: - logger.warning("您指定的是日频计算,非月频计算,因此强烈建议您指定history_file参数!!") + logger.warning( + "您指定的是日频计算,非月频计算,因此强烈建议您指定history_file参数!!" + ) if whole_cross: for end_date in tqdm.auto.tqdm(iter_item): start_date = self.find_begin(self.tradedays, end_date, self.backsee) @@ -930,7 +932,9 @@ def func_rolling(df): logger.info(f"已经更新至{new_end}") return cors else: - logger.warning("您本次计算没有指定任何本地文件路径,这很可能会导致大量的重复计算和不必要的时间浪费,请注意!") + logger.warning( + "您本次计算没有指定任何本地文件路径,这很可能会导致大量的重复计算和不必要的时间浪费,请注意!" + ) twins = merge_many([df1, df2]) tqdm.auto.tqdm.pandas() corrs = twins.groupby(["code"]).progress_apply(func_rolling) @@ -1098,7 +1102,7 @@ def detect_nan(df: pd.DataFrame) -> bool: @do_on_dfs -def get_abs(df: pd.DataFrame, quantile: float=None, square: bool = 0) -> pd.DataFrame: +def get_abs(df: pd.DataFrame, quantile: float = None, square: bool = 0) -> pd.DataFrame: """均值距离化:计算因子与截面均值的距离 Parameters @@ -1452,7 +1456,9 @@ def feather_to_parquet_all(): feather_to_parquet(homeplace.final_factor_file) feather_to_parquet(homeplace.update_data_file) feather_to_parquet(homeplace.factor_data_file) - logger.success("数据库中的feather文件全部被转化为了parquet文件,您可以手动删除所有的feather文件了") + logger.success( + "数据库中的feather文件全部被转化为了parquet文件,您可以手动删除所有的feather文件了" + ) def zip_many_dfs(dfs: List[pd.DataFrame]) -> pd.DataFrame: @@ -1886,7 +1892,9 @@ def clip_sing(x: pd.DataFrame, n: float = 3): df = df.reset_index() except Exception: ... - df = df.drop_duplicates(subset=['date','code']).pivot(index="date", columns="code", values="fac") + df = df.drop_duplicates(subset=["date", "code"]).pivot( + index="date", columns="code", values="fac" + ) return df elif replace: diff --git a/pure_ocean_breeze/jason/data/tools.py b/pure_ocean_breeze/jason/data/tools.py index dd7cc07..b53b7ba 100644 --- a/pure_ocean_breeze/jason/data/tools.py +++ b/pure_ocean_breeze/jason/data/tools.py @@ -948,15 +948,18 @@ def judge_factor_by_third( @do_on_dfs def jason_to_wind(df: pd.DataFrame): - df.index = pd.to_datetime(df.index.astype(str)) - df.columns = [add_suffix(i) for i in df.columns] + df1 = df.copy() + df1.index = pd.to_datetime(df1.index.astype(str)) + df1.columns = [add_suffix(i) for i in df1.columns] return df + @do_on_dfs def wind_to_jason(df: pd.DataFrame): - df.columns = [i[:6] for i in df.columns] - df.index = df.index.strftime("%Y%m%d").astype(int) - return df + df1 = df.copy() + df1.columns = [i[:6] for i in df1.columns] + df1.index = df1.index.strftime("%Y%m%d").astype(int) + return df1 @do_on_dfs @@ -968,7 +971,7 @@ def lu计算连续期数2( """ <<注意!使用此函数时,目标df的值必须全为1或nan!!!>> """ - + # 将Series中的值转换为布尔值,1为True,其余为False is_one = s == judge_number @@ -983,18 +986,25 @@ def lu计算连续期数2( return continuous_ones.replace(0, nan_value) -def lu计算连续期数2片段递增(s:Union[pd.Series,pd.DataFrame])->Union[pd.Series,pd.DataFrame]: - return lu计算连续期数2(s)+lu标记连续片段(s) +def lu计算连续期数2片段递增( + s: Union[pd.Series, pd.DataFrame] +) -> Union[pd.Series, pd.DataFrame]: + return lu计算连续期数2(s) + lu标记连续片段(s) + -def lu计算连续期数2片段递减(s:Union[pd.Series,pd.DataFrame])->Union[pd.Series,pd.DataFrame]: - return lu计算连续期数2(s)-lu标记连续片段(s) +def lu计算连续期数2片段递减( + s: Union[pd.Series, pd.DataFrame] +) -> Union[pd.Series, pd.DataFrame]: + return lu计算连续期数2(s) - lu标记连续片段(s) -def lu计算连续期数奇正偶反(s:Union[pd.Series,pd.DataFrame])->Union[pd.Series,pd.DataFrame]: - if isinstance(s,pd.DataFrame): +def lu计算连续期数奇正偶反( + s: Union[pd.Series, pd.DataFrame] +) -> Union[pd.Series, pd.DataFrame]: + if isinstance(s, pd.DataFrame): return s.apply(lu计算连续期数奇正偶反) else: - s=s.reset_index(drop=True) + s = s.reset_index(drop=True) # 用于标记每个连续非NaN片段 s_diff = (~s.isna()).astype(int).diff().fillna(1).cumsum() @@ -1008,17 +1018,22 @@ def lu计算连续期数奇正偶反(s:Union[pd.Series,pd.DataFrame])->Union[pd. order = s.groupby(s_mapped).cumcount() + 1 # 计算每个连续非NaN片段的长度 - segment_lengths = s.groupby(s_mapped).transform('count') + segment_lengths = s.groupby(s_mapped).transform("count") # 对偶数编号的片段进行逆序排列 - s[s.notna()] = np.where(s_mapped % 2 == 1, order, segment_lengths[s_mapped == s_mapped] - order + 1) + s[s.notna()] = np.where( + s_mapped % 2 == 1, order, segment_lengths[s_mapped == s_mapped] - order + 1 + ) return s.values - -def lu计算连续期数偶正奇反(s:Union[pd.Series,pd.DataFrame])->Union[pd.Series,pd.DataFrame]: - if isinstance(s,pd.DataFrame): + + +def lu计算连续期数偶正奇反( + s: Union[pd.Series, pd.DataFrame] +) -> Union[pd.Series, pd.DataFrame]: + if isinstance(s, pd.DataFrame): return s.apply(lu计算连续期数偶正奇反) else: - s=s.reset_index(drop=True) + s = s.reset_index(drop=True) # 用于标记每个连续非NaN片段 s_diff = (~s.isna()).astype(int).diff().fillna(1).cumsum() @@ -1032,16 +1047,20 @@ def lu计算连续期数偶正奇反(s:Union[pd.Series,pd.DataFrame])->Union[pd. order = s.groupby(s_mapped).cumcount() + 1 # 计算每个连续非NaN片段的长度 - segment_lengths = s.groupby(s_mapped).transform('count') + segment_lengths = s.groupby(s_mapped).transform("count") # 对偶数编号的片段进行逆序排列 - s[s.notna()] = np.where(s_mapped % 2 == 1, segment_lengths[s_mapped == s_mapped] - order + 1, order) + s[s.notna()] = np.where( + s_mapped % 2 == 1, segment_lengths[s_mapped == s_mapped] - order + 1, order + ) return s.values -def lu计算连续期数长度(s:Union[pd.Series,pd.DataFrame],final_mean=1)->Union[float,pd.Series,pd.DataFrame]: - if isinstance(s,pd.DataFrame): - return s.apply(lambda x:lu计算连续期数长度(x,final_mean)) +def lu计算连续期数长度( + s: Union[pd.Series, pd.DataFrame], final_mean=1 +) -> Union[float, pd.Series, pd.DataFrame]: + if isinstance(s, pd.DataFrame): + return s.apply(lambda x: lu计算连续期数长度(x, final_mean)) else: # 标识非 NaN 值 not_nan = s.notnull() @@ -1060,64 +1079,82 @@ def lu计算连续期数长度(s:Union[pd.Series,pd.DataFrame],final_mean=1)->Un return segment_lengths.mean() -def lu标记连续片段(s:Union[pd.Series,pd.DataFrame],label_nan=0,number_continuous=1)->Union[pd.Series,pd.DataFrame]: +def lu标记连续片段( + s: Union[pd.Series, pd.DataFrame], label_nan=0, number_continuous=1 +) -> Union[pd.Series, pd.DataFrame]: not_nan = ~s.isna() - segment_starts = not_nan.diff().fillna(True) # 对序列首个元素填充True,因为diff会产生NaN + segment_starts = not_nan.diff().fillna( + True + ) # 对序列首个元素填充True,因为diff会产生NaN # 为每个连续片段分配一个唯一标识符 segments = segment_starts.cumsum() # 仅对非NaN片段应用标识符,NaN值保持不变 if not label_nan: - segments=segments*np.sign(s.abs()+1) + segments = segments * np.sign(s.abs() + 1) if number_continuous: - segments=(segments+1)//2 + segments = (segments + 1) // 2 return segments - -def lu删去连续片段中的最大值(s:Union[pd.Series,pd.DataFrame])->Union[pd.Series,pd.DataFrame]: - if isinstance(s,pd.DataFrame): + + +def lu删去连续片段中的最大值( + s: Union[pd.Series, pd.DataFrame] +) -> Union[pd.Series, pd.DataFrame]: + if isinstance(s, pd.DataFrame): return s.apply(lu删去连续片段中的最大值) else: # 生成连续片段的标识符 s_diff = s.isna().astype(int).diff().fillna(0).ne(0).cumsum() # 对每个片段使用transform找到最大值 - max_vals = s.groupby(s_diff).transform('max') + max_vals = s.groupby(s_diff).transform("max") # 将原始序列中等于最大值的元素替换为NaN s[s == max_vals] = np.nan return s -def lu删去连续片段中的最小值(s:Union[pd.Series,pd.DataFrame])->Union[pd.Series,pd.DataFrame]: + +def lu删去连续片段中的最小值( + s: Union[pd.Series, pd.DataFrame] +) -> Union[pd.Series, pd.DataFrame]: return -lu删去连续片段中的最大值(-s) -def lu仅保留连续片段中的最大值(s:Union[pd.Series,pd.DataFrame])->Union[pd.Series,pd.DataFrame]: - if isinstance(s,pd.DataFrame): +def lu仅保留连续片段中的最大值( + s: Union[pd.Series, pd.DataFrame] +) -> Union[pd.Series, pd.DataFrame]: + if isinstance(s, pd.DataFrame): return s.apply(lu删去连续片段中的最大值) else: # 生成连续片段的标识符 s_diff = s.isna().astype(int).diff().fillna(0).ne(0).cumsum() # 对每个片段使用transform找到最大值 - max_vals = s.groupby(s_diff).transform('max') + max_vals = s.groupby(s_diff).transform("max") # 将原始序列中等于最大值的元素替换为NaN s[s != max_vals] = np.nan return s -def lu仅保留连续片段中的最小值(s:Union[pd.Series,pd.DataFrame])->Union[pd.Series,pd.DataFrame]: + +def lu仅保留连续片段中的最小值( + s: Union[pd.Series, pd.DataFrame] +) -> Union[pd.Series, pd.DataFrame]: return -lu仅保留连续片段中的最大值(-s) -def lu删去连续片段中的最大值及其后面的值(s:Union[pd.Series,pd.DataFrame])->Union[pd.Series,pd.DataFrame]: - if isinstance(s,pd.DataFrame): + +def lu删去连续片段中的最大值及其后面的值( + s: Union[pd.Series, pd.DataFrame] +) -> Union[pd.Series, pd.DataFrame]: + if isinstance(s, pd.DataFrame): return s.apply(lu删去连续片段中的最大值及其后面的值) else: # 生成连续片段的标识符 s_diff = s.isna().astype(int).diff().fillna(0).ne(0).cumsum() # 对每个片段使用transform找到最大值 - max_vals = s.groupby(s_diff).transform('max') + max_vals = s.groupby(s_diff).transform("max") # 使用cummax标记最大值及其之后的值 max_flag = (s.groupby(s_diff).cummax() == max_vals).astype(int) @@ -1128,39 +1165,64 @@ def lu删去连续片段中的最大值及其后面的值(s:Union[pd.Series,pd.D # 将标记的值替换为NaN s[max_flag_cum > 0] = np.nan return s - -def lu删去连续片段中的最小值及其后面的值(s:Union[pd.Series,pd.DataFrame])->Union[pd.Series,pd.DataFrame]: + + +def lu删去连续片段中的最小值及其后面的值( + s: Union[pd.Series, pd.DataFrame] +) -> Union[pd.Series, pd.DataFrame]: return -lu删去连续片段中的最大值及其后面的值(-s) -def lu删去连续片段中的最大值及其前面的值(s:Union[pd.Series,pd.DataFrame])->Union[pd.Series,pd.DataFrame]: + +def lu删去连续片段中的最大值及其前面的值( + s: Union[pd.Series, pd.DataFrame] +) -> Union[pd.Series, pd.DataFrame]: return lu删去连续片段中的最大值及其后面的值(s[::-1])[::-1] -def lu删去连续片段中的最小值及其前面的值(s:Union[pd.Series,pd.DataFrame])->Union[pd.Series,pd.DataFrame]: + +def lu删去连续片段中的最小值及其前面的值( + s: Union[pd.Series, pd.DataFrame] +) -> Union[pd.Series, pd.DataFrame]: return -lu删去连续片段中的最大值及其前面的值(-s) @do_on_dfs -def is_pos(s: Union[pd.Series, pd.DataFrame],zero_as_pos:bool=1) -> Union[pd.Series, pd.DataFrame]: +def is_pos( + s: Union[pd.Series, pd.DataFrame], zero_as_pos: bool = 1 +) -> Union[pd.Series, pd.DataFrame]: if zero_as_pos: - return np.sign(s).replace(0,1).replace(-1,np.nan) + return np.sign(s).replace(0, 1).replace(-1, np.nan) else: - return np.sign(s).replace(0,np.nan).replace(-1,np.nan) - + return np.sign(s).replace(0, np.nan).replace(-1, np.nan) + + @do_on_dfs -def is_neg(s: Union[pd.Series, pd.DataFrame],zero_as_neg:bool=1) -> Union[pd.Series, pd.DataFrame]: +def is_neg( + s: Union[pd.Series, pd.DataFrame], zero_as_neg: bool = 1 +) -> Union[pd.Series, pd.DataFrame]: if zero_as_neg: - return np.sign(s).replace(0,-1).replace(1,np.nan).replace(-1,1) + return np.sign(s).replace(0, -1).replace(1, np.nan).replace(-1, 1) else: - return np.sign(s).replace(0,np.nan).replace(1,np.nan).replace(-1,1) - + return np.sign(s).replace(0, np.nan).replace(1, np.nan).replace(-1, 1) + + @do_on_dfs -def get_pos_value(s: Union[pd.Series, pd.DataFrame],judge_sign:Union[float,pd.Series, pd.DataFrame],zero_as_pos:bool=1) -> Union[pd.Series, pd.DataFrame]: - return s * is_pos(s-judge_sign,zero_as_pos) +def get_pos_value( + s: Union[pd.Series, pd.DataFrame], + judge_sign: Union[float, pd.Series, pd.DataFrame], + zero_as_pos: bool = 1, +) -> Union[pd.Series, pd.DataFrame]: + return s * is_pos(s - judge_sign, zero_as_pos) + @do_on_dfs -def get_neg_value(s: Union[pd.Series, pd.DataFrame],judge_sign:Union[float,pd.Series, pd.DataFrame],zero_as_neg:bool=1) -> Union[pd.Series, pd.DataFrame]: - return s * is_neg(s-judge_sign,zero_as_neg) +def get_neg_value( + s: Union[pd.Series, pd.DataFrame], + judge_sign: Union[float, pd.Series, pd.DataFrame], + zero_as_neg: bool = 1, +) -> Union[pd.Series, pd.DataFrame]: + return s * is_neg(s - judge_sign, zero_as_neg) + @do_on_dfs -def count_pos_neg(s:Union[pd.Series,pd.DataFrame]): - print("正数个数:",is_pos(s).sum().sum(),"负数个数:",is_neg(s).sum().sum()) \ No newline at end of file +def count_pos_neg(s: Union[pd.Series, pd.DataFrame]): + print("正数个数:", is_pos(s).sum().sum(), "负数个数:", is_neg(s).sum().sum()) diff --git a/pure_ocean_breeze/jason/labor/process.py b/pure_ocean_breeze/jason/labor/process.py index cfc0292..34b1c0d 100644 --- a/pure_ocean_breeze/jason/labor/process.py +++ b/pure_ocean_breeze/jason/labor/process.py @@ -498,6 +498,7 @@ def show_corrs( factor_names = [f"fac{i}" for i in list(range(1, len(factors) + 1))] corrs = pd.DataFrame(corrs, columns=factor_names, index=factor_names) np.fill_diagonal(corrs.to_numpy(), 1) + corrs=pd.DataFrame(corrs.fillna(0).to_numpy()+corrs.fillna(0).to_numpy().T-np.diag(np.diag(corrs)),index=corrs.index,columns=corrs.columns) if show_percent: pcorrs = corrs.applymap(to_percent) else: