Skip to content

Pooler is the first adapter that'll make way for future curation on PowerLoom. We are snapshotting data from Uniswap like AMMs on EVM chains.

License

Notifications You must be signed in to change notification settings

PowerLoom/pooler

Repository files navigation

Table of Contents

Overview

Snapshotter workflow

A snapshotter peer as part of Powerloom Protocol does exactly what the name suggests: It synchronizes with other snapshotter peers over a smart contract running on Powerloom Prost chain. It follows an architecture that is driven by state transitions which makes it easy to understand and modify.

Because of its decentralized nature, the snapshotter specification and its implementations share some powerful features that can adapt to your specific information requirements on blockchain applications:

  • Each data point is calculated, updated, and synchronized with other snapshotter peers participating in the network
  • synchronization of data points is defined as a function of an epoch ID(identifier) where epoch refers to an equally spaced collection of blocks on the data source blockchain (for eg, Ethereum Mainnet/Polygon Mainnet/Polygon Testnet -- Mumbai). This simplifies the building of use cases that are stateful (i.e. can be accessed according to their state at a given height of the data source chain), synchronized, and depend on reliable data. For example,
    • dashboards by offering higher-order aggregate datapoints
    • trading strategies and bots
  • a snapshotter peer can load past epochs, indexes, and aggregates from a decentralized state and have access to a rich history of data
    • all the datasets are decentralized on IPFS/Filecoin
    • the power of these decentralized storage networks can be leveraged fully by applying the principle of composability

Architecture

The Snapshotter Peer is thoughtfully designed with a modular and highly configurable architecture, allowing for easy customization and seamless integration. It consists of three core components:

  1. Main Snapshotter Codebase:

    • This foundational component defines all the essential interfaces and handles a wide range of tasks, from listening to epoch release events to distributing tasks and managing snapshot submissions.
  2. Configuration Files:

    • Configuration files, located in the /config directory are linked to snapshotter-configs repo, play a pivotal role in defining project types, specifying paths for individual compute modules, and managing various project-related settings.
  3. Compute Modules:

    • The heart of the system resides in the snapshotter/modules directory are linked to snapshotter-computes, where the actual computation logic for each project type is defined. These modules drive the snapshot generation process for specific project types.

Snapshotter Architecture

The architecture has been designed to facilitate the seamless interchange of configuration and modules. To achieve this, we maintain these components in separate Git repositories, which are then integrated into the Snapshotter Peer using Git Submodules. As a result, adapting the system to different use cases is as straightforward as changing a Git branch, offering unparalleled flexibility and versatility.

For more information on using Git Submodules, please refer to the Git Submodules Documentation.

Setup

The snapshotter is a distributed system with multiple moving parts. The easiest way to get started is by using the Docker-based setup according to the instructions in the section: Development setup and instructions.

If you're planning to participate as a snapshotter, refer to these instructions to start snapshotting.

If you're a developer, you can follow the manual configuration steps for pooler from this document followed by the instructions on the deploy repo for code contributors for a more hands-on approach.

Note - RPC usage is highly use-case specific. If your use case is complicated and needs to make a lot of RPC calls, it is recommended to run your own RPC node instead of using third-party RPC services as it can be expensive.

State transitions and data composition

Data composition

Epoch Generation

An epoch denotes a range of block heights on the EVM-compatible data source blockchain, for eg Ethereum mainnet/Polygon PoS mainnet/testnet. This makes it easier to collect state transitions and snapshots of data on equally spaced block height intervals, as well as to support future work on other lightweight anchor proof mechanisms like Merkle proofs, succinct proofs, etc.

The size of an epoch is configurable. Let that be referred to as size(E)

  • A trusted service keeps track of the head of the chain as it moves ahead, and a marker h₀ against the max block height from the last released epoch. This makes the beginning of the next epoch, h₁ = h₀ + 1

  • Once the head of the chain has moved sufficiently ahead so that an epoch can be published, an epoch finalization service takes into account the following factors

    • chain reorganization reports where the reorganized limits are a subset of the epoch qualified to be published
    • a configurable ‘offset’ from the bleeding edge of the chain

and then publishes an epoch (h₁, h₂) by sending a transaction to the protocol state smart contract deployed on the Prost Chain (anchor chain) so that h₂ - h₁ + 1 == size(E). The next epoch, therefore, is tracked from h₂ + 1.

Each such transaction emits an EpochReleased event

event EpochReleased(uint256 indexed epochId, uint256 begin, uint256 end, uint256 timestamp);

The epochId here is incremented by 1 with every successive epoch release.

Preloading

Preloaders perform an important function of fetching low-level data for eg. block details, and transaction receipts so that subsequent base snapshot building can proceed without performing unnecessary redundant calls that ultimately save on access costs on RPC and other queries on the underlying node infrastructure for the source data blockchain.

Each project type within the project configuration as found in config/projects.json can specify the preloaders that their base snapshot builds depend on. Once the dependent preloaders have completed their fetches, the Processor Distributor subsequently triggers the base snapshot builders for each project type.

