diff --git a/pure_ocean_breeze/__init__.py b/pure_ocean_breeze/__init__.py index c5acedf..f188117 100644 --- a/pure_ocean_breeze/__init__.py +++ b/pure_ocean_breeze/__init__.py @@ -2,8 +2,8 @@ 一个量化多因子研究的框架,包含数据、回测、因子加工等方面的功能 """ -__updated__ = "2023-07-03 20:59:40" -__version__ = "4.0.3" +__updated__ = "2023-07-04 09:39:33" +__version__ = "4.0.4" __author__ = "chenzongwei" __author_email__ = "winterwinter999@163.com" __url__ = "https://github.com/chen-001/pure_ocean_breeze" diff --git a/pure_ocean_breeze/labor/process.py b/pure_ocean_breeze/labor/process.py index 2b5ca9f..d4ae156 100644 --- a/pure_ocean_breeze/labor/process.py +++ b/pure_ocean_breeze/labor/process.py @@ -1,4 +1,4 @@ -__updated__ = "2023-07-03 21:16:22" +__updated__ = "2023-07-04 15:30:48" import warnings @@ -32,6 +32,8 @@ import tradetime as tt import cufflinks as cf import deprecation +import duckdb +import concurrent from mpire import WorkerPool from scipy.optimize import minimize from pure_ocean_breeze import __version__ @@ -1359,7 +1361,7 @@ def show_corrs_with_old( corrs = show_corrs(olds, old_orders, method=method) else: corrs = show_corrs(olds, old_orders, method=method) - return corrs + return corrs.sort_index() @do_on_dfs @@ -6570,26 +6572,23 @@ def select_one_calculate( self, date: pd.Timestamp, func: Callable, + fields: str='*', resample_frequency: str = None, opens_in: bool = 0, highs_in: bool = 0, lows_in: bool = 0, - moneys_in: bool = 0, + amounts_in: bool = 0, merge_them: bool = 0, ) -> None: the_func = partial(func) if not isinstance(date, int): date = int(datetime.datetime.strftime(date, "%Y%m%d")) + parquet_name=homeplace.tick_by_tick_data+ str(date)[:4]+ "-"+ str(date)[4:6]+ "-"+ str(date)[6:]+ ".parquet" + if resample_frequency is not None: + fields='date,code,price,amount' # 开始计算因子值 - df = pd.read_parquet( - homeplace.tick_by_tick_data - + str(date)[:4] - + "-" - + str(date)[4:6] - + "-" - + str(date)[6:] - + ".parquet" - ) + cursor=duckdb.connect() + df=cursor.execute(f"select {fields} from '{parquet_name}';").arrow().to_pandas() date = df.date.iloc[0] date0 = pd.Timestamp(year=date.year, month=date.month, day=date.day) age_here = self.age.loc[pd.Timestamp(pd.Timestamp(df.date.iloc[0]).date())] @@ -6658,14 +6657,14 @@ def select_one_calculate( self.low = None names.append("close") - if moneys_in: - moneys = df.groupby(["code", "date"]).money.sum().reset_index() - moneys = moneys.pivot(index="date", columns="code", values="money") - moneys = moneys.resample(resample_frequency).sum().fillna(0) - self.moneys = moneys - names.append("money") + if amounts_in: + amounts = df.groupby(["code", "date"]).amount.sum().reset_index() + amounts = amounts.pivot(index="date", columns="code", values="amount") + amounts = amounts.resample(resample_frequency).sum().fillna(0) + self.amounts = amounts + names.append("amount") else: - self.moneys = None + self.amounts = None if merge_them: self.data = merge_many( @@ -6676,7 +6675,7 @@ def select_one_calculate( self.highs, self.lows, self.closes, - self.moneys, + self.amounts, ] if i is not None ], @@ -6715,7 +6714,7 @@ def get_daily_factors( opens_in: bool = 0, highs_in: bool = 0, lows_in: bool = 0, - moneys_in: bool = 0, + amounts_in: bool = 0, merge_them: bool = 0, ) -> None: """每次抽取chunksize天的截面上全部股票的分钟数据 @@ -6740,16 +6739,16 @@ def get_daily_factors( lows_in : bool, optional 在resample_frequency不为None的情况下,可以使用此参数,提前计算好最低价矩阵(index为时间,columns为股票代码,values为最低价), 可在循环计算的函数中使用`self.lows`来调用计算好的值,by default 0 - moneys_in : bool, optional - 在resample_frequency不为None的情况下,可以使用此参数,提前计算好成交额矩阵(index为时间,columns为股票代码,values为成交额), - 可在循环计算的函数中使用`self.moneys`来调用计算好的值,by default 0 + amounts_in : bool, optional + 在resample_frequency不为None的情况下,可以使用此参数,提前计算好成交额矩阵(index为时间,columns为股票代码,values为成交量), + 可在循环计算的函数中使用`self.amounts`来调用计算好的值,by default 0 merge_them : bool, optional 在resample_frequency不为None的情况下,可以使用此参数,将计算好的因子值合并到一起,生成类似于分钟数据的sql形式,by default 0 """ if len(self.dates_new) > 0: if n_jobs > 1: - with WorkerPool(n_jobs=n_jobs) as pool: - self.factor_new = pool.map( + with concurrent.futures.ThreadPoolExecutor(max_workers=n_jobs) as executor: + self.factor_new=list(tqdm.auto.tqdm(executor.map( lambda x: self.select_one_calculate( date=x, func=func, @@ -6757,12 +6756,9 @@ def get_daily_factors( opens_in=opens_in, highs_in=highs_in, lows_in=lows_in, - moneys_in=moneys_in, + moneys_in=amounts_in, merge_them=merge_them, - ), - self.dates_new, - progress_bar=True, - ) + ),self.dates_new),total=len(self.dates_new))) else: for date in tqdm.auto.tqdm(self.dates_new, "您现在处于单核运算状态,建议仅在调试时使用单核"): df = self.select_one_calculate( @@ -6772,7 +6768,7 @@ def get_daily_factors( opens_in=opens_in, highs_in=highs_in, lows_in=lows_in, - moneys_in=moneys_in, + amounts_in=amounts_in, merge_them=merge_them, ) self.factor_new.append(df) diff --git "a/\346\233\264\346\226\260\346\227\245\345\277\227/version4.md" "b/\346\233\264\346\226\260\346\227\245\345\277\227/version4.md" index 0f70942..75e3ccb 100644 --- "a/\346\233\264\346\226\260\346\227\245\345\277\227/version4.md" +++ "b/\346\233\264\346\226\260\346\227\245\345\277\227/version4.md" @@ -1,6 +1,14 @@ ## 更新日志🗓 — v4 +* v4.0.4 — 2023.7.4 + +> 1. 优化了show_corrs_with_old的结果展示排序 +> 2. 修复了pure_fall_nature中关于money与amount的错误 +> 3. 给pure_fall_nature的get_daily_factors方法增加了fields参数,用于指定要读取的字段,以节约内存(使用duckdb实现) +> 4. 将pure_fall_nature中的并行方法改为使用concurrent + + * v4.0.3 — 2023.7.4 > 1. 新增了全新的因子成果数据库FactorDone,每个最终复合因子,都附带其细分因子