diff --git a/moonstreamapi/moonstreamapi/actions.py b/moonstreamapi/moonstreamapi/actions.py index 841af7b5f..bd832c51c 100644 --- a/moonstreamapi/moonstreamapi/actions.py +++ b/moonstreamapi/moonstreamapi/actions.py @@ -470,7 +470,7 @@ def upload_abi_to_s3( def get_all_entries_from_search( - journal_id: str, search_query: str, limit: int, token: str + journal_id: str, search_query: str, limit: int, token: str, content: bool = False ) -> List[BugoutSearchResult]: """ Get all required entries from journal using search interface @@ -483,7 +483,7 @@ def get_all_entries_from_search( token=token, journal_id=journal_id, query=search_query, - content=False, + content=content, timeout=10.0, limit=limit, offset=offset, @@ -496,7 +496,7 @@ def get_all_entries_from_search( token=token, journal_id=journal_id, query=search_query, - content=False, + content=content, timeout=10.0, limit=limit, offset=offset, @@ -526,47 +526,45 @@ def apply_moonworm_tasks( token=MOONSTREAM_ADMIN_ACCESS_TOKEN, ) - # create historical crawl task in journal - # will use create_entries_pack for creating entries in journal existing_tags = [entry.tags for entry in entries] - existing_hashes = [ - tag.split(":")[-1] - for tag in chain(*existing_tags) - if "abi_method_hash" in tag + existing_selectors = [ + tag.split(":")[-1] for tag in chain(*existing_tags) if "abi_selector" in tag ] - abi_hashes_dict = { - hashlib.md5(json.dumps(method).encode("utf-8")).hexdigest(): method + abi_selectors_dict = { + Web3.keccak( + text=method["name"] + + "(" + + ",".join(map(lambda x: x["type"], method["inputs"])) + + ")" + )[:4].hex(): method for method in abi if (method["type"] in ("event", "function")) and (method.get("stateMutability", "") != "view") } - for hash in abi_hashes_dict: - if hash not in existing_hashes: - abi_selector = Web3.keccak( - text=abi_hashes_dict[hash]["name"] - + "(" - + ",".join( - map(lambda x: x["type"], abi_hashes_dict[hash]["inputs"]) - ) - + ")" - )[:4].hex() + for abi_selector in abi_selectors_dict: + if abi_selector not in existing_selectors: + hash = hashlib.md5( + json.dumps(abi_selectors_dict[abi_selector]).encode("utf-8") + ).hexdigest() moonworm_abi_tasks_entries_pack.append( { "title": address, - "content": json.dumps(abi_hashes_dict[hash], indent=4), + "content": json.dumps( + abi_selectors_dict[abi_selector], indent=4 + ), "tags": [ f"address:{address}", - f"type:{abi_hashes_dict[hash]['type']}", + f"type:{abi_selectors_dict[abi_selector]['type']}", f"abi_method_hash:{hash}", f"abi_selector:{abi_selector}", f"subscription_type:{subscription_type}", - f"abi_name:{abi_hashes_dict[hash]['name']}", + f"abi_name:{abi_selectors_dict[abi_selector]['name']}", f"status:active", f"task_type:moonworm", f"moonworm_task_pickedup:False", # True if task picked up by moonworm-crawler(default each 120 sec) diff --git a/moonstreamapi/moonstreamapi/admin/cli.py b/moonstreamapi/moonstreamapi/admin/cli.py index b05a595ad..12a96d4d9 100644 --- a/moonstreamapi/moonstreamapi/admin/cli.py +++ b/moonstreamapi/moonstreamapi/admin/cli.py @@ -12,7 +12,12 @@ from moonstreamdb.db import SessionLocal -from ..settings import BUGOUT_BROOD_URL, BUGOUT_SPIRE_URL, MOONSTREAM_APPLICATION_ID +from ..settings import ( + BUGOUT_BROOD_URL, + BUGOUT_SPIRE_URL, + MOONSTREAM_APPLICATION_ID, + MOONSTREAM_MOONWORM_TASKS_JOURNAL, +) from ..web3_provider import yield_web3_provider from . import subscription_types, subscriptions, moonworm_tasks, queries @@ -20,6 +25,7 @@ checksum_address, update_dashboard_subscription_key, generate_entity_subscriptions, + add_selectors, ) @@ -87,6 +93,9 @@ def migrations_list(args: argparse.Namespace) -> None: - id: 20230501 name: fix_duplicates_keys_in_entity_subscription description: Fix entity duplicates keys for all subscriptions introduced in 20230213 +- id: 20230904 +name fill_missing_selectors_in_moonworm_tasks +description: Get all moonworm jobs from moonworm journal and add selector tag if it not represent """ logger.info(entity_migration_overview) @@ -117,6 +126,30 @@ def migrations_run(args: argparse.Namespace) -> None: web3_session = yield_web3_provider() db_session = SessionLocal() try: + if args.id == 20230904: + step_order = [ + "fill_missing_selectors_in_moonworm_tasks", + "deduplicate_moonworm_tasks", + ] + step_map: Dict[str, Dict[str, Any]] = { + "upgrade": { + "fill_missing_selectors_in_moonworm_tasks": { + "action": add_selectors.fill_missing_selectors_in_moonworm_tasks, + "description": "Get all moonworm jobs from moonworm journal and add selector tag if it not represent", + }, + "deduplicate_moonworm_tasks": { + "action": add_selectors.deduplicate_moonworm_task_by_selector, + "description": "Deduplicate moonworm tasks by selector", + }, + }, + "downgrade": {}, + } + if args.command not in ["upgrade", "downgrade"]: + logger.info("Wrong command. Please use upgrade or downgrade") + step = args.step + + migration_run(step_map, args.command, step, step_order) + if args.id == 20230501: # fix entity duplicates keys for all subscriptions introduced in 20230213 diff --git a/moonstreamapi/moonstreamapi/admin/migrations/add_selectors.py b/moonstreamapi/moonstreamapi/admin/migrations/add_selectors.py new file mode 100644 index 000000000..7da444af5 --- /dev/null +++ b/moonstreamapi/moonstreamapi/admin/migrations/add_selectors.py @@ -0,0 +1,187 @@ +""" +Add selectors to all moonworm tasks. +""" +import logging +import json + + +from bugout.exceptions import BugoutResponseException +from web3 import Web3 + +from ...settings import ( + BUGOUT_REQUEST_TIMEOUT_SECONDS, + MOONSTREAM_ADMIN_ACCESS_TOKEN, + MOONSTREAM_MOONWORM_TASKS_JOURNAL, +) +from ...settings import bugout_client as bc +from ...actions import get_all_entries_from_search + +logger = logging.getLogger(__name__) + + +def fill_missing_selectors_in_moonworm_tasks() -> None: + """ + Add selectors to all moonworm tasks. + """ + + batch_size = 100 + + moonworm_tasks = get_all_entries_from_search( + journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL, + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + search_query="#task_type:moonworm !#version:2.0", + limit=batch_size, + content=True, + ) + + logger.info(f"Found {len(moonworm_tasks)} moonworm tasks versions 1.0") + + entries_tags = [] + + ## batch tasks + + for task_batch in [ + moonworm_tasks[i : i + batch_size] + for i in range(0, len(moonworm_tasks), batch_size) + ]: + count = 0 + for task in task_batch: + tags = ["version:2.0"] + + ## get abi + try: + abi = json.loads(task.content) + except Exception as e: + logger.warn( + f"Unable to parse abi from task: {task.entry_url.split()[-1]}: {e}" + ) + continue + + if "name" not in abi: + logger.warn( + f"Unable to find abi name in task: {task.entry_url.split()[-1]}" + ) + continue + + if not any([tag.startswith("abi_selector:") for tag in task.tags]): + ## generate selector + + abi_selector = Web3.keccak( + text=abi["name"] + + "(" + + ",".join(map(lambda x: x["type"], abi["inputs"])) + + ")" + )[:4].hex() + + tags.append(f"abi_selector:{abi_selector}") + + count += 1 + + entries_tags.append( + { + "entry_id": task.entry_url.split("/")[-1], ## 😭 + "tags": tags, + } + ) + + logger.info(f"Found {count} missing selectors in batch {len(task_batch)} tasks") + + ## update entries + + try: + bc.create_entries_tags( + journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL, + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + entries_tags=entries_tags, + timeout=15, + ) + except BugoutResponseException as e: + logger.error(f"Unable to update entries tags: {e}") + continue + + +def deduplicate_moonworm_task_by_selector(): + """ + Find moonworm tasks with same selector and remove old versions + """ + + moonworm_tasks = get_all_entries_from_search( + journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL, + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + search_query="#task_type:moonworm #version:2.0", + limit=100, + content=False, + ) + + logger.info(f"Found {len(moonworm_tasks)} moonworm tasks versions 2.0") + + ## loop over tasks + + selectors = {} + + for task in moonworm_tasks: + tags = task.tags + + ## get selector + selector = [tag for tag in tags if tag.startswith("abi_selector:")] + + address = [tag for tag in tags if tag.startswith("address:")] + + if len(selector) == 0: + logger.warn( + f"Unable to find selector in task: {task.entry_url.split()[-1]}" + ) + continue + + selector = selector[0].split(":")[1] + + if len(address) == 0: + logger.warn(f"Unable to find address in task: {task.entry_url.split()[-1]}") + continue + + address = address[0].split(":")[1] + + if address not in selectors: + selectors[address] = {} + + if selector not in selectors[address]: + selectors[address][selector] = {"entries": {}} + + selectors[address][selector]["entries"][ + task.entry_url.split("/")[-1] + ] = task.created_at + + logger.info(f"Found {len(selectors)} addresses") + + for address, selectors_dict in selectors.items(): + for selector, tasks_dict in selectors_dict.items(): + if len(tasks_dict["entries"]) == 1: + continue + + ## find earliest task + + earliest_task_id = min( + tasks_dict["entries"], key=lambda key: tasks_dict["entries"][key] + ) + + ## remove all tasks except latest + + logger.info( + f"Found {len(tasks_dict['entries'])} tasks with selector {selector} erliest task {earliest_task_id} with created_at: {tasks_dict['entries'][earliest_task_id]}" + ) + + for task_id in tasks_dict["entries"]: + if task_id == earliest_task_id: + continue + + try: + bc.delete_entry( + journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL, + entry_id=task_id, + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + ) + except BugoutResponseException as e: + logger.error(f"Unable to delete entry with id {task_id} : {e}") + continue + + logger.info(f"Deleted entry: {task_id}")