From 30f3c014d891b5838ef4d13c889c63c5706330e5 Mon Sep 17 00:00:00 2001 From: chenzongwei <63836858+chen-001@users.noreply.github.com> Date: Mon, 17 Jul 2023 14:59:45 +0800 Subject: [PATCH] v4.0.7 --- pure_ocean_breeze/__init__.py | 4 +- pure_ocean_breeze/labor/process.py | 135 ++++++++++++------ .../version4.md" | 9 +- 3 files changed, 99 insertions(+), 49 deletions(-) diff --git a/pure_ocean_breeze/__init__.py b/pure_ocean_breeze/__init__.py index 2bed341..bfbe8c9 100644 --- a/pure_ocean_breeze/__init__.py +++ b/pure_ocean_breeze/__init__.py @@ -2,8 +2,8 @@ 一个量化多因子研究的框架,包含数据、回测、因子加工等方面的功能 """ -__updated__ = "2023-07-13 08:36:22" -__version__ = "4.0.6" +__updated__ = "2023-07-17 14:36:04" +__version__ = "4.0.7" __author__ = "chenzongwei" __author_email__ = "winterwinter999@163.com" __url__ = "https://github.com/chen-001/pure_ocean_breeze" diff --git a/pure_ocean_breeze/labor/process.py b/pure_ocean_breeze/labor/process.py index 5ea2224..38fea10 100644 --- a/pure_ocean_breeze/labor/process.py +++ b/pure_ocean_breeze/labor/process.py @@ -1,4 +1,4 @@ -__updated__ = "2023-07-13 12:17:42" +__updated__ = "2023-07-17 14:55:20" import warnings @@ -3363,6 +3363,7 @@ def select_many_calculate( show_time: bool = 0, many_days: int = 1, n_jobs: int = 1, + use_mpire: bool = 0, ) -> None: the_func = partial(func) factor_new = [] @@ -3413,17 +3414,28 @@ def cal_one(date1, date2): return df if n_jobs > 1: - with concurrent.futures.ThreadPoolExecutor( - max_workers=n_jobs - ) as executor: - factor_new_more = list( - tqdm.auto.tqdm( - executor.map( - cal_one, cut_points[:-many_days], cut_points[many_days:] - ), - total=len(cut_points[many_days:]), + if use_mpire: + with WorkerPool(n_jobs=n_jobs) as pool: + factor_new_more = pool.map( + cal_one, + cut_points[:-many_days], + cut_points[many_days:], + progress_bar=True, + ) + else: + with concurrent.futures.ThreadPoolExecutor( + max_workers=n_jobs + ) as executor: + factor_new_more = list( + tqdm.auto.tqdm( + executor.map( + cal_one, + cut_points[:-many_days], + cut_points[many_days:], + ), + total=len(cut_points[many_days:]), + ) ) - ) factor_new = factor_new + factor_new_more else: # 开始计算因子值 @@ -3469,17 +3481,21 @@ def cal_two(date1, date2): pairs = self.forward_dates(dates, many_days=many_days) cuts2 = tuple(zip(pairs, dates)) if n_jobs > 1: - with concurrent.futures.ThreadPoolExecutor( - max_workers=n_jobs - ) as executor: - factor_new_more = list( - tqdm.auto.tqdm( - executor.map( - cal_two, pairs, dates - ), - total=len(pairs), + if use_mpire: + with WorkerPool(n_jobs=n_jobs) as pool: + factor_new_more = pool.map( + cal_two, pairs, dates, progress_bar=True + ) + else: + with concurrent.futures.ThreadPoolExecutor( + max_workers=n_jobs + ) as executor: + factor_new_more = list( + tqdm.auto.tqdm( + executor.map(cal_two, pairs, dates), + total=len(pairs), + ) ) - ) factor_new = factor_new + factor_new_more else: # 开始计算因子值 @@ -3502,6 +3518,7 @@ def select_any_calculate( show_time: bool = 0, many_days: int = 1, n_jobs: int = 1, + use_mpire: bool = 0, ) -> None: if len(dates) == 1 and many_days == 1: res = self.select_one_calculate( @@ -3519,6 +3536,7 @@ def select_any_calculate( show_time=show_time, many_days=many_days, n_jobs=n_jobs, + use_mpire=use_mpire, ) if res is not None: self.factor_new.append(res) @@ -3593,6 +3611,7 @@ def get_daily_factors_one( show_time: bool = 0, many_days: int = 1, n_jobs: int = 1, + use_mpire: bool = 0, ): if len(self.dates_new) > 0: for interval in self.dates_new_intervals: @@ -3604,6 +3623,7 @@ def get_daily_factors_one( show_time=show_time, many_days=many_days, n_jobs=n_jobs, + use_mpire=use_mpire, ) if len(self.factor_new) > 0: self.factor_new = pd.concat(self.factor_new) @@ -5656,7 +5676,7 @@ def optimize_one_day( else: return None - def optimize_many_days(self, startdate: int = STATES['START']): + def optimize_many_days(self, startdate: int = STATES["START"]): dates = [i for i in self.facs.index if i >= pd.Timestamp(str(startdate))] for date in tqdm.auto.tqdm(dates): fac = self.facs[self.facs.index == date].T.dropna() @@ -5702,7 +5722,7 @@ def make_contrast(self, weight, index, name) -> list[pd.DataFrame]: comments = comments_on_twins(rets[f"{name}增强组合超额净值"], abret.dropna()) return comments, rets - def run(self, startdate: int = STATES['START']) -> pd.DataFrame: + def run(self, startdate: int = STATES["START"]) -> pd.DataFrame: """运行规划求解 Parameters @@ -6761,9 +6781,9 @@ def select_one_calculate( 0, "date", pd.Timestamp(year=date.year, month=date.month, day=date.day) ) if (df is not None) and (df.shape[0] > 0): + df1 = df.pivot(columns="code", index="date", values="fac") self.factor_steps.write_via_df(df, self.factor_file_pinyin, tuple_col="fac") - df = df.pivot(columns="code", index="date", values="fac") - return df + return df1 def get_daily_factors( self, @@ -6776,6 +6796,7 @@ def get_daily_factors( lows_in: bool = 0, amounts_in: bool = 0, merge_them: bool = 0, + use_mpire: bool = 0, ) -> None: """每次抽取chunksize天的截面上全部股票的分钟数据 对每天的股票的数据计算因子值 @@ -6786,6 +6807,8 @@ def get_daily_factors( 用于计算因子值的函数 n_jobs : int, optional 并行数量, by default 1 + fields : str, optional + 要读取的字段,可选包含`date,code,price,amount,saleamount,buyamount,action,saleid,saleprice,buyid,buyprice`,其中date,code必须包含, by default `'*'` resample_frequency : str, optional 将逐笔数据转化为秒级或分钟频数据,可以填写要转化的频率,如'3s'(3秒数据),'1m'(1分钟数据), 指定此参数后,将自动生成一个self.closes的收盘价矩阵(index为时间,columns为股票代码,values为收盘价), @@ -6804,31 +6827,51 @@ def get_daily_factors( 可在循环计算的函数中使用`self.amounts`来调用计算好的值,by default 0 merge_them : bool, optional 在resample_frequency不为None的情况下,可以使用此参数,将计算好的因子值合并到一起,生成类似于分钟数据的sql形式,by default 0 + use_mpire : bool, optional + 并行是否使用mpire,默认使用concurrent,by default 0 """ if len(self.dates_new) > 0: if n_jobs > 1: - with concurrent.futures.ThreadPoolExecutor( - max_workers=n_jobs - ) as executor: - self.factor_new = list( - tqdm.auto.tqdm( - executor.map( - lambda x: self.select_one_calculate( - date=x, - func=func, - fields=fields, - resample_frequency=resample_frequency, - opens_in=opens_in, - highs_in=highs_in, - lows_in=lows_in, - amounts_in=amounts_in, - merge_them=merge_them, - ), - self.dates_new, + if use_mpire: + with WorkerPool(n_jobs=n_jobs) as pool: + self.factor_new = pool.map( + lambda x: self.select_one_calculate( + date=x, + func=func, + fields=fields, + resample_frequency=resample_frequency, + opens_in=opens_in, + highs_in=highs_in, + lows_in=lows_in, + amounts_in=amounts_in, + merge_them=merge_them, ), - total=len(self.dates_new), + self.dates_new, + progress_bar=True, + ) + else: + with concurrent.futures.ThreadPoolExecutor( + max_workers=n_jobs + ) as executor: + self.factor_new = list( + tqdm.auto.tqdm( + executor.map( + lambda x: self.select_one_calculate( + date=x, + func=func, + fields=fields, + resample_frequency=resample_frequency, + opens_in=opens_in, + highs_in=highs_in, + lows_in=lows_in, + amounts_in=amounts_in, + merge_them=merge_them, + ), + self.dates_new, + ), + total=len(self.dates_new), + ) ) - ) else: for date in tqdm.auto.tqdm(self.dates_new, "您现在处于单核运算状态,建议仅在调试时使用单核"): df = self.select_one_calculate( @@ -6848,7 +6891,7 @@ def get_daily_factors( [self.factor_old] + self.factor_new ).sort_index() else: - self.factor = self.factor_new.sort_index() + self.factor = pd.concat(self.factor_new).sort_index() self.factor = drop_duplicates_index(self.factor.dropna(how="all")) new_end_date = datetime.datetime.strftime(self.factor.index.max(), "%Y%m%d") # 存入本地 diff --git "a/\346\233\264\346\226\260\346\227\245\345\277\227/version4.md" "b/\346\233\264\346\226\260\346\227\245\345\277\227/version4.md" index a33d3b4..8677fce 100644 --- "a/\346\233\264\346\226\260\346\227\245\345\277\227/version4.md" +++ "b/\346\233\264\346\226\260\346\227\245\345\277\227/version4.md" @@ -1,6 +1,13 @@ ## 更新日志🗓 — v4 +* v4.0.7 — 2023.7.17 + +> 1. 给pure_fall_frequent和pure_fall_nature新增了use_mpire参数,可以使用mpire库开启并行 +> 2. 修复了pure_fall_frequent的select_one_calculate方法中,可能存在的返回类型为字符串的问题 +> 3. 修复了pure_fall_nature中全新因子运算完拼接时的bug + + * v4.0.6 — 2023.7.14 > 1. 修复了pure_dawn读取已有因子值时`__call__`方法返回错误的问题 @@ -58,7 +65,7 @@ > > ```python > import pure_ocean_breeze as p -> +> > p.ini() > ``` > 2. 初始化函数与`Homeplace`参数新增了存储逐笔数据的路径