Skip to content

Commit

Permalink
refactor observer (#58)
Browse files Browse the repository at this point in the history
* refactor observer

* fix pre-commit

* remove duplicate code

* bump
  • Loading branch information
cctdaniel authored May 6, 2024
1 parent 608e906 commit 195e81a
Show file tree
Hide file tree
Showing 7 changed files with 1,151 additions and 1,013 deletions.
2,018 changes: 1,116 additions & 902 deletions poetry.lock

Large diffs are not rendered by default.

5 changes: 2 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ ignore_missing_imports = true

[tool.poetry]
name = "pyth-observer"
version = "0.1.12"
version = "0.1.13"
description = "Alerts and stuff"
authors = []
readme = "README.md"
Expand All @@ -20,9 +20,8 @@ datadog-api-client = { extras = ["async"], version = "^2.5.0" }
loguru = "^0.6.0"
more-itertools = "^9.0.0"
prometheus-client = "0.15.0"
pytz = "^2022.4"
pycoingecko = "2.2.0"
pythclient = "0.1.4"
pythclient = "^0.1.24"
pyyaml = "^6.0"
throttler = "1.2.1"
types-pyyaml = "^6.0.12"
Expand Down
23 changes: 16 additions & 7 deletions pyth_observer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,25 +94,34 @@ async def run(self):
)

for _, price_account in price_accounts.items():
# Handle potential None for min_publishers
if (
price_account.min_publishers is None
# When min_publishers is high it means that the price is not production-ready
# yet and it is still being tested. We need no alerting for these prices.
or price_account.min_publishers >= 10
):
continue

# Ensure latest_block_slot is not None or provide a default value
latest_block_slot = (
price_account.slot if price_account.slot is not None else -1
)

if not price_account.aggregate_price_status:
raise RuntimeError("Price account status is missing")

if not price_account.aggregate_price_info:
raise RuntimeError("Aggregate price info is missing")

# When min_publishers is high it means that the price is not production-ready
# yet and it is still being tested. We need no alerting for these prices.
if price_account.min_publishers >= 10:
continue

states.append(
PriceFeedState(
symbol=product.attrs["symbol"],
asset_type=product.attrs["asset_type"],
public_key=price_account.key,
status=price_account.aggregate_price_status,
# this is the solana block slot when price account was fetched
latest_block_slot=price_account.slot,
latest_block_slot=latest_block_slot,
latest_trading_slot=price_account.last_slot,
price_aggregate=price_account.aggregate_price_info.price,
confidence_interval_aggregate=price_account.aggregate_price_info.confidence_interval,
Expand Down Expand Up @@ -141,7 +150,7 @@ async def run(self):
slot=component.latest_price_info.pub_slot,
aggregate_slot=price_account.last_slot,
# this is the solana block slot when price account was fetched
latest_block_slot=price_account.slot,
latest_block_slot=latest_block_slot,
status=component.latest_price_info.price_status,
aggregate_status=price_account.aggregate_price_status,
)
Expand Down
68 changes: 0 additions & 68 deletions pyth_observer/calendar.py

This file was deleted.

24 changes: 12 additions & 12 deletions pyth_observer/check/price_feed.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import datetime
import time
from dataclasses import dataclass
from datetime import datetime
from textwrap import dedent
from typing import Dict, Optional, Protocol, runtime_checkable
from zoneinfo import ZoneInfo

import arrow
import pytz
from pythclient.calendar import is_market_open
from pythclient.pythaccounts import PythPriceStatus
from pythclient.solana import SolanaPublicKey

from pyth_observer.calendar import HolidayCalendar
from pyth_observer.crosschain import CrosschainPrice


Expand Down Expand Up @@ -56,13 +56,13 @@ def state(self) -> PriceFeedState:
return self.__state

def run(self) -> bool:
is_market_open = HolidayCalendar().is_market_open(
market_open = is_market_open(
self.__state.asset_type,
datetime.datetime.now(tz=pytz.timezone("America/New_York")),
datetime.now(ZoneInfo("America/New_York")),
)

# Skip if market is not open
if not is_market_open:
if not market_open:
return True

distance = abs(
Expand Down Expand Up @@ -181,13 +181,13 @@ def run(self) -> bool:
if self.__state.status != PythPriceStatus.TRADING:
return True

is_market_open = HolidayCalendar().is_market_open(
market_open = is_market_open(
self.__state.asset_type,
datetime.datetime.now(tz=pytz.timezone("America/New_York")),
datetime.now(ZoneInfo("America/New_York")),
)

# Skip if not trading hours (for equities)
if not is_market_open:
if not market_open:
return True

# Price should exist, it fails otherwise
Expand Down Expand Up @@ -243,13 +243,13 @@ def run(self) -> bool:
if self.__state.status != PythPriceStatus.TRADING:
return True

is_market_open = HolidayCalendar().is_market_open(
market_open = is_market_open(
self.__state.asset_type,
datetime.datetime.now(tz=pytz.timezone("America/New_York")),
datetime.now(ZoneInfo("America/New_York")),
)

# Skip if not trading hours (for equities)
if not is_market_open:
if not market_open:
return True

staleness = int(time.time()) - self.__state.crosschain_price["publish_time"]
Expand Down
9 changes: 5 additions & 4 deletions pyth_observer/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@
from datadog_api_client.configuration import Configuration as DatadogConfig
from datadog_api_client.v1.api.events_api import EventsApi as DatadogEventAPI
from datadog_api_client.v1.model.event_alert_type import EventAlertType
from datadog_api_client.v1.model.event_create_request import (
EventCreateRequest as DatadogAPIEvent,
)
from datadog_api_client.v1.model.event_create_request import EventCreateRequest
from loguru import logger

from pyth_observer.check import Check
Expand Down Expand Up @@ -45,7 +43,7 @@ async def send(self):
# An example would be: PublisherPriceCheck-Crypto.AAVE/USD-9TvAYCUkGajRXs....
aggregation_key += "-" + self.check.state().public_key.key

event = DatadogAPIEvent(
event = EventCreateRequest(
aggregation_key=aggregation_key,
title=text.split("\n")[0],
text=text,
Expand All @@ -59,6 +57,9 @@ async def send(self):
source_type_name="my_apps",
)

# Cast the event to EventCreateRequest explicitly because pyright complains that the previous line returns UnparsedObject | Unknown | None
event = cast(EventCreateRequest, event)

# This assumes that DD_API_KEY and DD_SITE env. variables are set. Also,
# using the async API makes the events api return a coroutine, so we
# ignore the pyright warning.
Expand Down
17 changes: 0 additions & 17 deletions requirements.txt

This file was deleted.

0 comments on commit 195e81a

Please sign in to comment.