Skip to content

Commit

Permalink
v4.0.7
Browse files Browse the repository at this point in the history
  • Loading branch information
chen-001 committed Jul 17, 2023
1 parent 0d8ffc8 commit 30f3c01
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 49 deletions.
4 changes: 2 additions & 2 deletions pure_ocean_breeze/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
一个量化多因子研究的框架,包含数据、回测、因子加工等方面的功能
"""

__updated__ = "2023-07-13 08:36:22"
__version__ = "4.0.6"
__updated__ = "2023-07-17 14:36:04"
__version__ = "4.0.7"
__author__ = "chenzongwei"
__author_email__ = "winterwinter999@163.com"
__url__ = "https://github.com/chen-001/pure_ocean_breeze"
Expand Down
135 changes: 89 additions & 46 deletions pure_ocean_breeze/labor/process.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__updated__ = "2023-07-13 12:17:42"
__updated__ = "2023-07-17 14:55:20"

import warnings

Expand Down Expand Up @@ -3363,6 +3363,7 @@ def select_many_calculate(
show_time: bool = 0,
many_days: int = 1,
n_jobs: int = 1,
use_mpire: bool = 0,
) -> None:
the_func = partial(func)
factor_new = []
Expand Down Expand Up @@ -3413,17 +3414,28 @@ def cal_one(date1, date2):
return df

if n_jobs > 1:
with concurrent.futures.ThreadPoolExecutor(
max_workers=n_jobs
) as executor:
factor_new_more = list(
tqdm.auto.tqdm(
executor.map(
cal_one, cut_points[:-many_days], cut_points[many_days:]
),
total=len(cut_points[many_days:]),
if use_mpire:
with WorkerPool(n_jobs=n_jobs) as pool:
factor_new_more = pool.map(
cal_one,
cut_points[:-many_days],
cut_points[many_days:],
progress_bar=True,
)
else:
with concurrent.futures.ThreadPoolExecutor(
max_workers=n_jobs
) as executor:
factor_new_more = list(
tqdm.auto.tqdm(
executor.map(
cal_one,
cut_points[:-many_days],
cut_points[many_days:],
),
total=len(cut_points[many_days:]),
)
)
)
factor_new = factor_new + factor_new_more
else:
# 开始计算因子值
Expand Down Expand Up @@ -3469,17 +3481,21 @@ def cal_two(date1, date2):
pairs = self.forward_dates(dates, many_days=many_days)
cuts2 = tuple(zip(pairs, dates))
if n_jobs > 1:
with concurrent.futures.ThreadPoolExecutor(
max_workers=n_jobs
) as executor:
factor_new_more = list(
tqdm.auto.tqdm(
executor.map(
cal_two, pairs, dates
),
total=len(pairs),
if use_mpire:
with WorkerPool(n_jobs=n_jobs) as pool:
factor_new_more = pool.map(
cal_two, pairs, dates, progress_bar=True
)
else:
with concurrent.futures.ThreadPoolExecutor(
max_workers=n_jobs
) as executor:
factor_new_more = list(
tqdm.auto.tqdm(
executor.map(cal_two, pairs, dates),
total=len(pairs),
)
)
)
factor_new = factor_new + factor_new_more
else:
# 开始计算因子值
Expand All @@ -3502,6 +3518,7 @@ def select_any_calculate(
show_time: bool = 0,
many_days: int = 1,
n_jobs: int = 1,
use_mpire: bool = 0,
) -> None:
if len(dates) == 1 and many_days == 1:
res = self.select_one_calculate(
Expand All @@ -3519,6 +3536,7 @@ def select_any_calculate(
show_time=show_time,
many_days=many_days,
n_jobs=n_jobs,
use_mpire=use_mpire,
)
if res is not None:
self.factor_new.append(res)
Expand Down Expand Up @@ -3593,6 +3611,7 @@ def get_daily_factors_one(
show_time: bool = 0,
many_days: int = 1,
n_jobs: int = 1,
use_mpire: bool = 0,
):
if len(self.dates_new) > 0:
for interval in self.dates_new_intervals:
Expand All @@ -3604,6 +3623,7 @@ def get_daily_factors_one(
show_time=show_time,
many_days=many_days,
n_jobs=n_jobs,
use_mpire=use_mpire,
)
if len(self.factor_new) > 0:
self.factor_new = pd.concat(self.factor_new)
Expand Down Expand Up @@ -5656,7 +5676,7 @@ def optimize_one_day(
else:
return None

def optimize_many_days(self, startdate: int = STATES['START']):
def optimize_many_days(self, startdate: int = STATES["START"]):
dates = [i for i in self.facs.index if i >= pd.Timestamp(str(startdate))]
for date in tqdm.auto.tqdm(dates):
fac = self.facs[self.facs.index == date].T.dropna()
Expand Down Expand Up @@ -5702,7 +5722,7 @@ def make_contrast(self, weight, index, name) -> list[pd.DataFrame]:
comments = comments_on_twins(rets[f"{name}增强组合超额净值"], abret.dropna())
return comments, rets

def run(self, startdate: int = STATES['START']) -> pd.DataFrame:
def run(self, startdate: int = STATES["START"]) -> pd.DataFrame:
"""运行规划求解
Parameters
Expand Down Expand Up @@ -6761,9 +6781,9 @@ def select_one_calculate(
0, "date", pd.Timestamp(year=date.year, month=date.month, day=date.day)
)
if (df is not None) and (df.shape[0] > 0):
df1 = df.pivot(columns="code", index="date", values="fac")
self.factor_steps.write_via_df(df, self.factor_file_pinyin, tuple_col="fac")
df = df.pivot(columns="code", index="date", values="fac")
return df
return df1

