Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate moonworm tasks #919

Merged
merged 10 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 22 additions & 24 deletions moonstreamapi/moonstreamapi/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ def upload_abi_to_s3(


def get_all_entries_from_search(
journal_id: str, search_query: str, limit: int, token: str
journal_id: str, search_query: str, limit: int, token: str, content: bool = False
) -> List[BugoutSearchResult]:
"""
Get all required entries from journal using search interface
Expand All @@ -483,7 +483,7 @@ def get_all_entries_from_search(
token=token,
journal_id=journal_id,
query=search_query,
content=False,
content=content,
timeout=10.0,
limit=limit,
offset=offset,
Expand All @@ -496,7 +496,7 @@ def get_all_entries_from_search(
token=token,
journal_id=journal_id,
query=search_query,
content=False,
content=content,
timeout=10.0,
limit=limit,
offset=offset,
Expand Down Expand Up @@ -526,47 +526,45 @@ def apply_moonworm_tasks(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
)

# create historical crawl task in journal

# will use create_entries_pack for creating entries in journal

existing_tags = [entry.tags for entry in entries]

existing_hashes = [
tag.split(":")[-1]
for tag in chain(*existing_tags)
if "abi_method_hash" in tag
existing_selectors = [
tag.split(":")[-1] for tag in chain(*existing_tags) if "abi_selector" in tag
]

abi_hashes_dict = {
hashlib.md5(json.dumps(method).encode("utf-8")).hexdigest(): method
abi_selectors_dict = {
Web3.keccak(
text=method["name"]
+ "("
+ ",".join(map(lambda x: x["type"], method["inputs"]))
+ ")"
)[:4].hex(): method
for method in abi
if (method["type"] in ("event", "function"))
and (method.get("stateMutability", "") != "view")
}

for hash in abi_hashes_dict:
if hash not in existing_hashes:
abi_selector = Web3.keccak(
text=abi_hashes_dict[hash]["name"]
+ "("
+ ",".join(
map(lambda x: x["type"], abi_hashes_dict[hash]["inputs"])
)
+ ")"
)[:4].hex()
for abi_selector in abi_selectors_dict:
if abi_selector not in existing_selectors:
hash = hashlib.md5(
json.dumps(abi_selectors_dict[abi_selector]).encode("utf-8")
).hexdigest()

moonworm_abi_tasks_entries_pack.append(
{
"title": address,
"content": json.dumps(abi_hashes_dict[hash], indent=4),
"content": json.dumps(
abi_selectors_dict[abi_selector], indent=4
),
"tags": [
f"address:{address}",
f"type:{abi_hashes_dict[hash]['type']}",
f"type:{abi_selectors_dict[abi_selector]['type']}",
f"abi_method_hash:{hash}",
f"abi_selector:{abi_selector}",
f"subscription_type:{subscription_type}",
f"abi_name:{abi_hashes_dict[hash]['name']}",
f"abi_name:{abi_selectors_dict[abi_selector]['name']}",
f"status:active",
f"task_type:moonworm",
f"moonworm_task_pickedup:False", # True if task picked up by moonworm-crawler(default each 120 sec)
Expand Down
35 changes: 34 additions & 1 deletion moonstreamapi/moonstreamapi/admin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,20 @@

from moonstreamdb.db import SessionLocal

from ..settings import BUGOUT_BROOD_URL, BUGOUT_SPIRE_URL, MOONSTREAM_APPLICATION_ID
from ..settings import (
BUGOUT_BROOD_URL,
BUGOUT_SPIRE_URL,
MOONSTREAM_APPLICATION_ID,
MOONSTREAM_MOONWORM_TASKS_JOURNAL,
)
from ..web3_provider import yield_web3_provider

from . import subscription_types, subscriptions, moonworm_tasks, queries
from .migrations import (
checksum_address,
update_dashboard_subscription_key,
generate_entity_subscriptions,
add_selectors,
)


Expand Down Expand Up @@ -87,6 +93,9 @@ def migrations_list(args: argparse.Namespace) -> None:
- id: 20230501
name: fix_duplicates_keys_in_entity_subscription
description: Fix entity duplicates keys for all subscriptions introduced in 20230213
- id: 20230904
name fill_missing_selectors_in_moonworm_tasks
description: Get all moonworm jobs from moonworm journal and add selector tag if it not represent
"""
logger.info(entity_migration_overview)

Expand Down Expand Up @@ -117,6 +126,30 @@ def migrations_run(args: argparse.Namespace) -> None:
web3_session = yield_web3_provider()
db_session = SessionLocal()
try:
if args.id == 20230904:
step_order = [
"fill_missing_selectors_in_moonworm_tasks",
"deduplicate_moonworm_tasks",
]
step_map: Dict[str, Dict[str, Any]] = {
"upgrade": {
"fill_missing_selectors_in_moonworm_tasks": {
"action": add_selectors.fill_missing_selectors_in_moonworm_tasks,
"description": "Get all moonworm jobs from moonworm journal and add selector tag if it not represent",
},
"deduplicate_moonworm_tasks": {
"action": add_selectors.deduplicate_moonworm_task_by_selector,
"description": "Deduplicate moonworm tasks by selector",
},
},
"downgrade": {},
}
if args.command not in ["upgrade", "downgrade"]:
logger.info("Wrong command. Please use upgrade or downgrade")
step = args.step

migration_run(step_map, args.command, step, step_order)

if args.id == 20230501:
# fix entity duplicates keys for all subscriptions introduced in 20230213

Expand Down
187 changes: 187 additions & 0 deletions moonstreamapi/moonstreamapi/admin/migrations/add_selectors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
"""
Add selectors to all moonworm tasks.
"""
import logging
import json


from bugout.exceptions import BugoutResponseException
from web3 import Web3

from ...settings import (
BUGOUT_REQUEST_TIMEOUT_SECONDS,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
MOONSTREAM_MOONWORM_TASKS_JOURNAL,
)
from ...settings import bugout_client as bc
from ...actions import get_all_entries_from_search

logger = logging.getLogger(__name__)


def fill_missing_selectors_in_moonworm_tasks() -> None:
"""
Add selectors to all moonworm tasks.
"""

batch_size = 100

moonworm_tasks = get_all_entries_from_search(
journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
search_query="#task_type:moonworm !#version:2.0",
limit=batch_size,
content=True,
)

logger.info(f"Found {len(moonworm_tasks)} moonworm tasks versions 1.0")

entries_tags = []

## batch tasks

for task_batch in [
moonworm_tasks[i : i + batch_size]
for i in range(0, len(moonworm_tasks), batch_size)
]:
count = 0
for task in task_batch:
tags = ["version:2.0"]

## get abi
try:
abi = json.loads(task.content)
except Exception as e:
logger.warn(
f"Unable to parse abi from task: {task.entry_url.split()[-1]}: {e}"
)
continue

if "name" not in abi:
logger.warn(
f"Unable to find abi name in task: {task.entry_url.split()[-1]}"
)
continue

if not any([tag.startswith("abi_selector:") for tag in task.tags]):
## generate selector

abi_selector = Web3.keccak(
text=abi["name"]
+ "("
+ ",".join(map(lambda x: x["type"], abi["inputs"]))
+ ")"
)[:4].hex()

tags.append(f"abi_selector:{abi_selector}")

count += 1

entries_tags.append(
{
"entry_id": task.entry_url.split("/")[-1], ## 😭
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need entry id in tag?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It request for https://github.com/bugout-dev/spire/blob/171b6eb0f8ce9656308a4112deab76af59f662ea/spire/journal/api.py#L1772 that endpoint which get entries_ids and tags as array.

"tags": tags,
}
)

logger.info(f"Found {count} missing selectors in batch {len(task_batch)} tasks")

## update entries

try:
bc.create_entries_tags(
journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
entries_tags=entries_tags,
timeout=15,
)
except BugoutResponseException as e:
logger.error(f"Unable to update entries tags: {e}")
continue


def deduplicate_moonworm_task_by_selector():
"""
Find moonworm tasks with same selector and remove old versions
"""

moonworm_tasks = get_all_entries_from_search(
journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
search_query="#task_type:moonworm #version:2.0",
limit=100,
content=False,
)

logger.info(f"Found {len(moonworm_tasks)} moonworm tasks versions 2.0")

## loop over tasks

selectors = {}

for task in moonworm_tasks:
tags = task.tags

## get selector
selector = [tag for tag in tags if tag.startswith("abi_selector:")]

address = [tag for tag in tags if tag.startswith("address:")]

if len(selector) == 0:
logger.warn(
f"Unable to find selector in task: {task.entry_url.split()[-1]}"
)
continue

selector = selector[0].split(":")[1]

if len(address) == 0:
logger.warn(f"Unable to find address in task: {task.entry_url.split()[-1]}")
continue

address = address[0].split(":")[1]

if address not in selectors:
selectors[address] = {}

if selector not in selectors[address]:
selectors[address][selector] = {"entries": {}}

selectors[address][selector]["entries"][
task.entry_url.split("/")[-1]
] = task.created_at

logger.info(f"Found {len(selectors)} addresses")

for address, selectors_dict in selectors.items():
for selector, tasks_dict in selectors_dict.items():
if len(tasks_dict["entries"]) == 1:
continue

## find earliest task

earliest_task_id = min(
tasks_dict["entries"], key=lambda key: tasks_dict["entries"][key]
)

## remove all tasks except latest

logger.info(
f"Found {len(tasks_dict['entries'])} tasks with selector {selector} erliest task {earliest_task_id} with created_at: {tasks_dict['entries'][earliest_task_id]}"
)

for task_id in tasks_dict["entries"]:
if task_id == earliest_task_id:
continue

try:
bc.delete_entry(
journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL,
entry_id=task_id,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
)
except BugoutResponseException as e:
logger.error(f"Unable to delete entry with id {task_id} : {e}")
continue

logger.info(f"Deleted entry: {task_id}")