Skip to content

Commit

Permalink
Update address field logic.
Browse files Browse the repository at this point in the history
Add env MOONSTREAM_DB_V3_CONTROLLER_API.
  • Loading branch information
Andrey committed Dec 17, 2024
1 parent a230798 commit 28e6163
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 10 deletions.
28 changes: 20 additions & 8 deletions crawlers/mooncrawl/mooncrawl/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,14 @@
from bugout.app import Bugout
from moonstreamtypes.blockchain import AvailableBlockchainType

# Bugout

# APIs
## Bugout
BUGOUT_BROOD_URL = os.environ.get("BUGOUT_BROOD_URL", "https://auth.bugout.dev")
BUGOUT_SPIRE_URL = os.environ.get("BUGOUT_SPIRE_URL", "https://spire.bugout.dev")

bugout_client = Bugout(brood_api_url=BUGOUT_BROOD_URL, spire_api_url=BUGOUT_SPIRE_URL)


MOONSTREAM_API_URL = os.environ.get("MOONSTREAM_API_URL", "https://api.moonstream.to")
MOONSTREAM_ENGINE_URL = os.environ.get(
"MOONSTREAM_ENGINE_URL", "https://engineapi.moonstream.to"
)


BUGOUT_REQUEST_TIMEOUT_SECONDS_RAW = os.environ.get(
"MOONSTREAM_BUGOUT_TIMEOUT_SECONDS", 30
)
Expand All @@ -31,6 +26,22 @@

HUMBUG_REPORTER_CRAWLERS_TOKEN = os.environ.get("HUMBUG_REPORTER_CRAWLERS_TOKEN")


## Moonstream
MOONSTREAM_API_URL = os.environ.get("MOONSTREAM_API_URL", "https://api.moonstream.to")

## Moonstream Engine
MOONSTREAM_ENGINE_URL = os.environ.get(
"MOONSTREAM_ENGINE_URL", "https://engineapi.moonstream.to"
)

## Moonstream DB
MOONSTREAM_DB_V3_CONTROLLER_API = os.environ.get(
"MOONSTREAM_DB_V3_CONTROLLER_API", "https://mdb-v3-api.moonstream.to"
)



# Origin
RAW_ORIGINS = os.environ.get("MOONSTREAM_CORS_ALLOWED_ORIGINS")
if RAW_ORIGINS is None:
Expand Down Expand Up @@ -490,3 +501,4 @@
MOONSTREAM_DB_V3_SCHEMA_NAME = os.environ.get(
"MOONSTREAM_DB_V3_SCHEMA_NAME", "blockchain"
)

45 changes: 43 additions & 2 deletions crawlers/mooncrawl/mooncrawl/state_crawler/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
multicall_contracts,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
MOONSTREAM_STATE_CRAWLER_JOURNAL_ID,
MOONSTREAM_DB_V3_CONTROLLER_API
)
from .db import clean_labels, commit_session, view_call_to_label
from .Multicall2_interface import Contract as Multicall2
Expand All @@ -44,7 +45,7 @@ def request_connection_string(customer_id: str, instance_id: int, token: str) ->
Request connection string from the Moonstream API.
"""
response = requests.get(
f"https://mdb-v3-api.moonstream.to/customers/{customer_id}/instances/{instance_id}/creds/seer/url",
f"{MOONSTREAM_DB_V3_CONTROLLER_API}/customers/{customer_id}/instances/{instance_id}/creds/seer/url",
headers={"Authorization": f"Bearer {token}"},
)

Expand Down Expand Up @@ -510,6 +511,29 @@ def build_interfaces(
return interfaces


def process_address_field(job: Dict[str, Any], moonstream_token: str) -> List[str]:
"""Processes the address field of a job and returns a list of addresses."""
if isinstance(job["address"], str):
return [Web3.toChecksumAddress(job["address"])]
elif isinstance(job["address"], list):
return [Web3.toChecksumAddress(address) for address in job["address"]] # manual job multiplication
elif isinstance(job["address"], dict):
if job["address"].get("type") == "queryAPI":
# QueryAPI job multiplication
addresses = execute_query(job["address"], token=moonstream_token)
for address in addresses:
try:
Web3.toChecksumAddress(address)
except Exception as e:
logger.error(f"Invalid address: {address}")
continue
return addresses
else:
raise ValueError(f"Invalid address type: {type(job['address'])}")
else:
raise ValueError(f"Invalid address type: {type(job['address'])}")


def parse_jobs(
jobs: List[Any],
blockchain_type: Any,
Expand Down Expand Up @@ -542,11 +566,27 @@ def parse_jobs(
# All sessions are stored in the dictionary db_sessions
# Under one try block
try:
# Process jobs and create sessions
# Process jobs and create session

for job in jobs:

### process address field
### Handle case when 1 job represents multiple contracts
addresses = process_address_field(job, moonstream_token)

for address in addresses[1:]:
new_job = job.copy()
new_job["address"] = address
jobs.append(new_job)

job["address"] = addresses[0]


v3 = job.get("v3", False)
customer_id = job.get("customer_id")
instance_id = job.get("instance_id")

### DB sessions
if customer_db_uri is not None:
if v3 and (customer_id, instance_id) not in db_sessions:
# Create session
Expand Down Expand Up @@ -581,6 +621,7 @@ def parse_jobs(
if "v2" not in db_sessions:
db_sessions["v2"] = PrePing_SessionLocal()


if job["address"] not in contracts_ABIs:
contracts_ABIs[job["address"]] = {}

Expand Down

0 comments on commit 28e6163

Please sign in to comment.