diff --git a/bin/empower-mocks b/bin/empower-mocks new file mode 100755 index 00000000000000..9aeef385900b42 --- /dev/null +++ b/bin/empower-mocks @@ -0,0 +1,325 @@ +#!/usr/bin/env python +import os +import signal + +from sentry.runner import configure + +configure() + +import hashlib +import re +import subprocess +import sys +import time +import uuid +from argparse import ArgumentParser +from dataclasses import dataclass, field +from datetime import datetime, timezone + +import click +import requests +from django.conf import settings + +from sentry import buffer +from sentry.models.projectkey import ProjectKey +from sentry.utils.mockdata import get_organization +from sentry.utils.mockdata.core import generate_projects + +GCP_BUCKET = "empower-mocks-production" + +# TODO: do mapping on empower/mini-relay side as it has control over project ids +EMPOWER_PROJECT_IDS = { + "react": 2, + "flask": 3, + "ruby": 4, +} + +PROJECT_PLATFORM_MAP = { + "react": "javascript-react", + "flask": "python-flask", + "ruby": "ruby", +} + + +def download_file(url): + try: + response = requests.get(url) + if response.status_code == 200: + return response.content + elif response.status_code == 404: + return None + else: + response.raise_for_status() # This will raise an HTTPError + except requests.exceptions.RequestException as e: + raise Exception(f"An error occurred while downloading the file: {str(e)}") + + +@dataclass +class Envelope: + content: bytes + project_name: str + latest_timestamp: datetime | None = field(default=None) + + +# we assume that everything in the bucket is a single "batch" - typically meaning a single Replay session +def get_envelopes(projects) -> list[Envelope]: + res = [] + for project_name in projects: + project_id = EMPOWER_PROJECT_IDS[project_name] + i = 1 + while True: + envelope_content = download_file( + f"http://{GCP_BUCKET}.storage.googleapis.com/{project_id}/{i}" + ) + if envelope_content is None: + break + res.append(Envelope(envelope_content, project_name)) + i += 1 + return res + + +def send_envelope_into_ingest(envelope, project_id, public_key, quiet=True): + + # TODO these are part of URL path so should be parsed out of the envelope: + # &sentry_version=7&sentry_client=sentry.javascript.react%2F8.20.0 + subprocess.run( + [ + "curl", + "-X", + "POST", + f"http://dev.getsentry.net:8000/api/{project_id}/envelope/?sentry_key={public_key}", + "-H", + "content-type: text/plain;charset=UTF-8", + "-d", + envelope, + ], + stdout=subprocess.DEVNULL if quiet else sys.stdout, + stderr=subprocess.DEVNULL if quiet else sys.stderr, + ) + if not quiet: + click.echo("") + # click.echo("Sent envelope: " + envelope.decode("utf-8")) + + +def decode_if_needed(str_or_bytes): + if type(str_or_bytes) is not str: + return str_or_bytes.decode("utf-8") + return str_or_bytes + + +def iso2dt(iso_str): + return datetime.strptime(decode_if_needed(iso_str), "%Y-%m-%dT%H:%M:%S.%fZ").replace( + tzinfo=timezone.utc + ) + + +def dt2iso(dt): + return dt.strftime("%Y-%m-%dT%H:%M:%S.%f") + "Z" + + +def unix2dt(unix_str): + return datetime.fromtimestamp(float(decode_if_needed(unix_str))).replace(tzinfo=timezone.utc) + + +def dt2unix(dt): + return f"{dt.timestamp():.7f}" + + +ISO_8601_RE = rb"(\"(?:start_timestamp|timestamp|started|sent_at)\":\")(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3,9}Z)(\")" +UNIX_FRACT_RE = rb"(\"(?:start_timestamp|timestamp|started|sent_at)\":)(\d{10}\.\d{3,9})([^0-9])" + + +def compute_latest_timestamps(envelopes): + for envelope in envelopes: + timestamps = [ + iso2dt(m[1].decode("utf-8")) for m in re.findall(ISO_8601_RE, envelope.content) + ] + [unix2dt(m[1].decode("utf-8")) for m in re.findall(UNIX_FRACT_RE, envelope.content)] + if not timestamps: + click.echo(f"[WARNING] No timestamps found in input string: {envelope}") + else: + envelope.latest_timestamp = max(timestamps) + + +# Shift all timestamps by equal amount so that the latest timestamp found becomes `base_time` +# +# NOTE: this can potentially mess up JSON in breadcrumbs, HTTP response bodies, etc. +# but we don't care if that happens as this is demo data and those values can be anything. +# What's potentially undesirable is if extraneous timestamps happens to the latest timestamp +# that's why we try to match keys to reduce the chance of that happening. +def shift_all_timestamps(envelopes: list[Envelope], base_time: datetime) -> list[Envelope]: + latest_time = max(e.latest_timestamp for e in envelopes) + offset = base_time - latest_time + + new_envelopes = [] + for envelope in envelopes: + new_envelopes.append( + Envelope( + re.sub( + ISO_8601_RE, + lambda m: m.group(1) + + dt2iso(iso2dt(m.group(2)) + offset).encode("utf-8") + + m.group(3), + re.sub( + UNIX_FRACT_RE, + lambda m: m.group(1) + + dt2unix(unix2dt(m.group(2)) + offset).encode("utf-8") + + m.group(3), + envelope.content, + ), + ), + envelope.project_name, + envelope.latest_timestamp + offset, + ) + ) + return new_envelopes + + +# Replace all ID with their salted hashes +# All envelopes in the same Replay session must be treated as a single unit to preserve the relationships +# between events via replay_id and trace_id +def make_all_ids_unique(envelope_content: bytes, batch_salt: bytes = "") -> bytes: + uuid32_re = ( + rb'("(?:id|event_id|trace_id|profile_id|replay_id|replayId|message|sid)":")([0-9a-f]{32})"' + ) + id16_re = rb'("(?:parent_span_id|span_id|__span)":")([0-9a-f]{16})"' + + return re.sub( + uuid32_re, + lambda t: t.group(1) + + hashlib.sha256(t.group(2) + batch_salt).hexdigest()[:32].encode("utf-8") + + b'"', + re.sub( + id16_re, + lambda t: t.group(1) + + hashlib.sha256(t.group(2) + batch_salt).hexdigest()[:16].encode("utf-8") + + b'"', + envelope_content, + ), + ) + + +def kill_old_backgrounded_processes(): + process_name = os.path.basename(__file__) + try: + # Use the `pgrep` command to find processes by name + pids = subprocess.check_output(["pgrep", "-f", process_name]).splitlines() + for pid in pids: + # Exclude the current process to avoid killing itself + if int(pid) != os.getpid(): + os.kill(int(pid), signal.SIGTERM) + click.echo(f"Also terminating old backgrounded process with PID {int(pid)}") + except subprocess.CalledProcessError: + # If `pgrep` doesn't find any processes, it will raise a CalledProcessError + pass + + +if __name__ == "__main__": + try: + settings.CELERY_ALWAYS_EAGER = True + + parser = ArgumentParser(description="Load latest mock data from empower") + parser.add_argument( + "--projects", + default=["react", "flask"], + nargs="+", + help="List of empower projects", + ) + parser.add_argument( + "--stream", + nargs="?", # Makes the value optional + const="5", # Value assigned if the option is provided without a value + default=None, # Value when the option is not provided at all + help="Send new events continuously at given interval in seconds (default: 5)", + metavar="INTERVAL_SECONDS", + ) + parser.add_argument( + "--quiet", + default=False, + action="store_true", + help="Don't print ingest responses", + ) + options = parser.parse_args() + + ingest_running = ( + subprocess.run( + ["pgrep", "-fl", "sentry run consumer ingest-events"], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ).returncode + == 0 + ) + if not ingest_running: + click.echo( + "Ingest is required for empower-mocks but is not running. Please follow https://develop.sentry.dev/development/environment/#ingestion-pipeline-relay-aka-sending-events-to-your-dev-environment" + ) + sys.exit(1) + + organization = get_organization() + project_map = generate_projects( + organization, tuple([("Empower Plant", tuple(p for p in options.projects))]) + ) + for project_name, project in project_map.items(): + project.platform = PROJECT_PLATFORM_MAP[project_name] + project.save() + + template_envelopes = get_envelopes(options.projects) + compute_latest_timestamps(template_envelopes) + template_envelopes.sort(key=lambda e: e.latest_timestamp) # why not + + if options.stream: + click.echo("") + click.echo(f"> Sending new batch of events every {options.stream} seconds") + click.echo("") + click.echo("^C\tExit") + click.echo("^Z\tPause") + click.echo("fg\tResume") + click.echo("") + + def sigtstp_handler(signum, frame): + click.echo("") + click.echo("Paused. Use `fg` command to resume") + # sys.stdout.flush() # Ensure the message is printed immediately + # Reset the signal handler to the default to allow the process to stop + signal.signal(signal.SIGTSTP, signal.SIG_DFL) + os.kill(os.getpid(), signal.SIGTSTP) + + signal.signal(signal.SIGTSTP, sigtstp_handler) + + while True: + batch_envelopes = shift_all_timestamps(template_envelopes, datetime.now(timezone.utc)) + batch_salt = uuid.uuid4().hex.encode("utf-8") + + for envelope in batch_envelopes: + project = project_map[envelope.project_name] + send_envelope_into_ingest( + make_all_ids_unique(envelope.content, batch_salt), + project.id, + ProjectKey.get_default(project), + options.quiet, + ) + + if not options.stream: + click.echo(" > Done sending initial envelopes. Waiting for processing to finish") + if hasattr(buffer, "process_pending"): + click.echo(" > Processing pending buffers") + buffer.process_pending() + click.echo(" > Processing complete") + break + else: + if not options.quiet: + click.echo("") + time.sleep(float(options.stream)) + + except KeyboardInterrupt: + click.echo("") + click.echo("Exiting...") + kill_old_backgrounded_processes() + sys.exit(0) + + except Exception: + # Avoid reporting any issues recursively back into Sentry + import sys + import traceback + + traceback.print_exc() + sys.exit(1) diff --git a/src/sentry/utils/mockdata/core.py b/src/sentry/utils/mockdata/core.py index 8c2a182972fa28..5afc1902d7d58c 100644 --- a/src/sentry/utils/mockdata/core.py +++ b/src/sentry/utils/mockdata/core.py @@ -393,8 +393,8 @@ def create_access_request(member: OrganizationMember, team: Team) -> None: OrganizationAccessRequest.objects.create_or_update(member=member, team=team) -def generate_projects(organization: Organization) -> Mapping[str, Any]: - mocks = ( +def generate_projects(organization: Organization, mocks=None) -> Mapping[str, Any]: + mocks = mocks or ( ("Massive Dynamic", ("Ludic Science",)), ("Captain Planet", ("Earth", "Fire", "Wind", "Water", "Heart")), )