Skip to content

Commit

Permalink
markets changes, bug fixes, tests (#188)
Browse files Browse the repository at this point in the history
  • Loading branch information
AnastasiyaB committed Aug 20, 2021
1 parent 3d96784 commit 967e8dd
Show file tree
Hide file tree
Showing 53 changed files with 2,897 additions and 584 deletions.
38 changes: 30 additions & 8 deletions gs_quant/analytics/core/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
under the License.
"""
import asyncio
import functools
import logging
import uuid
from abc import ABCMeta, abstractmethod
Expand Down Expand Up @@ -57,6 +58,13 @@ class DataQueryInfo:
data: Series = None


@dataclass
class MeasureQueryInfo:
attr: str
processor: 'BaseProcessor'
entity: Entity


DateOrDatetimeOrRDate = Union[DateOrDatetime, RelativeDate]


Expand All @@ -70,6 +78,7 @@ def __init__(self, **kwargs):
self.children_data: Dict[str, ProcessorResult] = {}
self.data_cell = None
self.last_value = kwargs.get('last_value', False)
self.measure_processor = kwargs.get('measure_processor', False)

@abstractmethod
def process(self, *args):
Expand All @@ -86,7 +95,7 @@ def __handle_date_range(self,
result,
rdate_entity_map: Dict[str, date]):
"""
Applies a date/datetime mask on the result using the start/end parameters on a processoor
Applies a date/datetime mask on the result using the start/end parameters on a processor
:param result:
:param rdate_entity_map: map of entity, rule, base_date to date value
:return: None
Expand Down Expand Up @@ -125,23 +134,32 @@ async def update(self,
attribute: str,
result: ProcessorResult,
rdate_entity_map: Dict[str, date],
pool: ProcessPoolExecutor = None):
pool: ProcessPoolExecutor = None,
query_info: Union[DataQueryInfo, MeasureQueryInfo] = None):
""" Handle the update of a single coordinate and recalculate the value
:param attribute: Attribute alinging to data coordinate in the processor
:param result: Processor result including success and series from data query
"""
self.__handle_date_range(result, rdate_entity_map)
if not self.measure_processor:
self.__handle_date_range(result, rdate_entity_map)
self.children_data[attribute] = result

