Skip to content

Commit

Permalink
v4.2.1
Browse files Browse the repository at this point in the history
  • Loading branch information
chen-001 committed Apr 13, 2024
1 parent 7d9ace9 commit d7949e0
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 69 deletions.
30 changes: 19 additions & 11 deletions pure_ocean_breeze/data/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def convert_code(x: str) -> Tuple[str, str]:


@do_on_dfs
def add_suffix(code:str)->str:
def add_suffix(code: str) -> str:
"""给股票代码加上后缀
Parameters
Expand All @@ -111,7 +111,7 @@ def add_suffix(code:str)->str:
-------
str
添加完后缀后的股票代码,如000001.SZ
"""
"""
if not isinstance(code, str):
code = str(code)
if len(code) < 6:
Expand Down Expand Up @@ -290,8 +290,6 @@ def add_suffix(code: str) -> str:
return code




@do_on_dfs
def 生成每日分类表(
df: pd.DataFrame, code: str, entry: str, exit: str, kind: str
Expand Down Expand Up @@ -537,7 +535,7 @@ def get_monthly_factor(
old_date = old.index.max()
if old_date == self.fac.date.max():
logger.info(f"本地文件已经是最新的了,无需计算")
self.fac=old
self.fac = old
else:
try:
new_date = self.find_begin(
Expand Down Expand Up @@ -624,9 +622,13 @@ def get_monthly_factor(
self.fac.to_parquet(homeplace.update_data_file + history_file)
logger.success(f"本地文件已经写入完成")
else:
logger.warning("您本次计算没有指定任何本地文件路径,这很可能会导致大量的重复计算和不必要的时间浪费,请注意!")
logger.warning(
"您本次计算没有指定任何本地文件路径,这很可能会导致大量的重复计算和不必要的时间浪费,请注意!"
)
if daily:
logger.warning("您指定的是日频计算,非月频计算,因此强烈建议您指定history_file参数!!")
logger.warning(
"您指定的是日频计算,非月频计算,因此强烈建议您指定history_file参数!!"
)
if whole_cross:
for end_date in tqdm.auto.tqdm(iter_item):
start_date = self.find_begin(self.tradedays, end_date, self.backsee)
Expand Down Expand Up @@ -930,7 +932,9 @@ def func_rolling(df):
logger.info(f"已经更新至{new_end}")
return cors
else:
logger.warning("您本次计算没有指定任何本地文件路径,这很可能会导致大量的重复计算和不必要的时间浪费,请注意!")
logger.warning(
"您本次计算没有指定任何本地文件路径,这很可能会导致大量的重复计算和不必要的时间浪费,请注意!"
)
twins = merge_many([df1, df2])
tqdm.auto.tqdm.pandas()
corrs = twins.groupby(["code"]).progress_apply(func_rolling)
Expand Down Expand Up @@ -1098,7 +1102,7 @@ def detect_nan(df: pd.DataFrame) -> bool:


@do_on_dfs
def get_abs(df: pd.DataFrame, quantile: float=None, square: bool = 0) -> pd.DataFrame:
def get_abs(df: pd.DataFrame, quantile: float = None, square: bool = 0) -> pd.DataFrame:
"""均值距离化:计算因子与截面均值的距离
Parameters
Expand Down Expand Up @@ -1452,7 +1456,9 @@ def feather_to_parquet_all():
feather_to_parquet(homeplace.final_factor_file)
feather_to_parquet(homeplace.update_data_file)
feather_to_parquet(homeplace.factor_data_file)
logger.success("数据库中的feather文件全部被转化为了parquet文件,您可以手动删除所有的feather文件了")
logger.success(
"数据库中的feather文件全部被转化为了parquet文件,您可以手动删除所有的feather文件了"
)


def zip_many_dfs(dfs: List[pd.DataFrame]) -> pd.DataFrame:
Expand Down Expand Up @@ -1886,7 +1892,9 @@ def clip_sing(x: pd.DataFrame, n: float = 3):
df = df.reset_index()
except Exception:
...
df = df.drop_duplicates(subset=['date','code']).pivot(index="date", columns="code", values="fac")
df = df.drop_duplicates(subset=["date", "code"]).pivot(
index="date", columns="code", values="fac"
)
return df
elif replace:

