Skip to content

Commit

Permalink
dev(mocks) --stream option, sig handlers, refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
realkosty committed Aug 27, 2024
1 parent 2d20859 commit 6b4a014
Showing 1 changed file with 163 additions and 79 deletions.
242 changes: 163 additions & 79 deletions bin/empower-mocks
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
#!/usr/bin/env python
import os
import signal

from sentry.runner import configure

configure()
Expand All @@ -7,8 +10,10 @@ import hashlib
import re
import subprocess
import sys
import time
import uuid
from argparse import ArgumentParser
from dataclasses import dataclass, field
from datetime import datetime, timezone

import click
Expand Down Expand Up @@ -49,20 +54,31 @@ def download_file(url):
raise Exception(f"An error occurred while downloading the file: {str(e)}")


def get_envelopes(project_name):
project_id = EMPOWER_PROJECT_IDS[project_name]
@dataclass
class Envelope:
content: bytes
project_name: str
latest_timestamp: datetime | None = field(default=None)


# we assume that everything in the bucket is a single "batch" - typically meaning a single Replay session
def get_envelopes(projects) -> list[Envelope]:
res = []
i = 1
while True:
envelope = download_file(f"http://{GCP_BUCKET}.storage.googleapis.com/{project_id}/{i}")
if envelope is None:
break
res.append(envelope)
i += 1
for project_name in projects:
project_id = EMPOWER_PROJECT_IDS[project_name]
i = 1
while True:
envelope_content = download_file(
f"http://{GCP_BUCKET}.storage.googleapis.com/{project_id}/{i}"
)
if envelope_content is None:
break
res.append(Envelope(envelope_content, project_name))
i += 1
return res


def send_envelope_into_ingest(envelope, project_id, public_key):
def send_envelope_into_ingest(envelope, project_id, public_key, quiet=True):

# TODO these are part of URL path so should be parsed out of the envelope:
# &sentry_version=7&sentry_client=sentry.javascript.react%2F8.20.0
Expand All @@ -76,9 +92,12 @@ def send_envelope_into_ingest(envelope, project_id, public_key):
"content-type: text/plain;charset=UTF-8",
"-d",
envelope,
]
],
stdout=subprocess.DEVNULL if quiet else sys.stdout,
stderr=subprocess.DEVNULL if quiet else sys.stderr,
)
click.echo("")
if not quiet:
click.echo("")
# click.echo("Sent envelope: " + envelope.decode("utf-8"))


Expand Down Expand Up @@ -106,68 +125,92 @@ def dt2unix(dt):
return f"{dt.timestamp():.7f}"


ISO_8601_RE = rb"(\"(?:start_timestamp|timestamp|started|sent_at)\":\")(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3,9}Z)(\")"
UNIX_FRACT_RE = rb"(\"(?:start_timestamp|timestamp|started|sent_at)\":)(\d{10}\.\d{3,9})([^0-9])"


def compute_latest_timestamps(envelopes):
for envelope in envelopes:
timestamps = [
iso2dt(m[1].decode("utf-8")) for m in re.findall(ISO_8601_RE, envelope.content)
] + [unix2dt(m[1].decode("utf-8")) for m in re.findall(UNIX_FRACT_RE, envelope.content)]
if not timestamps:
click.echo(f"[WARNING] No timestamps found in input string: {envelope}")
else:
envelope.latest_timestamp = max(timestamps)


# Shift all timestamps by equal amount so that the latest timestamp found becomes `base_time`
#
# NOTE: this can potentially mess up JSON in breadcrumbs, HTTP response bodies, etc.
# but we don't care if that happens as this is demo data and those values can be anything.
# What's potentially undesirable is if extraneous timestamps happens to the latest timestamp
# that's why we try to match keys to reduce the chance of that happening.
def shift_all_timestamps(envelope, base_time=None):
base_time = base_time or datetime.now(timezone.utc)
iso_8601_re = rb"(\"(?:start_timestamp|timestamp|started|sent_at)\":\")(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3,9}Z)(\")"
unix_fract_re = (
rb"(\"(?:start_timestamp|timestamp|started|sent_at)\":)(\d{10}\.\d{3,9})([^0-9])"
)

datetime_objects = [iso2dt(m[1].decode("utf-8")) for m in re.findall(iso_8601_re, envelope)] + [
unix2dt(m[1].decode("utf-8")) for m in re.findall(unix_fract_re, envelope)
]
if not datetime_objects:
click.echo(f"[WARNING] No timestamps found in input string: {envelope}")
return envelope
latest_time = max(datetime_objects)
def shift_all_timestamps(envelopes: list[Envelope], base_time: datetime) -> list[Envelope]:
latest_time = max(e.latest_timestamp for e in envelopes)
offset = base_time - latest_time

return re.sub(
iso_8601_re,
lambda m: m.group(1) + dt2iso(iso2dt(m.group(2)) + offset).encode("utf-8") + m.group(3),
re.sub(
unix_fract_re,
lambda m: m.group(1)
+ dt2unix(unix2dt(m.group(2)) + offset).encode("utf-8")
+ m.group(3),
envelope,
),
)
new_envelopes = []
for envelope in envelopes:
new_envelopes.append(
Envelope(
re.sub(
ISO_8601_RE,
lambda m: m.group(1)
+ dt2iso(iso2dt(m.group(2)) + offset).encode("utf-8")
+ m.group(3),
re.sub(
UNIX_FRACT_RE,
lambda m: m.group(1)
+ dt2unix(unix2dt(m.group(2)) + offset).encode("utf-8")
+ m.group(3),
envelope.content,
),
),
envelope.project_name,
envelope.latest_timestamp + offset,
)
)
return new_envelopes


