Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dev(mocks): empower-mocks v0 WIP #76520

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
325 changes: 325 additions & 0 deletions bin/empower-mocks
Original file line number Diff line number Diff line change
@@ -0,0 +1,325 @@
#!/usr/bin/env python
import os
import signal

from sentry.runner import configure

configure()

import hashlib
import re
import subprocess
import sys
import time
import uuid
from argparse import ArgumentParser
from dataclasses import dataclass, field
from datetime import datetime, timezone

import click
import requests
from django.conf import settings

from sentry import buffer
from sentry.models.projectkey import ProjectKey
from sentry.utils.mockdata import get_organization
from sentry.utils.mockdata.core import generate_projects

GCP_BUCKET = "empower-mocks-production"

# TODO: do mapping on empower/mini-relay side as it has control over project ids
EMPOWER_PROJECT_IDS = {
"react": 2,
"flask": 3,
"ruby": 4,
}

PROJECT_PLATFORM_MAP = {
"react": "javascript-react",
"flask": "python-flask",
"ruby": "ruby",
}


def download_file(url):
try:
response = requests.get(url)
if response.status_code == 200:
return response.content
elif response.status_code == 404:
return None
else:
response.raise_for_status() # This will raise an HTTPError
except requests.exceptions.RequestException as e:
raise Exception(f"An error occurred while downloading the file: {str(e)}")


@dataclass
class Envelope:
content: bytes
project_name: str
latest_timestamp: datetime | None = field(default=None)


# we assume that everything in the bucket is a single "batch" - typically meaning a single Replay session
def get_envelopes(projects) -> list[Envelope]:
res = []
for project_name in projects:
project_id = EMPOWER_PROJECT_IDS[project_name]
i = 1
while True:
envelope_content = download_file(
f"http://{GCP_BUCKET}.storage.googleapis.com/{project_id}/{i}"
)
if envelope_content is None:
break
res.append(Envelope(envelope_content, project_name))
i += 1
return res


def send_envelope_into_ingest(envelope, project_id, public_key, quiet=True):

# TODO these are part of URL path so should be parsed out of the envelope:
# &sentry_version=7&sentry_client=sentry.javascript.react%2F8.20.0
subprocess.run(
[
"curl",
"-X",
"POST",
f"http://dev.getsentry.net:8000/api/{project_id}/envelope/?sentry_key={public_key}",
"-H",
"content-type: text/plain;charset=UTF-8",
"-d",
envelope,
],
stdout=subprocess.DEVNULL if quiet else sys.stdout,
stderr=subprocess.DEVNULL if quiet else sys.stderr,
)
if not quiet:
click.echo("")
# click.echo("Sent envelope: " + envelope.decode("utf-8"))


def decode_if_needed(str_or_bytes):
if type(str_or_bytes) is not str:
return str_or_bytes.decode("utf-8")
return str_or_bytes


def iso2dt(iso_str):
return datetime.strptime(decode_if_needed(iso_str), "%Y-%m-%dT%H:%M:%S.%fZ").replace(
tzinfo=timezone.utc
)


def dt2iso(dt):
return dt.strftime("%Y-%m-%dT%H:%M:%S.%f") + "Z"


def unix2dt(unix_str):
return datetime.fromtimestamp(float(decode_if_needed(unix_str))).replace(tzinfo=timezone.utc)


def dt2unix(dt):
return f"{dt.timestamp():.7f}"


ISO_8601_RE = rb"(\"(?:start_timestamp|timestamp|started|sent_at)\":\")(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3,9}Z)(\")"
UNIX_FRACT_RE = rb"(\"(?:start_timestamp|timestamp|started|sent_at)\":)(\d{10}\.\d{3,9})([^0-9])"


def compute_latest_timestamps(envelopes):
for envelope in envelopes:
timestamps = [
iso2dt(m[1].decode("utf-8")) for m in re.findall(ISO_8601_RE, envelope.content)
] + [unix2dt(m[1].decode("utf-8")) for m in re.findall(UNIX_FRACT_RE, envelope.content)]
if not timestamps:
click.echo(f"[WARNING] No timestamps found in input string: {envelope}")
else:
envelope.latest_timestamp = max(timestamps)


# Shift all timestamps by equal amount so that the latest timestamp found becomes `base_time`
#
# NOTE: this can potentially mess up JSON in breadcrumbs, HTTP response bodies, etc.
# but we don't care if that happens as this is demo data and those values can be anything.
# What's potentially undesirable is if extraneous timestamps happens to the latest timestamp
# that's why we try to match keys to reduce the chance of that happening.
def shift_all_timestamps(envelopes: list[Envelope], base_time: datetime) -> list[Envelope]:
latest_time = max(e.latest_timestamp for e in envelopes)
offset = base_time - latest_time

new_envelopes = []
for envelope in envelopes:
new_envelopes.append(
Envelope(
re.sub(
ISO_8601_RE,
lambda m: m.group(1)
+ dt2iso(iso2dt(m.group(2)) + offset).encode("utf-8")
+ m.group(3),
re.sub(
UNIX_FRACT_RE,
lambda m: m.group(1)
+ dt2unix(unix2dt(m.group(2)) + offset).encode("utf-8")
+ m.group(3),
envelope.content,
),
),
envelope.project_name,
envelope.latest_timestamp + offset,
)
)
return new_envelopes