if isinstance(result, ProcessorResult):
if result.success:
try:
if pool:
value = await asyncio.get_running_loop().run_in_executor(pool, self.process)
if self.measure_processor:
value = await asyncio.get_running_loop()\
.run_in_executor(pool, functools.partial(self.process, query_info.entity))
else:
value = await asyncio.get_running_loop().run_in_executor(pool, self.process)
self.value = value
else:
self.process()
if self.measure_processor:
self.process(query_info.entity)
else:
self.process()
self.post_process()
except Exception as e:
self.value = ProcessorResult(False,
Expand All @@ -168,7 +186,7 @@ def __add_required_rdates(self, entity: Entity, rdate_entity_map: Dict[str, Set[
def build_graph(self,
entity: Entity,
cell,
queries: List[DataQueryInfo],
queries: List[Union[DataQueryInfo, MeasureQueryInfo]],
rdate_entity_map: Dict[str, Set[Tuple]],
overrides: Optional[List]):
""" Generates the nested cell graph and keeps a map of leaf data queries to processors"""
Expand All @@ -177,6 +195,9 @@ def build_graph(self,

attributes = self.__dict__

if self.measure_processor:
queries.append(MeasureQueryInfo(attr='a', processor=self, entity=entity))

for attr_name, child in self.children.items():
if isinstance(child, DataCoordinate):
# Override coordinate dimensions
Expand Down Expand Up @@ -212,14 +233,15 @@ async def calculate(self,
attribute: str,
result: ProcessorResult,
rdate_entity_map: Dict[str, date],
pool: ProcessPoolExecutor = None):
pool: ProcessPoolExecutor = None,
query_info: Union[DataQueryInfo, MeasureQueryInfo] = None):
""" Sets the result on the processor and recursively calls parent to set and calculate value
:param attribute: Attribute alinging to data coordinate in the processor
:param result: Processor result including success and series from data query
"""
# update the result
await self.update(attribute, result, rdate_entity_map, pool)
await self.update(attribute, result, rdate_entity_map, pool, query_info)

# if there is a parent, traverse up and recompute
if self.parent:
Expand Down
3 changes: 3 additions & 0 deletions gs_quant/analytics/core/query_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

from pandas import DataFrame, to_datetime

from gs_quant.analytics.core.processor import MeasureQueryInfo
from gs_quant.analytics.core.processor_result import ProcessorResult
from gs_quant.data import DataFrequency
from gs_quant.session import GsSession
Expand All @@ -31,6 +32,8 @@
def aggregate_queries(query_infos):
mappings = defaultdict(dict) # DataSet -> start/end
for query_info in query_infos:
if isinstance(query_info, MeasureQueryInfo):
continue
query = query_info.query
coordinate = query.coordinate
dataset_id = coordinate.dataset_id
Expand Down
7 changes: 4 additions & 3 deletions gs_quant/analytics/datagrid/data_cell.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@

import copy
import uuid
from typing import List, Optional, Dict, Set, Tuple
from typing import List, Optional, Dict, Set, Tuple, Union

from pandas import Series

from gs_quant.analytics.common import DATA_CELL_NOT_CALCULATED
from gs_quant.analytics.core import BaseProcessor
from gs_quant.analytics.core.processor import DataQueryInfo
from gs_quant.analytics.core.processor import DataQueryInfo, MeasureQueryInfo
from gs_quant.analytics.core.processor_result import ProcessorResult
from gs_quant.analytics.datagrid import Override
from gs_quant.analytics.datagrid.utils import get_utc_now
Expand Down Expand Up @@ -61,7 +61,8 @@ def __init__(self,
# Store the cell data queries
self.data_queries: List[DataQueryInfo] = []

def build_cell_graph(self, all_queries: List[DataQueryInfo], rdate_entity_map: Dict[str, Set[Tuple]]) -> None:
def build_cell_graph(self, all_queries: List[Union[DataQueryInfo, MeasureQueryInfo]],
rdate_entity_map: Dict[str, Set[Tuple]]) -> None:
""" Generate and store the cell graph and data queries
This can be modified to return the data queries rather than store on the cell
Expand Down
19 changes: 14 additions & 5 deletions gs_quant/analytics/datagrid/datagrid.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from gs_quant.analytics.common import DATAGRID_HELP_MSG
from gs_quant.analytics.common.helpers import resolve_entities, get_entity_rdate_key, get_entity_rdate_key_from_rdate, \
get_rdate_cache_key
from gs_quant.analytics.core.processor import DataQueryInfo
from gs_quant.analytics.core.processor import DataQueryInfo, MeasureQueryInfo
from gs_quant.analytics.core.processor_result import ProcessorResult
from gs_quant.analytics.core.query_helpers import aggregate_queries, fetch_query, build_query_string, valid_dimensions
from gs_quant.analytics.datagrid.data_cell import DataCell
Expand Down Expand Up @@ -125,7 +125,7 @@ def __init__(self,
# store the graph, data queries to leaf processors and results
self._primary_column_index: int = kwargs.get('primary_column_index', 0)
self._cells: List[DataCell] = []
self._data_queries: List[DataQueryInfo] = []
self._data_queries: List[Union[DataQueryInfo, MeasureQueryInfo]] = []
self._entity_cells: List[DataCell] = []
self._coord_processor_cells: List[DataCell] = []
self._value_cells: List[DataCell] = []
Expand All @@ -152,7 +152,7 @@ def initialize(self) -> None:
Upon providing data to a leaf, the leaf processor is calculated and propagated up the graph to the cell level.
"""
all_queries: List[DataQueryInfo] = []
all_queries: List[Union[DataQueryInfo, MeasureQueryInfo]] = []
entity_cells: List[DataCell] = []
current_row_group = None

Expand Down Expand Up @@ -201,6 +201,8 @@ def initialize(self) -> None:
cell.processor.children['a'].set_dimensions(data_overrides[-1].dimensions)

self._coord_processor_cells.append(cell)
elif column_processor.measure_processor:
all_queries.append(MeasureQueryInfo(attr='', entity=entity, processor=column_processor))
else:
# append the required queries to the map
cell.build_cell_graph(all_queries, self.rdate_entity_map)
Expand Down Expand Up @@ -342,7 +344,8 @@ def _resolve_queries(self, availability_cache: Dict = None) -> None:

for query in self._data_queries:
entity = query.entity
if isinstance(entity, str): # If we were unable to fetch entity (404/403)
if isinstance(entity, str) or isinstance(query, MeasureQueryInfo):
# If we were unable to fetch entity (404/403) or if we're processing a measure processor
continue
query = query.query
coord = query.coordinate
Expand Down Expand Up @@ -397,7 +400,13 @@ def _fetch_queries(self):
query_info.data = Series()

for query_info in self._data_queries:
if query_info.data is None or len(query_info.data) == 0:
if isinstance(query_info, MeasureQueryInfo):
asyncio.get_event_loop().run_until_complete(
query_info.processor.calculate(query_info.attr,
ProcessorResult(True, None),
self.rule_cache,
query_info=query_info))
elif query_info.data is None or len(query_info.data) == 0:
asyncio.get_event_loop().run_until_complete(
query_info.processor.calculate(query_info.attr,
ProcessorResult(False,
Expand Down
4 changes: 2 additions & 2 deletions gs_quant/analytics/processors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

from .analysis_processors import DiffProcessor
from .econometrics_processors import SharpeRatioProcessor, VolatilityProcessor, CorrelationProcessor, ChangeProcessor, \
ReturnsProcessor, BetaProcessor
ReturnsProcessor, BetaProcessor, FXImpliedCorrProcessor
from .special_processors import EntityProcessor, CoordinateProcessor
from .statistics_processors import PercentilesProcessor, PercentileProcessor, \
from .statistics_processors import PercentilesProcessor, PercentileProcessor, StdMoveProcessor, \
CovarianceProcessor, ZscoresProcessor, MeanProcessor, VarianceProcessor, SumProcessor, StdDevProcessor
from .utility_processors import LastProcessor, AppendProcessor, AdditionProcessor, SubtractionProcessor, \
MultiplicationProcessor, DivisionProcessor, MinProcessor, MaxProcessor, NthLastProcessor
Expand Down
50 changes: 48 additions & 2 deletions gs_quant/analytics/processors/econometrics_processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
from gs_quant.analytics.core.processor import BaseProcessor, DataCoordinateOrProcessor, DataQueryInfo, \
DateOrDatetimeOrRDate
from gs_quant.analytics.core.processor_result import ProcessorResult
from gs_quant.analytics.processors.special_processors import MeasureProcessor
from gs_quant.data.coordinate import DataCoordinate
from gs_quant.data.query import DataQuery
from gs_quant.entities.entity import Entity
from gs_quant.markets.securities import Stock
from gs_quant.markets.securities import Stock, Cross
from gs_quant.target.common import Currency
from gs_quant.timeseries import correlation, Window, SeriesType, DataMeasure, DataFrequency
from gs_quant.timeseries import correlation, Window, SeriesType, DataMeasure, DataFrequency, DataContext, \
fx_implied_correlation
from gs_quant.timeseries import excess_returns_pure
from gs_quant.timeseries.econometrics import get_ratio_pure, SharpeAssets, change, returns
from gs_quant.timeseries.econometrics import volatility, Returns, beta
Expand Down Expand Up @@ -349,3 +351,47 @@ def process(self):

def get_plot_expression(self):
pass


class FXImpliedCorrProcessor(MeasureProcessor):

def __init__(self,
*,
cross2: Entity = None,
tenor: str = '3m',
start: Optional[DateOrDatetimeOrRDate] = None,
end: Optional[DateOrDatetimeOrRDate] = None,
**kwargs):
""" CorrelationProcessor
:param a: DataCoordinate or BaseProcessor for the series
:param benchmark: benchmark to compare price series
:param start: start date or time used in the underlying data query
:param end: end date or time used in the underlying data query
:param w: Window, int, or str: size of window and ramp up to use. e.g. Window(22, 10) where 22 is the window
size and 10 the ramp up value. If w is a string, it should be a relative date like '1m', '1d', etc.
Window size defaults to length of series.
:param type_: type of both input series: prices or returns
"""
super().__init__(**kwargs, )
self.cross2: Entity = cross2
self.tenor: str = tenor
# datetime
self.start = start
self.end = end

def process(self, cross1: Entity) -> ProcessorResult:
if isinstance(cross1, Cross) and isinstance(self.cross2, Cross):
try:
with DataContext(self.start, self.end):
result = fx_implied_correlation(cross1, self.cross2, self.tenor)
self.value = ProcessorResult(True, result)
except Exception as e:
self.value = ProcessorResult(False, str(e))
else:
self.value = ProcessorResult(False, "Processor does not have valid crosses as inputs")

return self.value

def get_plot_expression(self):
pass
11 changes: 11 additions & 0 deletions gs_quant/analytics/processors/special_processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,14 @@ def update(self, attribute: str, result: ProcessorResult):

def get_plot_expression(self):
pass


class MeasureProcessor(BaseProcessor):
def __init__(self, **kwargs):
super().__init__(**kwargs, measure_processor=True)

def process(self, *args):
pass

def get_plot_expression(self):
pass
51 changes: 51 additions & 0 deletions gs_quant/analytics/processors/statistics_processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

from gs_quant.analytics.core.processor import BaseProcessor, DataCoordinateOrProcessor, DateOrDatetimeOrRDate
from gs_quant.analytics.core.processor_result import ProcessorResult
from gs_quant.timeseries import returns
from gs_quant.timeseries.statistics import percentiles, percentile, Window, mean, sum_, std, var, cov, zscores


Expand Down Expand Up @@ -440,3 +441,53 @@ def process(self):

def get_plot_expression(self):
pass


class StdMoveProcessor(BaseProcessor):
def __init__(self,
a: DataCoordinateOrProcessor,
*,
start: Optional[DateOrDatetimeOrRDate] = None,
end: Optional[DateOrDatetimeOrRDate] = None,
w: Union[Window, int] = None,
**kwargs):
""" StdMoveProcessor: Returns normalized by std deviation of series a
:param a: DataCoordinate or BaseProcessor for the first series
:param start: start date or time used in the underlying data query
:param end: end date or time used in the underlying data query
:param w: Window or int: size of window and ramp up to use. e.g. Window(22, 10) where 22 is the window size
and 10 the ramp up value. If w is a string, it should be a relative date like '1m', '1d', etc.
Window size defaults to length of series.
"""
super().__init__(**kwargs)
self.children['a'] = a

self.start = start
self.end = end
self.w = w

def process(self):
a_data = self.children_data.get('a')
if isinstance(a_data, ProcessorResult):
if a_data.success:
data_series = a_data.data
change_pd = data_series.tail(2)
change = returns(change_pd).iloc[-1]

# Pass in all values except last value (which is last value)
returns_series = returns(data_series.head(-1))
std_result = std(returns_series, w=Window(None, 0) if self.w is None else self.w).iloc[-1]

if change is not None and std_result != 0:
self.value = ProcessorResult(True, pd.Series([change / std_result]))
else:
self.value = ProcessorResult(False, "StdMoveProcessor returns a NaN")
else:
self.value = ProcessorResult(False, "StdMoveProcessor does not have 'a' series values yet")
else:
self.value = ProcessorResult(False, "StdMoveProcessor does not have 'a' series yet")
return self.value

def get_plot_expression(self):
pass
2 changes: 2 additions & 0 deletions gs_quant/api/gs/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ class QueryType(Enum):
USD_OIS = "Usd Ois"
NON_USD_OIS = "Non Usd Ois"
SETTLEMENT_PRICE = "Settlement Price"
THEMATIC_EXPOSURE = "Thematic Exposure"
THEMATIC_BETA = "Thematic Beta"


class GsDataApi(DataApi):
Expand Down
Loading

0 comments on commit 967e8dd

Please sign in to comment.