From 6b4a014e3a93f24e04ac7c9baa41bf8a1e03e1f8 Mon Sep 17 00:00:00 2001 From: Kosty Maleyev Date: Mon, 26 Aug 2024 19:53:39 -0700 Subject: [PATCH] dev(mocks) --stream option, sig handlers, refactor --- bin/empower-mocks | 242 +++++++++++++++++++++++++++++++--------------- 1 file changed, 163 insertions(+), 79 deletions(-) diff --git a/bin/empower-mocks b/bin/empower-mocks index 9d6244e3aac2d1..9aeef385900b42 100755 --- a/bin/empower-mocks +++ b/bin/empower-mocks @@ -1,4 +1,7 @@ #!/usr/bin/env python +import os +import signal + from sentry.runner import configure configure() @@ -7,8 +10,10 @@ import hashlib import re import subprocess import sys +import time import uuid from argparse import ArgumentParser +from dataclasses import dataclass, field from datetime import datetime, timezone import click @@ -49,20 +54,31 @@ def download_file(url): raise Exception(f"An error occurred while downloading the file: {str(e)}") -def get_envelopes(project_name): - project_id = EMPOWER_PROJECT_IDS[project_name] +@dataclass +class Envelope: + content: bytes + project_name: str + latest_timestamp: datetime | None = field(default=None) + + +# we assume that everything in the bucket is a single "batch" - typically meaning a single Replay session +def get_envelopes(projects) -> list[Envelope]: res = [] - i = 1 - while True: - envelope = download_file(f"http://{GCP_BUCKET}.storage.googleapis.com/{project_id}/{i}") - if envelope is None: - break - res.append(envelope) - i += 1 + for project_name in projects: + project_id = EMPOWER_PROJECT_IDS[project_name] + i = 1 + while True: + envelope_content = download_file( + f"http://{GCP_BUCKET}.storage.googleapis.com/{project_id}/{i}" + ) + if envelope_content is None: + break + res.append(Envelope(envelope_content, project_name)) + i += 1 return res -def send_envelope_into_ingest(envelope, project_id, public_key): +def send_envelope_into_ingest(envelope, project_id, public_key, quiet=True): # TODO these are part of URL path so should be parsed out of the envelope: # &sentry_version=7&sentry_client=sentry.javascript.react%2F8.20.0 @@ -76,9 +92,12 @@ def send_envelope_into_ingest(envelope, project_id, public_key): "content-type: text/plain;charset=UTF-8", "-d", envelope, - ] + ], + stdout=subprocess.DEVNULL if quiet else sys.stdout, + stderr=subprocess.DEVNULL if quiet else sys.stderr, ) - click.echo("") + if not quiet: + click.echo("") # click.echo("Sent envelope: " + envelope.decode("utf-8")) @@ -106,68 +125,92 @@ def dt2unix(dt): return f"{dt.timestamp():.7f}" +ISO_8601_RE = rb"(\"(?:start_timestamp|timestamp|started|sent_at)\":\")(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3,9}Z)(\")" +UNIX_FRACT_RE = rb"(\"(?:start_timestamp|timestamp|started|sent_at)\":)(\d{10}\.\d{3,9})([^0-9])" + + +def compute_latest_timestamps(envelopes): + for envelope in envelopes: + timestamps = [ + iso2dt(m[1].decode("utf-8")) for m in re.findall(ISO_8601_RE, envelope.content) + ] + [unix2dt(m[1].decode("utf-8")) for m in re.findall(UNIX_FRACT_RE, envelope.content)] + if not timestamps: + click.echo(f"[WARNING] No timestamps found in input string: {envelope}") + else: + envelope.latest_timestamp = max(timestamps) + + # Shift all timestamps by equal amount so that the latest timestamp found becomes `base_time` # # NOTE: this can potentially mess up JSON in breadcrumbs, HTTP response bodies, etc. # but we don't care if that happens as this is demo data and those values can be anything. # What's potentially undesirable is if extraneous timestamps happens to the latest timestamp # that's why we try to match keys to reduce the chance of that happening. -def shift_all_timestamps(envelope, base_time=None): - base_time = base_time or datetime.now(timezone.utc) - iso_8601_re = rb"(\"(?:start_timestamp|timestamp|started|sent_at)\":\")(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3,9}Z)(\")" - unix_fract_re = ( - rb"(\"(?:start_timestamp|timestamp|started|sent_at)\":)(\d{10}\.\d{3,9})([^0-9])" - ) - - datetime_objects = [iso2dt(m[1].decode("utf-8")) for m in re.findall(iso_8601_re, envelope)] + [ - unix2dt(m[1].decode("utf-8")) for m in re.findall(unix_fract_re, envelope) - ] - if not datetime_objects: - click.echo(f"[WARNING] No timestamps found in input string: {envelope}") - return envelope - latest_time = max(datetime_objects) +def shift_all_timestamps(envelopes: list[Envelope], base_time: datetime) -> list[Envelope]: + latest_time = max(e.latest_timestamp for e in envelopes) offset = base_time - latest_time - return re.sub( - iso_8601_re, - lambda m: m.group(1) + dt2iso(iso2dt(m.group(2)) + offset).encode("utf-8") + m.group(3), - re.sub( - unix_fract_re, - lambda m: m.group(1) - + dt2unix(unix2dt(m.group(2)) + offset).encode("utf-8") - + m.group(3), - envelope, - ), - ) + new_envelopes = [] + for envelope in envelopes: + new_envelopes.append( + Envelope( + re.sub( + ISO_8601_RE, + lambda m: m.group(1) + + dt2iso(iso2dt(m.group(2)) + offset).encode("utf-8") + + m.group(3), + re.sub( + UNIX_FRACT_RE, + lambda m: m.group(1) + + dt2unix(unix2dt(m.group(2)) + offset).encode("utf-8") + + m.group(3), + envelope.content, + ), + ), + envelope.project_name, + envelope.latest_timestamp + offset, + ) + ) + return new_envelopes # Replace all ID with their salted hashes # All envelopes in the same Replay session must be treated as a single unit to preserve the relationships # between events via replay_id and trace_id -def make_all_ids_unique(envelopes, batch_salt=""): +def make_all_ids_unique(envelope_content: bytes, batch_salt: bytes = "") -> bytes: uuid32_re = ( rb'("(?:id|event_id|trace_id|profile_id|replay_id|replayId|message|sid)":")([0-9a-f]{32})"' ) id16_re = rb'("(?:parent_span_id|span_id|__span)":")([0-9a-f]{16})"' - new_envelopes = [] - for envelope in envelopes: - new_envelopes.append( - re.sub( - uuid32_re, - lambda t: t.group(1) - + hashlib.sha256(t.group(2) + batch_salt).hexdigest()[:32].encode("utf-8") - + b'"', - re.sub( - id16_re, - lambda t: t.group(1) - + hashlib.sha256(t.group(2) + batch_salt).hexdigest()[:16].encode("utf-8") - + b'"', - envelope, - ), - ) - ) - return new_envelopes + return re.sub( + uuid32_re, + lambda t: t.group(1) + + hashlib.sha256(t.group(2) + batch_salt).hexdigest()[:32].encode("utf-8") + + b'"', + re.sub( + id16_re, + lambda t: t.group(1) + + hashlib.sha256(t.group(2) + batch_salt).hexdigest()[:16].encode("utf-8") + + b'"', + envelope_content, + ), + ) + + +def kill_old_backgrounded_processes(): + process_name = os.path.basename(__file__) + try: + # Use the `pgrep` command to find processes by name + pids = subprocess.check_output(["pgrep", "-f", process_name]).splitlines() + for pid in pids: + # Exclude the current process to avoid killing itself + if int(pid) != os.getpid(): + os.kill(int(pid), signal.SIGTERM) + click.echo(f"Also terminating old backgrounded process with PID {int(pid)}") + except subprocess.CalledProcessError: + # If `pgrep` doesn't find any processes, it will raise a CalledProcessError + pass if __name__ == "__main__": @@ -176,12 +219,25 @@ if __name__ == "__main__": parser = ArgumentParser(description="Load latest mock data from empower") parser.add_argument( - "-p", "--projects", default=["react", "flask"], nargs="+", help="List of empower projects", ) + parser.add_argument( + "--stream", + nargs="?", # Makes the value optional + const="5", # Value assigned if the option is provided without a value + default=None, # Value when the option is not provided at all + help="Send new events continuously at given interval in seconds (default: 5)", + metavar="INTERVAL_SECONDS", + ) + parser.add_argument( + "--quiet", + default=False, + action="store_true", + help="Don't print ingest responses", + ) options = parser.parse_args() ingest_running = ( @@ -202,35 +258,63 @@ if __name__ == "__main__": project_map = generate_projects( organization, tuple([("Empower Plant", tuple(p for p in options.projects))]) ) - - # [ {envelope: bytes, dev_project_id: int, dev_project_public_key: str} ] - - batch_time = datetime.now(timezone.utc) - batch_salt = uuid.uuid4().hex.encode("utf-8") - - for project_name in options.projects: - # TODO: first update platform, get envelopes, then send as separate step/loop - project = project_map[project_name] + for project_name, project in project_map.items(): project.platform = PROJECT_PLATFORM_MAP[project_name] project.save() - project_public_key = ProjectKey.get_default(project) - envelopes = get_envelopes(project_name) - envelopes = make_all_ids_unique(envelopes, batch_salt) - for envelope in envelopes: + template_envelopes = get_envelopes(options.projects) + compute_latest_timestamps(template_envelopes) + template_envelopes.sort(key=lambda e: e.latest_timestamp) # why not + + if options.stream: + click.echo("") + click.echo(f"> Sending new batch of events every {options.stream} seconds") + click.echo("") + click.echo("^C\tExit") + click.echo("^Z\tPause") + click.echo("fg\tResume") + click.echo("") + + def sigtstp_handler(signum, frame): + click.echo("") + click.echo("Paused. Use `fg` command to resume") + # sys.stdout.flush() # Ensure the message is printed immediately + # Reset the signal handler to the default to allow the process to stop + signal.signal(signal.SIGTSTP, signal.SIG_DFL) + os.kill(os.getpid(), signal.SIGTSTP) + + signal.signal(signal.SIGTSTP, sigtstp_handler) + + while True: + batch_envelopes = shift_all_timestamps(template_envelopes, datetime.now(timezone.utc)) + batch_salt = uuid.uuid4().hex.encode("utf-8") + + for envelope in batch_envelopes: + project = project_map[envelope.project_name] send_envelope_into_ingest( - shift_all_timestamps(envelope, base_time=batch_time), + make_all_ids_unique(envelope.content, batch_salt), project.id, - project_public_key, + ProjectKey.get_default(project), + options.quiet, ) - click.echo(" > Done sending envelopes. Waiting for processing to finish") - - if hasattr(buffer, "process_pending"): - click.echo(" > Processing pending buffers") - buffer.process_pending() - - click.echo(" > Processing complete") + if not options.stream: + click.echo(" > Done sending initial envelopes. Waiting for processing to finish") + if hasattr(buffer, "process_pending"): + click.echo(" > Processing pending buffers") + buffer.process_pending() + click.echo(" > Processing complete") + break + else: + if not options.quiet: + click.echo("") + time.sleep(float(options.stream)) + + except KeyboardInterrupt: + click.echo("") + click.echo("Exiting...") + kill_old_backgrounded_processes() + sys.exit(0) except Exception: # Avoid reporting any issues recursively back into Sentry