diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4108515e6b..b35b9561ed 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -15,7 +15,7 @@ jobs: name: Checkout code - name: Internal github app token id: token - uses: getsentry/action-github-app-token@97c9e23528286821f97fba885c1b1123284b29cc # v2.0.0 + uses: getsentry/action-github-app-token@97c9e23528286821f97fba885c1b1123284b29cc # v2.0.0 continue-on-error: true with: app_id: ${{ vars.SENTRY_INTERNAL_APP_ID }} @@ -161,7 +161,7 @@ jobs: needs: [linting, snuba-image] name: Tests and code coverage runs-on: ubuntu-latest - timeout-minutes: 60 + timeout-minutes: 30 strategy: matrix: snuba_settings: @@ -206,7 +206,6 @@ jobs: SNUBA_IMAGE=snuba-test SNUBA_SETTINGS=test_initialization TEST_LOCATION=test_initialization docker-compose -f docker-compose.gcb.yml run --rm snuba-test if: ${{ matrix.snuba_settings == 'test' }} - - name: Upload to codecov run: | curl -Os https://uploader.codecov.io/latest/linux/codecov && chmod +x codecov && ./codecov -t ${CODECOV_TOKEN} diff --git a/.github/workflows/lint-pipelines.yml b/.github/workflows/lint-pipelines.yml index 32bd0602fa..d3ddf6335f 100644 --- a/.github/workflows/lint-pipelines.yml +++ b/.github/workflows/lint-pipelines.yml @@ -18,3 +18,16 @@ jobs: - uses: getsentry/action-setup-gocd-cli@2f7943ce1a380dea121fd6338a60dc9aabf8e7f1 # v1.0.1 - name: Lint Pipelines with gocd-cli run: ./.github/workflows/lint-pipelines.sh + + render: + name: Render GoCD Pipelines with Jsonnet + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@ac593985615ec2ede58e132d2e21d2b1cbd6127c # v3 + - uses: getsentry/action-gocd-jsonnet@v0 + with: + jb-install: true + check-for-changes: true + convert-to-yaml: true + jsonnet-dir: gocd/templates + generated-dir: gocd/generated-pipelines diff --git a/.gitignore b/.gitignore index c2899494a5..ed5c2a5941 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,4 @@ node_modules .vscode/*.log snuba/admin/dist/bundle.js* tmp/ +gocd/templates/vendor/ diff --git a/Makefile b/Makefile index a8539582f9..8eb308f995 100644 --- a/Makefile +++ b/Makefile @@ -84,3 +84,13 @@ lint-rust: cd rust_snuba && cargo clippy -- -D warnings .PHONY: lint-rust + +gocd: + rm -rf ./gocd/generated-pipelines + mkdir -p ./gocd/generated-pipelines + cd ./gocd/templates && jb install + find . -type f \( -name '*.libsonnet' -o -name '*.jsonnet' \) -print0 | xargs -n 1 -0 jsonnetfmt -i + find . -type f \( -name '*.libsonnet' -o -name '*.jsonnet' \) -print0 | xargs -n 1 -0 jsonnet-lint -J ./gocd/templates/vendor + cd ./gocd/templates && jsonnet -J vendor -m ../generated-pipelines ./snuba.jsonnet + cd ./gocd/generated-pipelines && find . -type f \( -name '*.yaml' \) -print0 | xargs -n 1 -0 yq -p json -o yaml -i +.PHONY: gocd diff --git a/docs-requirements.txt b/docs-requirements.txt index 071c61f729..3ac18c8f7a 100644 --- a/docs-requirements.txt +++ b/docs-requirements.txt @@ -1,5 +1,5 @@ jsonschema2md==0.4.0 fastjsonschema==2.16.2 -sentry-sdk==1.18.0 +sentry-sdk==1.26.0 myst-parser==0.18.0 sphinx==5.1.1 diff --git a/gocd/README.md b/gocd/README.md new file mode 100644 index 0000000000..f2ec760ce4 --- /dev/null +++ b/gocd/README.md @@ -0,0 +1,59 @@ +# Snuba Pipelines + +Relay is in the process of moving to a set of rendered jsonnet pipelines. + +## Jsonnet + +You can render the jsonnet pipelines by running: + +``` +make gocd +``` + +This will clean, fmt, lint and generate the GoCD pipelines to +`./gocd/generated-pipelines`. + + +The snuba pipelines are using the https://github.com/getsentry/gocd-jsonnet +libraries to generate the pipeline for each region. + +## Files + +Below is a description of the directories in the `gocd/` directory. + +### `gocd/templates/` + +These are a set of jsonnet and libsonnet files which are used +to generate the GoCD pipelines. This avoids duplication across +our pipelines as we deploy to multiple regions. + +The `gocd/templates/snuba.jsonnet` file is the entry point for the +generated pipelines. + +`gocd/templates/pipelines/snuba.libsonnet` define the pipeline behaviors. +This library defines the GoCD pipeline, following the same naming +as the [GoCD yaml pipelines](https://github.com/tomzo/gocd-yaml-config-plugin#readme). + +`gocd/templates/bash/*.sh` are shell scripts that are inlined in the +resulting pipelines. This seperation means syntax highlighting and +extra tooling works for bash scripts. + +`gocd/templates/jsonnetfile.json` and `gocd/templates/jsonnetfile.lock.json` +are used by [jsonnet-bundler](https://github.com/jsonnet-bundler/jsonnet-bundler#readme), a package manager for jsonnet. + +You can update jsonnet dependencies by runnning `jb update`. + +### `gocd/generated-pipelines/` + +The current setup of GoCD at sentry is only able to look for +yaml pipelines in a repo, so the genered pipelines have the be +commited. + +The dev-infra team is working on a GoCD plugin that will accept +the jsonnet directly, removing the need for commiting the +generated-pipelines. + +### `gocd/pipelines/` + +These are the original pipelines and will be used until we move +to the jsonnet pipelines. diff --git a/gocd/generated-pipelines/snuba-next-monitor.yaml b/gocd/generated-pipelines/snuba-next-monitor.yaml new file mode 100644 index 0000000000..2434398287 --- /dev/null +++ b/gocd/generated-pipelines/snuba-next-monitor.yaml @@ -0,0 +1,266 @@ +format_version: 10 +pipelines: + deploy-snuba-next-monitor: + display_order: 2 + environment_variables: + GITHUB_TOKEN: '{{SECRET:[devinfra-github][token]}}' + GOCD_ACCESS_TOKEN: '{{SECRET:[devinfra][gocd_access_token]}}' + SENTRY_REGION: monitor + group: snuba-next + lock_behavior: unlockWhenFinished + materials: + deploy-snuba-next-us-pipeline-complete: + pipeline: deploy-snuba-next-us + stage: pipeline-complete + snuba_repo: + branch: master + destination: snuba + git: git@github.com:getsentry/snuba.git + shallow_clone: false + stages: + - ready: + jobs: + ready: + tasks: + - exec: + command: true + - wait: + approval: + type: manual + jobs: + wait: + tasks: + - exec: + command: true + - checks: + jobs: + checks: + elastic_profile_id: snuba + tasks: + - script: | + ##!/bin/bash + + /devinfra/scripts/checks/githubactions/checkruns.py \ + getsentry/snuba \ + ${GO_REVISION_SNUBA_REPO} \ + "Tests and code coverage (test)" \ + "Tests and code coverage (test_distributed)" \ + "Tests and code coverage (test_distributed_migrations)" \ + "Dataset Config Validation" \ + "sentry (0)" \ + "sentry (1)" \ + "self-hosted-end-to-end" + - script: | + ##!/bin/bash + + /devinfra/scripts/checks/googlecloud/checkcloudbuild.py \ + ${GO_REVISION_SNUBA_REPO} \ + sentryio \ + "us.gcr.io/sentryio/snuba" + - script: | + ##!/bin/bash + + deploy_sha=`snuba/scripts/fetch_service_refs.py --pipeline "deploy-snuba"` + snuba/scripts/check-migrations.py --to $deploy_sha --workdir snuba + timeout: 1800 + - deploy-canary: + fetch_materials: true + jobs: + create-sentry-release: + elastic_profile_id: snuba + environment_variables: + SENTRY_AUTH_TOKEN: '{{SECRET:[devinfra-sentryio][token]}}' + SENTRY_ORG: sentry + SENTRY_PROJECT: snuba + tasks: + - script: | + ##!/bin/bash + + sentry-cli releases new "${GO_REVISION_SNUBA_REPO}" + sentry-cli releases set-commits "${GO_REVISION_SNUBA_REPO}" --commit "getsentry/snuba@${GO_REVISION_SNUBA_REPO}" + sentry-cli releases deploys "${GO_REVISION_SNUBA_REPO}" new -e canary + timeout: 300 + deploy-canary: + elastic_profile_id: snuba + environment_variables: + LABEL_SELECTOR: service=snuba,is_canary=true + tasks: + - script: | + ##!/bin/bash + + eval $(/devinfra/scripts/regions/project_env_vars.py --region="${SENTRY_REGION}") + + /devinfra/scripts/k8s/k8stunnel \ + && /devinfra/scripts/k8s/k8s-deploy.py \ + --context="gke_${GCP_PROJECT}_${GKE_REGION}-${GKE_CLUSTER_ZONE}_${GKE_CLUSTER}" \ + --label-selector="${LABEL_SELECTOR}" \ + --image="us.gcr.io/sentryio/snuba:${GO_REVISION_SNUBA_REPO}" \ + --container-name="api" \ + --container-name="consumer" \ + --container-name="errors-consumer" \ + --container-name="errors-replacer" \ + --container-name="events-subscriptions-executor" \ + --container-name="events-subscriptions-scheduler" \ + --container-name="generic-metrics-counters-consumer" \ + --container-name="generic-metrics-counters-subscriptions-executor" \ + --container-name="generic-metrics-counters-subscriptions-scheduler" \ + --container-name="generic-metrics-distributions-consumer" \ + --container-name="generic-metrics-distributions-subscriptions-executor" \ + --container-name="generic-metrics-distributions-subscriptions-scheduler" \ + --container-name="generic-metrics-sets-consumer" \ + --container-name="generic-metrics-sets-subscriptions-executor" \ + --container-name="generic-metrics-sets-subscriptions-scheduler" \ + --container-name="loadbalancer-outcomes-consumer" \ + --container-name="loadtest-errors-consumer" \ + --container-name="loadtest-loadbalancer-outcomes-consumer" \ + --container-name="loadtest-outcomes-consumer" \ + --container-name="loadtest-transactions-consumer" \ + --container-name="metrics-consumer" \ + --container-name="metrics-counters-subscriptions-scheduler" \ + --container-name="metrics-sets-subscriptions-scheduler" \ + --container-name="metrics-subscriptions-executor" \ + --container-name="outcomes-billing-consumer" \ + --container-name="outcomes-consumer" \ + --container-name="profiles-consumer" \ + --container-name="profiling-functions-consumer" \ + --container-name="querylog-consumer" \ + --container-name="replacer" \ + --container-name="replays-consumer" \ + --container-name="search-issues-consumer" \ + --container-name="snuba-admin" \ + --container-name="transactions-consumer-new" \ + --container-name="transactions-subscriptions-executor" \ + --container-name="transactions-subscriptions-scheduler" \ + --container-name="rust-querylog-consumer" \ + --container-name="spans-consumer" \ + --container-name="dlq-consumer" \ + && /devinfra/scripts/k8s/k8s-deploy.py \ + --label-selector="${LABEL_SELECTOR}" \ + --image="us.gcr.io/sentryio/snuba:${GO_REVISION_SNUBA_REPO}" \ + --type="cronjob" \ + --container-name="cleanup" \ + --container-name="optimize" + timeout: 1200 + - deploy-primary: + fetch_materials: true + jobs: + create-sentry-release: + elastic_profile_id: snuba + environment_variables: + SENTRY_AUTH_TOKEN: '{{SECRET:[devinfra-sentryio][token]}}' + SENTRY_ORG: sentry + SENTRY_PROJECT: snuba + tasks: + - script: | + ##!/bin/bash + + sentry-cli releases deploys "${GO_REVISION_SNUBA_REPO}" new -e production + sentry-cli releases finalize "${GO_REVISION_SNUBA_REPO}" + timeout: 300 + deploy-canary: + elastic_profile_id: snuba + environment_variables: + LABEL_SELECTOR: service=snuba + tasks: + - script: | + ##!/bin/bash + + eval $(/devinfra/scripts/regions/project_env_vars.py --region="${SENTRY_REGION}") + + /devinfra/scripts/k8s/k8stunnel \ + && /devinfra/scripts/k8s/k8s-deploy.py \ + --context="gke_${GCP_PROJECT}_${GKE_REGION}-${GKE_CLUSTER_ZONE}_${GKE_CLUSTER}" \ + --label-selector="${LABEL_SELECTOR}" \ + --image="us.gcr.io/sentryio/snuba:${GO_REVISION_SNUBA_REPO}" \ + --container-name="api" \ + --container-name="consumer" \ + --container-name="errors-consumer" \ + --container-name="errors-replacer" \ + --container-name="events-subscriptions-executor" \ + --container-name="events-subscriptions-scheduler" \ + --container-name="generic-metrics-counters-consumer" \ + --container-name="generic-metrics-counters-subscriptions-executor" \ + --container-name="generic-metrics-counters-subscriptions-scheduler" \ + --container-name="generic-metrics-distributions-consumer" \ + --container-name="generic-metrics-distributions-subscriptions-executor" \ + --container-name="generic-metrics-distributions-subscriptions-scheduler" \ + --container-name="generic-metrics-sets-consumer" \ + --container-name="generic-metrics-sets-subscriptions-executor" \ + --container-name="generic-metrics-sets-subscriptions-scheduler" \ + --container-name="loadbalancer-outcomes-consumer" \ + --container-name="loadtest-errors-consumer" \ + --container-name="loadtest-loadbalancer-outcomes-consumer" \ + --container-name="loadtest-outcomes-consumer" \ + --container-name="loadtest-transactions-consumer" \ + --container-name="metrics-consumer" \ + --container-name="metrics-counters-subscriptions-scheduler" \ + --container-name="metrics-sets-subscriptions-scheduler" \ + --container-name="metrics-subscriptions-executor" \ + --container-name="outcomes-billing-consumer" \ + --container-name="outcomes-consumer" \ + --container-name="profiles-consumer" \ + --container-name="profiling-functions-consumer" \ + --container-name="querylog-consumer" \ + --container-name="replacer" \ + --container-name="replays-consumer" \ + --container-name="search-issues-consumer" \ + --container-name="snuba-admin" \ + --container-name="transactions-consumer-new" \ + --container-name="transactions-subscriptions-executor" \ + --container-name="transactions-subscriptions-scheduler" \ + --container-name="rust-querylog-consumer" \ + --container-name="spans-consumer" \ + --container-name="dlq-consumer" \ + && /devinfra/scripts/k8s/k8s-deploy.py \ + --label-selector="${LABEL_SELECTOR}" \ + --image="us.gcr.io/sentryio/snuba:${GO_REVISION_SNUBA_REPO}" \ + --type="cronjob" \ + --container-name="cleanup" \ + --container-name="optimize" + timeout: 1200 + - migrate: + fetch_materials: true + jobs: + migrate: + elastic_profile_id: snuba + tasks: + - script: | + ##!/bin/bash + + eval $(/devinfra/scripts/regions/project_env_vars.py --region="${SENTRY_REGION}") + + /devinfra/scripts/k8s/k8stunnel \ + && /devinfra/scripts/k8s/k8s-spawn-job.py \ + --context="gke_${GCP_PROJECT}_${GKE_REGION}-${GKE_CLUSTER_ZONE}_${GKE_CLUSTER}" \ + --label-selector="service=snuba-admin" \ + --container-name="snuba-admin" \ + "snuba-migrate" "us.gcr.io/sentryio/snuba:${GO_REVISION_SNUBA_REPO}" \ + -- snuba migrations migrate -r complete -r partial + - plugin: + configuration: + id: script-executor + version: 1 + options: + script: | + ##!/bin/bash + + eval $(/devinfra/scripts/regions/project_env_vars.py --region="${SENTRY_REGION}") + + /devinfra/scripts/k8s/k8stunnel \ + && /devinfra/scripts/k8s/k8s-spawn-job.py \ + --context="gke_${GCP_PROJECT}_${GKE_REGION}-${GKE_CLUSTER_ZONE}_${GKE_CLUSTER}" \ + --label-selector="service=snuba-admin" \ + --container-name="snuba-admin" \ + "snuba-migrate-reverse" "us.gcr.io/sentryio/snuba:${GO_REVISION_SNUBA_REPO}" \ + -- snuba migrations reverse-in-progress + run_if: failed + timeout: 1200 + - pipeline-complete: + approval: + allow_only_on_success: true + type: success + jobs: + pipeline-complete: + tasks: + - exec: + command: true diff --git a/gocd/generated-pipelines/snuba-next-us.yaml b/gocd/generated-pipelines/snuba-next-us.yaml new file mode 100644 index 0000000000..94d5d794b7 --- /dev/null +++ b/gocd/generated-pipelines/snuba-next-us.yaml @@ -0,0 +1,266 @@ +format_version: 10 +pipelines: + deploy-snuba-next-us: + display_order: 1 + environment_variables: + GITHUB_TOKEN: '{{SECRET:[devinfra-github][token]}}' + GOCD_ACCESS_TOKEN: '{{SECRET:[devinfra][gocd_access_token]}}' + SENTRY_REGION: us + group: snuba-next + lock_behavior: unlockWhenFinished + materials: + deploy-snuba-next-pipeline-complete: + pipeline: deploy-snuba-next + stage: pipeline-complete + snuba_repo: + branch: master + destination: snuba + git: git@github.com:getsentry/snuba.git + shallow_clone: false + stages: + - ready: + jobs: + ready: + tasks: + - exec: + command: true + - wait: + approval: + type: manual + jobs: + wait: + tasks: + - exec: + command: true + - checks: + jobs: + checks: + elastic_profile_id: snuba + tasks: + - script: | + ##!/bin/bash + + /devinfra/scripts/checks/githubactions/checkruns.py \ + getsentry/snuba \ + ${GO_REVISION_SNUBA_REPO} \ + "Tests and code coverage (test)" \ + "Tests and code coverage (test_distributed)" \ + "Tests and code coverage (test_distributed_migrations)" \ + "Dataset Config Validation" \ + "sentry (0)" \ + "sentry (1)" \ + "self-hosted-end-to-end" + - script: | + ##!/bin/bash + + /devinfra/scripts/checks/googlecloud/checkcloudbuild.py \ + ${GO_REVISION_SNUBA_REPO} \ + sentryio \ + "us.gcr.io/sentryio/snuba" + - script: | + ##!/bin/bash + + deploy_sha=`snuba/scripts/fetch_service_refs.py --pipeline "deploy-snuba"` + snuba/scripts/check-migrations.py --to $deploy_sha --workdir snuba + timeout: 1800 + - deploy-canary: + fetch_materials: true + jobs: + create-sentry-release: + elastic_profile_id: snuba + environment_variables: + SENTRY_AUTH_TOKEN: '{{SECRET:[devinfra-sentryio][token]}}' + SENTRY_ORG: sentry + SENTRY_PROJECT: snuba + tasks: + - script: | + ##!/bin/bash + + sentry-cli releases new "${GO_REVISION_SNUBA_REPO}" + sentry-cli releases set-commits "${GO_REVISION_SNUBA_REPO}" --commit "getsentry/snuba@${GO_REVISION_SNUBA_REPO}" + sentry-cli releases deploys "${GO_REVISION_SNUBA_REPO}" new -e canary + timeout: 300 + deploy-canary: + elastic_profile_id: snuba + environment_variables: + LABEL_SELECTOR: service=snuba,is_canary=true + tasks: + - script: | + ##!/bin/bash + + eval $(/devinfra/scripts/regions/project_env_vars.py --region="${SENTRY_REGION}") + + /devinfra/scripts/k8s/k8stunnel \ + && /devinfra/scripts/k8s/k8s-deploy.py \ + --context="gke_${GCP_PROJECT}_${GKE_REGION}-${GKE_CLUSTER_ZONE}_${GKE_CLUSTER}" \ + --label-selector="${LABEL_SELECTOR}" \ + --image="us.gcr.io/sentryio/snuba:${GO_REVISION_SNUBA_REPO}" \ + --container-name="api" \ + --container-name="consumer" \ + --container-name="errors-consumer" \ + --container-name="errors-replacer" \ + --container-name="events-subscriptions-executor" \ + --container-name="events-subscriptions-scheduler" \ + --container-name="generic-metrics-counters-consumer" \ + --container-name="generic-metrics-counters-subscriptions-executor" \ + --container-name="generic-metrics-counters-subscriptions-scheduler" \ + --container-name="generic-metrics-distributions-consumer" \ + --container-name="generic-metrics-distributions-subscriptions-executor" \ + --container-name="generic-metrics-distributions-subscriptions-scheduler" \ + --container-name="generic-metrics-sets-consumer" \ + --container-name="generic-metrics-sets-subscriptions-executor" \ + --container-name="generic-metrics-sets-subscriptions-scheduler" \ + --container-name="loadbalancer-outcomes-consumer" \ + --container-name="loadtest-errors-consumer" \ + --container-name="loadtest-loadbalancer-outcomes-consumer" \ + --container-name="loadtest-outcomes-consumer" \ + --container-name="loadtest-transactions-consumer" \ + --container-name="metrics-consumer" \ + --container-name="metrics-counters-subscriptions-scheduler" \ + --container-name="metrics-sets-subscriptions-scheduler" \ + --container-name="metrics-subscriptions-executor" \ + --container-name="outcomes-billing-consumer" \ + --container-name="outcomes-consumer" \ + --container-name="profiles-consumer" \ + --container-name="profiling-functions-consumer" \ + --container-name="querylog-consumer" \ + --container-name="replacer" \ + --container-name="replays-consumer" \ + --container-name="search-issues-consumer" \ + --container-name="snuba-admin" \ + --container-name="transactions-consumer-new" \ + --container-name="transactions-subscriptions-executor" \ + --container-name="transactions-subscriptions-scheduler" \ + --container-name="rust-querylog-consumer" \ + --container-name="spans-consumer" \ + --container-name="dlq-consumer" \ + && /devinfra/scripts/k8s/k8s-deploy.py \ + --label-selector="${LABEL_SELECTOR}" \ + --image="us.gcr.io/sentryio/snuba:${GO_REVISION_SNUBA_REPO}" \ + --type="cronjob" \ + --container-name="cleanup" \ + --container-name="optimize" + timeout: 1200 + - deploy-primary: + fetch_materials: true + jobs: + create-sentry-release: + elastic_profile_id: snuba + environment_variables: + SENTRY_AUTH_TOKEN: '{{SECRET:[devinfra-sentryio][token]}}' + SENTRY_ORG: sentry + SENTRY_PROJECT: snuba + tasks: + - script: | + ##!/bin/bash + + sentry-cli releases deploys "${GO_REVISION_SNUBA_REPO}" new -e production + sentry-cli releases finalize "${GO_REVISION_SNUBA_REPO}" + timeout: 300 + deploy-canary: + elastic_profile_id: snuba + environment_variables: + LABEL_SELECTOR: service=snuba + tasks: + - script: | + ##!/bin/bash + + eval $(/devinfra/scripts/regions/project_env_vars.py --region="${SENTRY_REGION}") + + /devinfra/scripts/k8s/k8stunnel \ + && /devinfra/scripts/k8s/k8s-deploy.py \ + --context="gke_${GCP_PROJECT}_${GKE_REGION}-${GKE_CLUSTER_ZONE}_${GKE_CLUSTER}" \ + --label-selector="${LABEL_SELECTOR}" \ + --image="us.gcr.io/sentryio/snuba:${GO_REVISION_SNUBA_REPO}" \ + --container-name="api" \ + --container-name="consumer" \ + --container-name="errors-consumer" \ + --container-name="errors-replacer" \ + --container-name="events-subscriptions-executor" \ + --container-name="events-subscriptions-scheduler" \ + --container-name="generic-metrics-counters-consumer" \ + --container-name="generic-metrics-counters-subscriptions-executor" \ + --container-name="generic-metrics-counters-subscriptions-scheduler" \ + --container-name="generic-metrics-distributions-consumer" \ + --container-name="generic-metrics-distributions-subscriptions-executor" \ + --container-name="generic-metrics-distributions-subscriptions-scheduler" \ + --container-name="generic-metrics-sets-consumer" \ + --container-name="generic-metrics-sets-subscriptions-executor" \ + --container-name="generic-metrics-sets-subscriptions-scheduler" \ + --container-name="loadbalancer-outcomes-consumer" \ + --container-name="loadtest-errors-consumer" \ + --container-name="loadtest-loadbalancer-outcomes-consumer" \ + --container-name="loadtest-outcomes-consumer" \ + --container-name="loadtest-transactions-consumer" \ + --container-name="metrics-consumer" \ + --container-name="metrics-counters-subscriptions-scheduler" \ + --container-name="metrics-sets-subscriptions-scheduler" \ + --container-name="metrics-subscriptions-executor" \ + --container-name="outcomes-billing-consumer" \ + --container-name="outcomes-consumer" \ + --container-name="profiles-consumer" \ + --container-name="profiling-functions-consumer" \ + --container-name="querylog-consumer" \ + --container-name="replacer" \ + --container-name="replays-consumer" \ + --container-name="search-issues-consumer" \ + --container-name="snuba-admin" \ + --container-name="transactions-consumer-new" \ + --container-name="transactions-subscriptions-executor" \ + --container-name="transactions-subscriptions-scheduler" \ + --container-name="rust-querylog-consumer" \ + --container-name="spans-consumer" \ + --container-name="dlq-consumer" \ + && /devinfra/scripts/k8s/k8s-deploy.py \ + --label-selector="${LABEL_SELECTOR}" \ + --image="us.gcr.io/sentryio/snuba:${GO_REVISION_SNUBA_REPO}" \ + --type="cronjob" \ + --container-name="cleanup" \ + --container-name="optimize" + timeout: 1200 + - migrate: + fetch_materials: true + jobs: + migrate: + elastic_profile_id: snuba + tasks: + - script: | + ##!/bin/bash + + eval $(/devinfra/scripts/regions/project_env_vars.py --region="${SENTRY_REGION}") + + /devinfra/scripts/k8s/k8stunnel \ + && /devinfra/scripts/k8s/k8s-spawn-job.py \ + --context="gke_${GCP_PROJECT}_${GKE_REGION}-${GKE_CLUSTER_ZONE}_${GKE_CLUSTER}" \ + --label-selector="service=snuba-admin" \ + --container-name="snuba-admin" \ + "snuba-migrate" "us.gcr.io/sentryio/snuba:${GO_REVISION_SNUBA_REPO}" \ + -- snuba migrations migrate -r complete -r partial + - plugin: + configuration: + id: script-executor + version: 1 + options: + script: | + ##!/bin/bash + + eval $(/devinfra/scripts/regions/project_env_vars.py --region="${SENTRY_REGION}") + + /devinfra/scripts/k8s/k8stunnel \ + && /devinfra/scripts/k8s/k8s-spawn-job.py \ + --context="gke_${GCP_PROJECT}_${GKE_REGION}-${GKE_CLUSTER_ZONE}_${GKE_CLUSTER}" \ + --label-selector="service=snuba-admin" \ + --container-name="snuba-admin" \ + "snuba-migrate-reverse" "us.gcr.io/sentryio/snuba:${GO_REVISION_SNUBA_REPO}" \ + -- snuba migrations reverse-in-progress + run_if: failed + timeout: 1200 + - pipeline-complete: + approval: + allow_only_on_success: true + type: success + jobs: + pipeline-complete: + tasks: + - exec: + command: true diff --git a/gocd/generated-pipelines/snuba-next.yaml b/gocd/generated-pipelines/snuba-next.yaml new file mode 100644 index 0000000000..10dc658510 --- /dev/null +++ b/gocd/generated-pipelines/snuba-next.yaml @@ -0,0 +1,21 @@ +format_version: 10 +pipelines: + deploy-snuba-next: + display_order: 0 + group: snuba-next + lock_behavior: unlockWhenFinished + materials: + snuba_repo: + branch: master + destination: snuba + git: git@github.com:getsentry/snuba.git + shallow_clone: true + stages: + - pipeline-complete: + approval: + type: manual + jobs: + pipeline-complete: + tasks: + - exec: + command: true diff --git a/gocd/pipelines/snuba.yaml b/gocd/pipelines/snuba.yaml index 569322fab4..a6da867d6c 100644 --- a/gocd/pipelines/snuba.yaml +++ b/gocd/pipelines/snuba.yaml @@ -112,6 +112,7 @@ pipelines: --container-name="transactions-subscriptions-executor" \ --container-name="transactions-subscriptions-scheduler" \ --container-name="spans-consumer" \ + --container-name="dlq-consumer" \ && /devinfra/scripts/k8s/k8s-deploy.py \ --label-selector="service=snuba,is_canary=true" \ --image="us.gcr.io/sentryio/snuba:${GO_REVISION_SNUBA_REPO}" \ @@ -180,6 +181,7 @@ pipelines: --container-name="transactions-subscriptions-scheduler" \ --container-name="rust-querylog-consumer" \ --container-name="spans-consumer" \ + --container-name="dlq-consumer" \ && /devinfra/scripts/k8s/k8s-deploy.py \ --label-selector="service=snuba" \ --image="us.gcr.io/sentryio/snuba:${GO_REVISION_SNUBA_REPO}" \ diff --git a/gocd/templates/bash/check-cloud-build.sh b/gocd/templates/bash/check-cloud-build.sh new file mode 100644 index 0000000000..b8d9ec1486 --- /dev/null +++ b/gocd/templates/bash/check-cloud-build.sh @@ -0,0 +1,6 @@ +#!/bin/bash + +/devinfra/scripts/checks/googlecloud/checkcloudbuild.py \ + ${GO_REVISION_SNUBA_REPO} \ + sentryio \ + "us.gcr.io/sentryio/snuba" diff --git a/gocd/templates/bash/check-github.sh b/gocd/templates/bash/check-github.sh new file mode 100644 index 0000000000..b6d6617be1 --- /dev/null +++ b/gocd/templates/bash/check-github.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +/devinfra/scripts/checks/githubactions/checkruns.py \ + getsentry/snuba \ + ${GO_REVISION_SNUBA_REPO} \ + "Tests and code coverage (test)" \ + "Tests and code coverage (test_distributed)" \ + "Tests and code coverage (test_distributed_migrations)" \ + "Dataset Config Validation" \ + "sentry (0)" \ + "sentry (1)" \ + "self-hosted-end-to-end" diff --git a/gocd/templates/bash/check-migrations.sh b/gocd/templates/bash/check-migrations.sh new file mode 100644 index 0000000000..a6bf1bd155 --- /dev/null +++ b/gocd/templates/bash/check-migrations.sh @@ -0,0 +1,4 @@ +#!/bin/bash + +deploy_sha=`snuba/scripts/fetch_service_refs.py --pipeline "deploy-snuba"` +snuba/scripts/check-migrations.py --to $deploy_sha --workdir snuba diff --git a/gocd/templates/bash/deploy.sh b/gocd/templates/bash/deploy.sh new file mode 100644 index 0000000000..27855fde49 --- /dev/null +++ b/gocd/templates/bash/deploy.sh @@ -0,0 +1,54 @@ +#!/bin/bash + +eval $(/devinfra/scripts/regions/project_env_vars.py --region="${SENTRY_REGION}") + +/devinfra/scripts/k8s/k8stunnel \ +&& /devinfra/scripts/k8s/k8s-deploy.py \ + --context="gke_${GCP_PROJECT}_${GKE_REGION}-${GKE_CLUSTER_ZONE}_${GKE_CLUSTER}" \ + --label-selector="${LABEL_SELECTOR}" \ + --image="us.gcr.io/sentryio/snuba:${GO_REVISION_SNUBA_REPO}" \ + --container-name="api" \ + --container-name="consumer" \ + --container-name="errors-consumer" \ + --container-name="errors-replacer" \ + --container-name="events-subscriptions-executor" \ + --container-name="events-subscriptions-scheduler" \ + --container-name="generic-metrics-counters-consumer" \ + --container-name="generic-metrics-counters-subscriptions-executor" \ + --container-name="generic-metrics-counters-subscriptions-scheduler" \ + --container-name="generic-metrics-distributions-consumer" \ + --container-name="generic-metrics-distributions-subscriptions-executor" \ + --container-name="generic-metrics-distributions-subscriptions-scheduler" \ + --container-name="generic-metrics-sets-consumer" \ + --container-name="generic-metrics-sets-subscriptions-executor" \ + --container-name="generic-metrics-sets-subscriptions-scheduler" \ + --container-name="loadbalancer-outcomes-consumer" \ + --container-name="loadtest-errors-consumer" \ + --container-name="loadtest-loadbalancer-outcomes-consumer" \ + --container-name="loadtest-outcomes-consumer" \ + --container-name="loadtest-transactions-consumer" \ + --container-name="metrics-consumer" \ + --container-name="metrics-counters-subscriptions-scheduler" \ + --container-name="metrics-sets-subscriptions-scheduler" \ + --container-name="metrics-subscriptions-executor" \ + --container-name="outcomes-billing-consumer" \ + --container-name="outcomes-consumer" \ + --container-name="profiles-consumer" \ + --container-name="profiling-functions-consumer" \ + --container-name="querylog-consumer" \ + --container-name="replacer" \ + --container-name="replays-consumer" \ + --container-name="search-issues-consumer" \ + --container-name="snuba-admin" \ + --container-name="transactions-consumer-new" \ + --container-name="transactions-subscriptions-executor" \ + --container-name="transactions-subscriptions-scheduler" \ + --container-name="rust-querylog-consumer" \ + --container-name="spans-consumer" \ + --container-name="dlq-consumer" \ +&& /devinfra/scripts/k8s/k8s-deploy.py \ + --label-selector="${LABEL_SELECTOR}" \ + --image="us.gcr.io/sentryio/snuba:${GO_REVISION_SNUBA_REPO}" \ + --type="cronjob" \ + --container-name="cleanup" \ + --container-name="optimize" diff --git a/gocd/templates/bash/migrate-reverse.sh b/gocd/templates/bash/migrate-reverse.sh new file mode 100644 index 0000000000..a3832e1475 --- /dev/null +++ b/gocd/templates/bash/migrate-reverse.sh @@ -0,0 +1,11 @@ +#!/bin/bash + +eval $(/devinfra/scripts/regions/project_env_vars.py --region="${SENTRY_REGION}") + +/devinfra/scripts/k8s/k8stunnel \ +&& /devinfra/scripts/k8s/k8s-spawn-job.py \ + --context="gke_${GCP_PROJECT}_${GKE_REGION}-${GKE_CLUSTER_ZONE}_${GKE_CLUSTER}" \ + --label-selector="service=snuba-admin" \ + --container-name="snuba-admin" \ + "snuba-migrate-reverse" "us.gcr.io/sentryio/snuba:${GO_REVISION_SNUBA_REPO}" \ + -- snuba migrations reverse-in-progress diff --git a/gocd/templates/bash/migrate.sh b/gocd/templates/bash/migrate.sh new file mode 100644 index 0000000000..a9e58b3eca --- /dev/null +++ b/gocd/templates/bash/migrate.sh @@ -0,0 +1,11 @@ +#!/bin/bash + +eval $(/devinfra/scripts/regions/project_env_vars.py --region="${SENTRY_REGION}") + +/devinfra/scripts/k8s/k8stunnel \ +&& /devinfra/scripts/k8s/k8s-spawn-job.py \ + --context="gke_${GCP_PROJECT}_${GKE_REGION}-${GKE_CLUSTER_ZONE}_${GKE_CLUSTER}" \ + --label-selector="service=snuba-admin" \ + --container-name="snuba-admin" \ + "snuba-migrate" "us.gcr.io/sentryio/snuba:${GO_REVISION_SNUBA_REPO}" \ + -- snuba migrations migrate -r complete -r partial diff --git a/gocd/templates/bash/sentry-release-canary.sh b/gocd/templates/bash/sentry-release-canary.sh new file mode 100644 index 0000000000..ed09f95036 --- /dev/null +++ b/gocd/templates/bash/sentry-release-canary.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +sentry-cli releases new "${GO_REVISION_SNUBA_REPO}" +sentry-cli releases set-commits "${GO_REVISION_SNUBA_REPO}" --commit "getsentry/snuba@${GO_REVISION_SNUBA_REPO}" +sentry-cli releases deploys "${GO_REVISION_SNUBA_REPO}" new -e canary diff --git a/gocd/templates/bash/sentry-release-primary.sh b/gocd/templates/bash/sentry-release-primary.sh new file mode 100644 index 0000000000..810d8c2179 --- /dev/null +++ b/gocd/templates/bash/sentry-release-primary.sh @@ -0,0 +1,4 @@ +#!/bin/bash + +sentry-cli releases deploys "${GO_REVISION_SNUBA_REPO}" new -e production +sentry-cli releases finalize "${GO_REVISION_SNUBA_REPO}" diff --git a/gocd/templates/jsonnetfile.json b/gocd/templates/jsonnetfile.json new file mode 100644 index 0000000000..c05e780858 --- /dev/null +++ b/gocd/templates/jsonnetfile.json @@ -0,0 +1,15 @@ +{ + "version": 1, + "dependencies": [ + { + "source": { + "git": { + "remote": "https://github.com/getsentry/gocd-jsonnet.git", + "subdir": "v1.0.0" + } + }, + "version": "main" + } + ], + "legacyImports": true +} diff --git a/gocd/templates/jsonnetfile.lock.json b/gocd/templates/jsonnetfile.lock.json new file mode 100644 index 0000000000..b5361cb8d5 --- /dev/null +++ b/gocd/templates/jsonnetfile.lock.json @@ -0,0 +1,16 @@ +{ + "version": 1, + "dependencies": [ + { + "source": { + "git": { + "remote": "https://github.com/getsentry/gocd-jsonnet.git", + "subdir": "v1.0.0" + } + }, + "version": "7636a8579792e6e1e66b0b567583e5e05764c39f", + "sum": "eOpSoGZ9y3+6O3qllOXVBdiHfQ2xwcnAJWqlE4k3Rj8=" + } + ], + "legacyImports": false +} diff --git a/gocd/templates/pipelines/snuba.libsonnet b/gocd/templates/pipelines/snuba.libsonnet new file mode 100644 index 0000000000..8f67bea696 --- /dev/null +++ b/gocd/templates/pipelines/snuba.libsonnet @@ -0,0 +1,124 @@ +local gocdtasks = import 'github.com/getsentry/gocd-jsonnet/v1.0.0/gocd-tasks.libsonnet'; + +// The return value of this function is the body of a GoCD pipeline. +// More information on gocd-flavor YAML this is producing can be found here: +// - https://github.com/tomzo/gocd-yaml-config-plugin#pipeline +// - https://www.notion.so/sentry/GoCD-New-Service-Quickstart-6d8db7a6964049b3b0e78b8a4b52e25d + +function(region) { + environment_variables: { + SENTRY_REGION: region, + // Required for checkruns. + GITHUB_TOKEN: '{{SECRET:[devinfra-github][token]}}', + GOCD_ACCESS_TOKEN: '{{SECRET:[devinfra][gocd_access_token]}}', + }, + group: 'snuba-next', + lock_behavior: 'unlockWhenFinished', + materials: { + snuba_repo: { + git: 'git@github.com:getsentry/snuba.git', + shallow_clone: false, + branch: 'master', + destination: 'snuba', + }, + }, + stages: [ + { + checks: { + jobs: { + checks: { + timeout: 1800, + elastic_profile_id: 'snuba', + tasks: [ + gocdtasks.script(importstr '../bash/check-github.sh'), + gocdtasks.script(importstr '../bash/check-cloud-build.sh'), + gocdtasks.script(importstr '../bash/check-migrations.sh'), + ], + }, + }, + }, + }, + { + 'deploy-canary': { + fetch_materials: true, + jobs: { + 'create-sentry-release': { + environment_variables: { + SENTRY_ORG: 'sentry', + SENTRY_PROJECT: 'snuba', + SENTRY_AUTH_TOKEN: '{{SECRET:[devinfra-sentryio][token]}}', + }, + timeout: 300, + elastic_profile_id: 'snuba', + tasks: [ + gocdtasks.script(importstr '../bash/sentry-release-canary.sh'), + ], + }, + 'deploy-canary': { + timeout: 1200, + elastic_profile_id: 'snuba', + environment_variables: { + LABEL_SELECTOR: 'service=snuba,is_canary=true', + }, + tasks: [ + gocdtasks.script(importstr '../bash/deploy.sh'), + ], + }, + }, + }, + }, + { + 'deploy-primary': { + fetch_materials: true, + jobs: { + 'create-sentry-release': { + environment_variables: { + SENTRY_ORG: 'sentry', + SENTRY_PROJECT: 'snuba', + SENTRY_AUTH_TOKEN: '{{SECRET:[devinfra-sentryio][token]}}', + }, + timeout: 300, + elastic_profile_id: 'snuba', + tasks: [ + gocdtasks.script(importstr '../bash/sentry-release-primary.sh'), + ], + }, + 'deploy-canary': { + timeout: 1200, + elastic_profile_id: 'snuba', + environment_variables: { + LABEL_SELECTOR: 'service=snuba', + }, + tasks: [ + gocdtasks.script(importstr '../bash/deploy.sh'), + ], + }, + }, + }, + }, + { + migrate: { + fetch_materials: true, + jobs: { + migrate: { + timeout: 1200, + elastic_profile_id: 'snuba', + tasks: [ + gocdtasks.script(importstr '../bash/migrate.sh'), + { + plugin: { + options: gocdtasks.script(importstr '../bash/migrate-reverse.sh'), + run_if: 'failed', + configuration: { + id: 'script-executor', + version: 1, + }, + }, + }, + ], + }, + }, + }, + }, + ], +} diff --git a/gocd/templates/snuba.jsonnet b/gocd/templates/snuba.jsonnet new file mode 100644 index 0000000000..ad1e1c2216 --- /dev/null +++ b/gocd/templates/snuba.jsonnet @@ -0,0 +1,21 @@ +local snuba = import './pipelines/snuba.libsonnet'; +local pipedream = import 'github.com/getsentry/gocd-jsonnet/v1.0.0/pipedream.libsonnet'; + +local pipedream_config = { + name: 'snuba-next', + materials: { + snuba_repo: { + git: 'git@github.com:getsentry/snuba.git', + shallow_clone: true, + branch: 'master', + destination: 'snuba', + }, + }, + + // Set to true to auto-deploy changes (defaults to true) + auto_deploy: false, + // Set to true if you want each pipeline to require manual approval + auto_pipeline_progression: false, +}; + +pipedream.render(pipedream_config, snuba) diff --git a/requirements.txt b/requirements.txt index 2de9a9ba9e..eb8b3759f1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -23,11 +23,11 @@ python-dateutil==2.8.2 python-rapidjson==1.8 pytz==2022.2.1 redis==4.3.4 -sentry-arroyo==2.13.0 -sentry-kafka-schemas==0.1.12 +sentry-arroyo==2.14.0 +sentry-kafka-schemas==0.1.16 sentry-redis-tools==0.1.6 -sentry-relay==0.8.21 -sentry-sdk==1.18.0 +sentry-relay==0.8.27 +sentry-sdk==1.26.0 simplejson==3.17.6 structlog==22.3.0 structlog-sentry==2.0.0 diff --git a/rust_snuba/Cargo.lock b/rust_snuba/Cargo.lock index b0f58aaeb6..3752097c58 100644 --- a/rust_snuba/Cargo.lock +++ b/rust_snuba/Cargo.lock @@ -978,9 +978,9 @@ checksum = "b7e5500299e16ebb147ae15a00a942af264cf3688f47923b8fc2cd5858f23ad3" [[package]] name = "openssl" -version = "0.10.48" +version = "0.10.55" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "518915b97df115dd36109bfa429a48b8f737bd05508cf9588977b599648926d2" +checksum = "345df152bc43501c5eb9e4654ff05f794effb78d4efe3d53abc158baddc0703d" dependencies = [ "bitflags", "cfg-if 1.0.0", @@ -1010,11 +1010,10 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" [[package]] name = "openssl-sys" -version = "0.9.83" +version = "0.9.90" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "666416d899cf077260dac8698d60a60b435a46d57e82acb1be3d0dad87284e5b" +checksum = "374533b0e45f3a7ced10fcaeccca020e66656bc03dac384f852e4e5a7a8104a6" dependencies = [ - "autocfg", "cc", "libc", "pkg-config", diff --git a/snuba/admin/auth.py b/snuba/admin/auth.py index 92092888c1..5bb5bb3df6 100644 --- a/snuba/admin/auth.py +++ b/snuba/admin/auth.py @@ -3,6 +3,7 @@ import json from typing import Sequence +import rapidjson import structlog from flask import request @@ -11,9 +12,12 @@ from snuba.admin.google import CloudIdentityAPI from snuba.admin.jwt import validate_assertion from snuba.admin.user import AdminUser +from snuba.redis import RedisClientKey, get_redis_client USER_HEADER_KEY = "X-Goog-Authenticated-User-Email" +redis_client = get_redis_client(RedisClientKey.ADMIN_AUTH) + logger = structlog.get_logger().bind(module=__name__) @@ -41,7 +45,7 @@ def _is_member_of_group(user: AdminUser, group: str) -> bool: return google_api.check_group_membership(group_email=group, member=user.email) -def get_iam_roles_from_file(user: AdminUser) -> Sequence[str]: +def get_iam_roles_from_user(user: AdminUser) -> Sequence[str]: iam_roles = [] try: with open(settings.ADMIN_IAM_POLICY_FILE, "r") as policy_file: @@ -65,10 +69,38 @@ def get_iam_roles_from_file(user: AdminUser) -> Sequence[str]: return iam_roles +def get_cached_iam_roles(user: AdminUser) -> Sequence[str]: + iam_roles_str = redis_client.get(f"roles-{user.email}") + if not iam_roles_str: + return [] + + iam_roles = rapidjson.loads(iam_roles_str) + if isinstance(iam_roles, list): + return iam_roles + + return [] + + def _set_roles(user: AdminUser) -> AdminUser: # todo: depending on provider convert user email # to subset of DEFAULT_ROLES based on IAM roles - iam_roles = get_iam_roles_from_file(user) + iam_roles: Sequence[str] = [] + try: + iam_roles = get_cached_iam_roles(user) + except Exception as e: + logger.exception("Failed to load roles from cache", exception=e) + + if not iam_roles: + iam_roles = get_iam_roles_from_user(user) + try: + redis_client.set( + f"roles-{user.email}", + rapidjson.dumps(iam_roles), + ex=settings.ADMIN_ROLES_REDIS_TTL, + ) + except Exception as e: + logger.exception(e) + user.roles = [*[ROLES[role] for role in iam_roles if role in ROLES], *DEFAULT_ROLES] return user diff --git a/snuba/admin/iam_policy/iam_policy.json b/snuba/admin/iam_policy/iam_policy.json index d85af4eb0e..9db99766d0 100644 --- a/snuba/admin/iam_policy/iam_policy.json +++ b/snuba/admin/iam_policy/iam_policy.json @@ -32,7 +32,8 @@ }, { "members": [ - "group:team-starfish@sentry.io" + "group:team-starfish@sentry.io", + "user:iker.barriocanal@sentry.io" ], "role": "roles/CardinalityAnalyzer" } diff --git a/snuba/admin/package.json b/snuba/admin/package.json index fa0a533f9e..10838e3540 100644 --- a/snuba/admin/package.json +++ b/snuba/admin/package.json @@ -7,12 +7,12 @@ "test": "jest" }, "dependencies": { - "@sentry/react": "^7.53.1", "@tiptap/extension-code-block-lowlight": "^2.0.3", "@tiptap/extension-placeholder": "^2.0.3", "@tiptap/pm": "^2.0.3", + "@sentry/react": "^7.56.0", "@types/react": "^18.0.20", - "@types/react-dom": "^18.0.6", + "@types/react-dom": "^18.2.6", "jest-dom": "^4.0.0", "lowlight": "^2.9.0", "react": "^18.2.0", @@ -44,7 +44,7 @@ "resize-observer-polyfill": "^1.5.1", "ts-jest": "^29.0.5", "use-resize-observer": "^9.1.0", - "webpack": "^5.74.0", + "webpack": "^5.88.0", "webpack-cli": "^4.10.0" }, "volta": { diff --git a/snuba/admin/static/api_client.tsx b/snuba/admin/static/api_client.tsx index bbf05c24d4..eafe5c226b 100644 --- a/snuba/admin/static/api_client.tsx +++ b/snuba/admin/static/api_client.tsx @@ -25,7 +25,10 @@ import { SnQLRequest, SnQLResult, SnubaDatasetName } from "./snql_to_sql/types"; import { KafkaTopicData } from "./kafka/types"; import { QuerylogRequest, QuerylogResult } from "./querylog/types"; -import { CardinalityQueryRequest, CardinalityQueryResult } from "./cardinality_analyzer/types"; +import { + CardinalityQueryRequest, + CardinalityQueryResult, +} from "./cardinality_analyzer/types"; import { AllocationPolicy } from "./capacity_management/types"; @@ -57,7 +60,9 @@ interface Client { getPredefinedQuerylogOptions: () => Promise<[PredefinedQuery]>; getQuerylogSchema: () => Promise; executeQuerylogQuery: (req: QuerylogRequest) => Promise; - executeCardinalityQuery: (req: CardinalityQueryRequest) => Promise; + executeCardinalityQuery: ( + req: CardinalityQueryRequest + ) => Promise; getAllMigrationGroups: () => Promise; runMigration: (req: RunMigrationRequest) => Promise; getAllowedTools: () => Promise; @@ -183,7 +188,7 @@ function Client() { }, convertSnQLQuery: (query: SnQLRequest) => { - const url = baseUrl + "snql_to_sql"; + const url = baseUrl + "snuba_debug"; return fetch(url, { headers: { "Content-Type": "application/json" }, method: "POST", diff --git a/snuba/admin/static/index.tsx b/snuba/admin/static/index.tsx index 5e1172c5b8..c2580ccea0 100644 --- a/snuba/admin/static/index.tsx +++ b/snuba/admin/static/index.tsx @@ -32,6 +32,7 @@ client.getSettings().then((settings) => { replaysSessionSampleRate: settings.replaysSessionSampleRate, replaysOnErrorSampleRate: settings.replaysOnErrorSampleRate, }); + Sentry.setUser({ email: settings.userEmail }); } }); diff --git a/snuba/admin/static/types.tsx b/snuba/admin/static/types.tsx index e1e2884c22..b90a01891a 100644 --- a/snuba/admin/static/types.tsx +++ b/snuba/admin/static/types.tsx @@ -7,6 +7,7 @@ type Settings = { tracesSampleRate: number; replaysSessionSampleRate: number; replaysOnErrorSampleRate: number; + userEmail: string; }; export { AllowedTools, Settings }; diff --git a/snuba/admin/tool_policies.py b/snuba/admin/tool_policies.py index 319b581dbe..eeda63e2d7 100644 --- a/snuba/admin/tool_policies.py +++ b/snuba/admin/tool_policies.py @@ -28,6 +28,7 @@ class AdminTools(Enum): KAFKA = "kafka" CAPACITY_MANAGEMENT = "capacity-management" CARDINALITY_ANALYZER = "cardinality-analyzer" + SNUBA_EXPLAIN = "snuba_explain" DEVELOPER_TOOLS: set[AdminTools] = {AdminTools.SNQL_TO_SQL, AdminTools.QUERY_TRACING} diff --git a/snuba/admin/views.py b/snuba/admin/views.py index 924a33dd2c..76c32962df 100644 --- a/snuba/admin/views.py +++ b/snuba/admin/views.py @@ -6,6 +6,7 @@ from dataclasses import asdict from typing import Any, List, Mapping, Optional, Sequence, Tuple, cast +import sentry_sdk import simplejson as json import structlog from flask import Flask, Response, g, jsonify, make_response, request @@ -64,6 +65,7 @@ from snuba.migrations.groups import MigrationGroup from snuba.migrations.runner import MigrationKey, Runner from snuba.query.exceptions import InvalidQueryException +from snuba.state.explain_meta import explain_cleanup, get_explain_meta from snuba.utils.metrics.timer import Timer from snuba.web.views import dataset_query @@ -96,7 +98,9 @@ def authorize() -> None: if request.endpoint != "health": user = authorize_request() logger.info("authorize.finished", user=user) - g.user = user + with sentry_sdk.push_scope() as scope: + scope.user = {"email": user.email} + g.user = user @application.route("/") @@ -123,6 +127,7 @@ def settings_endpoint() -> Response: "tracesSampleRate": settings.ADMIN_TRACE_SAMPLE_RATE, "replaysSessionSampleRate": settings.ADMIN_REPLAYS_SAMPLE_RATE, "replaysOnErrorSampleRate": settings.ADMIN_REPLAYS_SAMPLE_RATE_ON_ERROR, + "userEmail": g.user.email, } ), 200, @@ -721,15 +726,23 @@ def snuba_datasets() -> Response: ) -@application.route("/snql_to_sql", methods=["POST"]) -@check_tool_perms(tools=[AdminTools.SNQL_TO_SQL]) -def snql_to_sql() -> Response: +@application.route("/snuba_debug", methods=["POST"]) +@check_tool_perms(tools=[AdminTools.SNQL_TO_SQL, AdminTools.SNUBA_EXPLAIN]) +def snuba_debug() -> Response: body = json.loads(request.data) body["debug"] = True body["dry_run"] = True try: dataset = get_dataset(body.pop("dataset")) - return dataset_query(dataset, body, Timer("admin")) + response = dataset_query(dataset, body, Timer("admin")) + data = response.get_json() + assert isinstance(data, dict) + + meta = get_explain_meta() + if meta: + data["explain"] = asdict(meta) + response.data = json.dumps(data) + return response except InvalidQueryException as exception: return Response( json.dumps({"error": {"message": str(exception)}}, indent=4), @@ -742,6 +755,8 @@ def snql_to_sql() -> Response: 400, {"Content-Type": "application/json"}, ) + finally: + explain_cleanup() @application.route("/storages_with_allocation_policies") diff --git a/snuba/admin/yarn.lock b/snuba/admin/yarn.lock index 355b6d57c2..dd97aebb33 100644 --- a/snuba/admin/yarn.lock +++ b/snuba/admin/yarn.lock @@ -963,68 +963,68 @@ uncontrollable "^7.2.1" warning "^4.0.3" -"@sentry-internal/tracing@7.53.1": - version "7.53.1" - resolved "https://registry.npmjs.org/@sentry-internal/tracing/-/tracing-7.53.1.tgz" - integrity sha512-a4H4rvVdz0XDGgNfRqc7zg6rMt2P1P05xBmgfIfztYy94Vciw1QMdboNiT7einr8ra8wogdEaK4Pe2AzYAPBJQ== - dependencies: - "@sentry/core" "7.53.1" - "@sentry/types" "7.53.1" - "@sentry/utils" "7.53.1" +"@sentry-internal/tracing@7.56.0": + version "7.56.0" + resolved "https://registry.yarnpkg.com/@sentry-internal/tracing/-/tracing-7.56.0.tgz#ba709258f2f0f3d8a36f9740403088b39212b843" + integrity sha512-OKI4Pz/O13gng8hT9rNc+gRV3+P7nnk1HnHlV8fgaQydS6DsRxoDL1sHa42tZGbh7K9jqNAP3TC6VjBOsr2tXA== + dependencies: + "@sentry/core" "7.56.0" + "@sentry/types" "7.56.0" + "@sentry/utils" "7.56.0" tslib "^1.9.3" -"@sentry/browser@7.53.1": - version "7.53.1" - resolved "https://registry.npmjs.org/@sentry/browser/-/browser-7.53.1.tgz" - integrity sha512-1zas2R6riJaj0k7FoeieCW0SuC7UyKaBGA6jEG2LsgIqyD7IDOlF3BPZ4Yt08GFav0ImpyhGn5Vbrq5JLbeQdw== +"@sentry/browser@7.56.0": + version "7.56.0" + resolved "https://registry.yarnpkg.com/@sentry/browser/-/browser-7.56.0.tgz#6bf3ff21bc2e9b66a72ea0c7dcc3572fdeb3bd8f" + integrity sha512-qpyyw+NM/psbNAYMlTCCdWwxHHcogppEQ+Q40jGE4sKP2VRIjjyteJkUcaEMoGpbJXx9puzTWbpzqlQ8r15Now== dependencies: - "@sentry-internal/tracing" "7.53.1" - "@sentry/core" "7.53.1" - "@sentry/replay" "7.53.1" - "@sentry/types" "7.53.1" - "@sentry/utils" "7.53.1" + "@sentry-internal/tracing" "7.56.0" + "@sentry/core" "7.56.0" + "@sentry/replay" "7.56.0" + "@sentry/types" "7.56.0" + "@sentry/utils" "7.56.0" tslib "^1.9.3" -"@sentry/core@7.53.1": - version "7.53.1" - resolved "https://registry.npmjs.org/@sentry/core/-/core-7.53.1.tgz" - integrity sha512-DAH8IJNORJJ7kQLqsZuhMkN6cwJjXzFuuUoZor7IIDHIHjtl51W+2F3Stg3+I3ZoKDfJfUNKqhipk2WZjG0FBg== +"@sentry/core@7.56.0": + version "7.56.0" + resolved "https://registry.yarnpkg.com/@sentry/core/-/core-7.56.0.tgz#f4253e0d61f55444180a63e5278b62e57303f7cf" + integrity sha512-Nuyyfh09Yz27kPo74fXHlrdmZeK6zrlJVtxQ6LkwuoaTBcNcesNXVaOtr6gjvUGUmsfriVPP3Jero5LXufV7GQ== dependencies: - "@sentry/types" "7.53.1" - "@sentry/utils" "7.53.1" + "@sentry/types" "7.56.0" + "@sentry/utils" "7.56.0" tslib "^1.9.3" -"@sentry/react@^7.53.1": - version "7.53.1" - resolved "https://registry.npmjs.org/@sentry/react/-/react-7.53.1.tgz" - integrity sha512-eEOY/peBepSD/nhPn4SU77aYdjQfAI1svOqpG4sbpjaGZU1P6L7+IIGmip8l2T68oPEeKDaiH9Qy/3uxu55B/Q== +"@sentry/react@^7.56.0": + version "7.56.0" + resolved "https://registry.yarnpkg.com/@sentry/react/-/react-7.56.0.tgz#7e2e9363a76c7d67854bdb179142a2f7f910d476" + integrity sha512-dRnkZwspef5aEHV/eiLS/mhomFCXItylU8s78fzAn5QMTDGHmu5Pnhl5dxh/zbPUcdXqFA6GwJVucV4gzsNEJw== dependencies: - "@sentry/browser" "7.53.1" - "@sentry/types" "7.53.1" - "@sentry/utils" "7.53.1" + "@sentry/browser" "7.56.0" + "@sentry/types" "7.56.0" + "@sentry/utils" "7.56.0" hoist-non-react-statics "^3.3.2" tslib "^1.9.3" -"@sentry/replay@7.53.1": - version "7.53.1" - resolved "https://registry.npmjs.org/@sentry/replay/-/replay-7.53.1.tgz" - integrity sha512-5He5JLJiYLeWtXHC53z2ZzfbgAedafbHNZVS4+MBCOtydCk7cnuyJ0gGV6Rfxej/lZSNXZxOdW7HeMhzBtZCxw== +"@sentry/replay@7.56.0": + version "7.56.0" + resolved "https://registry.yarnpkg.com/@sentry/replay/-/replay-7.56.0.tgz#8a49dcb45e9ea83bf905cec0d9b42fed4b8085bd" + integrity sha512-bvjiJK1+SM/paLapuL+nEJ8CIF1bJqi0nnFyxUIi2L5L6yb2uMwfyT3IQ+kz0cXJsLdb3HN4WMphVjyiU9pFdg== dependencies: - "@sentry/core" "7.53.1" - "@sentry/types" "7.53.1" - "@sentry/utils" "7.53.1" + "@sentry/core" "7.56.0" + "@sentry/types" "7.56.0" + "@sentry/utils" "7.56.0" -"@sentry/types@7.53.1": - version "7.53.1" - resolved "https://registry.npmjs.org/@sentry/types/-/types-7.53.1.tgz" - integrity sha512-/ijchRIu+jz3+j/zY+7KRPfLSCY14fTx5xujjbOdmEKjmIHQmwPBdszcQm40uwofrR8taV4hbt5MFN+WnjCkCw== +"@sentry/types@7.56.0": + version "7.56.0" + resolved "https://registry.yarnpkg.com/@sentry/types/-/types-7.56.0.tgz#9042a099cf9e8816d081886d24b88082a5d9f87a" + integrity sha512-5WjhVOQm75ItOytOx2jTx+5yw8/qJ316+g1Di8dS9+kgIi1zniqdMcX00C2yYe3FMUgFB49PegCUYulm9Evapw== -"@sentry/utils@7.53.1": - version "7.53.1" - resolved "https://registry.npmjs.org/@sentry/utils/-/utils-7.53.1.tgz" - integrity sha512-DKJA1LSUOEv4KOR828MzVuLh+drjeAgzyKgN063OEKmnirgjgRgNNS8wUgwpG0Tn2k6ANZGCwrdfzPeSBxshKg== +"@sentry/utils@7.56.0": + version "7.56.0" + resolved "https://registry.yarnpkg.com/@sentry/utils/-/utils-7.56.0.tgz#e60e4935d17b2197584abf6ce61b522ad055352c" + integrity sha512-wgeX7bufxc//TjjSIE+gCMm8hVId7Jzvc+f441bYrWnNZBuzPIDW2BummCcPrKzSYe5GeYZDTZGV8YZGMLGBjw== dependencies: - "@sentry/types" "7.53.1" + "@sentry/types" "7.56.0" tslib "^1.9.3" "@sinclair/typebox@^0.25.16": @@ -1445,17 +1445,10 @@ resolved "https://registry.npmjs.org/@types/prop-types/-/prop-types-15.7.4.tgz" integrity sha512-rZ5drC/jWjrArrS8BR6SIr4cWpW09RNTYt9AMZo3Jwwif+iacXAqgVjm0B0Bv/S1jhDXKHqRVNCbACkJ89RAnQ== -"@types/react-dom@^18.0.0": - version "18.0.11" - resolved "https://registry.npmjs.org/@types/react-dom/-/react-dom-18.0.11.tgz" - integrity sha512-O38bPbI2CWtgw/OoQoY+BRelw7uysmXbWvw3nLWO21H1HSh+GOlqPuXshJfjmpNlKiiSDG9cc1JZAaMmVdcTlw== - dependencies: - "@types/react" "*" - -"@types/react-dom@^18.0.6": - version "18.0.6" - resolved "https://registry.npmjs.org/@types/react-dom/-/react-dom-18.0.6.tgz" - integrity sha512-/5OFZgfIPSwy+YuIBP/FgJnQnsxhZhjjrnxudMddeblOouIodEQ75X14Rr4wGSG/bknL+Omy9iWlLo1u/9GzAA== +"@types/react-dom@^18.0.0", "@types/react-dom@^18.2.6": + version "18.2.6" + resolved "https://registry.yarnpkg.com/@types/react-dom/-/react-dom-18.2.6.tgz#ad621fa71a8db29af7c31b41b2ea3d8a6f4144d1" + integrity sha512-2et4PDvg6PVCyS7fuTc4gPoksV58bW0RwSxWKcPRcHZf0PRUGq03TKcD/rUHe3azfV6/5/biUBJw+HhCQjaP0A== dependencies: "@types/react" "*" @@ -1685,26 +1678,21 @@ acorn-globals@^7.0.0: acorn "^8.1.0" acorn-walk "^8.0.2" -acorn-import-assertions@^1.7.6: - version "1.8.0" - resolved "https://registry.npmjs.org/acorn-import-assertions/-/acorn-import-assertions-1.8.0.tgz" - integrity sha512-m7VZ3jwz4eK6A4Vtt8Ew1/mNbP24u0FhdyfA7fSvnJR6LMdfOYnmuIrrJAgrYfYJ10F/otaHTtrtrtmHdMNzEw== +acorn-import-assertions@^1.9.0: + version "1.9.0" + resolved "https://registry.yarnpkg.com/acorn-import-assertions/-/acorn-import-assertions-1.9.0.tgz#507276249d684797c84e0734ef84860334cfb1ac" + integrity sha512-cmMwop9x+8KFhxvKrKfPYmN6/pKTYYHBqLa0DfvVZcKMJWNyWLnaqND7dx/qn66R7ewM1UX5XMaDVP5wlVTaVA== acorn-walk@^8.0.2: version "8.2.0" resolved "https://registry.npmjs.org/acorn-walk/-/acorn-walk-8.2.0.tgz" integrity sha512-k+iyHEuPgSw6SbuDpGQM+06HQUa04DZ3o+F6CSzXMvvI5KMvnaEqXe+YVe555R9nn6GPt404fos4wcgpw12SDA== -acorn@^8.1.0, acorn@^8.8.1: +acorn@^8.1.0, acorn@^8.5.0, acorn@^8.7.1, acorn@^8.8.1: version "8.8.2" resolved "https://registry.npmjs.org/acorn/-/acorn-8.8.2.tgz" integrity sha512-xjIYgE8HBrkpd/sJqOGNspf8uHG+NOHGOw6a/Urj8taM2EXfdNAH2oFcPeIFfsv3+kz/mJrS5VuMqbNLjCa2vw== -acorn@^8.5.0, acorn@^8.7.1: - version "8.8.0" - resolved "https://registry.npmjs.org/acorn/-/acorn-8.8.0.tgz" - integrity sha512-QOxyigPVrpZ2GXT+PFyZTl6TtOFc5egxHIP9IlQ+RbupQuX4RkT/Bee4/kQuC02Xkzg84JcT7oLYtDIQxp+v7w== - agent-base@6: version "6.0.2" resolved "https://registry.npmjs.org/agent-base/-/agent-base-6.0.2.tgz" @@ -1904,18 +1892,7 @@ braces@^3.0.1, braces@^3.0.2: dependencies: fill-range "^7.0.1" -browserslist@^4.14.5: - version "4.17.5" - resolved "https://registry.npmjs.org/browserslist/-/browserslist-4.17.5.tgz" - integrity sha512-I3ekeB92mmpctWBoLXe0d5wPS2cBuRvvW0JyyJHMrk9/HmP2ZjrTboNAZ8iuGqaEIlKguljbQY32OkOJIRrgoA== - dependencies: - caniuse-lite "^1.0.30001271" - electron-to-chromium "^1.3.878" - escalade "^3.1.1" - node-releases "^2.0.1" - picocolors "^1.0.0" - -browserslist@^4.21.3: +browserslist@^4.14.5, browserslist@^4.21.3: version "4.21.5" resolved "https://registry.npmjs.org/browserslist/-/browserslist-4.21.5.tgz" integrity sha512-tUkiguQGW7S3IhB7N+c2MV/HZPSCPAAiYBZXLsBhFB/PCy6ZKKsZrmBayHV9fdGV/ARIfJ14NkxKzRDjvp7L6w== @@ -1967,11 +1944,6 @@ camelcase@^6.2.0: resolved "https://registry.npmjs.org/camelcase/-/camelcase-6.3.0.tgz" integrity sha512-Gmy6FhYlCY7uOElZUSbxo2UCDH8owEk996gkbrpsgGtrJLM3J7jGxl9Ic7Qwwj4ivOE5AWZWRMecDdF7hqGjFA== -caniuse-lite@^1.0.30001271: - version "1.0.30001274" - resolved "https://registry.npmjs.org/caniuse-lite/-/caniuse-lite-1.0.30001274.tgz" - integrity sha512-+Nkvv0fHyhISkiMIjnyjmf5YJcQ1IQHZN6U9TLUMroWR38FNwpsC51Gb68yueafX1V6ifOisInSgP9WJFS13ew== - caniuse-lite@^1.0.30001449: version "1.0.30001465" resolved "https://registry.npmjs.org/caniuse-lite/-/caniuse-lite-1.0.30001465.tgz" @@ -2287,11 +2259,6 @@ domexception@^4.0.0: dependencies: webidl-conversions "^7.0.0" -electron-to-chromium@^1.3.878: - version "1.3.886" - resolved "https://registry.npmjs.org/electron-to-chromium/-/electron-to-chromium-1.3.886.tgz" - integrity sha512-+vYdeBosI63VkCtNWnEVFjgNd/IZwvnsWkKyPtWAvrhA+XfByKoBJcbsMgudVU/bUcGAF9Xp3aXn96voWlc3oQ== - electron-to-chromium@^1.4.284: version "1.4.328" resolved "https://registry.npmjs.org/electron-to-chromium/-/electron-to-chromium-1.4.328.tgz" @@ -2307,10 +2274,10 @@ emoji-regex@^8.0.0: resolved "https://registry.npmjs.org/emoji-regex/-/emoji-regex-8.0.0.tgz" integrity sha512-MSjYzcWNOA0ewAHpz0MxpYFvwg6yjy1NG3xteoqz644VCo/RPgnr1/GGt+ic3iJTzQ8Eu3TdM14SawnVUmGE6A== -enhanced-resolve@^5.0.0, enhanced-resolve@^5.13.0: - version "5.13.0" - resolved "https://registry.npmjs.org/enhanced-resolve/-/enhanced-resolve-5.13.0.tgz" - integrity sha512-eyV8f0y1+bzyfh8xAwW/WTSZpLbjhqc4ne9eGSH4Zo2ejdyiNG9pU6mf9DG8a7+Auk6MFTlNOT4Y2y/9k8GKVg== +enhanced-resolve@^5.0.0, enhanced-resolve@^5.15.0: + version "5.15.0" + resolved "https://registry.yarnpkg.com/enhanced-resolve/-/enhanced-resolve-5.15.0.tgz#1af946c7d93603eb88e9896cee4904dc012e9c35" + integrity sha512-LXYT42KJ7lpIKECr2mAXIaMldcNCh/7E0KBKOu4KSfkHmP+mZmSs+8V5gBAqisWBy0OO4W5Oyys0GO1Y8KtdKg== dependencies: graceful-fs "^4.2.4" tapable "^2.2.0" @@ -2618,12 +2585,7 @@ gopd@^1.0.1: dependencies: get-intrinsic "^1.1.3" -graceful-fs@^4.1.2: - version "4.2.8" - resolved "https://registry.npmjs.org/graceful-fs/-/graceful-fs-4.2.8.tgz" - integrity sha512-qkIilPUYcNhJpd33n0GBXTB1MMPp14TxEsEs0pTrsSVucApsYzW5V+Q8Qxhik6KU3evy+qkAAowTByymK0avdg== - -graceful-fs@^4.2.4, graceful-fs@^4.2.9: +graceful-fs@^4.1.2, graceful-fs@^4.2.4, graceful-fs@^4.2.9: version "4.2.10" resolved "https://registry.npmjs.org/graceful-fs/-/graceful-fs-4.2.10.tgz" integrity sha512-9ByhssR2fPVsNZj478qUUbKfmL0+t5BDVyjShtyZZLiK7ZDAArFFfopyOTj0M05wE2tJPisA4iTnnXl2YoPvOA== @@ -3608,30 +3570,18 @@ micromatch@^4.0.4: braces "^3.0.2" picomatch "^2.3.1" -mime-db@1.50.0: - version "1.50.0" - resolved "https://registry.npmjs.org/mime-db/-/mime-db-1.50.0.tgz" - integrity sha512-9tMZCDlYHqeERXEHO9f/hKfNXhre5dK2eE/krIvUjZbS2KPcqGDfNShIWS1uW9XOTKQKqK6qbeOci18rbfW77A== - mime-db@1.52.0: version "1.52.0" resolved "https://registry.npmjs.org/mime-db/-/mime-db-1.52.0.tgz" integrity sha512-sPU4uV7dYlvtWJxwwxHD0PuihVNiE7TyAbQ5SWxDCB9mUYvOgroQOwYQQOKPJ8CIbE+1ETVlOoK1UC2nU3gYvg== -mime-types@^2.1.12: +mime-types@^2.1.12, mime-types@^2.1.27: version "2.1.35" resolved "https://registry.npmjs.org/mime-types/-/mime-types-2.1.35.tgz" integrity sha512-ZDY+bPm5zTTF+YpCrAU9nK0UgICYPT0QtT1NZWFv4s++TNkcgVaT0g6+4R2uI4MjQjzysHB1zxuWL50hzaeXiw== dependencies: mime-db "1.52.0" -mime-types@^2.1.27: - version "2.1.33" - resolved "https://registry.npmjs.org/mime-types/-/mime-types-2.1.33.tgz" - integrity sha512-plLElXp7pRDd0bNZHw+nMd52vRYjLwQjygaNg7ddJ2uJtTlmnTCjWuPKxVu6//AdaRuME84SvLW91sIkBqGT0g== - dependencies: - mime-db "1.50.0" - mimic-fn@^2.1.0: version "2.1.0" resolved "https://registry.npmjs.org/mimic-fn/-/mimic-fn-2.1.0.tgz" @@ -3664,11 +3614,6 @@ node-int64@^0.4.0: resolved "https://registry.npmjs.org/node-int64/-/node-int64-0.4.0.tgz" integrity sha512-O5lz91xSOeoXP6DulyHfllpq+Eg00MWitZIbtPfoSEvqIHdl5gfcY6hYzDWnj0qD5tz52PI08u9qUvSVeUBeHw== -node-releases@^2.0.1: - version "2.0.1" - resolved "https://registry.npmjs.org/node-releases/-/node-releases-2.0.1.tgz" - integrity sha512-CqyzN6z7Q6aMeF/ktcMVTzhAHCEpf8SOarwpzpf8pNBY2k5/oM34UHldUwp8VKI7uxct2HxSRdJjBaZeESzcxA== - node-releases@^2.0.8: version "2.0.10" resolved "https://registry.npmjs.org/node-releases/-/node-releases-2.0.10.tgz" @@ -4317,10 +4262,10 @@ scheduler@^0.23.0: dependencies: loose-envify "^1.1.0" -schema-utils@^3.1.1, schema-utils@^3.1.2: - version "3.1.2" - resolved "https://registry.npmjs.org/schema-utils/-/schema-utils-3.1.2.tgz" - integrity sha512-pvjEHOgWc9OWA/f/DE3ohBWTD6EleVLf7iFUkoSwAxttdBhB9QUebQgxER2kWueOvRJXPHNnyrvvh9eZINB8Eg== +schema-utils@^3.1.1, schema-utils@^3.2.0: + version "3.3.0" + resolved "https://registry.yarnpkg.com/schema-utils/-/schema-utils-3.3.0.tgz#f50a88877c3c01652a15b622ae9e9795df7a60fe" + integrity sha512-pN/yOAvcC+5rQ5nERGuwrjLlYvLTbCibnZ1I7B1LaiAz9BRBlE9GMgE/eqV30P7aJQUf7Ddimy/RsbYO/GrVGg== dependencies: "@types/json-schema" "^7.0.8" ajv "^6.12.5" @@ -4828,10 +4773,10 @@ webpack-sources@^3.2.3: resolved "https://registry.npmjs.org/webpack-sources/-/webpack-sources-3.2.3.tgz" integrity sha512-/DyMEOrDgLKKIG0fmvtz+4dUX/3Ghozwgm6iPp8KRhvn+eQf9+Q7GWxVNMk3+uCPWfdXYC4ExGBckIXdFEfH1w== -webpack@^5.74.0: - version "5.81.0" - resolved "https://registry.npmjs.org/webpack/-/webpack-5.81.0.tgz" - integrity sha512-AAjaJ9S4hYCVODKLQTgG5p5e11hiMawBwV2v8MYLE0C/6UAGLuAF4n1qa9GOwdxnicaP+5k6M5HrLmD4+gIB8Q== +webpack@^5.88.0: + version "5.88.0" + resolved "https://registry.yarnpkg.com/webpack/-/webpack-5.88.0.tgz#a07aa2f8e7a64a8f1cec0c6c2e180e3cb34440c8" + integrity sha512-O3jDhG5e44qIBSi/P6KpcCcH7HD+nYIHVBhdWFxcLOcIGN8zGo5nqF3BjyNCxIh4p1vFdNnreZv2h2KkoAw3lw== dependencies: "@types/eslint-scope" "^3.7.3" "@types/estree" "^1.0.0" @@ -4839,10 +4784,10 @@ webpack@^5.74.0: "@webassemblyjs/wasm-edit" "^1.11.5" "@webassemblyjs/wasm-parser" "^1.11.5" acorn "^8.7.1" - acorn-import-assertions "^1.7.6" + acorn-import-assertions "^1.9.0" browserslist "^4.14.5" chrome-trace-event "^1.0.2" - enhanced-resolve "^5.13.0" + enhanced-resolve "^5.15.0" es-module-lexer "^1.2.1" eslint-scope "5.1.1" events "^3.2.0" @@ -4852,7 +4797,7 @@ webpack@^5.74.0: loader-runner "^4.2.0" mime-types "^2.1.27" neo-async "^2.6.2" - schema-utils "^3.1.2" + schema-utils "^3.2.0" tapable "^2.1.1" terser-webpack-plugin "^5.3.7" watchpack "^2.4.0" diff --git a/snuba/cli/consumer.py b/snuba/cli/consumer.py index d77bba7da8..3824c27875 100644 --- a/snuba/cli/consumer.py +++ b/snuba/cli/consumer.py @@ -71,13 +71,31 @@ "--max-batch-size", default=settings.DEFAULT_MAX_BATCH_SIZE, type=int, - help="Max number of messages to batch in memory before writing to Kafka.", + help=( + "Max number of messages to batch in memory.\n\n" + "Batching parameters apply to three steps: Batching of messages for " + "processing them (=transforming them into ClickHouse rows), batching for" + "the INSERT statement, and batching of offset commits.\n\n" + "Commits are additionally debounced to happen at most once per second." + ), ) @click.option( "--max-batch-time-ms", default=settings.DEFAULT_MAX_BATCH_TIME_MS, type=int, - help="Max length of time to buffer messages in memory before writing to Kafka.", + help="Max duration to buffer messages in memory for.", +) +@click.option( + "--max-insert-batch-size", + default=None, + type=int, + help="Max number of messages to batch in memory for inserts into ClickHouse. Defaults to --max-batch-size", +) +@click.option( + "--max-insert-batch-time-ms", + default=None, + type=int, + help="Max duration to batch in memory for inserts into ClickHouse. Defaults to --max-batch-time-ms", ) @click.option( "--auto-offset-reset", @@ -116,6 +134,13 @@ type=int, ) @click.option("--join-timeout", type=int, help="Join timeout in seconds.", default=5) +@click.option( + "--enforce-schema", + type=bool, + is_flag=True, + default=False, + help="Enforce schema on the raw events topic.", +) @click.option( "--profile-path", type=click.Path(dir_okay=True, file_okay=False, exists=True) ) @@ -136,6 +161,8 @@ def consumer( slice_id: Optional[int], max_batch_size: int, max_batch_time_ms: int, + max_insert_batch_size: Optional[int], + max_insert_batch_time_ms: Optional[int], auto_offset_reset: str, no_strict_offset_reset: bool, queued_max_messages_kbytes: int, @@ -144,6 +171,7 @@ def consumer( input_block_size: Optional[int], output_block_size: Optional[int], join_timeout: int = 5, + enforce_schema: bool = False, log_level: Optional[str] = None, profile_path: Optional[str] = None, max_poll_interval_ms: Optional[int] = None, @@ -196,11 +224,14 @@ def consumer( ), max_batch_size=max_batch_size, max_batch_time_ms=max_batch_time_ms, + max_insert_batch_size=max_insert_batch_size, + max_insert_batch_time_ms=max_insert_batch_time_ms, metrics=metrics, profile_path=profile_path, slice_id=slice_id, join_timeout=join_timeout, max_poll_interval_ms=max_poll_interval_ms, + enforce_schema=enforce_schema, ) consumer = consumer_builder.build_base_consumer() diff --git a/snuba/cli/devserver.py b/snuba/cli/devserver.py index 15c7280600..b66a824710 100644 --- a/snuba/cli/devserver.py +++ b/snuba/cli/devserver.py @@ -46,6 +46,7 @@ def devserver(*, bootstrap: bool, workers: bool) -> None: "--log-level=debug", "--storage=transactions", "--consumer-group=transactions_group", + "--enforce-schema", ], ), ( @@ -70,6 +71,7 @@ def devserver(*, bootstrap: bool, workers: bool) -> None: "--log-level=debug", "--storage=outcomes_raw", "--consumer-group=outcomes_group", + "--enforce-schema", ], ), ( @@ -81,6 +83,7 @@ def devserver(*, bootstrap: bool, workers: bool) -> None: "--no-strict-offset-reset", "--log-level=debug", "--storage=errors", + "--enforce-schema", ], ), ( @@ -213,6 +216,7 @@ def devserver(*, bootstrap: bool, workers: bool) -> None: "--no-strict-offset-reset", "--log-level=debug", "--consumer-group=snuba-metrics-consumers", + "--enforce-schema", ], ), ( @@ -225,6 +229,7 @@ def devserver(*, bootstrap: bool, workers: bool) -> None: "--no-strict-offset-reset", "--log-level=debug", "--consumer-group=snuba-gen-metrics-distributions-consumers", + "--enforce-schema", ], ), ( @@ -237,6 +242,7 @@ def devserver(*, bootstrap: bool, workers: bool) -> None: "--no-strict-offset-reset", "--log-level=debug", "--consumer-group=snuba-gen-metrics-sets-consumers", + "--enforce-schema", ], ), ( @@ -249,6 +255,7 @@ def devserver(*, bootstrap: bool, workers: bool) -> None: "--no-strict-offset-reset", "--log-level=debug", "--consumer-group=snuba-gen-metrics-counters-consumers", + "--enforce-schema", ], ), ] @@ -428,6 +435,7 @@ def devserver(*, bootstrap: bool, workers: bool) -> None: "--log-level=debug", "--storage=spans", "--consumer-group=spans_group", + "--enforce-schema", ], ), ] diff --git a/snuba/cli/dlq_consumer.py b/snuba/cli/dlq_consumer.py index 10afcd2786..612bfff8fd 100644 --- a/snuba/cli/dlq_consumer.py +++ b/snuba/cli/dlq_consumer.py @@ -46,6 +46,18 @@ type=int, help="Max length of time to buffer messages in memory before writing to Kafka.", ) +@click.option( + "--max-insert-batch-size", + default=None, + type=int, + help="Max number of messages to batch in memory for inserts into ClickHouse. Defaults to --max-batch-size", +) +@click.option( + "--max-insert-batch-time-ms", + default=None, + type=int, + help="Max duration to batch in memory for inserts into ClickHouse. Defaults to --max-batch-time-ms", +) @click.option( "--auto-offset-reset", default="error", @@ -87,6 +99,8 @@ def dlq_consumer( consumer_group: str, max_batch_size: int, max_batch_time_ms: int, + max_insert_batch_size: int, + max_insert_batch_time_ms: int, auto_offset_reset: str, no_strict_offset_reset: bool, queued_max_messages_kbytes: int, @@ -166,9 +180,12 @@ def handler(signum: int, frame: Any) -> None: ), max_batch_size=max_batch_size, max_batch_time_ms=max_batch_time_ms, + max_insert_batch_size=max_insert_batch_size, + max_insert_batch_time_ms=max_insert_batch_time_ms, metrics=metrics, slice_id=instruction.slice_id, join_timeout=None, + enforce_schema=False, ) consumer = consumer_builder.build_dlq_consumer(instruction) diff --git a/snuba/consumers/consumer.py b/snuba/consumers/consumer.py index deb75dd3f0..b67828b858 100644 --- a/snuba/consumers/consumer.py +++ b/snuba/consumers/consumer.py @@ -32,12 +32,12 @@ from arroyo.processing.strategies import ( CommitOffsets, FilterStep, - ParallelTransformStep, ProcessingStrategy, ProcessingStrategyFactory, Reduce, + RunTask, RunTaskInThreads, - TransformStep, + RunTaskWithMultiprocessing, ) from arroyo.types import ( BaseValue, @@ -55,7 +55,7 @@ from snuba import environment, state from snuba.clickhouse.http import JSONRow, JSONRowEncoder, ValuesRowEncoder -from snuba.consumers.schemas import get_json_codec +from snuba.consumers.schemas import _NOOP_CODEC, get_json_codec from snuba.consumers.types import KafkaMessageMetadata from snuba.datasets.storage import WritableTableStorage from snuba.datasets.storages.factory import get_writable_storage @@ -523,6 +523,7 @@ def process_message( processor: MessageProcessor, consumer_group: str, snuba_logical_topic: SnubaTopic, + enforce_schema: bool, message: Message[KafkaPayload], ) -> Union[None, BytesInsertBatch, ReplacementBatch]: local_metrics = MetricsWrapper( @@ -553,6 +554,10 @@ def process_message( scope.set_tag("snuba_logical_topic", snuba_logical_topic.name) try: + # Occasionally log errors if no validator is configured + if codec == _NOOP_CODEC: + raise Exception("No validator configured for topic") + codec.validate(decoded) except Exception as err: local_metrics.increment( @@ -569,6 +574,8 @@ def process_message( _LAST_INVALID_MESSAGE[snuba_logical_topic.name] = start sentry_sdk.set_tag("invalid_message_schema", "true") logger.warning(err, exc_info=True) + if enforce_schema: + raise # TODO: this is not the most efficient place to emit a metric, but # as long as should_validate is behind a sample rate it should be @@ -600,8 +607,6 @@ def process_message( value = message.value raise InvalidMessage(value.partition, value.offset) from err - return None - if isinstance(result, InsertBatch): return BytesInsertBatch( [json_row_encoder.encode(row) for row in result.rows], @@ -830,11 +835,11 @@ def flush_batch( ] if self.__processes is None: - inner_strategy = TransformStep(transform_function, collect) + inner_strategy = RunTask(transform_function, collect) else: assert self.__input_block_size is not None assert self.__output_block_size is not None - inner_strategy = ParallelTransformStep( + inner_strategy = RunTaskWithMultiprocessing( transform_function, collect, self.__processes, @@ -845,7 +850,7 @@ def flush_batch( initializer=self.__initialize_parallel_transform, ) - return TransformStep( + return RunTask( partial(find_destination_storages, self.__storages), FilterStep( has_destination_storages, diff --git a/snuba/consumers/consumer_builder.py b/snuba/consumers/consumer_builder.py index be6285de46..23f74cde0b 100644 --- a/snuba/consumers/consumer_builder.py +++ b/snuba/consumers/consumer_builder.py @@ -10,7 +10,7 @@ build_kafka_configuration, build_kafka_consumer_configuration, ) -from arroyo.commit import IMMEDIATE +from arroyo.commit import ONCE_PER_SECOND from arroyo.dlq import DlqLimit, DlqPolicy, KafkaDlqProducer, NoopDlqProducer from arroyo.processing import StreamProcessor from arroyo.processing.strategies import ProcessingStrategyFactory @@ -68,9 +68,12 @@ def __init__( processing_params: ProcessingParameters, max_batch_size: int, max_batch_time_ms: int, + max_insert_batch_size: Optional[int], + max_insert_batch_time_ms: Optional[int], metrics: MetricsBackend, slice_id: Optional[int], - join_timeout: Optional[int], + join_timeout: Optional[float], + enforce_schema: bool, profile_path: Optional[str] = None, max_poll_interval_ms: Optional[int] = None, ) -> None: @@ -83,6 +86,7 @@ def __init__( self.__consumer_config = consumer_config self.__kafka_params = kafka_params self.consumer_group = kafka_params.group_id + self.__enforce_schema = enforce_schema broker_config = build_kafka_consumer_configuration( self.__consumer_config.raw_topic.broker_config, @@ -130,6 +134,8 @@ def __init__( self.metrics = metrics self.max_batch_size = max_batch_size self.max_batch_time_ms = max_batch_time_ms + self.max_insert_batch_size = max_insert_batch_size + self.max_insert_batch_time_ms = max_insert_batch_time_ms self.group_id = kafka_params.group_id self.auto_offset_reset = kafka_params.auto_offset_reset self.strict_offset_reset = kafka_params.strict_offset_reset @@ -146,6 +152,7 @@ def __init__( def __build_consumer( self, strategy_factory: ProcessingStrategyFactory[KafkaPayload], + input_topic: Topic, dlq_policy: Optional[DlqPolicy[KafkaPayload]], ) -> StreamProcessor[KafkaPayload]: @@ -179,9 +186,9 @@ def log_general_error(e: KafkaError) -> None: return StreamProcessor( consumer, - self.raw_topic, + input_topic, strategy_factory, - IMMEDIATE, + ONCE_PER_SECOND, dlq_policy=dlq_policy, join_timeout=self.join_timeout, ) @@ -212,6 +219,7 @@ def build_streaming_strategy_factory( processor, self.consumer_group, logical_topic, + self.__enforce_schema, ), collector=build_batch_writer( table_writer, @@ -223,6 +231,9 @@ def build_streaming_strategy_factory( ), max_batch_size=self.max_batch_size, max_batch_time=self.max_batch_time_ms / 1000.0, + max_insert_batch_size=self.max_insert_batch_size, + max_insert_batch_time=self.max_insert_batch_time_ms + and self.max_insert_batch_time_ms / 1000.0, processes=self.processes, input_block_size=self.input_block_size, output_block_size=self.output_block_size, @@ -264,6 +275,7 @@ def build_dlq_strategy_factory( processor, self.consumer_group, logical_topic, + self.__enforce_schema, ), collector=build_batch_writer( table_writer, @@ -275,6 +287,9 @@ def build_dlq_strategy_factory( ), max_batch_size=self.max_batch_size, max_batch_time=self.max_batch_time_ms / 1000.0, + max_insert_batch_size=self.max_insert_batch_size, + max_insert_batch_time=self.max_insert_batch_time_ms + and self.max_insert_batch_time_ms / 1000.0, processes=self.processes, input_block_size=self.input_block_size, output_block_size=self.output_block_size, @@ -299,7 +314,9 @@ def build_base_consumer(self) -> StreamProcessor[KafkaPayload]: Builds the consumer. """ return self.__build_consumer( - self.build_streaming_strategy_factory(), self.__build_default_dlq_policy() + self.build_streaming_strategy_factory(), + self.raw_topic, + self.__build_default_dlq_policy(), ) def build_dlq_consumer( @@ -325,8 +342,13 @@ def build_dlq_consumer( else: raise ValueError("Invalid DLQ policy") + dlq_topic = self.__consumer_config.dlq_topic + assert dlq_topic is not None + return self.__build_consumer( - self.build_dlq_strategy_factory(instruction), dlq_policy + self.build_dlq_strategy_factory(instruction), + Topic(dlq_topic.physical_topic_name), + dlq_policy, ) def __build_default_dlq_policy(self) -> Optional[DlqPolicy[KafkaPayload]]: diff --git a/snuba/consumers/dlq.py b/snuba/consumers/dlq.py index 476130d39e..8b1409050b 100644 --- a/snuba/consumers/dlq.py +++ b/snuba/consumers/dlq.py @@ -8,6 +8,7 @@ from typing import Optional, TypeVar import rapidjson +from arroyo.dlq import InvalidMessage from arroyo.processing.strategies.abstract import ProcessingStrategy from arroyo.types import Message @@ -150,7 +151,12 @@ def poll(self) -> None: def submit(self, message: Message[TPayload]) -> None: if self.__processed_messages < self.__num_messages_to_process: self.__last_message_time = time.time() - self.__next_step.submit(message) + + try: + self.__next_step.submit(message) + except InvalidMessage: + self.__processed_messages += 1 + raise self.__processed_messages += 1 def close(self) -> None: diff --git a/snuba/consumers/strategy_factory.py b/snuba/consumers/strategy_factory.py index 4886080edb..df1f09c2a4 100644 --- a/snuba/consumers/strategy_factory.py +++ b/snuba/consumers/strategy_factory.py @@ -8,10 +8,11 @@ ProcessingStrategy, ProcessingStrategyFactory, Reduce, + RunTask, RunTaskInThreads, + RunTaskWithMultiprocessing, ) from arroyo.processing.strategies.commit import CommitOffsets -from arroyo.processing.strategies.transform import ParallelTransformStep, TransformStep from arroyo.types import BaseValue, Commit, FilteredPayload, Message, Partition from snuba.consumers.consumer import BytesInsertBatch, ProcessedMessageBatchWriter @@ -63,6 +64,8 @@ def __init__( processes: Optional[int], input_block_size: Optional[int], output_block_size: Optional[int], + max_insert_batch_size: Optional[int], + max_insert_batch_time: Optional[float], # Passed in the case of DLQ consumer which exits after a certain number of messages # is processed max_messages_to_process: Optional[int] = None, @@ -75,6 +78,8 @@ def __init__( self.__max_batch_size = max_batch_size self.__max_batch_time = max_batch_time + self.__max_insert_batch_size = max_insert_batch_size or max_batch_size + self.__max_insert_batch_time = max_insert_batch_time or max_batch_time if processes is not None: assert input_block_size is not None, "input block size required" @@ -118,8 +123,8 @@ def flush_batch( commit_strategy = CommitOffsets(commit) collect: Reduce[ProcessedMessage, ProcessedMessageBatchWriter] = Reduce( - self.__max_batch_size, - self.__max_batch_time, + self.__max_insert_batch_size, + self.__max_insert_batch_time, accumulator, self.__collector, RunTaskInThreads( @@ -136,11 +141,11 @@ def flush_batch( strategy: ProcessingStrategy[Union[FilteredPayload, KafkaPayload]] if self.__processes is None: - strategy = TransformStep(transform_function, collect) + strategy = RunTask(transform_function, collect) else: assert self.__input_block_size is not None assert self.__output_block_size is not None - strategy = ParallelTransformStep( + strategy = RunTaskWithMultiprocessing( transform_function, collect, self.__processes, diff --git a/snuba/datasets/configuration/issues/entities/search_issues.yaml b/snuba/datasets/configuration/issues/entities/search_issues.yaml index 508030901f..3a81f8cfc6 100644 --- a/snuba/datasets/configuration/issues/entities/search_issues.yaml +++ b/snuba/datasets/configuration/issues/entities/search_issues.yaml @@ -9,6 +9,7 @@ schema: { name: group_id, type: UInt, args: { size: 64 } }, { name: search_title, type: String }, { name: resource_id, type: String }, + { name: message, type: String }, { name: subtitle, type: String }, { name: culprit, type: String }, { name: level, type: String, args: { schema_modifiers: [ low_cardinality ] } }, diff --git a/snuba/datasets/configuration/issues/storages/search_issues.yaml b/snuba/datasets/configuration/issues/storages/search_issues.yaml index 87f79ce9fc..e9e30c71d1 100644 --- a/snuba/datasets/configuration/issues/storages/search_issues.yaml +++ b/snuba/datasets/configuration/issues/storages/search_issues.yaml @@ -23,6 +23,7 @@ schema: { name: resource_id, type: String, args: { schema_modifiers: [ nullable ] } }, + { name: message, type: String }, { name: subtitle, type: String, args: { schema_modifiers: [ nullable ] } }, { name: culprit, type: String, args: { schema_modifiers: [ nullable ] } }, { name: level, type: String, args: { schema_modifiers: [ nullable ] } }, diff --git a/snuba/datasets/processors/errors_processor.py b/snuba/datasets/processors/errors_processor.py index c336908b56..e4137ff6cd 100644 --- a/snuba/datasets/processors/errors_processor.py +++ b/snuba/datasets/processors/errors_processor.py @@ -73,7 +73,7 @@ def process_message( if row is None: # the processor cannot/does not handle this input return None - return InsertBatch([row], None) + return InsertBatch([row], row["received"]) elif type_ in REPLACEMENT_EVENT_TYPES: # pass raw events along to republish return ReplacementBatch(str(event["project_id"]), [message]) diff --git a/snuba/datasets/processors/outcomes_processor.py b/snuba/datasets/processors/outcomes_processor.py index 4fe530b7ea..4485b1d42b 100644 --- a/snuba/datasets/processors/outcomes_processor.py +++ b/snuba/datasets/processors/outcomes_processor.py @@ -33,6 +33,8 @@ "sample_rate", "send_error", "internal_sdk_error", + "insufficient_data", + "backpressure", ] ) diff --git a/snuba/datasets/processors/querylog_processor.py b/snuba/datasets/processors/querylog_processor.py index dacb93ae0c..92160b1153 100644 --- a/snuba/datasets/processors/querylog_processor.py +++ b/snuba/datasets/processors/querylog_processor.py @@ -1,6 +1,7 @@ from __future__ import annotations import logging +import random import uuid from typing import Any, Mapping, MutableMapping, Optional, Sequence, Union @@ -11,7 +12,7 @@ QueryMetadata, ) -from snuba import environment +from snuba import environment, state from snuba.consumers.types import KafkaMessageMetadata from snuba.datasets.processors import DatasetMessageProcessor from snuba.processor import InsertBatch, ProcessedMessage @@ -150,6 +151,12 @@ def _remove_invalid_data(self, processed: dict[str, Any]) -> None: def process_message( self, message: Querylog, metadata: KafkaMessageMetadata ) -> Optional[ProcessedMessage]: + # XXX: Temporary code for the DLQ test. + reject_rate = state.get_config("querylog_reject_rate", 0.0) + assert isinstance(reject_rate, float) + if random.random() < reject_rate: + raise ValueError("This message is rejected on purpose.") + processed = { "request_id": str(uuid.UUID(message["request"]["id"])), "request_body": self.__to_json_string(message["request"]["body"]), diff --git a/snuba/datasets/processors/search_issues_processor.py b/snuba/datasets/processors/search_issues_processor.py index 1de9291d33..ee5ec5cb64 100644 --- a/snuba/datasets/processors/search_issues_processor.py +++ b/snuba/datasets/processors/search_issues_processor.py @@ -92,6 +92,7 @@ class SearchIssueEvent(TypedDict, total=False): group_id: int platform: str primary_hash: str + message: str datetime: str data: IssueEventData @@ -276,6 +277,7 @@ def process_insert_v1( "receive_timestamp": receive_timestamp, "client_timestamp": client_timestamp, "platform": event["platform"], + "message": _unicodify(event["message"]), } # optional fields @@ -325,7 +327,7 @@ def process_message( processed = self.process_insert_v1(event, metadata) except EventTooOld: metrics.increment("event_too_old") - raise + return None except IndexError: metrics.increment("invalid_message") raise diff --git a/snuba/migrations/group_loader.py b/snuba/migrations/group_loader.py index 63b40cada5..b147e7ca52 100644 --- a/snuba/migrations/group_loader.py +++ b/snuba/migrations/group_loader.py @@ -311,6 +311,7 @@ def get_migrations(self) -> Sequence[str]: "0006_add_subtitle_culprit_level_resource_id", "0007_add_transaction_duration", "0008_add_profile_id_replay_id", + "0009_add_message", ] diff --git a/snuba/migrations/operations.py b/snuba/migrations/operations.py index 9ade7bd977..0e9eff44e6 100644 --- a/snuba/migrations/operations.py +++ b/snuba/migrations/operations.py @@ -3,12 +3,16 @@ from enum import Enum from typing import Any, Callable, Mapping, Optional, Sequence, Tuple +import structlog + from snuba.clickhouse.columns import Column from snuba.clusters.cluster import ClickhouseClientSettings, get_cluster from snuba.clusters.storage_sets import StorageSetKey from snuba.migrations.columns import MigrationModifiers from snuba.migrations.table_engines import TableEngine +logger = structlog.get_logger().bind(module=__name__) + class OperationTarget(Enum): """ @@ -49,12 +53,13 @@ def execute(self) -> None: nodes = dist_nodes else: raise ValueError(f"Target not set for {self}") - + if nodes: + logger.info(f"Executing op: {self.format_sql()[:32]}...") for node in nodes: connection = cluster.get_node_connection( ClickhouseClientSettings.MIGRATE, node ) - + logger.info(f"Executing on {self.target.value} node: {node}") connection.execute(self.format_sql(), settings=self._settings) @abstractmethod diff --git a/snuba/pipeline/processors.py b/snuba/pipeline/processors.py index 581f75e69b..9dbf2a6745 100644 --- a/snuba/pipeline/processors.py +++ b/snuba/pipeline/processors.py @@ -7,6 +7,7 @@ from snuba.query.logical import Query as LogicalQuery from snuba.query.processors.physical import ClickhouseQueryProcessor from snuba.query.query_settings import QuerySettings +from snuba.state import explain_meta def _execute_clickhouse_processors( @@ -74,4 +75,7 @@ def execute_entity_processors(query: LogicalQuery, settings: QuerySettings) -> N with sentry_sdk.start_span( description=type(processor).__name__, op="processor" ): + if settings.get_dry_run(): + explain_meta.add_step("entity_processor", type(processor).__name__) + processor.process_query(query, settings) diff --git a/snuba/query/allocation_policies/__init__.py b/snuba/query/allocation_policies/__init__.py index e99ccc6fb5..d01fe16b0b 100644 --- a/snuba/query/allocation_policies/__init__.py +++ b/snuba/query/allocation_policies/__init__.py @@ -395,7 +395,10 @@ def is_active(self) -> bool: @property def is_enforced(self) -> bool: - return bool(self.get_config_value(IS_ENFORCED)) + return ( + bool(self.get_config_value(IS_ENFORCED)) + and settings.ENFORCE_BYTES_SCANNED_WINDOW_POLICY + ) @property def max_threads(self) -> int: diff --git a/snuba/redis.py b/snuba/redis.py index d8efe15043..575948018e 100644 --- a/snuba/redis.py +++ b/snuba/redis.py @@ -113,6 +113,7 @@ class RedisClientKey(Enum): CONFIG = "config" DLQ = "dlq" OPTIMIZE = "optimize" + ADMIN_AUTH = "admin_auth" _redis_clients: Mapping[RedisClientKey, RedisClientType] = { @@ -137,6 +138,9 @@ class RedisClientKey(Enum): RedisClientKey.OPTIMIZE: _initialize_specialized_redis_cluster( settings.REDIS_CLUSTERS["optimize"] ), + RedisClientKey.ADMIN_AUTH: _initialize_specialized_redis_cluster( + settings.REDIS_CLUSTERS["admin_auth"] + ), } diff --git a/snuba/settings/__init__.py b/snuba/settings/__init__.py index 74a02f5f0f..b3524b092e 100644 --- a/snuba/settings/__init__.py +++ b/snuba/settings/__init__.py @@ -55,6 +55,8 @@ os.environ.get("ADMIN_REPLAYS_SAMPLE_RATE_ON_ERROR", 1.0) ) +ADMIN_ROLES_REDIS_TTL = 600 + ###################### # End Admin Settings # ###################### @@ -154,6 +156,7 @@ class RedisClusters(TypedDict): config: RedisClusterConfig | None dlq: RedisClusterConfig | None optimize: RedisClusterConfig | None + admin_auth: RedisClusterConfig | None REDIS_CLUSTERS: RedisClusters = { @@ -164,6 +167,7 @@ class RedisClusters(TypedDict): "config": None, "dlq": None, "optimize": None, + "admin_auth": None, } # Query Recording Options @@ -188,6 +192,7 @@ class RedisClusters(TypedDict): BULK_CLICKHOUSE_BUFFER = 10000 BULK_BINARY_LOAD_CHUNK = 2**22 # 4 MB + # Processor/Writer Options @@ -245,6 +250,12 @@ class RedisClusters(TypedDict): PROJECT_STACKTRACE_BLACKLIST: Set[int] = set() PRETTY_FORMAT_EXPRESSIONS = True +# Capacity Management +# HACK: This is necessary because single tenant does not have snuba-admin deployed / accessible +# so we can't change policy configs ourselves. This should be removed once we have snuba-admin +# available for single tenant since we can enable/disable policies at runtime there. +ENFORCE_BYTES_SCANNED_WINDOW_POLICY = True + # By default, allocation policies won't block requests from going through in a production # environment to not cause incidents unnecessarily. If something goes wrong with allocation # policy code, the request will still be able to go through (but it will create a dangerous diff --git a/snuba/settings/settings_test.py b/snuba/settings/settings_test.py index d4f0fd5c36..aa6a3ac03d 100644 --- a/snuba/settings/settings_test.py +++ b/snuba/settings/settings_test.py @@ -26,6 +26,8 @@ # should fail on bad code RAISE_ON_ALLOCATION_POLICY_FAILURES = True +ENFORCE_BYTES_SCANNED_WINDOW_POLICY = True + # override replacer threshold to write to redis every time a replacement message is consumed REPLACER_PROCESSING_TIMEOUT_THRESHOLD = 0 # ms @@ -55,5 +57,6 @@ (6, "config"), (7, "dlq"), (8, "optimize"), + (9, "admin_auth"), ] } diff --git a/snuba/snuba_migrations/search_issues/0009_add_message.py b/snuba/snuba_migrations/search_issues/0009_add_message.py new file mode 100644 index 0000000000..cc0faa018d --- /dev/null +++ b/snuba/snuba_migrations/search_issues/0009_add_message.py @@ -0,0 +1,45 @@ +from typing import Sequence + +from snuba.clickhouse.columns import Column, String +from snuba.clusters.storage_sets import StorageSetKey +from snuba.migrations import migration, operations +from snuba.migrations.operations import OperationTarget + + +class Migration(migration.ClickhouseNodeMigration): + blocking = False + + def forwards_ops(self) -> Sequence[operations.SqlOperation]: + ops = [ + [ + operations.AddColumn( + storage_set=StorageSetKey.SEARCH_ISSUES, + table_name=table_name, + column=Column("message", String()), + after="replay_id", + target=target, + ), + ] + for table_name, target in [ + ("search_issues_local_v2", OperationTarget.LOCAL), + ("search_issues_dist_v2", OperationTarget.DISTRIBUTED), + ] + ] + return ops[0] + ops[1] + + def backwards_ops(self) -> Sequence[operations.SqlOperation]: + ops = [ + [ + operations.DropColumn( + storage_set=StorageSetKey.SEARCH_ISSUES, + table_name=table_name, + column_name="message", + target=target, + ), + ] + for table_name, target in [ + ("search_issues_dist_v2", OperationTarget.DISTRIBUTED), + ("search_issues_local_v2", OperationTarget.LOCAL), + ] + ] + return ops[0] + ops[1] diff --git a/snuba/snuba_migrations/search_issues/__init__.py b/snuba/snuba_migrations/search_issues/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/snuba/state/explain_meta.py b/snuba/state/explain_meta.py new file mode 100644 index 0000000000..b00cf0c123 --- /dev/null +++ b/snuba/state/explain_meta.py @@ -0,0 +1,57 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any, cast + +from flask import g + + +@dataclass +class ExplainStep: + category: str # The type of step e.g. "processor" + name: str # The specific name for the step e.g. "TimeSeriesProcessor" + data: dict[str, Any] = field( + default_factory=dict + ) # Any extra information about the step + + +@dataclass +class ExplainMeta: + steps: list[ExplainStep] = field(default_factory=list) + + def add_step(self, step: ExplainStep) -> None: + self.steps.append(step) + + +def add_step(category: str, name: str, data: dict[str, Any] | None = None) -> None: + try: + if data is None: + data = {} + step = ExplainStep(category, name, data) + + if not hasattr(g, "explain_meta"): + g.explain_meta = ExplainMeta() + + g.explain_meta.add_step(step) + except RuntimeError: + # Code is executing outside of a flask context + return + + +def get_explain_meta() -> ExplainMeta | None: + try: + if hasattr(g, "explain_meta"): + return cast(ExplainMeta, g.explain_meta) + return None + except RuntimeError: + # Code is executing outside of a flask context + return None + + +def explain_cleanup() -> None: + try: + if hasattr(g, "explain_meta"): + g.pop("explain_meta") + except RuntimeError: + # Code is executing outside of a flask context + pass diff --git a/snuba/web/views.py b/snuba/web/views.py index aac3a1d23a..33de52f564 100644 --- a/snuba/web/views.py +++ b/snuba/web/views.py @@ -621,6 +621,9 @@ def eventstream(*, entity: Entity) -> RespTuple: storage = entity.get_writable_storage() assert storage is not None + storage_name = storage.get_storage_key().value + # TODO: A few storages are currently excluded from schema validation + should_validate_schema = storage_name not in ["search_issues", "replays"] try: if type_ == "insert": @@ -644,6 +647,7 @@ def commit( stream_loader.get_processor(), "consumer_grouup", stream_loader.get_default_topic_spec().topic, + should_validate_schema, ), build_batch_writer(table_writer, metrics=metrics), max_batch_size=1, @@ -651,6 +655,8 @@ def commit( processes=None, input_block_size=None, output_block_size=None, + max_insert_batch_size=None, + max_insert_batch_time=None, ).create_with_partitions(commit, {}) strategy.submit(message) strategy.close() diff --git a/tests/admin/clickhouse_migrations/test_api.py b/tests/admin/clickhouse_migrations/test_api.py index 9140e8385e..b7d688fd42 100644 --- a/tests/admin/clickhouse_migrations/test_api.py +++ b/tests/admin/clickhouse_migrations/test_api.py @@ -28,6 +28,7 @@ from snuba.migrations.policies import MigrationPolicy from snuba.migrations.runner import MigrationKey, Runner from snuba.migrations.status import Status +from snuba.redis import RedisClientKey, get_redis_client def generate_migration_test_role( @@ -60,6 +61,7 @@ def admin_api() -> FlaskClient: return application.test_client() +@pytest.mark.redis_db @pytest.mark.clickhouse_db def test_migration_groups(admin_api: FlaskClient) -> None: runner = Runner() @@ -105,6 +107,7 @@ def get_migration_ids( ] +@pytest.mark.redis_db @pytest.mark.clickhouse_db def test_list_migration_status(admin_api: FlaskClient) -> None: with patch( @@ -166,6 +169,7 @@ def sort_by_migration_id(migration: Any) -> Any: assert sorted_response == sorted_expected_json +@pytest.mark.redis_db @pytest.mark.clickhouse_db @pytest.mark.parametrize("action", ["run", "reverse"]) def test_run_reverse_migrations(admin_api: FlaskClient, action: str) -> None: @@ -310,6 +314,7 @@ def print_something(*args: Any, **kwargs: Any) -> None: assert mock_run_migration.call_count == 1 +@pytest.mark.redis_db def test_get_iam_roles(caplog: Any) -> None: system_role = generate_migration_test_role("system", "all") tool_role = generate_tool_test_role("snql-to-sql") @@ -388,6 +393,8 @@ def test_get_iam_roles(caplog: Any) -> None: tool_role, ] + iam_file.close() + with patch( "snuba.admin.auth.settings.ADMIN_IAM_POLICY_FILE", "file_not_exists.json" ): @@ -398,3 +405,122 @@ def test_get_iam_roles(caplog: Any) -> None: assert "IAM policy file not found file_not_exists.json" in str( log.calls ) + + +@pytest.mark.redis_db +def test_get_iam_roles_cache() -> None: + system_role = generate_migration_test_role("system", "all") + tool_role = generate_tool_test_role("snql-to-sql") + with patch( + "snuba.admin.auth.DEFAULT_ROLES", + [system_role, tool_role], + ): + iam_file = tempfile.NamedTemporaryFile() + iam_file.write( + json.dumps( + { + "bindings": [ + { + "members": [ + "group:team-sns@sentry.io", + "user:test_user1@sentry.io", + ], + "role": "roles/NonBlockingMigrationsExecutor", + }, + { + "members": [ + "group:team-sns@sentry.io", + "user:test_user1@sentry.io", + "user:test_user2@sentry.io", + ], + "role": "roles/TestMigrationsExecutor", + }, + { + "members": [ + "group:team-sns@sentry.io", + "user:test_user1@sentry.io", + "user:test_user2@sentry.io", + ], + "role": "roles/owner", + }, + { + "members": [ + "group:team-sns@sentry.io", + "user:test_user1@sentry.io", + ], + "role": "roles/AllTools", + }, + ] + } + ).encode("utf-8") + ) + + iam_file.flush() + with patch("snuba.admin.auth.settings.ADMIN_IAM_POLICY_FILE", iam_file.name): + + user1 = AdminUser(email="test_user1@sentry.io", id="unknown") + _set_roles(user1) + + assert user1.roles == [ + ROLES["NonBlockingMigrationsExecutor"], + ROLES["TestMigrationsExecutor"], + ROLES["AllTools"], + system_role, + tool_role, + ] + + iam_file = tempfile.NamedTemporaryFile() + iam_file.write(json.dumps({"bindings": []}).encode("utf-8")) + iam_file.flush() + + with patch("snuba.admin.auth.settings.ADMIN_IAM_POLICY_FILE", iam_file.name): + _set_roles(user1) + + assert user1.roles == [ + ROLES["NonBlockingMigrationsExecutor"], + ROLES["TestMigrationsExecutor"], + ROLES["AllTools"], + system_role, + tool_role, + ] + + redis_client = get_redis_client(RedisClientKey.ADMIN_AUTH) + redis_client.delete(f"roles-{user1.email}") + _set_roles(user1) + + assert user1.roles == [ + system_role, + tool_role, + ] + + +@pytest.mark.redis_db +@patch("redis.Redis") +def test_get_iam_roles_cache_fail(mock_redis: Any) -> None: + mock_redis.get.side_effect = Exception("Test exception") + mock_redis.set.side_effect = Exception("Test exception") + system_role = generate_migration_test_role("system", "all") + tool_role = generate_tool_test_role("snql-to-sql") + with patch( + "snuba.admin.auth.DEFAULT_ROLES", + [system_role, tool_role], + ): + iam_file = tempfile.NamedTemporaryFile() + iam_file.write(json.dumps({"bindings": []}).encode("utf-8")) + iam_file.flush() + + with patch("snuba.admin.auth.settings.ADMIN_IAM_POLICY_FILE", iam_file.name): + user1 = AdminUser(email="test_user1@sentry.io", id="unknown") + _set_roles(user1) + + assert user1.roles == [ + system_role, + tool_role, + ] + + _set_roles(user1) + + assert user1.roles == [ + system_role, + tool_role, + ] diff --git a/tests/admin/test_api.py b/tests/admin/test_api.py index 5f1c4d4bb3..fbb0e164c3 100644 --- a/tests/admin/test_api.py +++ b/tests/admin/test_api.py @@ -166,6 +166,7 @@ def test_config_descriptions(admin_api: FlaskClient) -> None: } +@pytest.mark.redis_db def get_node_for_table( admin_api: FlaskClient, storage_name: str ) -> tuple[str, str, int]: @@ -204,6 +205,7 @@ def test_system_query(admin_api: FlaskClient) -> None: assert data["rows"] == [] +@pytest.mark.redis_db def test_predefined_system_queries(admin_api: FlaskClient) -> None: response = admin_api.get( "/clickhouse_queries", @@ -249,6 +251,7 @@ def test_query_trace_bad_query(admin_api: FlaskClient) -> None: assert "clickhouse" == data["error"]["type"] +@pytest.mark.redis_db @pytest.mark.clickhouse_db def test_query_trace_invalid_query(admin_api: FlaskClient) -> None: table, _, _ = get_node_for_table(admin_api, "errors_ro") @@ -279,6 +282,7 @@ def test_querylog_query(admin_api: FlaskClient) -> None: assert "column_names" in data and data["column_names"] == ["count()"] +@pytest.mark.redis_db @pytest.mark.clickhouse_db def test_querylog_invalid_query(admin_api: FlaskClient) -> None: table, _, _ = get_node_for_table(admin_api, "errors_ro") @@ -301,6 +305,7 @@ def test_querylog_describe(admin_api: FlaskClient) -> None: assert "column_names" in data and "rows" in data +@pytest.mark.redis_db def test_predefined_querylog_queries(admin_api: FlaskClient) -> None: response = admin_api.get( "/querylog_queries", @@ -313,6 +318,7 @@ def test_predefined_querylog_queries(admin_api: FlaskClient) -> None: assert data[0]["name"] == "QueryByID" +@pytest.mark.redis_db def test_get_snuba_datasets(admin_api: FlaskClient) -> None: response = admin_api.get("/snuba_datasets") assert response.status_code == 200 @@ -320,9 +326,10 @@ def test_get_snuba_datasets(admin_api: FlaskClient) -> None: assert set(data) == set(get_enabled_dataset_names()) -def test_convert_SnQL_to_SQL_invalid_dataset(admin_api: FlaskClient) -> None: +@pytest.mark.redis_db +def test_snuba_debug_invalid_dataset(admin_api: FlaskClient) -> None: response = admin_api.post( - "/snql_to_sql", data=json.dumps({"dataset": "", "query": ""}) + "/snuba_debug", data=json.dumps({"dataset": "", "query": ""}) ) assert response.status_code == 400 data = json.loads(response.data) @@ -330,9 +337,9 @@ def test_convert_SnQL_to_SQL_invalid_dataset(admin_api: FlaskClient) -> None: @pytest.mark.redis_db -def test_convert_SnQL_to_SQL_invalid_query(admin_api: FlaskClient) -> None: +def test_snuba_debug_invalid_query(admin_api: FlaskClient) -> None: response = admin_api.post( - "/snql_to_sql", data=json.dumps({"dataset": "sessions", "query": ""}) + "/snuba_debug", data=json.dumps({"dataset": "sessions", "query": ""}) ) assert response.status_code == 400 data = json.loads(response.data) @@ -344,7 +351,7 @@ def test_convert_SnQL_to_SQL_invalid_query(admin_api: FlaskClient) -> None: @pytest.mark.redis_db @pytest.mark.clickhouse_db -def test_convert_SnQL_to_SQL_valid_query(admin_api: FlaskClient) -> None: +def test_snuba_debug_valid_query(admin_api: FlaskClient) -> None: snql_query = """ MATCH (sessions) SELECT sessions_crashed @@ -354,11 +361,17 @@ def test_convert_SnQL_to_SQL_valid_query(admin_api: FlaskClient) -> None: AND started < toDateTime('2022-02-01 00:00:00') """ response = admin_api.post( - "/snql_to_sql", data=json.dumps({"dataset": "sessions", "query": snql_query}) + "/snuba_debug", data=json.dumps({"dataset": "sessions", "query": snql_query}) ) assert response.status_code == 200 data = json.loads(response.data) assert data["sql"] != "" + assert len(data["explain"]["steps"]) > 0 + assert { + "category": "entity_processor", + "name": "BasicFunctionsProcessor", + "data": {}, + } in data["explain"]["steps"] @pytest.mark.redis_db diff --git a/tests/admin/test_authorization.py b/tests/admin/test_authorization.py index 923902a61e..d07614435f 100644 --- a/tests/admin/test_authorization.py +++ b/tests/admin/test_authorization.py @@ -16,6 +16,7 @@ def admin_api() -> FlaskClient: return application.test_client() +@pytest.mark.redis_db def test_tools(admin_api: FlaskClient) -> None: response = admin_api.get("/tools") assert response.status_code == 200 @@ -25,6 +26,7 @@ def test_tools(admin_api: FlaskClient) -> None: assert "all" in data["tools"] +@pytest.mark.redis_db @patch("snuba.admin.auth.DEFAULT_ROLES", [ROLES["ProductTools"]]) def test_product_tools_role( admin_api: FlaskClient, diff --git a/tests/consumers/test_consumer_builder.py b/tests/consumers/test_consumer_builder.py index c76084c006..29e0312336 100644 --- a/tests/consumers/test_consumer_builder.py +++ b/tests/consumers/test_consumer_builder.py @@ -18,7 +18,7 @@ from snuba.datasets.storages.storage_key import StorageKey from snuba.utils.metrics.backends.abstract import MetricsBackend from snuba.utils.metrics.wrapper import MetricsWrapper -from tests.fixtures import get_raw_event +from tests.fixtures import get_raw_error_message from tests.test_consumer import get_row_count test_storage_key = StorageKey("errors") @@ -54,6 +54,8 @@ ), max_batch_size=3, max_batch_time_ms=4, + max_insert_batch_size=None, + max_insert_batch_time_ms=None, metrics=MetricsWrapper( environment.metrics, "test_consumer", @@ -61,6 +63,7 @@ ), slice_id=None, join_timeout=5, + enforce_schema=True, ) optional_consumer_config = resolve_consumer_config( @@ -97,6 +100,8 @@ ), max_batch_size=3, max_batch_time_ms=4, + max_insert_batch_size=None, + max_insert_batch_time_ms=None, metrics=MetricsWrapper( environment.metrics, "test_consumer", @@ -104,6 +109,7 @@ ), slice_id=None, join_timeout=5, + enforce_schema=True, ) @@ -160,8 +166,7 @@ def test_run_processing_strategy() -> None: strategy_factory = consumer_builder.build_streaming_strategy_factory() strategy = strategy_factory.create_with_partitions(commit, partitions) - raw_message = get_raw_event() - json_string = json.dumps([2, "insert", raw_message, []]) + json_string = json.dumps(get_raw_error_message()) message = Message( BrokerValue( diff --git a/tests/datasets/test_errors_processor.py b/tests/datasets/test_errors_processor.py index dec193674c..de87c60e0f 100644 --- a/tests/datasets/test_errors_processor.py +++ b/tests/datasets/test_errors_processor.py @@ -4,6 +4,7 @@ from dataclasses import dataclass from datetime import datetime, timedelta from typing import Any, Mapping, Sequence +from unittest.mock import ANY from uuid import UUID import pytest @@ -460,7 +461,7 @@ def test_errors_basic(self) -> None: } ) assert processor.process_message(payload, meta) == InsertBatch( - [message.build_result(meta)], None + [message.build_result(meta)], ANY ) def test_errors_replayid_context(self) -> None: @@ -498,7 +499,7 @@ def test_errors_replayid_context(self) -> None: meta = KafkaMessageMetadata(offset=2, partition=2, timestamp=timestamp) assert self.processor.process_message(payload, meta) == InsertBatch( - [message.build_result(meta)], None + [message.build_result(meta)], ANY ) def test_errors_replayid_tag(self) -> None: @@ -542,7 +543,7 @@ def test_errors_replayid_tag(self) -> None: result["tags.key"].insert(4, "replayId") result["tags.value"].insert(4, replay_id.hex) assert self.processor.process_message(payload, meta) == InsertBatch( - [result], None + [result], ANY ) def test_errors_replayid_tag_and_context(self) -> None: @@ -585,7 +586,7 @@ def test_errors_replayid_tag_and_context(self) -> None: result = message.build_result(meta) result["replay_id"] = str(replay_id) assert self.processor.process_message(payload, meta) == InsertBatch( - [result], None + [result], ANY ) def test_errors_replayid_invalid_tag(self) -> None: @@ -629,7 +630,7 @@ def test_errors_replayid_invalid_tag(self) -> None: result["tags.key"].insert(4, "replayId") result["tags.value"].insert(4, invalid_replay_id) assert self.processor.process_message(payload, meta) == InsertBatch( - [result], None + [result], ANY ) def test_exception_main_thread_true(self) -> None: @@ -683,7 +684,7 @@ def test_exception_main_thread_true(self) -> None: result["exception_main_thread"] = True assert self.processor.process_message(payload, meta) == InsertBatch( - [result], None + [result], ANY ) def test_exception_main_thread_false(self) -> None: @@ -737,7 +738,7 @@ def test_exception_main_thread_false(self) -> None: result["exception_main_thread"] = False assert self.processor.process_message(payload, meta) == InsertBatch( - [result], None + [result], ANY ) def test_trace_sampled(self) -> None: @@ -777,7 +778,7 @@ def test_trace_sampled(self) -> None: result["trace_sampled"] = True assert self.processor.process_message(payload, meta) == InsertBatch( - [result], None + [result], ANY ) # verify processing trace.sampled=None works as it did before @@ -788,7 +789,7 @@ def test_trace_sampled(self) -> None: result2 = message.build_result(meta) assert self.processor.process_message(payload, meta) == InsertBatch( - [result2], None + [result2], ANY ) def test_errors_processed(self) -> None: @@ -828,7 +829,7 @@ def test_errors_processed(self) -> None: result["num_processing_errors"] = 3 assert self.processor.process_message(payload, meta) == InsertBatch( - [result], None + [result], ANY ) # ensure old behavior where data.errors=None won't set 'num_processing_errors' @@ -839,5 +840,5 @@ def test_errors_processed(self) -> None: result = message.build_result(meta) assert self.processor.process_message(payload, meta) == InsertBatch( - [result], None + [result], ANY ) diff --git a/tests/datasets/test_search_issues_processor.py b/tests/datasets/test_search_issues_processor.py index bd9b635b16..e2a36ce4db 100644 --- a/tests/datasets/test_search_issues_processor.py +++ b/tests/datasets/test_search_issues_processor.py @@ -36,6 +36,7 @@ def message_base() -> SearchIssueEvent: "primary_hash": str(uuid.uuid4()), "datetime": datetime.utcnow().isoformat() + "Z", "platform": "other", + "message": "something", "data": { "received": datetime.now().timestamp(), }, @@ -70,6 +71,7 @@ class TestSearchIssuesMessageProcessor: "platform", "tags.key", "tags.value", + "message", } def process_message( @@ -439,6 +441,14 @@ def test_extract_replay_id(self, message_base): with pytest.raises(ValueError): self.process_message(message_base) + def test_extract_message(self, message_base): + message = "a message" + message_base["message"] = message + processed = self.process_message(message_base) + self.assert_required_columns(processed) + insert_row = processed.rows[0] + assert insert_row["message"] == message + def test_ensure_uuid(self): with pytest.raises(ValueError): ensure_uuid("not_a_uuid") diff --git a/tests/test_consumer.py b/tests/test_consumer.py index 582c8eb5fd..4bf2dcd86e 100644 --- a/tests/test_consumer.py +++ b/tests/test_consumer.py @@ -31,13 +31,16 @@ from snuba.utils.streams.topics import Topic as SnubaTopic from tests.assertions import assert_changes from tests.backends.metrics import TestingMetricsBackend, Timing +from tests.fixtures import get_raw_error_message def test_streaming_consumer_strategy() -> None: messages = ( Message( BrokerValue( - KafkaPayload(None, b"{}", []), + KafkaPayload( + None, json.dumps(get_raw_error_message()).encode("utf-8"), [] + ), Partition(Topic("events"), 0), i, datetime.now(), @@ -72,11 +75,13 @@ def write_step() -> ProcessedMessageBatchWriter: factory = KafkaConsumerStrategyFactory( None, functools.partial( - process_message, processor, "consumer_group", SnubaTopic.EVENTS + process_message, processor, "consumer_group", SnubaTopic.EVENTS, True ), write_step, max_batch_size=10, max_batch_time=60, + max_insert_batch_size=None, + max_insert_batch_time=None, processes=None, input_block_size=None, output_block_size=None, diff --git a/tests/test_search_issues_api.py b/tests/test_search_issues_api.py index 425e7b2dc0..29704990c9 100644 --- a/tests/test_search_issues_api.py +++ b/tests/test_search_issues_api.py @@ -29,6 +29,7 @@ def base_insert_event( "primary_hash": str(uuid.uuid4()), "datetime": datetime.utcnow().isoformat() + "Z", "platform": "other", + "message": "message", "data": { "received": now.timestamp(), }, @@ -94,6 +95,7 @@ def test_simple_search_query(self) -> None: primary_hash=str(uuid.uuid4().hex), datetime=datetime.utcnow().isoformat() + "Z", platform="other", + message="message", data={"received": now.timestamp()}, occurrence_data=dict( id=str(uuid.uuid4().hex),