Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: add data acquisition to recorders #64

Merged
merged 28 commits into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
acf866f
refactor!: add data acquisition models to recorder
fubuloubu Apr 2, 2024
baf6f3c
refactor: reformat metrics a bit
fubuloubu Apr 17, 2024
55d23e2
refactor: change how state annotations work a bit
fubuloubu Apr 17, 2024
0980bd6
refactor!: remove SQLRecorder, replace with JSONLineRecorder
fubuloubu Apr 17, 2024
9233839
refactor: migrate CircuitBreaker exception to subclass of Halt
fubuloubu Apr 17, 2024
5cc0d0d
refactor!: migrate recorder config to CLI callback
fubuloubu Apr 17, 2024
cc3cfff
refactor!: clean up startup process significantly
fubuloubu Apr 17, 2024
1f86d98
fix: don't use py 3.10 unions yet
fubuloubu Apr 17, 2024
acccdbb
fix: missing Annotated from py 3.8
fubuloubu Apr 17, 2024
086df4e
refactor: fix rebase misses
fubuloubu May 2, 2024
a0ac645
refactor: use a more recent block number
fubuloubu May 2, 2024
2acfcc0
fix: do not check `len(ContractEvent)` for performance reasons
fubuloubu May 2, 2024
1416b3d
fix: display event signatures as strings
fubuloubu May 2, 2024
152876d
fix: ensure that task name ends up in labels
fubuloubu May 2, 2024
f5e5d2f
fix: feedback from peer review
fubuloubu May 2, 2024
bc9146b
refactor: shorten name of startup/shutdown tags
fubuloubu May 2, 2024
8ef802e
fix: wrong label selected to pull block number
fubuloubu May 2, 2024
71eda73
refactor: ensure all tasks have task name (not just silverback)
fubuloubu May 2, 2024
029df43
feat: store startup and shutdown result via recorder
fubuloubu May 2, 2024
8cd2771
refactor!: move `.identifier` from runner to app
fubuloubu May 2, 2024
9173f53
refactor!: remove WorkerState, suggest using TaskiqState
fubuloubu May 2, 2024
a0e2a3e
refactor!: move application state from Recorder to new state datastore
fubuloubu May 2, 2024
143fad2
fix: revert back to .pop bug fix
fubuloubu May 2, 2024
8ae71df
fix: move extra quotes for event signatures to middleware
fubuloubu May 2, 2024
b4d7970
refactor: rename variable for clarity
fubuloubu May 2, 2024
b6468bf
fix: unused import
fubuloubu May 2, 2024
f8d3713
fix: was using app.state on shutdown
fubuloubu May 2, 2024
17302cb
fix: constrain integer values to support maximum parquet type
fubuloubu Apr 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ version.py

# Ape stuff
.build/
.silverback-sessions/

**/.DS_Store
*.swp
Expand Down
70 changes: 41 additions & 29 deletions example.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
from typing import Annotated

from ape import chain
from ape.api import BlockAPI
from ape.types import ContractLog
from ape_tokens import tokens # type: ignore[import]
from taskiq import Context, TaskiqDepends, TaskiqState

from silverback import CircuitBreaker, SilverbackApp, SilverbackStartupState
from silverback import CircuitBreaker, SilverbackApp, WorkerState

# Do this to initialize your app
# Do this first to initialize your app
app = SilverbackApp()

# NOTE: Don't do any networking until after initializing app
Expand All @@ -17,53 +14,68 @@


@app.on_startup()
def app_startup(startup_state: SilverbackStartupState):
return {"message": "Starting...", "block_number": startup_state.last_block_seen}
def app_startup():
# NOTE: This is called just as the app is put into "run" state,
# and handled by the first available worker
# raise Exception # NOTE: Any exception raised on startup aborts immediately
return {"block_number": app.state.last_block_seen}


# Can handle some resource initialization for each worker, like LLMs or database connections
class MyDB:
def execute(self, query: str):
pass


# Can handle some initialization on startup, like models or network connections
@app.on_worker_startup()
def worker_startup(state: TaskiqState):
def worker_startup(state: WorkerState): # NOTE: You need the type hint here
# NOTE: Can put anything here, any python object works
state.db = MyDB()
state.block_count = 0
# state.db = MyDB()
return {"message": "Worker started."}
# raise Exception # NOTE: Any exception raised on worker startup aborts immediately


# This is how we trigger off of new blocks
@app.on_(chain.blocks)
# context must be a type annotated kwarg to be provided to the task
def exec_block(block: BlockAPI, context: Annotated[Context, TaskiqDepends()]):
context.state.block_count += 1
# NOTE: The type hint for block is `BlockAPI`, but we parse it using `EcosystemAPI`
# NOTE: If you need something from worker state, you have to use the type hint
def exec_block(block: BlockAPI, state: WorkerState):
state.db.execute(f"some query {block.number}")
return len(block.transactions)