# Replace all ID with their salted hashes
# All envelopes in the same Replay session must be treated as a single unit to preserve the relationships
# between events via replay_id and trace_id
def make_all_ids_unique(envelopes, batch_salt=""):
def make_all_ids_unique(envelope_content: bytes, batch_salt: bytes = "") -> bytes:
uuid32_re = (
rb'("(?:id|event_id|trace_id|profile_id|replay_id|replayId|message|sid)":")([0-9a-f]{32})"'
)
id16_re = rb'("(?:parent_span_id|span_id|__span)":")([0-9a-f]{16})"'

new_envelopes = []
for envelope in envelopes:
new_envelopes.append(
re.sub(
uuid32_re,
lambda t: t.group(1)
+ hashlib.sha256(t.group(2) + batch_salt).hexdigest()[:32].encode("utf-8")
+ b'"',
re.sub(
id16_re,
lambda t: t.group(1)
+ hashlib.sha256(t.group(2) + batch_salt).hexdigest()[:16].encode("utf-8")
+ b'"',
envelope,
),
)
)
return new_envelopes
return re.sub(
uuid32_re,
lambda t: t.group(1)
+ hashlib.sha256(t.group(2) + batch_salt).hexdigest()[:32].encode("utf-8")
+ b'"',
re.sub(
id16_re,
lambda t: t.group(1)
+ hashlib.sha256(t.group(2) + batch_salt).hexdigest()[:16].encode("utf-8")
+ b'"',
envelope_content,
),
)


def kill_old_backgrounded_processes():
process_name = os.path.basename(__file__)
try:
# Use the `pgrep` command to find processes by name
pids = subprocess.check_output(["pgrep", "-f", process_name]).splitlines()
for pid in pids:
# Exclude the current process to avoid killing itself
if int(pid) != os.getpid():
os.kill(int(pid), signal.SIGTERM)
click.echo(f"Also terminating old backgrounded process with PID {int(pid)}")
except subprocess.CalledProcessError:
# If `pgrep` doesn't find any processes, it will raise a CalledProcessError
pass


if __name__ == "__main__":
Expand All @@ -176,12 +219,25 @@ if __name__ == "__main__":

parser = ArgumentParser(description="Load latest mock data from empower")
parser.add_argument(
"-p",
"--projects",
default=["react", "flask"],
nargs="+",
help="List of empower projects",
)
parser.add_argument(
"--stream",
nargs="?", # Makes the value optional
const="5", # Value assigned if the option is provided without a value
default=None, # Value when the option is not provided at all
help="Send new events continuously at given interval in seconds (default: 5)",
metavar="INTERVAL_SECONDS",
)
parser.add_argument(
"--quiet",
default=False,
action="store_true",
help="Don't print ingest responses",
)
options = parser.parse_args()

ingest_running = (
Expand All @@ -202,35 +258,63 @@ if __name__ == "__main__":
project_map = generate_projects(
organization, tuple([("Empower Plant", tuple(p for p in options.projects))])
)

# [ {envelope: bytes, dev_project_id: int, dev_project_public_key: str} ]

batch_time = datetime.now(timezone.utc)
batch_salt = uuid.uuid4().hex.encode("utf-8")

for project_name in options.projects:
# TODO: first update platform, get envelopes, then send as separate step/loop
project = project_map[project_name]
for project_name, project in project_map.items():
project.platform = PROJECT_PLATFORM_MAP[project_name]
project.save()
project_public_key = ProjectKey.get_default(project)

envelopes = get_envelopes(project_name)
envelopes = make_all_ids_unique(envelopes, batch_salt)
for envelope in envelopes:
template_envelopes = get_envelopes(options.projects)
compute_latest_timestamps(template_envelopes)
template_envelopes.sort(key=lambda e: e.latest_timestamp) # why not

if options.stream:
click.echo("")
click.echo(f"> Sending new batch of events every {options.stream} seconds")
click.echo("")
click.echo("^C\tExit")
click.echo("^Z\tPause")
click.echo("fg\tResume")
click.echo("")

def sigtstp_handler(signum, frame):
click.echo("")
click.echo("Paused. Use `fg` command to resume")
# sys.stdout.flush() # Ensure the message is printed immediately
# Reset the signal handler to the default to allow the process to stop
signal.signal(signal.SIGTSTP, signal.SIG_DFL)
os.kill(os.getpid(), signal.SIGTSTP)

signal.signal(signal.SIGTSTP, sigtstp_handler)

while True:
batch_envelopes = shift_all_timestamps(template_envelopes, datetime.now(timezone.utc))
batch_salt = uuid.uuid4().hex.encode("utf-8")

for envelope in batch_envelopes:
project = project_map[envelope.project_name]
send_envelope_into_ingest(
shift_all_timestamps(envelope, base_time=batch_time),
make_all_ids_unique(envelope.content, batch_salt),
project.id,
project_public_key,
ProjectKey.get_default(project),
options.quiet,
)

click.echo(" > Done sending envelopes. Waiting for processing to finish")

if hasattr(buffer, "process_pending"):
click.echo(" > Processing pending buffers")
buffer.process_pending()

click.echo(" > Processing complete")
if not options.stream:
click.echo(" > Done sending initial envelopes. Waiting for processing to finish")
if hasattr(buffer, "process_pending"):
click.echo(" > Processing pending buffers")
buffer.process_pending()
click.echo(" > Processing complete")
break
else:
if not options.quiet:
click.echo("")
time.sleep(float(options.stream))

except KeyboardInterrupt:
click.echo("")
click.echo("Exiting...")
kill_old_backgrounded_processes()
sys.exit(0)

except Exception:
# Avoid reporting any issues recursively back into Sentry
Expand Down

0 comments on commit 6b4a014

Please sign in to comment.