Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hackweek(kafkatasks): add kafkatasks postgres model #76394

Closed
wants to merge 117 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
117 commits
Select commit Hold shift + click to select a range
5b4fed7
Empty files!
markstory Aug 19, 2024
9bc027a
We'll need migrations probably
markstory Aug 19, 2024
06ef2f8
Start sketching registry/namespace and decorator
markstory Aug 19, 2024
f828edb
Get task decorator working
markstory Aug 19, 2024
c5e8ee7
Janky message production
markstory Aug 19, 2024
6d492df
add kafkatasks model
victoria-yining-huang Aug 19, 2024
8b4eb85
:hammer_and_wrench: apply pre-commit fixes
getsantry[bot] Aug 19, 2024
1049e7f
restore server config
victoria-yining-huang Aug 19, 2024
454b2ec
remove src/sentry/models/kafkatasks.py
victoria-yining-huang Aug 19, 2024
40263ae
add src/sentry/taskworker/models.py
victoria-yining-huang Aug 19, 2024
fb9e7ae
Add module to INSTALLED_APPS
markstory Aug 19, 2024
4880129
Adds base service skeleton, sample impl for get and set tasks
GabeVillalobos Aug 19, 2024
1c0a8e5
Start sketching out retry and retry state
markstory Aug 19, 2024
8f3761d
Fix mistakes
markstory Aug 19, 2024
a586417
Adds processing state to mark tasks as inaccessible to other workers,…
GabeVillalobos Aug 19, 2024
511630a
add migration
victoria-yining-huang Aug 19, 2024
b02c45b
Fixes for new schema changes
GabeVillalobos Aug 19, 2024
15ab2b0
Fix typos
markstory Aug 19, 2024
eb2a78e
Sketch in deadline and idempotent task attributes
markstory Aug 19, 2024
4934bba
Sketch out worker command line interface
markstory Aug 19, 2024
7ff2496
Rework initial PendingTask migration for primary key
GabeVillalobos Aug 19, 2024
7caf551
Adds namespace column to PendingTasks
GabeVillalobos Aug 19, 2024
d871941
start sketching in the worker
markstory Aug 19, 2024
c5db13b
:hammer_and_wrench: apply pre-commit fixes
getsantry[bot] Aug 19, 2024
909985e
Adds stub command for task processing
GabeVillalobos Aug 19, 2024
35d053e
Adds test CLI command to run task worker
GabeVillalobos Aug 20, 2024
4de2859
Adds actual connected namespace example
GabeVillalobos Aug 20, 2024
84c21ca
add consumer code
victoria-yining-huang Aug 20, 2024
4a38086
Sketch in retry flow a bit more
markstory Aug 20, 2024
8c09371
Add some logging
markstory Aug 20, 2024
8958166
Fix logging
markstory Aug 20, 2024
177cedc
update some PendingTasks fields based on the example message, and add…
victoria-yining-huang Aug 20, 2024
b7118df
Allow unsafe migration - hackweek!
markstory Aug 20, 2024
1d2de31
remove cli from consumer, and remove old migrations
victoria-yining-huang Aug 20, 2024
6e82cad
add latest migration
victoria-yining-huang Aug 20, 2024
c19ebf8
Adds retry logic and updating for tasks
GabeVillalobos Aug 20, 2024
a217c7b
Adds sample task and namespace
GabeVillalobos Aug 20, 2024
f07f8a2
refactor
victoria-yining-huang Aug 20, 2024
6d89742
Flattens retry fields on model
GabeVillalobos Aug 20, 2024
9c3fd22
Pushing work for live E2E test, with retry support
GabeVillalobos Aug 20, 2024
41f15fc
Make `sentry run consumer taskworker --consumer-group hackweek` work
markstory Aug 21, 2024
72e7ab6
add offset but idk if it's right
victoria-yining-huang Aug 21, 2024
1d90a9a
Sketch in processing timeout, retries and deadlines
markstory Aug 21, 2024
18fded4
Improve logging
markstory Aug 21, 2024
4805c19
Fix mistakes
markstory Aug 21, 2024
43bab4d
Get retries working
markstory Aug 21, 2024
4108408
Use config tooling better
markstory Aug 22, 2024
b7c762e
Cleanup
markstory Aug 22, 2024
52c3691
Rename cli var
GabeVillalobos Aug 21, 2024
e499899
baby refactors for retry state conversion
GabeVillalobos Aug 21, 2024
a2b449d
Updates strategy, config to use protobufs with kafka tasks
GabeVillalobos Aug 21, 2024
3064fb9
Fixes some warnings around datetime setting
GabeVillalobos Aug 22, 2024
8273cd0
Merge branch 'master' into hackweek-kafkatasks
markstory Sep 16, 2024
8d78757
create pending task store
enochtangg Sep 17, 2024
c4ab4ea
fanout cpu intensive work in RunTaskWithMultiprocessing
john-z-yang Sep 17, 2024
51caaea
fix shutdown and reduce inital value
john-z-yang Sep 18, 2024
fb8cbf9
wip stub grpc server
john-z-yang Sep 19, 2024
99c5879
make worker and service use pending task store
enochtangg Sep 19, 2024
b1d92ca
wip refactor to use protobuf as first class data structure
john-z-yang Sep 20, 2024
eca1b54
use grpc
enochtangg Sep 20, 2024
bf0f2fe
Add logging for latency
markstory Sep 20, 2024
7185964
feat: Add script for running taskworker consumer/worker
markstory Sep 20, 2024
a664b50
Dial down logging levels for arroyo
markstory Sep 20, 2024
546630d
Reduce logging noise more
markstory Sep 20, 2024
a169649
Merge remote-tracking branch 'origin/grpc-server' into hackweek-kafka…
markstory Sep 23, 2024
76c9bde
Add grpc to test script
markstory Sep 23, 2024
cacfd87
Merge branch 'master' into hackweek-kafkatasks
markstory Sep 24, 2024
67abb4d
Rough update to latest protobufs
markstory Sep 24, 2024
55935c4
Fixes and migrations
markstory Sep 24, 2024
42f8004
Fix mistakes and add better logging
markstory Sep 25, 2024
9ed981d
Don't need imports in consumer
markstory Sep 25, 2024
8aeda50
feat(taskworker) Improve prototype measurement harness for taskworkers
markstory Sep 25, 2024
0eee765
implement basic consumer push model (#78098)
john-z-yang Oct 1, 2024
0390010
Merge branch 'master' into hackweek-kafkatasks
markstory Oct 1, 2024
652e5eb
Merge branch 'feat-taskworker-harness' into hackweek-kafkatasks
markstory Oct 2, 2024
6de579a
Fix overflow in output
markstory Oct 2, 2024
3aaa7ab
Add push mode
markstory Oct 2, 2024
248de27
Merge branch 'master' into hackweek-kafkatasks
markstory Oct 2, 2024
d6295b3
Merge branch 'feat-taskworker-harness-push' into hackweek-kafkatasks
markstory Oct 3, 2024
8e1a2ef
Merge branch 'master' into hackweek-kafkatasks
markstory Oct 3, 2024
eaca553
Implement work pull processing deadline (#78487)
enochtangg Oct 3, 2024
2959f20
Fix processing_deadline being in the past by default
markstory Oct 3, 2024
78b2ab9
Improve output and make reports easier to re-run
markstory Oct 3, 2024
0fb6181
feat(taskworker) Use threadpools instead of multi-processing
markstory Oct 4, 2024
b0d8365
Improve output of testing harness
markstory Oct 4, 2024
76ad166
Improve pull model to return 'next' task in response.
markstory Oct 4, 2024
5f06422
improve test harness output
markstory Oct 4, 2024
d493f96
Use ProcessPoolExecutor instead.
markstory Oct 4, 2024
4982495
Use ProcessPool so we can cancel futures
markstory Oct 4, 2024
2a7b71e
adds a task that runs forever, but with timeout of 8 seconds
john-z-yang Oct 4, 2024
72ef65c
Add task id tracking to results output
markstory Oct 4, 2024
38bd9c9
Merge branch 'master' into hackweek-kafkatasks
markstory Oct 7, 2024
d82b39f
Merge branch 'taskworker-improve-throughput' into hackweek-kafkatasks
markstory Oct 7, 2024
022748f
Use pool.terminate() when tasks timeout
markstory Oct 7, 2024
65095cc
feat(taskworker) Remove completed tasks from the pending store
markstory Oct 7, 2024
8b78adf
Fix duplicate task execution by making updates atomic.
markstory Oct 7, 2024
b7fbf06
Merge branch 'master' into hackweek-kafkatasks
markstory Oct 8, 2024
7d4ea99
Throttle inflight activations when exceeding max count (#78684)
enochtangg Oct 8, 2024
8cf706c
Start janky prototype of SQLite storage for inflight activations
markstory Oct 8, 2024
25afb8f
Improve sqlite queries so they run
markstory Oct 8, 2024
b2d9b0e
Fix fat finger typos
markstory Oct 9, 2024
0559437
Add storage option to consumer/grpc processes
markstory Oct 9, 2024
61a9316
Connect storage option with test harness
markstory Oct 9, 2024
e4d3ab5
Fix data race when fetching tasks.
markstory Oct 10, 2024
5442aaf
Merge branch 'master' into hackweek-kafkatasks
markstory Oct 11, 2024
9f905d2
Merge branch 'master' into hackweek-kafkatasks
markstory Oct 21, 2024
bbea2ea
allow multiple consumers (#79218)
enochtangg Oct 21, 2024
ba535f9
rfc(taskworker): First pass at storing inflight tasks in redis (#79045)
enochtangg Oct 21, 2024
ccbefa8
Merge branch 'master' into hackweek-kafkatasks
markstory Oct 22, 2024
bda7cdf
Merge branch 'master' into hackweek-kafkatasks
markstory Oct 23, 2024
41621ef
feat: Add reply consumer to taskworker prototypes
markstory Oct 21, 2024
1d1e999
Remove redundant comment
markstory Oct 22, 2024
3ebaefd
Found a different, slightly less jank solution
markstory Oct 22, 2024
059bee2
Make logging locations more consistent
markstory Oct 22, 2024
e2b4d10
normalize execution time to be after the task is complete.
markstory Oct 22, 2024
d0d0755
Cleanup
markstory Oct 22, 2024
b688138
Add ability to run task eagerly during testing (#79686)
john-z-yang Oct 24, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
296 changes: 296 additions & 0 deletions bin/taskworker-test
Original file line number Diff line number Diff line change
@@ -0,0 +1,296 @@
#!/usr/bin/env python
import csv
import os
import random
import sys
import time
from collections import defaultdict
from subprocess import list2cmdline

import click
from honcho.manager import Manager

from sentry.runner import configure
from sentry.runner.formatting import get_honcho_printer

configure()


@click.command("taskworker-test")
@click.option("--number", "-n", help="Number of messages to append", default=10_000)
@click.option("--verbose", "-v", help="Enable verbose output", default=False, is_flag=True)
@click.option("--seed", "-s", help="pseudo random number generator seed", default=None)
@click.option("--consumers", help="Number of consumer processes", default=1)
@click.option("--workers", help="Number of worker processes", default=1)
@click.option("--mode", help="The worker/consumer mode to use", default="pull")
@click.option("--storage", help="The storage mode to use", default="postgres")
@click.option("--report-only", help="Run report output only", is_flag=True, default=False)
def main(
number: int,
verbose: bool,
consumers: int,
workers: int,
seed: float | None,
mode: str,
report_only: bool,
storage: str,
):
from sentry.taskdemo import variable_time

if report_only:
return print_results("./taskworker.log", workers)

if verbose:
click.echo(f"Adding {number} task messages")
if not seed:
seed = random.random()

# Fill the topic up with a number of tasks
start = time.monotonic()
for i in range(number):
variable_time.delay(wait=random.random(), taskno=i)

end = time.monotonic()
click.echo(f"Appending {number} tasks took {(end-start)}s")

cwd = os.getcwd()

os.unlink("./taskworker.log")

# Use honcho to control the worker and consumer proceses.
honcho_printer = get_honcho_printer(prefix=True, pretty=False)
manager = Manager(honcho_printer)

processes = []
if mode == "pull":
processes.append(
{
"name": "grpc",
"cmd": ["sentry", "run", "kafka-task-grpc-pull", "--storage", storage],
},
)
for i in range(consumers):
processes.append(
{
"name": f"consumer-{i}",
"cmd": [
"sentry",
"run",
"consumer",
"taskworker",
"--consumer-group",
"taskworker-pull",
"--log-level",
"warning",
"--",
"--storage",
storage,
],
}
)
for i in range(workers):
processes.append(
{
"name": f"worker-{i}",
"cmd": [
"sentry",
"run",
"taskworker-pull",
"--namespace",
"demos",
],
},
)
elif mode == "push":
worker_ports = [50051 + i for i in range(workers)]
worker_addrs = ",".join([f"127.0.0.1:{port}" for port in worker_ports])

processes.append(
{
"name": "grpc",
"cmd": [
"sentry",
"run",
"kafka-task-grpc-push",
"--worker-addrs",
worker_addrs,
"--storage",
storage,
],
},
)
processes.append(
{
"name": "consumer",
"cmd": [
"sentry",
"run",
"consumer",
"taskworker",
"--consumer-group",
"taskworker-pull",
"--log-level",
"warning",
"--",
"--storage",
storage,
],
}
)
for port in worker_ports:
processes.append(
{
"name": f"worker-{port}",
"cmd": [
"sentry",
"run",
"taskworker-push",
"--namespace",
"demos",
"--port",
str(port),
],
},
)
elif mode == "reply":
worker_ports = [50051 + i for i in range(workers)]
worker_addrs = ",".join([f"127.0.0.1:{port}" for port in worker_ports])

processes.append(
{
"name": "consumer",
"cmd": [
"sentry",
"run",
"consumer",
"taskworker-reply",
"--consumer-group",
"taskworker-pull",
"--log-level",
"warning",
"--",
"--storage",
storage,
"--worker-addrs",
worker_addrs,
],
}
)
for port in worker_ports:
processes.append(
{
"name": f"worker-{port}",
"cmd": [
"sentry",
"run",
"taskworker-reply",
"--namespace",
"demos",
"--port",
str(port),
],
},
)
else:
raise RuntimeError(f"Unexpected mode of {mode}. Use `push`, `pull`, or `reply` instead.")

for process in processes:
manager.add_process(process["name"], list2cmdline(process["cmd"]), cwd=cwd)

# Lets go!
manager.loop()

print_results("./taskworker.log", workers)

sys.exit(manager.returncode)


def print_results(log_file: str, worker_count: int) -> None:
click.echo("")
click.echo("== Test run complete ==")
click.echo("")

fieldnames = ["event", "worker_id", "task_add_time", "execution_time", "latency", "task_id"]
latency_times = []
execution_times = []
task_ids = set()
duplicates = []
with open(log_file) as logs:
results = csv.DictReader(logs, fieldnames=fieldnames)
for row in results:
latency_times.append(float(row["latency"].strip()))
execution_times.append(float(row["execution_time"].strip()))
row_id = row["task_id"].strip()
if row_id in task_ids:
duplicates.append(row_id)
task_ids.add(row_id)

# We append tasks and then start applications. The first
# message always has long latency as application startup
# and kafka take some time to get going.
first_latency = latency_times[0]

min_latency = min(latency_times)
max_latency = max(latency_times)
avg_latency = sum(latency_times) / len(latency_times)

processing_time = execution_times[-1] - execution_times[0]
task_throughput = len(latency_times) / processing_time

# Remove the startup overhead to get relative latency.
adj_min_latency = min_latency - first_latency
adj_max_latency = max_latency - first_latency
adj_avg_latency = avg_latency - first_latency

# Bucket latency and count totals in each bucket
latency_spread = adj_max_latency - adj_min_latency
bucket_count = 20
bucket_width = latency_spread / bucket_count
buckets = defaultdict(int)
for value in latency_times:
adjusted = max(value - first_latency, 0)
bucket = int(adjusted / bucket_width)
buckets[bucket] += 1

click.echo("")
click.echo("## Run summary")
click.echo("")
click.echo(f"Task count: {len(latency_times)}")
click.echo(f"Processing time: {processing_time:.4f}")
click.echo(f"Throughput: {task_throughput:.4f}")

if duplicates:
click.echo("")
click.echo("Duplicate executions found:")
for dupe in duplicates:
click.echo(f"- {dupe}")
click.echo("")

click.echo("")
click.echo("## Task latency")
click.echo("")
click.echo(f"First task Latency: {first_latency:.5f}")
click.echo(
f"Raw Min / Max / Avg latency: {min_latency:.5f} / {max_latency:.5f} / {avg_latency:.5f}"
)
click.echo(
f"Adjusted Min / Max / Avg latency: {adj_min_latency:.5f} / {adj_max_latency:.5f} / {adj_avg_latency:.5f}"
)

click.echo("")
click.echo("## Latency histogram")
click.echo("")
bars = []
for key, count in buckets.items():
bucket_upper = key * bucket_width
# Limit to 50 to prevent wrapping in output
bar = "█" * min(count, 50)
bar += f" {count} "
bars.append((bucket_upper, bar))

for bucket_upper, bar in sorted(bars, key=lambda x: x[0]):
click.echo(f"{bucket_upper:.5f} {bar}")


if __name__ == "__main__":
main()
2 changes: 1 addition & 1 deletion requirements-base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ cryptography>=38.0.3
# Note, grpcio>1.30.0 requires setting GRPC_POLL_STRATEGY=epoll1
# See https://github.com/grpc/grpc/issues/23796 and
# https://github.com/grpc/grpc/blob/v1.35.x/doc/core/grpc-polling-engines.md#polling-engine-implementations-in-grpc
grpcio>=1.59.0
grpcio==1.66.1

# not directly used, but provides a speedup for redis
hiredis>=2.3.2
Expand Down
2 changes: 1 addition & 1 deletion requirements-dev-frozen.txt
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ google-resumable-media==2.7.0
googleapis-common-protos==1.63.2
grpc-google-iam-v1==0.13.1
grpc-stubs==1.53.0.5
grpcio==1.60.1
grpcio==1.66.1
grpcio-status==1.60.1
h11==0.13.0
hiredis==2.3.2
Expand Down
2 changes: 1 addition & 1 deletion requirements-frozen.txt
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ google-resumable-media==2.7.0
googleapis-common-protos==1.63.2
grpc-google-iam-v1==0.13.1
grpc-stubs==1.53.0.5
grpcio==1.60.1
grpcio==1.66.1
grpcio-status==1.60.1
h11==0.14.0
hiredis==2.3.2
Expand Down
19 changes: 19 additions & 0 deletions src/sentry/conf/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,7 @@ def env(
"sentry.hybridcloud",
"sentry.remote_subscriptions.apps.Config",
"sentry.data_secrecy",
"sentry.taskworker",
"sentry.workflow_engine",
)

Expand Down Expand Up @@ -711,6 +712,8 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str:
BROKER_URL = "redis://127.0.0.1:6379"
BROKER_TRANSPORT_OPTIONS: dict[str, int] = {}

TASK_WORKER_ALWAYS_EAGER = False

# Ensure workers run async by default
# in Development you might want them to run in-process
# though it would cause timeouts/recursions in some cases
Expand Down Expand Up @@ -1292,6 +1295,8 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str:
CELERY_TASK_SOFT_TIME_LIMIT = int(timedelta(hours=3).total_seconds())
CELERY_TASK_TIME_LIMIT = int(timedelta(hours=3, seconds=15).total_seconds())

TASKWORKER_IMPORTS = ("sentry.taskdemo",)

# Queues that belong to the processing pipeline and need to be monitored
# for backpressure management
PROCESSING_QUEUES = [
Expand Down Expand Up @@ -1363,6 +1368,11 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str:
"filters": ["important_django_request"],
"class": "sentry_sdk.integrations.logging.EventHandler",
},
"taskworkerlog": {
"level": "INFO",
"class": "logging.FileHandler",
"filename": "./taskworker.log",
},
},
"filters": {
"important_django_request": {
Expand Down Expand Up @@ -1408,6 +1418,11 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str:
"urllib3.connectionpool": {"level": "ERROR", "handlers": ["console"], "propagate": False},
"boto3": {"level": "WARNING", "handlers": ["console"], "propagate": False},
"botocore": {"level": "WARNING", "handlers": ["console"], "propagate": False},
"taskworker.results": {
"level": "INFO",
"handlers": ["console", "taskworkerlog"],
"propagate": False,
},
},
}

Expand Down Expand Up @@ -2917,6 +2932,10 @@ def custom_parameter_sort(parameter: dict) -> tuple[str, int]:
"shared-resources-usage": "default",
"buffered-segments": "default",
"buffered-segments-dlq": "default",
"hackweek": "default",
"hackweek-reply": "default",
"hackweek-dlq": "default",
"^(hackweek-reply|hackweek)$": "default",
}


Expand Down
4 changes: 4 additions & 0 deletions src/sentry/conf/types/kafka_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ class Topic(Enum):
SNUBA_SPANS = "snuba-spans"
BUFFERED_SEGMENTS = "buffered-segments"
BUFFERED_SEGMENTS_DLQ = "buffered-segments-dlq"
HACKWEEK = "hackweek"
HACKWEEK_REPLY = "hackweek-reply"
HACKWEEK_DLQ = "hackweek-dlq"
HACKWEEK_GLOB = "^(hackweek-reply|hackweek)$"


class ConsumerDefinition(TypedDict, total=False):
Expand Down
Loading
Loading