# Replace all ID with their salted hashes
# All envelopes in the same Replay session must be treated as a single unit to preserve the relationships
# between events via replay_id and trace_id
def make_all_ids_unique(envelope_content: bytes, batch_salt: bytes = "") -> bytes:
uuid32_re = (
rb'("(?:id|event_id|trace_id|profile_id|replay_id|replayId|message|sid)":")([0-9a-f]{32})"'
)
id16_re = rb'("(?:parent_span_id|span_id|__span)":")([0-9a-f]{16})"'

return re.sub(
uuid32_re,
lambda t: t.group(1)
+ hashlib.sha256(t.group(2) + batch_salt).hexdigest()[:32].encode("utf-8")
+ b'"',
re.sub(
id16_re,
lambda t: t.group(1)
+ hashlib.sha256(t.group(2) + batch_salt).hexdigest()[:16].encode("utf-8")
+ b'"',
envelope_content,
),
)


def kill_old_backgrounded_processes():
process_name = os.path.basename(__file__)
try:
# Use the `pgrep` command to find processes by name
pids = subprocess.check_output(["pgrep", "-f", process_name]).splitlines()
for pid in pids:
# Exclude the current process to avoid killing itself
if int(pid) != os.getpid():
os.kill(int(pid), signal.SIGTERM)
click.echo(f"Also terminating old backgrounded process with PID {int(pid)}")
except subprocess.CalledProcessError:
# If `pgrep` doesn't find any processes, it will raise a CalledProcessError
pass


if __name__ == "__main__":
try:
settings.CELERY_ALWAYS_EAGER = True

parser = ArgumentParser(description="Load latest mock data from empower")
parser.add_argument(
"--projects",
default=["react", "flask"],
nargs="+",
help="List of empower projects",
)
parser.add_argument(
"--stream",
nargs="?", # Makes the value optional
const="5", # Value assigned if the option is provided without a value
default=None, # Value when the option is not provided at all
help="Send new events continuously at given interval in seconds (default: 5)",
metavar="INTERVAL_SECONDS",
)
parser.add_argument(
"--quiet",
default=False,
action="store_true",
help="Don't print ingest responses",
)
options = parser.parse_args()

ingest_running = (
subprocess.run(
["pgrep", "-fl", "sentry run consumer ingest-events"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
).returncode
== 0
)
if not ingest_running:
click.echo(
"Ingest is required for empower-mocks but is not running. Please follow https://develop.sentry.dev/development/environment/#ingestion-pipeline-relay-aka-sending-events-to-your-dev-environment"
)
sys.exit(1)

organization = get_organization()
project_map = generate_projects(
organization, tuple([("Empower Plant", tuple(p for p in options.projects))])
)
for project_name, project in project_map.items():
project.platform = PROJECT_PLATFORM_MAP[project_name]
project.save()

template_envelopes = get_envelopes(options.projects)
compute_latest_timestamps(template_envelopes)
template_envelopes.sort(key=lambda e: e.latest_timestamp) # why not

if options.stream:
click.echo("")
click.echo(f"> Sending new batch of events every {options.stream} seconds")
click.echo("")
click.echo("^C\tExit")
click.echo("^Z\tPause")
click.echo("fg\tResume")
click.echo("")

def sigtstp_handler(signum, frame):
click.echo("")
click.echo("Paused. Use `fg` command to resume")
# sys.stdout.flush() # Ensure the message is printed immediately
# Reset the signal handler to the default to allow the process to stop
signal.signal(signal.SIGTSTP, signal.SIG_DFL)
os.kill(os.getpid(), signal.SIGTSTP)

signal.signal(signal.SIGTSTP, sigtstp_handler)

while True:
batch_envelopes = shift_all_timestamps(template_envelopes, datetime.now(timezone.utc))
batch_salt = uuid.uuid4().hex.encode("utf-8")

for envelope in batch_envelopes:
project = project_map[envelope.project_name]
send_envelope_into_ingest(
make_all_ids_unique(envelope.content, batch_salt),
project.id,
ProjectKey.get_default(project),
options.quiet,
)

if not options.stream:
click.echo(" > Done sending initial envelopes. Waiting for processing to finish")
if hasattr(buffer, "process_pending"):
click.echo(" > Processing pending buffers")
buffer.process_pending()
click.echo(" > Processing complete")
break
else:
if not options.quiet:
click.echo("")
time.sleep(float(options.stream))

except KeyboardInterrupt:
click.echo("")
click.echo("Exiting...")
kill_old_backgrounded_processes()
sys.exit(0)

except Exception:
# Avoid reporting any issues recursively back into Sentry
import sys
import traceback

traceback.print_exc()
sys.exit(1)
4 changes: 2 additions & 2 deletions src/sentry/utils/mockdata/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,8 +393,8 @@ def create_access_request(member: OrganizationMember, team: Team) -> None:
OrganizationAccessRequest.objects.create_or_update(member=member, team=team)


def generate_projects(organization: Organization) -> Mapping[str, Any]:
mocks = (
def generate_projects(organization: Organization, mocks=None) -> Mapping[str, Any]:
mocks = mocks or (
("Massive Dynamic", ("Ludic Science",)),
("Captain Planet", ("Earth", "Fire", "Wind", "Water", "Heart")),
)
Expand Down
Loading