Expand Down
178 changes: 120 additions & 58 deletions pure_ocean_breeze/jason/data/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -948,15 +948,18 @@ def judge_factor_by_third(

@do_on_dfs
def jason_to_wind(df: pd.DataFrame):
df.index = pd.to_datetime(df.index.astype(str))
df.columns = [add_suffix(i) for i in df.columns]
df1 = df.copy()
df1.index = pd.to_datetime(df1.index.astype(str))
df1.columns = [add_suffix(i) for i in df1.columns]
return df


@do_on_dfs
def wind_to_jason(df: pd.DataFrame):
df.columns = [i[:6] for i in df.columns]
df.index = df.index.strftime("%Y%m%d").astype(int)
return df
df1 = df.copy()
df1.columns = [i[:6] for i in df1.columns]
df1.index = df1.index.strftime("%Y%m%d").astype(int)
return df1


@do_on_dfs
Expand All @@ -968,7 +971,7 @@ def lu计算连续期数2(
"""
<<注意!使用此函数时,目标df的值必须全为1或nan!!!>>
"""

# 将Series中的值转换为布尔值,1为True,其余为False
is_one = s == judge_number

Expand All @@ -983,18 +986,25 @@ def lu计算连续期数2(
return continuous_ones.replace(0, nan_value)


def lu计算连续期数2片段递增(s:Union[pd.Series,pd.DataFrame])->Union[pd.Series,pd.DataFrame]:
return lu计算连续期数2(s)+lu标记连续片段(s)
def lu计算连续期数2片段递增(
s: Union[pd.Series, pd.DataFrame]
) -> Union[pd.Series, pd.DataFrame]:
return lu计算连续期数2(s) + lu标记连续片段(s)


def lu计算连续期数2片段递减(s:Union[pd.Series,pd.DataFrame])->Union[pd.Series,pd.DataFrame]:
return lu计算连续期数2(s)-lu标记连续片段(s)
def lu计算连续期数2片段递减(
s: Union[pd.Series, pd.DataFrame]
) -> Union[pd.Series, pd.DataFrame]:
return lu计算连续期数2(s) - lu标记连续片段(s)


def lu计算连续期数奇正偶反(s:Union[pd.Series,pd.DataFrame])->Union[pd.Series,pd.DataFrame]:
if isinstance(s,pd.DataFrame):
def lu计算连续期数奇正偶反(
s: Union[pd.Series, pd.DataFrame]
) -> Union[pd.Series, pd.DataFrame]:
if isinstance(s, pd.DataFrame):
return s.apply(lu计算连续期数奇正偶反)
else:
s=s.reset_index(drop=True)
s = s.reset_index(drop=True)
# 用于标记每个连续非NaN片段
s_diff = (~s.isna()).astype(int).diff().fillna(1).cumsum()

Expand All @@ -1008,17 +1018,22 @@ def lu计算连续期数奇正偶反(s:Union[pd.Series,pd.DataFrame])->Union[pd.
order = s.groupby(s_mapped).cumcount() + 1

# 计算每个连续非NaN片段的长度
segment_lengths = s.groupby(s_mapped).transform('count')
segment_lengths = s.groupby(s_mapped).transform("count")

# 对偶数编号的片段进行逆序排列
s[s.notna()] = np.where(s_mapped % 2 == 1, order, segment_lengths[s_mapped == s_mapped] - order + 1)
s[s.notna()] = np.where(
s_mapped % 2 == 1, order, segment_lengths[s_mapped == s_mapped] - order + 1
)
return s.values

def lu计算连续期数偶正奇反(s:Union[pd.Series,pd.DataFrame])->Union[pd.Series,pd.DataFrame]:
if isinstance(s,pd.DataFrame):


def lu计算连续期数偶正奇反(
s: Union[pd.Series, pd.DataFrame]
) -> Union[pd.Series, pd.DataFrame]:
if isinstance(s, pd.DataFrame):
return s.apply(lu计算连续期数偶正奇反)
else:
s=s.reset_index(drop=True)
s = s.reset_index(drop=True)
# 用于标记每个连续非NaN片段
s_diff = (~s.isna()).astype(int).diff().fillna(1).cumsum()

Expand All @@ -1032,16 +1047,20 @@ def lu计算连续期数偶正奇反(s:Union[pd.Series,pd.DataFrame])->Union[pd.
order = s.groupby(s_mapped).cumcount() + 1

# 计算每个连续非NaN片段的长度
segment_lengths = s.groupby(s_mapped).transform('count')
segment_lengths = s.groupby(s_mapped).transform("count")

# 对偶数编号的片段进行逆序排列
s[s.notna()] = np.where(s_mapped % 2 == 1, segment_lengths[s_mapped == s_mapped] - order + 1, order)
s[s.notna()] = np.where(
s_mapped % 2 == 1, segment_lengths[s_mapped == s_mapped] - order + 1, order
)
return s.values


def lu计算连续期数长度(s:Union[pd.Series,pd.DataFrame],final_mean=1)->Union[float,pd.Series,pd.DataFrame]:
if isinstance(s,pd.DataFrame):
return s.apply(lambda x:lu计算连续期数长度(x,final_mean))
def lu计算连续期数长度(
s: Union[pd.Series, pd.DataFrame], final_mean=1
) -> Union[float, pd.Series, pd.DataFrame]:
if isinstance(s, pd.DataFrame):
return s.apply(lambda x: lu计算连续期数长度(x, final_mean))
else:
# 标识非 NaN 值
not_nan = s.notnull()
Expand All @@ -1060,64 +1079,82 @@ def lu计算连续期数长度(s:Union[pd.Series,pd.DataFrame],final_mean=1)->Un
return segment_lengths.mean()


def lu标记连续片段(s:Union[pd.Series,pd.DataFrame],label_nan=0,number_continuous=1)->Union[pd.Series,pd.DataFrame]:
def lu标记连续片段(
s: Union[pd.Series, pd.DataFrame], label_nan=0, number_continuous=1
) -> Union[pd.Series, pd.DataFrame]:
not_nan = ~s.isna()
segment_starts = not_nan.diff().fillna(True) # 对序列首个元素填充True,因为diff会产生NaN
segment_starts = not_nan.diff().fillna(
True
) # 对序列首个元素填充True,因为diff会产生NaN

# 为每个连续片段分配一个唯一标识符
segments = segment_starts.cumsum()

# 仅对非NaN片段应用标识符,NaN值保持不变
if not label_nan:
segments=segments*np.sign(s.abs()+1)
segments = segments * np.sign(s.abs() + 1)
if number_continuous:
segments=(segments+1)//2
segments = (segments + 1) // 2
return segments

def lu删去连续片段中的最大值(s:Union[pd.Series,pd.DataFrame])->Union[pd.Series,pd.DataFrame]:
if isinstance(s,pd.DataFrame):


def lu删去连续片段中的最大值(
s: Union[pd.Series, pd.DataFrame]
) -> Union[pd.Series, pd.DataFrame]:
if isinstance(s, pd.DataFrame):
return s.apply(lu删去连续片段中的最大值)
else:
# 生成连续片段的标识符
s_diff = s.isna().astype(int).diff().fillna(0).ne(0).cumsum()

# 对每个片段使用transform找到最大值
max_vals = s.groupby(s_diff).transform('max')
max_vals = s.groupby(s_diff).transform("max")

# 将原始序列中等于最大值的元素替换为NaN
s[s == max_vals] = np.nan
return s

def lu删去连续片段中的最小值(s:Union[pd.Series,pd.DataFrame])->Union[pd.Series,pd.DataFrame]:

def lu删去连续片段中的最小值(
s: Union[pd.Series, pd.DataFrame]
) -> Union[pd.Series, pd.DataFrame]:
return -lu删去连续片段中的最大值(-s)


def lu仅保留连续片段中的最大值(s:Union[pd.Series,pd.DataFrame])->Union[pd.Series,pd.DataFrame]:
if isinstance(s,pd.DataFrame):
def lu仅保留连续片段中的最大值(
s: Union[pd.Series, pd.DataFrame]
) -> Union[pd.Series, pd.DataFrame]:
if isinstance(s, pd.DataFrame):
return s.apply(lu删去连续片段中的最大值)
else:
# 生成连续片段的标识符
s_diff = s.isna().astype(int).diff().fillna(0).ne(0).cumsum()

# 对每个片段使用transform找到最大值
max_vals = s.groupby(s_diff).transform('max')
max_vals = s.groupby(s_diff).transform("max")

# 将原始序列中等于最大值的元素替换为NaN
s[s != max_vals] = np.nan
return s

def lu仅保留连续片段中的最小值(s:Union[pd.Series,pd.DataFrame])->Union[pd.Series,pd.DataFrame]:

def lu仅保留连续片段中的最小值(
s: Union[pd.Series, pd.DataFrame]
) -> Union[pd.Series, pd.DataFrame]:
return -lu仅保留连续片段中的最大值(-s)

def lu删去连续片段中的最大值及其后面的值(s:Union[pd.Series,pd.DataFrame])->Union[pd.Series,pd.DataFrame]:
if isinstance(s,pd.DataFrame):

def lu删去连续片段中的最大值及其后面的值(
s: Union[pd.Series, pd.DataFrame]
) -> Union[pd.Series, pd.DataFrame]:
if isinstance(s, pd.DataFrame):
return s.apply(lu删去连续片段中的最大值及其后面的值)
else:
# 生成连续片段的标识符
s_diff = s.isna().astype(int).diff().fillna(0).ne(0).cumsum()

# 对每个片段使用transform找到最大值
max_vals = s.groupby(s_diff).transform('max')
max_vals = s.groupby(s_diff).transform("max")

# 使用cummax标记最大值及其之后的值
max_flag = (s.groupby(s_diff).cummax() == max_vals).astype(int)
Expand All @@ -1128,39 +1165,64 @@ def lu删去连续片段中的最大值及其后面的值(s:Union[pd.Series,pd.D
# 将标记的值替换为NaN
s[max_flag_cum > 0] = np.nan
return s

def lu删去连续片段中的最小值及其后面的值(s:Union[pd.Series,pd.DataFrame])->Union[pd.Series,pd.DataFrame]:


def lu删去连续片段中的最小值及其后面的值(
s: Union[pd.Series, pd.DataFrame]
) -> Union[pd.Series, pd.DataFrame]:
return -lu删去连续片段中的最大值及其后面的值(-s)

def lu删去连续片段中的最大值及其前面的值(s:Union[pd.Series,pd.DataFrame])->Union[pd.Series,pd.DataFrame]:

def lu删去连续片段中的最大值及其前面的值(
s: Union[pd.Series, pd.DataFrame]
) -> Union[pd.Series, pd.DataFrame]:
return lu删去连续片段中的最大值及其后面的值(s[::-1])[::-1]

def lu删去连续片段中的最小值及其前面的值(s:Union[pd.Series,pd.DataFrame])->Union[pd.Series,pd.DataFrame]:

def lu删去连续片段中的最小值及其前面的值(
s: Union[pd.Series, pd.DataFrame]
) -> Union[pd.Series, pd.DataFrame]:
return -lu删去连续片段中的最大值及其前面的值(-s)


@do_on_dfs
def is_pos(s: Union[pd.Series, pd.DataFrame],zero_as_pos:bool=1) -> Union[pd.Series, pd.DataFrame]:
def is_pos(
s: Union[pd.Series, pd.DataFrame], zero_as_pos: bool = 1
) -> Union[pd.Series, pd.DataFrame]:
if zero_as_pos:
return np.sign(s).replace(0,1).replace(-1,np.nan)
return np.sign(s).replace(0, 1).replace(-1, np.nan)
else:
return np.sign(s).replace(0,np.nan).replace(-1,np.nan)

return np.sign(s).replace(0, np.nan).replace(-1, np.nan)


@do_on_dfs
def is_neg(s: Union[pd.Series, pd.DataFrame],zero_as_neg:bool=1) -> Union[pd.Series, pd.DataFrame]:
def is_neg(
s: Union[pd.Series, pd.DataFrame], zero_as_neg: bool = 1
) -> Union[pd.Series, pd.DataFrame]:
if zero_as_neg:
return np.sign(s).replace(0,-1).replace(1,np.nan).replace(-1,1)
return np.sign(s).replace(0, -1).replace(1, np.nan).replace(-1, 1)
else:
return np.sign(s).replace(0,np.nan).replace(1,np.nan).replace(-1,1)

return np.sign(s).replace(0, np.nan).replace(1, np.nan).replace(-1, 1)


@do_on_dfs
def get_pos_value(s: Union[pd.Series, pd.DataFrame],judge_sign:Union[float,pd.Series, pd.DataFrame],zero_as_pos:bool=1) -> Union[pd.Series, pd.DataFrame]:
return s * is_pos(s-judge_sign,zero_as_pos)
def get_pos_value(
s: Union[pd.Series, pd.DataFrame],
judge_sign: Union[float, pd.Series, pd.DataFrame],
zero_as_pos: bool = 1,
) -> Union[pd.Series, pd.DataFrame]:
return s * is_pos(s - judge_sign, zero_as_pos)


@do_on_dfs
def get_neg_value(s: Union[pd.Series, pd.DataFrame],judge_sign:Union[float,pd.Series, pd.DataFrame],zero_as_neg:bool=1) -> Union[pd.Series, pd.DataFrame]:
return s * is_neg(s-judge_sign,zero_as_neg)
def get_neg_value(
s: Union[pd.Series, pd.DataFrame],
judge_sign: Union[float, pd.Series, pd.DataFrame],
zero_as_neg: bool = 1,
) -> Union[pd.Series, pd.DataFrame]:
return s * is_neg(s - judge_sign, zero_as_neg)


@do_on_dfs
def count_pos_neg(s:Union[pd.Series,pd.DataFrame]):
print("正数个数:",is_pos(s).sum().sum(),"负数个数:",is_neg(s).sum().sum())
def count_pos_neg(s: Union[pd.Series, pd.DataFrame]):
print("正数个数:", is_pos(s).sum().sum(), "负数个数:", is_neg(s).sum().sum())
Loading

0 comments on commit d7949e0

Please sign in to comment.