def get_daily_factors(
self,
Expand All @@ -6776,6 +6796,7 @@ def get_daily_factors(
lows_in: bool = 0,
amounts_in: bool = 0,
merge_them: bool = 0,
use_mpire: bool = 0,
) -> None:
"""每次抽取chunksize天的截面上全部股票的分钟数据
对每天的股票的数据计算因子值
Expand All @@ -6786,6 +6807,8 @@ def get_daily_factors(
用于计算因子值的函数
n_jobs : int, optional
并行数量, by default 1
fields : str, optional
要读取的字段,可选包含`date,code,price,amount,saleamount,buyamount,action,saleid,saleprice,buyid,buyprice`,其中date,code必须包含, by default `'*'`
resample_frequency : str, optional
将逐笔数据转化为秒级或分钟频数据,可以填写要转化的频率,如'3s'(3秒数据),'1m'(1分钟数据),
指定此参数后,将自动生成一个self.closes的收盘价矩阵(index为时间,columns为股票代码,values为收盘价),
Expand All @@ -6804,31 +6827,51 @@ def get_daily_factors(
可在循环计算的函数中使用`self.amounts`来调用计算好的值,by default 0
merge_them : bool, optional
在resample_frequency不为None的情况下,可以使用此参数,将计算好的因子值合并到一起,生成类似于分钟数据的sql形式,by default 0
use_mpire : bool, optional
并行是否使用mpire,默认使用concurrent,by default 0
"""
if len(self.dates_new) > 0:
if n_jobs > 1:
with concurrent.futures.ThreadPoolExecutor(
max_workers=n_jobs
) as executor:
self.factor_new = list(
tqdm.auto.tqdm(
executor.map(
lambda x: self.select_one_calculate(
date=x,
func=func,
fields=fields,
resample_frequency=resample_frequency,
opens_in=opens_in,
highs_in=highs_in,
lows_in=lows_in,
amounts_in=amounts_in,
merge_them=merge_them,
),
self.dates_new,
if use_mpire:
with WorkerPool(n_jobs=n_jobs) as pool:
self.factor_new = pool.map(
lambda x: self.select_one_calculate(
date=x,
func=func,
fields=fields,
resample_frequency=resample_frequency,
opens_in=opens_in,
highs_in=highs_in,
lows_in=lows_in,
amounts_in=amounts_in,
merge_them=merge_them,
),
total=len(self.dates_new),
self.dates_new,
progress_bar=True,
)
else:
with concurrent.futures.ThreadPoolExecutor(
max_workers=n_jobs
) as executor:
self.factor_new = list(
tqdm.auto.tqdm(
executor.map(
lambda x: self.select_one_calculate(
date=x,
func=func,
fields=fields,
resample_frequency=resample_frequency,
opens_in=opens_in,
highs_in=highs_in,
lows_in=lows_in,
amounts_in=amounts_in,
merge_them=merge_them,
),
self.dates_new,
),
total=len(self.dates_new),
)
)
)
else:
for date in tqdm.auto.tqdm(self.dates_new, "您现在处于单核运算状态,建议仅在调试时使用单核"):
df = self.select_one_calculate(
Expand All @@ -6848,7 +6891,7 @@ def get_daily_factors(
[self.factor_old] + self.factor_new
).sort_index()
else:
self.factor = self.factor_new.sort_index()
self.factor = pd.concat(self.factor_new).sort_index()
self.factor = drop_duplicates_index(self.factor.dropna(how="all"))
new_end_date = datetime.datetime.strftime(self.factor.index.max(), "%Y%m%d")
# 存入本地
Expand Down
9 changes: 8 additions & 1 deletion 更新日志/version4.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
## 更新日志🗓 — v4


* v4.0.7 — 2023.7.17

> 1. 给pure_fall_frequent和pure_fall_nature新增了use_mpire参数,可以使用mpire库开启并行
> 2. 修复了pure_fall_frequent的select_one_calculate方法中,可能存在的返回类型为字符串的问题
> 3. 修复了pure_fall_nature中全新因子运算完拼接时的bug

* v4.0.6 — 2023.7.14

> 1. 修复了pure_dawn读取已有因子值时`__call__`方法返回错误的问题
Expand Down Expand Up @@ -58,7 +65,7 @@
>
> ```python
> import pure_ocean_breeze as p
>
>
> p.ini()
> ```
> 2. 初始化函数与`Homeplace`参数新增了存储逐笔数据的路径
Expand Down

0 comments on commit 30f3c01

Please sign in to comment.