"config": [{
"project_type": "pairContract_pair_total_reserves",
"preload_tasks":[
"eth_price",
"block_details"
],
"projects":[

The preloaders implement one of the following two generic interfaces

  • GenericPreloader

class GenericPreloader(ABC):
__metaclass__ = ABCMeta
def __init__(self):
pass
@abstractmethod
async def compute(
self,
epoch: EpochBase,
redis_conn: aioredis.Redis,
rpc_helper: RpcHelper,
):
pass
@abstractmethod
async def cleanup(self):
pass

  • GenericDelegatorPreloader. Such preloaders are tasked with fetching large volumes of data and utilize delegated workers to which they submit large workloads over a request queue and wait for the results to be returned over a response queue.

class GenericDelegatorPreloader(GenericPreloader):
_epoch: EpochBase
_channel: aio_pika.abc.AbstractChannel
_exchange: aio_pika.abc.AbstractExchange
_q_obj: aio_pika.abc.AbstractQueue
_consumer_tag: str
_redis_conn: aioredis.Redis
_task_type: str
_epoch_id: int
_preload_successful_event: asyncio.Event
_awaited_delegated_response_ids: set
_collected_response_objects: Dict[int, Dict[str, Dict[Any, Any]]]
_logger: loguru._logger.Logger
_request_id_query_obj_map: Dict[int, Any]
@abstractmethod
async def _on_delegated_responses_complete(self):
pass
@abstractmethod
async def _on_filter_worker_response_message(
self,
message: aio_pika.abc.AbstractIncomingMessage,
):
pass
@abstractmethod
async def _handle_filter_worker_response_message(self, message_body: bytes):
pass
@abstractmethod
async def _periodic_awaited_responses_checker(self):
pass

The preloaders can be found in the snapshotter/utils/preloaders directory. The preloaders that are available to project configuration entries are exposed through the config/preloader.json configuration.

{
"preloaders": [
{
"task_type": "block_transactions",
"module": "snapshotter.utils.preloaders.tx_receipts.preloader",
"class_name": "TxPreloadWorker"
},
{
"task_type": "block_details",
"module": "snapshotter.utils.preloaders.block_details.preloader",
"class_name": "BlockDetailsPreloader"
},
{
"task_type": "eth_price",
"module": "snapshotter.utils.preloaders.eth_price.preloader",
"class_name": "EthPricePreloader"
}
],
"delegate_tasks": [
{
"task_type": "txreceipt",
"module": "snapshotter.utils.preloaders.tx_receipts.delegated_worker.tx_receipts",
"class_name": "TxReceiptProcessor"
}
],
"timeout": 60
}

At the moment, we have 3 generic preloaders built into the snapshotter template.

  • Block Details - It prefetches and stores block details for blocks in each Epoch and stores it in Redis
  • Eth Price - It prefetches and stores ETH price for blocks in each Epoch and stores it in redis
  • Tx Receipts - It prefetches all transaction details present in each Epoch and stores the data in Redis. Since fetching all block transactions is a lot of work, it utilizes the delegated workers architecture to parallelize and fetch data in a fast and reliable way

More preloaders can be easily added depending on the use case user is snapshotting for. It is as simple as writing logic in preloader.py, adding the preloader config to config/preloader.json, and adding the preloader dependency in config/projects.json

Base Snapshot Generation

Workers, as mentioned in the configuration section for config/projects.json, calculate base snapshots against this epochId which corresponds to collections of state observations and event logs between the blocks at height in the range [begin, end].

The data sources are determined according to the following specification for the projects key:

  • an empty array against the projects indicates no specific data source is defined
  • an array of EVM-compatible wallet address strings can also be listed
  • an array of "_" strings that denote the relationship between two EVM addresses (for eg ERC20 balance of addr2 against a token contract addr1)
  • data sources can be dynamically added on the protocol state contract which the processor distributor syncs with:

The project ID is ultimately generated in the following manner:

def _gen_project_id(self, task_type: str, data_source: Optional[str] = None, primary_data_source: Optional[str] = None):
"""
Generates a project ID based on the given task type, data source, and primary data source.
Args:
task_type (str): The type of task.
data_source (Optional[str], optional): The data source. Defaults to None.
primary_data_source (Optional[str], optional): The primary data source. Defaults to None.
Returns:
str: The generated project ID.
"""
if not data_source:
# For generic use cases that don't have a data source like block details
project_id = f'{task_type}:{settings.namespace}'
else:
if primary_data_source:
project_id = f'{task_type}:{primary_data_source.lower()}_{data_source.lower()}:{settings.namespace}'
else:
project_id = f'{task_type}:{data_source.lower()}:{settings.namespace}'
return project_id

The snapshots generated by workers defined in this config are the fundamental data models on which higher-order aggregates and richer data points are built. The SnapshotSubmitted event generated on such base snapshots further triggers the building of sophisticated aggregates, super-aggregates, filters, and other data composites on top of them.

Bulk Mode

For situations where data sources are constantly changing or numerous, making it impractical to maintain an extensive list of them, the Snapshotter Peer offers a Bulk Mode. This feature is particularly useful in scenarios where specific data sources need not be defined explicitly.

In Bulk Mode, the system monitors all transactions and blocks without the need for predefined data sources. The Processor Distributor generates a SnapshotProcessMessage with bulk mode enabled for each project type. When snapshot workers receive this message, they leverage preloaded transaction receipts for entire blocks, filtering out relevant transactions to generate snapshots for all data sources that interacted with the blockchain during that epoch. Snapshot worker then generates relevant project Ids for these snapshots and submits them for further processing.

Bulk Mode is highly effective in situations where the project list is continually expanding or where snapshots don't need to be submitted in every epoch, perhaps because the data hasn't changed significantly. Example use cases include monitoring on-chain activities and tracking task or quest completion statuses on the blockchain.

An important advantage of Bulk Mode is that, since all transaction receipts are preloaded, this approach can efficiently scale to accommodate a large number of project types with little to no increase in RPC (Remote Procedure Call) calls.

for project_data_source, snapshot in snapshots:
data_sources = project_data_source.split('_')
if len(data_sources) == 1:
data_source = data_sources[0]
primary_data_source = None
else:
primary_data_source, data_source = data_sources
project_id = self._gen_project_id(
task_type=task_type, data_source=data_source, primary_data_source=primary_data_source,
)
await self._redis_conn.set(
name=submitted_base_snapshots_key(
epoch_id=msg_obj.epochId, project_id=project_id,
),
value=snapshot.json(),
# block time is about 2 seconds on anchor chain, keeping it around ten times the submission window
ex=self._submission_window * 10 * 2,
)
p = self._redis_conn.pipeline()
p.hset(
name=epoch_id_project_to_state_mapping(
epoch_id=msg_obj.epochId, state_id=SnapshotterStates.SNAPSHOT_BUILD.value,
),
mapping={
project_id: SnapshotterStateUpdate(
status='success', timestamp=int(time.time()),
).json(),
},
)
await p.execute()
await self._commit_payload(
task_type=task_type,
_ipfs_writer_client=self._ipfs_writer_client,
project_id=project_id,
epoch=msg_obj,
snapshot=snapshot,
storage_flag=settings.web3storage.upload_snapshots,
)

Data source signaling

As seen above in the section on base snapshot generation, data sources can be dynamically added to the contract according to the role of certain peers in the ecosystem known as 'signallers'. This is the most significant aspect of the Powerloom Protocol ecosystem apart from snapshotting and will soon be decentralized to factor in on-chain activity, and market forces and accommodate a demand-driven, dynamic data ecosystem.

In the existing setup, when the project_type is set to an empty array ([]) and bulk mode is not activated, the snapshotter node attempts to retrieve data sources corresponding to the projects key from the protocol state contract.

Whenever a data source is added or removed by a combination of the data source-detector and signaller, the protocol state smart contract emits a ProjectUpdated event, adhering to the defined data model.

class ProjectsUpdatedEvent(EventBase):
projectId: str
allowed: bool
enableEpochId: int

The snapshotting for every such dynamically added project is initiated only when the epochId, corresponding to the field enableEpochId contained within the ProjectUpdated event, is released. The processor distributor correctly triggers the snapshotting workflow for such dynamically added data sources in the following segment:

for project in project_config.projects:
project_id = f'{project_type}:{project}:{settings.namespace}'
if project_id.lower() in self._newly_added_projects:
genesis = True
self._newly_added_projects.remove(project_id.lower())
else:
genesis = False
data_sources = project.split('_')
if len(data_sources) == 1:
data_source = data_sources[0]
primary_data_source = None
else:
primary_data_source, data_source = data_sources
process_unit = PowerloomSnapshotProcessMessage(
begin=epoch.begin,
end=epoch.end,
epochId=epoch.epochId,
data_source=data_source,
primary_data_source=primary_data_source,
genesis=genesis,
)
msg_body = Message(process_unit.json().encode('utf-8'))
queuing_tasks.append(
exchange.publish(
routing_key=f'powerloom-backend-callback:{settings.namespace}'
f':{settings.instance_id}:EpochReleased.{project_type}',
message=msg_body,
),
)

Snapshot Finalization

All snapshots per project reach consensus on the protocol state contract which results in a SnapshotFinalized event being triggered.

event SnapshotFinalized(uint256 indexed epochId, uint256 epochEnd, string projectId, string snapshotCid, uint256 timestamp);

Epoch processing state transitions

The following is a sequence of states that an epoch goes through from the point epoch is released until SnapshotFinalized event is received by the processor distributor for the specific epoch. These state transitions can be inspected in detail as noted in the section on internal snapshotter APIs.


EPOCH_RELEASED

The state name is self explanatory.

PRELOAD

For every project type's preloader specifications, the status of all the preloading dependencies being satisfied is captured here:

for project_type in self._project_type_config_mapping:
project_config = self._project_type_config_mapping[project_type]
if not project_config.preload_tasks:
continue
self._logger.debug(
'Expected list of successful preloading for project type {}: {}',
project_type,
project_config.preload_tasks,
)
if all([t in succesful_preloads for t in project_config.preload_tasks]):
self._logger.info(
'Preloading dependency satisfied for project type {} epoch {}. Distributing snapshot build tasks...',
project_type, epoch.epochId,
)
asyncio.ensure_future(
self._redis_conn.hset(
name=epoch_id_project_to_state_mapping(epoch.epochId, SnapshotterStates.PRELOAD.value),
mapping={
project_type: SnapshotterStateUpdate(
status='success', timestamp=int(time.time())
).json()
}
)
)
await self._distribute_callbacks_snapshotting(project_type, epoch)

SNAPSHOT_BUILD

The snapshot builders as configured in projects.json are executed. Also refer to the case study of the current implementation of Pooler for a detailed look at snapshot building for base as well as aggregates.

await self._redis_conn.hset(
name=epoch_id_project_to_state_mapping(
epoch_id=msg_obj.epochId, state_id=SnapshotterStates.SNAPSHOT_BUILD.value,
),
mapping={
project_id: SnapshotterStateUpdate(
status='failed', error=str(e), timestamp=int(time.time()),
).json(),
},
)
else:
await self._redis_conn.hset(
name=epoch_id_project_to_state_mapping(
epoch_id=msg_obj.epochId, state_id=SnapshotterStates.SNAPSHOT_BUILD.value,
),
mapping={
project_id: SnapshotterStateUpdate(
status='success', timestamp=int(time.time()),
).json(),
},
)

SNAPSHOT_SUBMIT_PAYLOAD_COMMIT

Captures the status of propagation of the built snapshot to the payload commit service in Audit Protocol for further submission to the protocol state contract.

except Exception as e:
self._logger.opt(exception=True).error(
(
'Exception committing snapshot to commit payload queue:'
' {} | dump: {}'
),
snapshot,
e,
)
await self._redis_conn.hset(
name=epoch_id_project_to_state_mapping(
epoch.epochId, SnapshotterStates.SNAPSHOT_SUBMIT_PAYLOAD_COMMIT.value,
),
mapping={
project_id: SnapshotterStateUpdate(
status='failed', error=str(e), timestamp=int(time.time()),
).json(),
},
)
else:
await self._redis_conn.hset(
name=epoch_id_project_to_state_mapping(
epoch.epochId, SnapshotterStates.SNAPSHOT_SUBMIT_PAYLOAD_COMMIT.value,
),
mapping={
project_id: SnapshotterStateUpdate(
status='success', timestamp=int(time.time()),
).json(),
},
)

RELAYER_SEND

Payload commit service has sent the snapshot to a transaction relayer to submit to the protocol state contract.

SNAPSHOT_FINALIZE

Finalized snapshot accepted against an epoch via a SnapshotFinalized event.

await self._redis_conn.hset(
name=epoch_id_project_to_state_mapping(msg_obj.epochId, SnapshotterStates.SNAPSHOT_FINALIZE.value),
mapping={
msg_obj.projectId: SnapshotterStateUpdate(
status='success', timestamp=int(time.time()), extra={'snapshot_cid': msg_obj.snapshotCid}
).json()
}
)

Aggregation and data composition - snapshot generation of higher-order data points on base snapshots

Workers as defined in config/aggregator.json are triggered by the appropriate signals forwarded to Processor Distributor corresponding to the project ID filters as explained in the Configuration section. This is best seen in action in Pooler, the snapshotter implementation that serves multiple aggregated data points for Uniswap v2 trade information.

In case of aggregation over multiple projects, their project IDs are generated with a combination of the hash of the dependee project IDs along with the namespace

def _gen_single_type_project_id(self, task_type, epoch):
"""
Generates a project ID for a single task type and epoch.
Args:
task_type (str): The task type.
epoch (Epoch): The epoch object.
Returns:
str: The generated project ID.
"""
data_source = epoch.projectId.split(':')[-2]
project_id = f'{task_type}:{data_source}:{settings.namespace}'
return project_id
def _gen_multiple_type_project_id(self, task_type, epoch):
"""
Generates a unique project ID based on the task type and epoch messages.
Args:
task_type (str): The type of task.
epoch (Epoch): The epoch object containing messages.
Returns:
str: The generated project ID.
"""
underlying_project_ids = [project.projectId for project in epoch.messages]
unique_project_id = ''.join(sorted(underlying_project_ids))
project_hash = hashlib.sha3_256(unique_project_id.encode()).hexdigest()
project_id = f'{task_type}:{project_hash}:{settings.namespace}'
return project_id
def _gen_project_id(self, task_type, epoch):
"""
Generates a project ID based on the given task type and epoch.
Args:
task_type (str): The type of task.
epoch (int): The epoch number.
Returns:
str: The generated project ID.
Raises:
ValueError: If the task type is unknown.
"""
if task_type in self._single_project_types:
return self._gen_single_type_project_id(task_type, epoch)
elif task_type in self._multi_project_types:
return self._gen_multiple_type_project_id(task_type, epoch)
else:
raise ValueError(f'Unknown project type {task_type}')

Major Components

Snapshotter Components

System Event Detector

The system event detector tracks events being triggered on the protocol state contract running on the anchor chain and forwards it to a callback queue with the appropriate routing key depending on the event signature and type among other information.

Related information and other services depending on these can be found in previous sections: State Transitions, Configuration.

Process Hub Core

The Process Hub Core, defined in process_hub_core.py, serves as the primary process manager in the snapshotter.

  • Operated by the CLI tool processhub_cmd.py, it is responsible for starting and managing the SystemEventDetector and ProcessorDistributor processes.
  • Additionally, it spawns the base snapshot and aggregator workers required for processing tasks from the powerloom-backend-callback queue. The number of workers and their configuration path can be adjusted in config/settings.json.

Processor Distributor

The Processor Distributor, defined in processor_distributor.py, is initiated using the processhub_cmd.py CLI.

  • It loads the preloader, base snapshotting, and aggregator config information from the settings file
  • It reads the events forwarded by the event detector to the f'powerloom-event-detector:{settings.namespace}:{settings.instance_id}' RabbitMQ queue bound to a topic exchange as configured in settings.rabbitmq.setup.event_detector.exchange(code-ref: RabbitMQ exchanges and queue setup in pooler)
  • It creates and distributes processing messages based on the preloader configuration present in config/preloader.json, the project configuration present in config/projects.json and config/aggregator.json, and the topic pattern used in the routing key received from the topic exchange
    • For EpochReleased events, it forwards such messages to base snapshot builders for data source contracts as configured in config/projects.json for the current epoch information contained in the event.
      async def _distribute_callbacks_snapshotting(self, project_type: str, epoch: EpochBase):
      """
      Distributes callbacks for snapshotting to the appropriate snapshotters based on the project type and epoch.
      Args:
      project_type (str): The type of project.
      epoch (EpochBase): The epoch to snapshot.
      Returns:
      None
      """
      # send to snapshotters to get the balances of the addresses
      queuing_tasks = []
      async with self._rmq_channel_pool.acquire() as ch:
      # Prepare a message to send
      exchange = await ch.get_exchange(
      name=self._callback_exchange_name,
      )
      project_config = self._project_type_config_mapping[project_type]
      # handling bulk mode projects
      if project_config.bulk_mode:
      process_unit = PowerloomSnapshotProcessMessage(
      begin=epoch.begin,
      end=epoch.end,
      epochId=epoch.epochId,
      bulk_mode=True,
      )
      msg_body = Message(process_unit.json().encode('utf-8'))
      await exchange.publish(
      routing_key=f'powerloom-backend-callback:{settings.namespace}'
      f':{settings.instance_id}:EpochReleased.{project_type}',
      message=msg_body,
      )
      self._logger.debug(
      'Sent out message to be processed by worker'
      f' {project_type} : {process_unit}',
      )
      return
      # handling projects with no data sources
      if project_config.projects is None:
      project_id = f'{project_type}:{settings.namespace}'
      if project_id.lower() in self._newly_added_projects:
      genesis = True
      self._newly_added_projects.remove(project_id.lower())
      else:
      genesis = False
      process_unit = PowerloomSnapshotProcessMessage(
      begin=epoch.begin,
      end=epoch.end,
      epochId=epoch.epochId,
      genesis=genesis,
      )
      msg_body = Message(process_unit.json().encode('utf-8'))
      await exchange.publish(
      routing_key=f'powerloom-backend-callback:{settings.namespace}'
      f':{settings.instance_id}:EpochReleased.{project_type}',
      message=msg_body,
      )
      self._logger.debug(
      'Sent out message to be processed by worker'
      f' {project_type} : {process_unit}',
      )
      return
      # handling projects with data sources
      for project in project_config.projects:
      project_id = f'{project_type}:{project}:{settings.namespace}'
      if project_id.lower() in self._newly_added_projects:
      genesis = True
      self._newly_added_projects.remove(project_id.lower())
      else:
      genesis = False
      data_sources = project.split('_')
      if len(data_sources) == 1:
      data_source = data_sources[0]
      primary_data_source = None
      else:
      primary_data_source, data_source = data_sources
      process_unit = PowerloomSnapshotProcessMessage(
      begin=epoch.begin,
      end=epoch.end,
      epochId=epoch.epochId,
      data_source=data_source,
      primary_data_source=primary_data_source,
      genesis=genesis,
      )
      msg_body = Message(process_unit.json().encode('utf-8'))
      queuing_tasks.append(
      exchange.publish(
      routing_key=f'powerloom-backend-callback:{settings.namespace}'
      f':{settings.instance_id}:EpochReleased.{project_type}',
      message=msg_body,
      ),
      )
      self._logger.debug(
      'Sent out message to be processed by worker'
      f' {project_type} : {process_unit}',
      )
      results = await asyncio.gather(*queuing_tasks, return_exceptions=True)
      for result in results:
      if isinstance(result, Exception):
      self._logger.error(
      'Error while sending message to queue. Error - {}',
      result,
      )
    • For SnapshotSubmitted events, it forwards such messages to single and multi-project aggregate topic routing keys.
      async def _distribute_callbacks_aggregate(self, message: IncomingMessage):
      """
      Distributes the callbacks for aggregation.
      :param message: IncomingMessage object containing the message to be processed.
      """
      event_type = message.routing_key.split('.')[-1]
      try:
      if event_type != 'SnapshotSubmitted':
      self._logger.error(f'Unknown event type {event_type}')
      return
      process_unit: PowerloomSnapshotSubmittedMessage = (
      PowerloomSnapshotSubmittedMessage.parse_raw(message.body)
      )
      except ValidationError:
      self._logger.opt(exception=True).error(
      'Bad message structure of event callback',
      )
      return
      except Exception:
      self._logger.opt(exception=True).error(
      'Unexpected message format of event callback',
      )
      return
      self._logger.trace(f'Aggregation Task Distribution time - {int(time.time())}')
      # go through aggregator config, if it matches then send appropriate message
      rabbitmq_publish_tasks = list()
      async with self._rmq_channel_pool.acquire() as channel:
      exchange = await channel.get_exchange(
      name=self._callback_exchange_name,
      )
      for config in aggregator_config:
      task_type = config.project_type
      if config.aggregate_on == AggregateOn.single_project:
      if config.filters.projectId not in process_unit.projectId:
      self._logger.trace(f'projectId mismatch {process_unit.projectId} {config.filters.projectId}')
      continue
      rabbitmq_publish_tasks.append(
      exchange.publish(
      routing_key=f'powerloom-backend-callback:{settings.namespace}:'
      f'{settings.instance_id}:CalculateAggregate.{task_type}',
      message=Message(process_unit.json().encode('utf-8')),
      ),
      )
      elif config.aggregate_on == AggregateOn.multi_project:
      if process_unit.projectId not in config.projects_to_wait_for:
      self._logger.trace(
      f'projectId not required for {config.project_type}: {process_unit.projectId}',
      )
      continue
      # cleanup redis for all previous epochs (5 buffer)
      await self._redis_conn.zremrangebyscore(
      f'powerloom:aggregator:{config.project_type}:events',
      0,
      process_unit.epochId - 5,
      )
      await self._redis_conn.zadd(
      f'powerloom:aggregator:{config.project_type}:events',
      {process_unit.json(): process_unit.epochId},
      )
      events = await self._redis_conn.zrangebyscore(
      f'powerloom:aggregator:{config.project_type}:events',
      process_unit.epochId,
      process_unit.epochId,
      )
      if not events:
      self._logger.info(f'No events found for {process_unit.epochId}')
      continue
      event_project_ids = set()
      finalized_messages = list()
      for event in events:
      event = PowerloomSnapshotSubmittedMessage.parse_raw(event)
      event_project_ids.add(event.projectId)
      finalized_messages.append(event)
      if event_project_ids == set(config.projects_to_wait_for):
      self._logger.info(f'All projects present for {process_unit.epochId}, aggregating')
      final_msg = PowerloomCalculateAggregateMessage(
      messages=finalized_messages,
      epochId=process_unit.epochId,
      timestamp=int(time.time()),
      )
      rabbitmq_publish_tasks.append(
      exchange.publish(
      routing_key=f'powerloom-backend-callback:{settings.namespace}'
      f':{settings.instance_id}:CalculateAggregate.{task_type}',
      message=Message(final_msg.json().encode('utf-8')),
      ),
      )
      # Cleanup redis for current epoch
      await self._redis_conn.zremrangebyscore(
      f'powerloom:aggregator:{config.project_type}:events',
      process_unit.epochId,
      process_unit.epochId,
      )
      else:
      self._logger.trace(
      f'Not all projects present for {process_unit.epochId},'
      f' {len(set(config.projects_to_wait_for)) - len(event_project_ids)} missing',
      )
      await asyncio.gather(*rabbitmq_publish_tasks, return_exceptions=True)

Delegation Workers for preloaders

The preloaders often fetch and cache large volumes of data, for eg, all the transaction receipts for a block on the data source blockchain. In such a case, a single worker will never be enough to feasibly fetch the data for a timely base snapshot generation and subsequent aggregate snapshot generations to finally reach a consensus.

Hence such workers are defined as delegate_tasks in config/preloader.json and the process hub core launches a certain number of workers as defined in the primary settings file, config/settings.json under the key callback_worker_config.num_delegate_workers.

"delegate_tasks": [
{
"task_type": "txreceipt",
"module": "snapshotter.utils.preloaders.tx_receipts.delegated_worker.tx_receipts",
"class_name": "TxReceiptProcessor"
}
],

"callback_worker_config": {
"num_delegate_workers": 8,
"num_snapshot_workers": 2,
"num_aggregation_workers": 4
},

Delegation workers operate over a simple request-response queue architecture over RabbitMQ.

def get_delegate_worker_request_queue_routing_key() -> tuple[str, str]:
"""
Returns the name and routing key for the request queue used by the delegated worker.
Returns:
A tuple containing the request queue name and routing key.
"""
request_queue_routing_key = f'powerloom-delegated-worker:{settings.namespace}:{settings.instance_id}:Request'
request_queue_name = f'powerloom-delegated-worker-request:{settings.namespace}:{settings.instance_id}'
return request_queue_name, request_queue_routing_key
def get_delegate_worker_response_queue_routing_key_pattern() -> tuple[str, str]:
"""
Returns a tuple containing the response queue name and routing key pattern for a delegated worker.
Returns:
tuple[str, str]: A tuple containing the response queue name and routing key pattern.
"""
response_queue_routing_key = f'powerloom-delegated-worker:{settings.namespace}:{settings.instance_id}:Response.*'
response_queue_name = f'powerloom-delegated-worker-response:{settings.namespace}:{settings.instance_id}'
return response_queue_name, response_queue_routing_key

One of the preloaders bundled with this snapshotter peer is tasked with fetching all the transaction receipts within a given epoch's block range and because of the volume of data to be fetched it delegates this work to a bunch of delegation worker

As a common functionality shared by all preloaders that utilize delegate workers, this logic is present in the generic class DelegatorPreloaderAsyncWorker that all such preloaders inherit. Here you can observe the workload is sent to the delegation workers

self._exchange = await self._channel.get_exchange(
name=self._filter_worker_request_exchange_name,
)
query_tasks = list()
for query_obj in self._request_id_query_obj_map.values():
query_obj.extra['unique_id'] = self._unique_id
message_data = query_obj.json().encode('utf-8')
# Prepare a message to send
queue_message = Message(message_data)
t = self._exchange.publish(
message=queue_message,
routing_key=self._filter_worker_request_routing_key,
)
query_tasks.append(t)
self._logger.trace(
'Queued {} requests by preloader delegator {} for publish to delegated worker over RabbitMQ',
self._task_type, len(query_tasks),
)
asyncio.ensure_future(asyncio.gather(*query_tasks, return_exceptions=True))
try:
await asyncio.wait_for(self._preload_finished_event.wait(), timeout=preloader_config.timeout)
await self.cleanup()
except asyncio.TimeoutError:
self._logger.warning(
'Preloading task {} for epoch {} timed out after {} seconds',
self._task_type, epoch.epochId, preloader_config.timeout,
)
await self.cleanup()
raise Exception(
f'Preloading task {self._task_type} for epoch {epoch.epochId} timed out after {preloader_config.timeout} seconds',
)
except Exception as e:
self._logger.warning('Exception while waiting for preloading to complete: {}', e)
await self.cleanup()
raise e

Upon sending out the workloads tagged by unique request IDs, the delegator sets up a temporary exclusive queue to which only the delegation workers meant for the task type push their responses.

self._unique_id = str(uuid.uuid4())
async with await get_rabbitmq_basic_connection_async() as rmq_conn:
self._channel = await rmq_conn.channel()
await self._channel.set_qos(self._qos)
self._q_obj = await self._channel.declare_queue(exclusive=True)
self._filter_worker_response_queue, \
_filter_worker_response_routing_key_pattern = get_delegate_worker_response_queue_routing_key_pattern()
self._filter_worker_response_routing_key = _filter_worker_response_routing_key_pattern.replace(
'*', self._unique_id,
)
self._logger.debug(
'Consuming {} fetch response queue {} '
'in preloader, with routing key {}',
self._task_type,
self._filter_worker_response_queue,
self._filter_worker_response_routing_key,
)
self._response_exchange = await self._channel.get_exchange(
name=self._filter_worker_response_exchange_name,
)
await self._q_obj.bind(self._response_exchange, routing_key=self._filter_worker_response_routing_key)
self._consumer_tag = await self._q_obj.consume(
callback=self._on_filter_worker_response_message,
)
asyncio.ensure_future(self._periodic_awaited_responses_checker())

The corresponding response being pushed by the delegation workers can be found here in the generic class DelegateAsyncWorker that all such workers should inherit from:

result = await task_processor.compute(
msg_obj=msg_obj,
redis_conn=self._redis_conn,
rpc_helper=self._rpc_helper,
)
self._logger.trace('got result from delegate worker compute {}', result)
await self._send_delegate_worker_response_queue(
request_msg=msg_obj,
response_msg=result,
)

Delegation worker dependent preloading architecture

Callback Workers

The callback workers are the ones that build the base snapshot and aggregation snapshots and as explained above, are launched by the process hub core according to the configurations in aggregator/projects.json and config/aggregator.json.

They listen to new messages on the RabbitMQ topic exchange as described in the following configuration, and the topic queue's initialization is as follows.

"callbacks": {
"exchange": "powerloom-backend-callbacks"
},

def init_callback_queue(
ch: pika.adapters.blocking_connection.BlockingChannel,
) -> None:
"""
Initializes the callback queue for snapshot and aggregate.
Args:
ch (pika.adapters.blocking_connection.BlockingChannel): The blocking channel object.
Returns:
None
"""
callback_exchange_name = (
f'{settings.rabbitmq.setup.callbacks.exchange}:{settings.namespace}'
)
# Snapshot queue
queue_name, routing_key_pattern = get_snapshot_queue_routing_key_pattern()
init_topic_exchange_and_queue(
ch,
exchange_name=callback_exchange_name,
queue_name=queue_name,
routing_key_pattern=routing_key_pattern,
)
# Aggregate queue
queue_name, routing_key_pattern = get_aggregate_queue_routing_key_pattern()
init_queue(
ch,
exchange_name=callback_exchange_name,
queue_name=queue_name,
routing_key=routing_key_pattern,
)

Upon receiving a message from the processor distributor after preloading is complete, the workers do most of the heavy lifting along with some sanity checks and then call the compute() callback function on the project's configured snapshot worker class to transform the dependent data points as cached by the preloaders to finally generate the base snapshots.

RPC Helper

Extracting data from the blockchain state and generating the snapshot can be a complex task. The RpcHelper, defined in utils/rpc.py, has a bunch of helper functions to make this process easier. It handles all the retry and caching logic so that developers can focus on efficiently building their use cases.

Core API

This component is one of the most important and allows you to access the finalized protocol state on the smart contract running on the anchor chain. Find it in core_api.py.

The pooler-frontend that serves the Uniswap v2 dashboards hosted by the PowerLoom foundation on locations like https://uniswapv2.powerloom.io/ is a great example of a frontend specific web application that makes use of this API service.

Among many things, the core API allows you to access the finalized CID as well as its contents at a given epoch ID for a project.

The main endpoint implementations can be found as follows:

async def get_project_last_finalized_epoch_info(
request: Request,
response: Response,
project_id: str,
rate_limit_auth_dep: RateLimitAuthCheck = Depends(
rate_limit_auth_check,
),
):
"""
Get the last finalized epoch information for a given project.
Args:
request (Request): The incoming request object.
response (Response): The outgoing response object.
project_id (str): The ID of the project to get the last finalized epoch information for.
rate_limit_auth_dep (RateLimitAuthCheck, optional): The rate limit authentication dependency. Defaults to rate_limit_auth_check.
Returns:
dict: A dictionary containing the last finalized epoch information for the given project.
"""
if not (
rate_limit_auth_dep.rate_limit_passed and
rate_limit_auth_dep.authorized and
rate_limit_auth_dep.owner.active == UserStatusEnum.active
):
return inject_rate_limit_fail_response(rate_limit_auth_dep)
try:
# get project last finalized epoch from redis
project_last_finalized_epoch = await request.app.state.redis_pool.get(
project_last_finalized_epoch_key(project_id),
)
if project_last_finalized_epoch is None:
# find from contract
epoch_finalized = False
[cur_epoch] = await request.app.state.anchor_rpc_helper.web3_call(
[request.app.state.protocol_state_contract.functions.currentEpoch()],
redis_conn=request.app.state.redis_pool,
)
epoch_id = int(cur_epoch[2])
while not epoch_finalized and epoch_id >= 0:
# get finalization status
[epoch_finalized_contract] = await request.app.state.anchor_rpc_helper.web3_call(
[request.app.state.protocol_state_contract.functions.snapshotStatus(project_id, epoch_id)],
redis_conn=request.app.state.redis_pool,
)
if epoch_finalized_contract[0]:
epoch_finalized = True
project_last_finalized_epoch = epoch_id
await request.app.state.redis_pool.set(
project_last_finalized_epoch_key(project_id),
project_last_finalized_epoch,
)
else:
epoch_id -= 1
if epoch_id < 0:
response.status_code = 404
return {
'status': 'error',
'message': f'Unable to find last finalized epoch for project {project_id}',
}
else:
project_last_finalized_epoch = int(project_last_finalized_epoch.decode('utf-8'))
[epoch_info_data] = await request.app.state.anchor_rpc_helper.web3_call(
[request.app.state.protocol_state_contract.functions.epochInfo(project_last_finalized_epoch)],
redis_conn=request.app.state.redis_pool,
)
epoch_info = {
'epochId': project_last_finalized_epoch,
'timestamp': epoch_info_data[0],
'blocknumber': epoch_info_data[1],
'epochEnd': epoch_info_data[2],
}
except Exception as e:
rest_logger.exception(
'Exception in get_project_last_finalized_epoch_info',
e=e,
)
response.status_code = 500
return {
'status': 'error',
'message': f'Unable to get last finalized epoch for project {project_id}, error: {e}',
}
auth_redis_conn: aioredis.Redis = request.app.state.auth_aioredis_pool
await incr_success_calls_count(auth_redis_conn, rate_limit_auth_dep)
return epoch_info

@app.get('/data/{epoch_id}/{project_id}/')
async def get_data_for_project_id_epoch_id(
request: Request,
response: Response,
project_id: str,
epoch_id: int,
rate_limit_auth_dep: RateLimitAuthCheck = Depends(
rate_limit_auth_check,
),
):
"""
Get data for a given project and epoch ID.
Args:
request (Request): The incoming request.
response (Response): The outgoing response.
project_id (str): The ID of the project.
epoch_id (int): The ID of the epoch.
rate_limit_auth_dep (RateLimitAuthCheck, optional): The rate limit authentication check. Defaults to Depends(rate_limit_auth_check).
Returns:
dict: The data for the given project and epoch ID.
"""
if not (
rate_limit_auth_dep.rate_limit_passed and
rate_limit_auth_dep.authorized and
rate_limit_auth_dep.owner.active == UserStatusEnum.active
):
return inject_rate_limit_fail_response(rate_limit_auth_dep)
try:
data = await get_project_epoch_snapshot(
request.app.state.redis_pool,
request.app.state.protocol_state_contract,
request.app.state.anchor_rpc_helper,
request.app.state.ipfs_reader_client,
epoch_id,
project_id,
)
except Exception as e:
rest_logger.exception(
'Exception in get_data_for_project_id_epoch_id',
e=e,
)
response.status_code = 500
return {
'status': 'error',
'message': f'Unable to get data for project_id: {project_id},'
f' epoch_id: {epoch_id}, error: {e}',
}
if not data:
response.status_code = 404
return {
'status': 'error',
'message': f'No data found for project_id: {project_id},'
f' epoch_id: {epoch_id}',
}
auth_redis_conn: aioredis.Redis = request.app.state.auth_aioredis_pool
await incr_success_calls_count(auth_redis_conn, rate_limit_auth_dep)
return data

The first endpoint in GET /last_finalized_epoch/{project_id} returns the last finalized EpochId for a given project ID and the second one is GET /data/{epoch_id}/{project_id}/ which can be used to return the actual snapshot data for a given EpochId and ProjectId.

These endpoints along with the combination of a bunch of other helper endpoints present in Core API can be used to build powerful Dapps and dashboards.

You can observe the way it is used in pooler-frontend repo to fetch the dataset for the aggregate projects of top pairs trade volume and token reserves summary:

try {
      response = await axios.get(API_PREFIX+`/data/${epochInfo.epochId}/${top_pairs_7d_project_id}/`);
      console.log('got 7d top pairs', response.data);
      if (response.data) {
        for (let pair of response.data.pairs) {
          pairsData7d[pair.name] = pair;
        }
      } else {
        throw new Error(JSON.stringify(response.data));
      }
    }
    catch (e){
      console.error('7d top pairs', e);
    }

Development setup and instructions

These instructions are needed to run the system using build-docker.sh.

Configuration

Pooler needs the following config files to be present

  • settings.json in pooler/auth/settings: Changes are trivial. Copy config/auth_settings.example.json to config/auth_settings.json. This enables an authentication layer over the core API exposed by the pooler snapshotter.
  • settings files in config/
    • config/projects.json: Each entry in this configuration file defines the most fundamental unit of data representation in Powerloom Protocol, that is, a project. It is of the following schema

      {
          "project_type": "snapshot_project_name_prefix_",
          "projects": ["array of smart contract addresses"], // Uniswap v2 pair contract addresses in this implementation
          "preload_tasks":[
            "eth_price",
            "block_details"
          ],
          "processor":{
              "module": "snapshotter.modules.uniswapv2.pair_total_reserves",
              "class_name": "PairTotalReservesProcessor" // class to be found in module snapshotter/modules/pooler/uniswapv2/pair_total_reserves.py
          }
      }

      Copy over config/projects.example.json to config/projects.json. For more details, read on in the use case study for this current implementation.

    • config/aggregator.json : This lists out different type of aggregation work to be performed over a span of snapshots. Copy over config/aggregator.example.json to config/aggregator.json. The span is usually calculated as a function of the epoch size and average block time on the data source network. For eg, * the following configuration calculates a snapshot of total trade volume over a 24 hour time period, based on the snapshot finalization of a project ID corresponding to a pair contract. This can be seen by the aggregate_on key being set to SingleProject. * This is specified by the filters key below. When a snapshot build is achieved for an epoch over a project ID (ref:generation of project ID for snapshot building workers). For eg, a snapshot build on pairContract_trade_volume:0xb4e16d0168e52d35cacd2c6185b44281ec28c9dc:UNISWAPV2 triggers the worker AggregateTradeVolumeProcessor as defined in the processor section of the config against the pair contract 0xb4e16d0168e52d35cacd2c6185b44281ec28c9dc.

          {
              "config": [
                  {
                      "project_type": "aggregate_pairContract_24h_trade_volume",
                      "aggregate_on": "SingleProject",
                      "filters": {
                          // this triggers the compute() contained in the processor class at the module location
                          // every time a `SnapshotFinalized` event is received for project IDs containing the prefix `pairContract_trade_volume`
                          // at each epoch ID
                          "projectId": "pairContract_trade_volume"
                      },
                      "processor": {
                          "module": "snapshotter.modules.uniswapv2.aggregate.single_uniswap_trade_volume_24h",
                          "class_name": "AggregateTradeVolumeProcessor"
                      }
                  }
              ]
          }
      • The following configuration generates a collection of data sets of 24 hour trade volume as calculated by the worker above across multiple pair contracts. This can be seen by the aggregate_on key being set to MultiProject. * projects_to_wait_for specifies the exact project IDs on which this collection will be generated once a snapshot build has been achieved for an epochId.

        {
            "config": [
                "project_type": "aggregate_24h_top_pairs_lite",
                "aggregate_on": "MultiProject",
                "projects_to_wait_for": [
                    "aggregate_pairContract_24h_trade_volume:0xb4e16d0168e52d35cacd2c6185b44281ec28c9dc:UNISWAPV2",
                    "pairContract_pair_total_reserves:0xb4e16d0168e52d35cacd2c6185b44281ec28c9dc:UNISWAPV2",
                    "aggregate_pairContract_24h_trade_volume:0xae461ca67b15dc8dc81ce7615e0320da1a9ab8d5:UNISWAPV2",
                    "pairContract_pair_total_reserves:0xae461ca67b15dc8dc81ce7615e0320da1a9ab8d5:UNISWAPV2",
                    "aggregate_pairContract_24h_trade_volume:0x0d4a11d5eeaac28ec3f61d100daf4d40471f1852:UNISWAPV2",
                    "pairContract_pair_total_reserves:0x0d4a11d5eeaac28ec3f61d100daf4d40471f1852:UNISWAPV2",
                    "aggregate_pairContract_24h_trade_volume:0x3041cbd36888becc7bbcbc0045e3b1f144466f5f:UNISWAPV2",
                    "pairContract_pair_total_reserves:0x3041cbd36888becc7bbcbc0045e3b1f144466f5f:UNISWAPV2",
                    "aggregate_pairContract_24h_trade_volume:0xd3d2e2692501a5c9ca623199d38826e513033a17:UNISWAPV2",
                    "pairContract_pair_total_reserves:0xd3d2e2692501a5c9ca623199d38826e513033a17:UNISWAPV2",
                    "aggregate_pairContract_24h_trade_volume:0xbb2b8038a1640196fbe3e38816f3e67cba72d940:UNISWAPV2",
                    "pairContract_pair_total_reserves:0xbb2b8038a1640196fbe3e38816f3e67cba72d940:UNISWAPV2",
                    "aggregate_pairContract_24h_trade_volume:0xa478c2975ab1ea89e8196811f51a7b7ade33eb11:UNISWAPV2",
                    "pairContract_pair_total_reserves:0xa478c2975ab1ea89e8196811f51a7b7ade33eb11:UNISWAPV2"
                ],
                "processor": {
                    "module": "snapshotter.modules.uniswapv2.aggregate.multi_uniswap_top_pairs_24h",
                    "class_name": "AggreagateTopPairsProcessor"
                }
            ]
        }
      • To begin with, you can keep the workers and contracts as specified in the example files.

      • config/settings.json: This is the primary configuration. We've provided a settings template in config/settings.example.json to help you get started. Copy over config/settings.example.json to config/settings.json. There can be a lot to fine tune but the following are essential.

        • instance_id: This is the unique public key for your node to participate in consensus. It is currently registered on approval of an application (refer deploy repo for more details on applying).
        • namespace, is the unique key used to identify your project namespace around which all consensus activity takes place.
        • RPC service URL(s) and rate limit configurations. Rate limits are service provider specific, different RPC providers have different rate limits. Example rate limit config for a node looks something like this "100000000/day;20000/minute;2500/second"
          • rpc.full_nodes: This will correspond to RPC nodes for the chain on which the data source smart contracts live (for eg. Ethereum Mainnet, Polygon Mainnet, etc).
          • anchor_chain_rpc.full_nodes: This will correspond to RPC nodes for the anchor chain on which the protocol state smart contract lives (Prost Chain).
          • protocol_state.address : This will correspond to the address at which the protocol state smart contract is deployed on the anchor chain. protocol_state.abi is already filled in the example and already available at the static path specified pooler/static/abis/ProtocolContract.json

Monitoring and Debugging

Login to the pooler docker container using docker exec -it deploy-boost-1 bash (use docker ps to verify its presence in the list of running containers) and use the following commands for monitoring and debugging

  • To monitor the status of running processes, you simply need to run pm2 status.
  • To see all logs you can run pm2 logs
  • To see logs for a specific process you can run pm2 logs <Process Identifier>
  • To see only error logs you can run pm2 logs --err

Internal Snapshotter APIs

All implementations of a snapshotter come equipped with a barebones API service that return detailed insights into its state. You can tunnel into port 8002 of an instance running the snapshotter and right away try out the internal APIs among others by visting the FastAPI generated SwaggerUI.

http://localhost:8002/docs

Snapshotter API SwaggerUI

GET /internal/snapshotter/epochProcessingStatus

As detailed out in the section on epoch processing state transitions, this internal API endpoint offers the most detailed insight into each epoch's processing status as it passes through the snapshot builders and is sent out for consensus.

NOTE: The endpoint, though paginated and cached, serves a raw dump of insights into an epoch's state transitions and the payloads are significantly large enough for requests to timeout or to clog the internal API's limited resource. Hence it is advisable to query somewhere between 1 to 5 epochs. The same can be specified as the size query parameter.

Sample Request:

curl -X 'GET' \
  'http://localhost:8002/internal/snapshotter/epochProcessingStatus?page=1&size=3' \
  -H 'accept: application/json'

Sample Response:

{
    "items": [
      {
        "epochId": 43523,
        "transitionStatus": {
          "EPOCH_RELEASED": {
            "status": "success",
            "error": null,
            "extra": null,
            "timestamp": 1692530595
          },
          "PRELOAD": {
            "pairContract_pair_total_reserves": {
              "status": "success",
              "error": null,
              "extra": null,
              "timestamp": 1692530595
            },
          },
          "SNAPSHOT_BUILD": {
            "aggregate_24h_stats_lite:35ee1886fa4665255a0d0486c6079c4719c82f0f62ef9e96a98f26fde2e8a106:UNISWAPV2": {
              "status": "success",
              "error": null,
              "extra": null,
              "timestamp": 1692530596
            },
          },
          "SNAPSHOT_SUBMIT_PAYLOAD_COMMIT": {

          },
         "RELAYER_SEND": {

         },
        "SNAPSHOT_FINALIZE": {

        },
      },
    }
   ],
   "total": 3,
   "page": 1,
   "size": 3,
   "pages": 1
}

/status Returns the overall status of all the projects

Response

{
  "totalSuccessfulSubmissions": 10,
  "totalMissedSubmissions": 5,
  "totalIncorrectSubmissions": 1,
  "projects":[
    {
      "projectId": "projectid"
      "successfulSubmissions": 3,
      "missedSubmissions": 2,
      "incorrectSubmissions": 1
    },
  ]
}

GET /internal/snapshotter/status

Returns the overall status of all the projects

Response

{
  "totalSuccessfulSubmissions": 10,
  "totalMissedSubmissions": 5,
  "totalIncorrectSubmissions": 1,
  "projects":[
    {
      "projectId": "projectid"
      "successfulSubmissions": 3,
      "missedSubmissions": 2,
      "incorrectSubmissions": 1
    },
  ]
}

GET /internal/snapshotter/status/{project_id}

Returns project specific detailed status report

Response

{
  "missedSubmissions": [
    {
      "epochId": 10,
      "finalizedSnapshotCid": "cid",
      "reason": "error/exception/trace"
    }
  ],
  "incorrectSubmissions": [
    {
      "epochId": 12,
      "submittedSnapshotCid": "snapshotcid",
      "finalizedSnapshotCid": "finalizedsnapshotcid",
      "reason": "reason for incorrect submission"
    }
  ]
}

GET /internal/snapshotter/status/{project_id}?data=true

Returns project specific detailed status report with snapshot data

Response

{
  "missedSubmissions": [
    {
      "epochId": 10,
      "finalizedSnapshotCid": "cid",
      "reason": "error/exception/trace"
    }
  ],
  "incorrectSubmissions": [
    {
      "epochId": 12,
      "submittedSnapshotCid": "snapshotcid",
      "submittedSnapshot": {}
      "finalizedSnapshotCid": "finalizedsnapshotcid",
      "finalizedSnapshot": {},
      "reason": "reason for incorrect submission"
    }
  ]
}

For Contributors

We use pre-commit hooks to ensure our code quality is maintained over time. For this contributors need to do a one-time setup by running the following commands.

  • Install the required dependencies using pip install -r dev-requirements.txt, this will set up everything needed for pre-commit checks.
  • Run pre-commit install

Now, whenever you commit anything, it'll automatically check the files you've changed/edited for code quality issues and suggest improvements.

Case Studies

1. Pooler: Case study and extending this implementation

Pooler is a Uniswap specific implementation of what is known as a 'snapshotter' in the PowerLoom Protocol ecosystem. It synchronizes with other snapshotter peers over a smart contract running on the present version of the PowerLoom Protocol testnet. It follows an architecture that is driven by state transitions which makes it easy to understand and modify. This present release ultimately provide access to rich aggregates that can power a Uniswap v2 dashboard with the following data points:

  • Total Value Locked (TVL)
  • Trade Volume, Liquidity reserves, Fees earned
    • grouped by
      • Pair contracts
      • Individual tokens participating in pair contract
    • aggregated over time periods
      • 24 hours
      • 7 days
  • Transactions containing Swap, Mint, and Burn events

Extending pooler with a Uniswap v2 data point

In this section, let us take a look at the data composition abilities of Pooler to build on the base snapshot being built that captures information on Uniswap trades.

Step 1. Review: Base snapshot extraction logic for trade information

Required reading:

As you can notice in config/projects.example.json, each project config needs to have the following components

  • project_type (unique identifier prefix for the usecase, used to generate project ID)
  • projects (smart contracts to extract data from, pooler can generate different snapshots from multiple sources as long as the Contract ABI is same)
  • processor (the actual compuation logic reference, while you can write the logic anywhere, it is recommended to write your implementation in pooler/modules folder)

There's currently no limitation on the number or type of usecases you can build using snapshotter. Just write the Processor class and pooler libraries will take care of the rest.

{
"config": [{
"project_type": "uniswap_pairContract_pair_total_reserves",
"projects":[
"0xb4e16d0168e52d35cacd2c6185b44281ec28c9dc",
"0xae461ca67b15dc8dc81ce7615e0320da1a9ab8d5",
"0x0d4a11d5eeaac28ec3f61d100daf4d40471f1852",
"0x3041cbd36888becc7bbcbc0045e3b1f144466f5f",
"0xd3d2e2692501a5c9ca623199d38826e513033a17",
"0xbb2b8038a1640196fbe3e38816f3e67cba72d940",
"0xa478c2975ab1ea89e8196811f51a7b7ade33eb11"
],
"processor":{
"module": "pooler.modules.uniswapv2.pair_total_reserves",
"class_name": "PairTotalReservesProcessor"
}
},
{
"project_type": "uniswap_pairContract_trade_volume",
"projects":[
"0xb4e16d0168e52d35cacd2c6185b44281ec28c9dc",
"0xae461ca67b15dc8dc81ce7615e0320da1a9ab8d5",
"0x0d4a11d5eeaac28ec3f61d100daf4d40471f1852",
"0x3041cbd36888becc7bbcbc0045e3b1f144466f5f",
"0xd3d2e2692501a5c9ca623199d38826e513033a17",
"0xbb2b8038a1640196fbe3e38816f3e67cba72d940",
"0xa478c2975ab1ea89e8196811f51a7b7ade33eb11"
],
"processor":{
"module": "pooler.modules.uniswapv2.trade_volume",
"class_name": "TradeVolumeProcessor"
}
}
]
}

If we take a look at the TradeVolumeProcessor class present at snapshotter/modules/computes/trade_volume.py it implements the interface of GenericProcessorSnapshot defined in pooler/utils/callback_helpers.py.

There are a couple of important concepts here necessary to write your extraction logic:

  • compute is the main function where most of the snapshot extraction and generation logic needs to be written. It receives the following inputs:
  • epoch (current epoch details)
  • redis (async redis connection)
  • rpc_helper (RpcHelper instance to help with any calls to the data source contract's chain)
  • transformation_lambdas provide an additional layer for computation on top of the generated snapshot (if needed). If compute function handles everything you can just set transformation_lambdas to [] otherwise pass the list of transformation function sequence. Each function referenced in transformation_lambdas must have same input interface. It should receive the following inputs -
  • snapshot (the generated snapshot to apply transformation on)
  • address (contract address to extract data from)
  • epoch_begin (epoch begin block)
  • epoch_end (epoch end block)

Output format can be anything depending on the usecase requirements. Although it is recommended to use proper pydantic models to define the snapshot interface.

The resultant output model in this specific example is UniswapTradesSnapshot as defined in the Uniswap v2 specific modules directory: utils/models/message_models.py. This encapsulates state information captured by TradeVolumeProcessor between the block heights of the epoch: min_chain_height and max_chain_height.

Step 2. Review: 24 hour aggregate of trade volume snapshots over a single pair contract
  • As demonstrated in the previous section, the TradeVolumeProcessor logic takes care of capturing a snapshot of information regarding Uniswap v2 trades between the block heights of min_chain_height and max_chain_height.

  • The epoch size as described in the prior section on epoch generation can be considered to be constant for this specific implementation of the Uniswap v2 use case on PowerLoom Protocol, and by extension, the time duration captured within the epoch.

  • As shown in the section on dependency graph of data composition, every aggregate is calculated relative to the epochId at which the dependee SnapshotFinalized event is receieved.

  • The finalized state and data CID corresponding to each epoch can be accessed on the smart contract on the anchor chain that holds the protocol state. The corresponding helpers for that can be found in get_project_epoch_snapshot() in pooler/utils/data_utils

async def get_project_epoch_snapshot(
redis_conn: aioredis.Redis, state_contract_obj, rpc_helper, ipfs_reader, epoch_id, project_id,
) -> dict:
"""
Retrieves the epoch snapshot for a given project.
Args:
redis_conn (aioredis.Redis): Redis connection object.
state_contract_obj: State contract object.
rpc_helper: RPC helper object.
ipfs_reader: IPFS reader object.
epoch_id (int): Epoch ID.
project_id (str): Project ID.
Returns:
dict: The epoch snapshot data.
"""
cid = await get_project_finalized_cid(redis_conn, state_contract_obj, rpc_helper, epoch_id, project_id)
if cid:
data = await get_submission_data(redis_conn, cid, ipfs_reader, project_id)
return data
else:
return dict()

  • Considering the incoming epochId to be the head of the span, the quickest formula to arrive at the tail of the span of 24 hours worth of snapshots and trade information becomes,
time_in_seconds = 86400
tail_epoch_id = current_epoch_id - int(time_in_seconds / (source_chain_epoch_size * source_chain_block_time))

async def get_tail_epoch_id(
redis_conn: aioredis.Redis,
state_contract_obj,
rpc_helper,
current_epoch_id,
time_in_seconds,
project_id,
):
"""
Returns the tail epoch_id and a boolean indicating if tail contains the full time window.
Args:
redis_conn (aioredis.Redis): Redis connection object.
state_contract_obj: State contract object.
rpc_helper: RPC helper object.
current_epoch_id (int): Current epoch ID.
time_in_seconds (int): Time in seconds.
project_id (str): Project ID.
Returns:
Tuple[int, bool]: Tail epoch ID and a boolean indicating if tail contains the full time window.
"""
source_chain_epoch_size = await get_source_chain_epoch_size(redis_conn, state_contract_obj, rpc_helper)
source_chain_block_time = await get_source_chain_block_time(redis_conn, state_contract_obj, rpc_helper)
# calculate tail epoch_id
tail_epoch_id = current_epoch_id - int(time_in_seconds / (source_chain_epoch_size * source_chain_block_time))
project_first_epoch = await(
get_project_first_epoch(redis_conn, state_contract_obj, rpc_helper, project_id)
)
if tail_epoch_id < project_first_epoch:
tail_epoch_id = project_first_epoch
return tail_epoch_id, True
logger.trace(
'project ID {} tail epoch_id: {} against head epoch ID {} ',
project_id, tail_epoch_id, current_epoch_id,
)
return tail_epoch_id, False

  • The worker class for such aggregation is defined in config/aggregator.json in the following manner
    {
      "project_type": "aggregate_pairContract_24h_trade_volume",
      "aggregate_on": "SingleProject",
      "filters": {
        "projectId": "pairContract_trade_volume"
      },
      "processor": {
        "module": "snapshotter.modules.computes.aggregate.single_uniswap_trade_volume_24h",
        "class_name": "AggregateTradeVolumeProcessor"
      }
    }
  • Each finalized epochId is registered with a snapshot commit against the aggregated data set generated by running summations on trade volumes on all the base snapshots contained within the span calculated above.
Step 3. New Datapoint: 2 hours aggregate of only swap events

From the information provided above, the following is left as an exercise for the reader to generate aggregate datasets at every epochId finalization for a pair contract, spanning 2 hours worth of snapshots and containing only Swap event logs and the trade volume generated from them as a result.

Feel free to fork this repo and commit these on your implementation branch. By following the steps recommended for developers for the overall setup on deploy, you can begin capturing aggregates for this datapoint.

  • Add a new configuration entry in config/aggregator.json for this new aggregation worker class

  • Define a new data model in utils/message_models.py referring to

    • UniswapTradesAggregateSnapshot as used in above example
    • UniswapTradesSnapshot used to capture each epoch's trade snapshots which includes the raw event logs as well
  • Follow the example of the aggregator worker as implemented for 24 hours aggregation calculation , and work on calculating an epochId span of 2 hours and filtering out only the Swap events and the trade volume contained within.

2. Zkevm Quests: A Case Study of Implementation

Phase 2 quests form a crucial part of the Powerloom testnet program, where we leverage Snapshotter Peers to monitor on-chain activities of testnet participants across various chains and protocols. These quests predominantly operate in Bulk Mode due to their one-time nature and the highly dynamic set of participants involved.

In this particular implementation of the peer, known as 'Snapshotter' in the Powerloom Protocol, we have successfully harnessed its capabilities to provide accurate metrics, verified through consensus, pertaining to fundamental data points. These metrics allow us to determine if and when a quest is completed by a testnet participant.

This case study serves as a testament to the effectiveness and versatility of the Snapshotter Peer in real-world scenarios, highlighting its ability to support complex use cases with precision and reliability.

Review: Base snapshots

The snapshot builders can be found under the snapshotter-specific implementation directory: snapshotter/modules/computes. Every snapshot builder must implement the interface of GenericProcessorSnapshot

class GenericProcessorSnapshot(ABC):
__metaclass__ = ABCMeta
def __init__(self):
pass
@abstractproperty
def transformation_lambdas(self):
pass
@abstractmethod
async def compute(
self,
epoch: PowerloomSnapshotProcessMessage,
redis: aioredis.Redis,
rpc_helper: RpcHelper,
):
pass

  • compute() is the callback where the snapshot extraction and generation logic needs to be written. It receives the following inputs:

    • epoch (current epoch details)
    • redis (async redis connection)
    • rpc_helper (RpcHelper instance to help with any calls to the data source contract's chain)
  • transformation_lambdas provide an additional layer for computation on top of the generated snapshot (if needed). If the compute() callback handles everything you can just set transformation_lambdas to [] otherwise pass the list of transformation function sequences. Each function referenced in transformation_lambdas must have the same input interface. It should receive the following inputs -

    • snapshot (the generated snapshot to apply the transformation on)
    • address (contract address to extract data from)
    • epoch_begin (epoch begin block)
    • epoch_end (epoch end block)

compute() should return an instance of a Pydantic model which is in turn uploaded to IPFS by the payload commit service helper method.

async def _upload_to_ipfs(self, snapshot: bytes, _ipfs_writer_client: AsyncIPFSClient):
"""
Uploads a snapshot to IPFS using the provided AsyncIPFSClient.
Args:
snapshot (bytes): The snapshot to upload.
_ipfs_writer_client (AsyncIPFSClient): The IPFS client to use for uploading.
Returns:
str: The CID of the uploaded snapshot.
"""
snapshot_cid = await _ipfs_writer_client.add_bytes(snapshot)
return snapshot_cid

Looking at the pre-supplied example configuration of config/projects.json, we can find the following snapshots being generated

zkevm:bungee_bridge

Snapshot builder: snapshotter/modules/computes/bungee_bridge.py

    {
      "project_type": "zkevm:bungee_bridge",
      "projects":[
        ],
      "preload_tasks":[
        "block_transactions"
      ],
      "processor":{
        "module": "snapshotter.modules.boost.bungee_bridge",
        "class_name": "BungeeBridgeProcessor"
      }
    },

Its preloader dependency is block_transactions as seen in the preloader configuration.

The snapshot builder then goes through all preloaded block transactions, filters out, and then generates relevant snapshots for wallet address that received funds from the Bungee Bridge refuel contract during that epoch.

https://github.com/PowerLoom/snapshotter-computes/blob/29199feab449ad0361b5867efcaae9854992966f/bungee_bridge.py#L40-L92

Find us

About

Pooler is the first adapter that'll make way for future curation on PowerLoom. We are snapshotting data from Uniswap like AMMs on EVM chains.

Topics

Resources

License

Code of conduct

Stars

Watchers

Forks

Packages

 
 
 

Languages