# This is how we trigger off of events
# Set new_block_timeout to adjust the expected block time.
@app.on_(USDC.Transfer, start_block=18588777, new_block_timeout=25)
# NOTE: Typing isn't required
@app.on_(USDC.Transfer, start_block=19784367, new_block_timeout=25)
# NOTE: Typing isn't required, it will still be an Ape `ContractLog` type
def exec_event1(log):
if log.log_index % 7 == 3:
# If you ever want the app to shutdown under some scenario, call this exception
raise CircuitBreaker("Oopsie!")
# If you raise any exception, Silverback will track the failure and keep running
# NOTE: By default, if you have 3 tasks fail in a row, the app will shutdown itself
raise ValueError("I don't like the number 3.")

return {"amount": log.amount}


@app.on_(YFI.Approval)
# Any handler function can be async too
async def exec_event2(log: ContractLog):
return log.amount

if log.log_index % 7 == 6:
# If you ever want the app to immediately shutdown under some scenario, raise this exception
raise CircuitBreaker("Oopsie!")

# Just in case you need to release some resources or something
@app.on_worker_shutdown()
def worker_shutdown(state):
return {
"message": f"Worker stopped after handling {state.block_count} blocks.",
"block_count": state.block_count,
}
return log.amount


# A final job to execute on Silverback shutdown
@app.on_shutdown()
def app_shutdown(state):
return {"message": "Stopping..."}
def app_shutdown():
# raise Exception # NOTE: Any exception raised on shutdown is ignored
return {"block_number": app.state.last_block_processed}


# Just in case you need to release some resources or something inside each worker
@app.on_worker_shutdown()
def worker_shutdown(state):
state.db = None
# raise Exception # NOTE: Any exception raised on worker shutdown is ignored
4 changes: 2 additions & 2 deletions silverback/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from .application import SilverbackApp
from .exceptions import CircuitBreaker, SilverbackException
from .types import SilverbackStartupState
from .types import WorkerState

__all__ = [
"CircuitBreaker",
"SilverbackApp",
"SilverbackException",
"SilverbackStartupState",
"WorkerState",
]
19 changes: 17 additions & 2 deletions silverback/_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,16 @@ def _runner_callback(ctx, param, val):
raise ValueError(f"Failed to import runner '{val}'.")


def _recorder_callback(ctx, param, val):
if not val:
return None

elif recorder := import_from_string(val):
return recorder()

raise ValueError(f"Failed to import recorder '{val}'.")


def _account_callback(ctx, param, val):
if val:
val = val.alias.replace("dev_", "TEST::")
Expand Down Expand Up @@ -92,11 +102,16 @@ async def run_worker(broker: AsyncBroker, worker_count=2, shutdown_timeout=90):
help="An import str in format '<module>:<CustomRunner>'",
callback=_runner_callback,
)
@click.option(
"--recorder",
help="An import string in format '<module>:<CustomRecorder>'",
callback=_recorder_callback,
)
@click.option("-x", "--max-exceptions", type=int, default=3)
@click.argument("path")
def run(cli_ctx, account, runner, max_exceptions, path):
def run(cli_ctx, account, runner, recorder, max_exceptions, path):
app = import_from_string(path)
runner = runner(app, max_exceptions=max_exceptions)
runner = runner(app, recorder=recorder, max_exceptions=max_exceptions)
asyncio.run(runner.run())


Expand Down
22 changes: 15 additions & 7 deletions silverback/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,11 @@ def __init__(self, settings: Settings | None = None):
if not settings:
settings = Settings()

self.network = settings.get_provider_context()
self.name = settings.APP_NAME

network = settings.get_provider_context()
# NOTE: This allows using connected ape methods e.g. `Contract`
provider = self.network.__enter__()
provider = network.__enter__()

# Adjust defaults from connection
if settings.NEW_BLOCK_TIMEOUT is None and (
Expand All @@ -64,23 +66,25 @@ def __init__(self, settings: Settings | None = None):
self.tasks: defaultdict[TaskType, list[TaskData]] = defaultdict(list)
self.poll_settings: dict[str, dict] = {}

atexit.register(self.network.__exit__, None, None, None)
atexit.register(network.__exit__, None, None, None)

self.signer = settings.get_signer()
self.new_block_timeout = settings.NEW_BLOCK_TIMEOUT
self.start_block = settings.START_BLOCK

network_str = f'\n NETWORK="{provider.network.ecosystem.name}:{provider.network.name}"'
self.network_choice = f"{provider.network.ecosystem.name}:{provider.network.name}"
signer_str = f"\n SIGNER={repr(self.signer)}"
start_block_str = f"\n START_BLOCK={self.start_block}" if self.start_block else ""
new_block_timeout_str = (
f"\n NEW_BLOCK_TIMEOUT={self.new_block_timeout}" if self.new_block_timeout else ""
)
logger.info(
f"Loaded Silverback App:{network_str}"
f'Loaded Silverback App:\n NETWORK="{self.network_choice}"'
f"{signer_str}{start_block_str}{new_block_timeout_str}"
)

self.state = None # Runner manages this
fubuloubu marked this conversation as resolved.
Show resolved Hide resolved

def broker_task_decorator(
self,
task_type: TaskType,
Expand Down Expand Up @@ -120,11 +124,15 @@ def broker_task_decorator(
def add_taskiq_task(handler: Callable) -> AsyncTaskiqDecoratedTask:
labels = {"task_type": str(task_type)}

if container and isinstance(container, ContractEvent):
# NOTE: Do *not* do `if container` because that does a `len(container)` call,
# which for ContractEvent queries *every single log* ever emitted, and really
# we only want to determine if it is not None
if container is not None and isinstance(container, ContractEvent):
# Address is almost a certainty if the container is being used as a filter here.
if contract_address := getattr(container.contract, "address", None):
labels["contract_address"] = contract_address
labels["event_signature"] = container.abi.signature
# NOTE: event signature is a string with spaces/commas, so encapsulate it in quotes
labels["event_signature"] = f'"{container.abi.signature}"'
fubuloubu marked this conversation as resolved.
Show resolved Hide resolved

broker_task = self.broker.register_task(
handler,
Expand Down
22 changes: 17 additions & 5 deletions silverback/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from typing import Any
from typing import Any, Sequence

from ape.exceptions import ApeException
from ape.logging import logger

from .types import TaskType

Expand Down Expand Up @@ -31,14 +30,27 @@ class SilverbackException(ApeException):
"""Base Exception for any Silverback runtime faults."""


# TODO: `ExceptionGroup` added in Python 3.11
class StartupFailure(SilverbackException):
def __init__(self, *exceptions: Sequence[Exception]):
if error_str := "\n".join(str(e) for e in exceptions):
super().__init__(f"Startup failure(s):\n{error_str}")
else:
super().__init__("Startup failure(s) detected. See logs for details.")


class NoTasksAvailableError(SilverbackException):
def __init__(self):
super().__init__("No tasks to execute")


class Halt(SilverbackException):
def __init__(self):
super().__init__("App halted, must restart manually")


class CircuitBreaker(SilverbackException):
class CircuitBreaker(Halt):
"""Custom exception (created by user) that will trigger an application shutdown."""

def __init__(self, message: str):
logger.error(message)
super().__init__(message)
super(SilverbackException, self).__init__(message)
37 changes: 9 additions & 28 deletions silverback/middlewares.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@
from eth_utils.conversions import to_hex
from taskiq import TaskiqMessage, TaskiqMiddleware, TaskiqResult

from silverback.recorder import HandlerResult
from silverback.types import SilverbackID, TaskType
from silverback.types import TaskType
from silverback.utils import hexbytes_dict


Expand All @@ -22,11 +21,7 @@ def compute_block_time() -> int:

return int((head.timestamp - genesis.timestamp) / head.number)

settings = kwargs.pop("silverback_settings")

self.block_time = self.chain_manager.provider.network.block_time or compute_block_time()
self.ident = SilverbackID.from_settings(settings)
self.recorder = settings.get_recorder()

def pre_send(self, message: TaskiqMessage) -> TaskiqMessage:
# TODO: Necessary because bytes/HexBytes doesn't encode/deocde well for some reason
Expand All @@ -49,20 +44,23 @@ def fix_dict(data: dict, recurse_count: int = 0) -> dict:
return message

def _create_label(self, message: TaskiqMessage) -> str:
if labels_str := ",".join(f"{k}={v}" for k, v in message.labels.items()):
if labels_str := ",".join(
f"{k}={v}" for k, v in message.labels.items() if k != "task_name"
):
return f"{message.task_name}[{labels_str}]"

else:
return message.task_name

def pre_execute(self, message: TaskiqMessage) -> TaskiqMessage:
if "task_type" not in message.labels:
return message # Not a silverback task
# NOTE: Ensure we always have this, no matter what
message.labels["task_name"] = message.task_name
mikeshultz marked this conversation as resolved.
Show resolved Hide resolved

task_type = message.labels.pop("task_type")
if not (task_type_str := message.labels.pop("task_type")):
return message # Not a silverback task
fubuloubu marked this conversation as resolved.
Show resolved Hide resolved

try:
task_type = TaskType(task_type)
task_type = TaskType(task_type_str)
except ValueError:
return message # Not a silverback task

Expand Down Expand Up @@ -97,21 +95,4 @@ def post_execute(self, message: TaskiqMessage, result: TaskiqResult):
f"{self._create_label(message)} " f"- {result.execution_time:.3f}s{percent_display}"
)

async def post_save(self, message: TaskiqMessage, result: TaskiqResult):
if not self.recorder:
return

handler_result = HandlerResult.from_taskiq(
self.ident,
message.task_name,
message.labels.get("block_number"),
message.labels.get("log_index"),
result,
)

try:
await self.recorder.add_result(handler_result)
except Exception as err:
logger.error(f"Error storing result: {err}")

# NOTE: Unless stdout is ignored, error traceback appears in stdout, no need for `on_error`
Loading
Loading