Skip to content

Commit

Permalink
ref(server): Improve request body and signature handling (#2885)
Browse files Browse the repository at this point in the history
Changes the signature of `UpstreamRequest` so that any request can be
configured to be signed now. Before, this was possible only for query
requests. The `UpstreamRequest` trait now has a `sign() -> bool` method
that returns whether or not a signature should be applied.

Along with this, there are changes to the request builder to make it
more ergonomic for the common use case:

- The builder uses interior mutability instead of moving now.
- `RequestBuilder` now takes `Into<Bytes>` as body, which allows for
  more efficient retries. However, it is still the responsibility of the
  request to ensure bytes are cloned. This is because usually we want
  expensive serialization operations outside of the Upstream's runtime.
- The `build()` function no longer receives the config, as this was
  needed just for internal query requests.
- The `build()` function does not need to finish the request, this is
  done by the upstream now.
- `HttpError::NoCredentials` has been removed. This was a hack to allow
  emitting this error from the build callback, but this is no longer
  required.
  • Loading branch information
jan-auer authored Dec 21, 2023
1 parent ad621dc commit bd75478
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 73 deletions.
17 changes: 9 additions & 8 deletions relay-server/src/actors/envelopes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;

use bytes::Bytes;
use chrono::Utc;
use relay_base_schema::project::ProjectKey;
use relay_config::{Config, HttpEncoding};
Expand All @@ -24,7 +25,7 @@ use crate::actors::upstream::{
};
use crate::envelope::{self, ContentType, Envelope, EnvelopeError, Item, ItemType};
use crate::extractors::{PartialDsn, RequestMeta};
use crate::http::{HttpError, Request, RequestBuilder, Response};
use crate::http::{HttpError, RequestBuilder, Response};
use crate::statsd::RelayHistograms;
use crate::utils::ManagedEnvelope;

Expand Down Expand Up @@ -56,7 +57,7 @@ impl From<relay_system::SendError> for SendEnvelopeError {
/// An upstream request that submits an envelope via HTTP.
#[derive(Debug)]
pub struct SendEnvelope {
pub envelope_body: Vec<u8>,
pub envelope_body: Bytes,
pub envelope_meta: RequestMeta,
pub project_cache: Addr<ProjectCache>,
pub scoping: Scoping,
Expand All @@ -79,8 +80,8 @@ impl UpstreamRequest for SendEnvelope {
"envelope"
}

fn build(&mut self, _: &Config, builder: RequestBuilder) -> Result<Request, HttpError> {
let envelope_body = &self.envelope_body;
fn build(&mut self, builder: &mut RequestBuilder) -> Result<(), HttpError> {
let envelope_body = self.envelope_body.clone();
metric!(histogram(RelayHistograms::UpstreamEnvelopeBodySize) = envelope_body.len() as u64);

let meta = &self.envelope_meta;
Expand All @@ -92,7 +93,9 @@ impl UpstreamRequest for SendEnvelope {
.header("X-Forwarded-For", meta.forwarded_for())
.header("Content-Type", envelope::CONTENT_TYPE)
.header_opt("X-Sentry-Relay-Shard", self.partition_key.as_ref())
.body(envelope_body)
.body(envelope_body);

Ok(())
}

fn respond(
Expand Down Expand Up @@ -242,11 +245,9 @@ impl EnvelopeManagerService {
// possible so that we avoid internal delays.
envelope.set_sent_at(Utc::now());

let envelope_body = envelope.to_vec()?;

let (tx, rx) = oneshot::channel();
let request = SendEnvelope {
envelope_body,
envelope_body: envelope.to_vec()?.into(),
envelope_meta: envelope.meta().clone(),
project_cache: self.project_cache.clone(),
scoping,
Expand Down
11 changes: 6 additions & 5 deletions relay-server/src/actors/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1313,11 +1313,11 @@ impl EnvelopeProcessorService {
}

fn encode_envelope_body(
body: Vec<u8>,
body: Bytes,
http_encoding: HttpEncoding,
) -> Result<Vec<u8>, std::io::Error> {
let envelope_body = match http_encoding {
HttpEncoding::Identity => body,
) -> Result<Bytes, std::io::Error> {
let envelope_body: Vec<u8> = match http_encoding {
HttpEncoding::Identity => return Ok(body),
HttpEncoding::Deflate => {
let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
encoder.write_all(body.as_ref())?;
Expand All @@ -1335,7 +1335,8 @@ impl EnvelopeProcessorService {
encoder.into_inner()
}
};
Ok(envelope_body)

Ok(envelope_body.into())
}

fn handle_encode_envelope(&self, message: EncodeEnvelope) {
Expand Down
96 changes: 62 additions & 34 deletions relay-server/src/actors/upstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;

use bytes::Bytes;
use itertools::Itertools;
use relay_auth::{RegisterChallenge, RegisterRequest, RegisterResponse, Registration};
use relay_config::{Config, Credentials, RelayMode};
Expand Down Expand Up @@ -107,7 +108,7 @@ pub enum UpstreamRequestError {

/// Likely a bad HTTP status code or unparseable response.
#[error("could not send request")]
Http(#[source] HttpError),
Http(#[from] HttpError),

#[error("upstream requests rate limited")]
RateLimited(UpstreamRateLimits),
Expand Down Expand Up @@ -188,7 +189,6 @@ impl UpstreamRequestError {
UpstreamRequestError::Http(HttpError::Json(_)) => "invalid_json",
UpstreamRequestError::Http(HttpError::Reqwest(_)) => "reqwest_error",
UpstreamRequestError::Http(HttpError::Overflow) => "overflow",
UpstreamRequestError::Http(HttpError::NoCredentials) => "no_credentials",
UpstreamRequestError::RateLimited(_) => "rate_limited",
UpstreamRequestError::ResponseError(_, _) => "response_error",
UpstreamRequestError::ChannelClosed => "channel_closed",
Expand All @@ -197,15 +197,6 @@ impl UpstreamRequestError {
}
}

impl From<HttpError> for UpstreamRequestError {
fn from(error: HttpError) -> Self {
match error {
HttpError::NoCredentials => Self::NoCredentials,
other => Self::Http(other),
}
}
}

/// Checks the authentication state with the upstream.
///
/// In static and proxy mode, Relay does not require authentication and `IsAuthenticated` always
Expand Down Expand Up @@ -306,21 +297,41 @@ pub trait UpstreamRequest: Send + Sync + fmt::Debug {
true
}

/// Add the `X-Sentry-Relay-Signature` header to the outgoing request.
///
/// This requires configuration of the Relay's credentials. If the credentials are not
/// configured, the request will fail with [`UpstreamRequestError::NoCredentials`].
///
/// Defaults to `false`.
fn sign(&self) -> bool {
false
}

/// Returns the name of the logical route.
///
/// This is used for internal metrics and logging. Other than the path, this cannot contain
/// dynamic elements and should be globally unique.
fn route(&self) -> &'static str;

/// Callback to apply configuration to the request.
///
/// This hook is called at least once before `build`. It can be used to include additional
/// properties from Relay's config in the Request before it is sent or handled if during request
/// creation time the configuration is not available.
///
/// This method is optional and defaults to a no-op.
fn configure(&mut self, _config: &Config) {}

/// Callback to build the outgoing web request.
///
/// This callback populates the initialized request with headers and a request body. To send an
/// empty request without additional headers, call [`RequestBuilder::finish`] directly.
/// This callback populates the initialized request with headers and a request body.
///
/// Note that this function can be called repeatedly if [`retry`](UpstreamRequest::retry)
/// returns `true`. This function should therefore not move out of the request struct, but can
/// use it to memoize heavy computation.
fn build(&mut self, config: &Config, builder: RequestBuilder) -> Result<Request, HttpError>;
fn build(&mut self, _builder: &mut RequestBuilder) -> Result<(), HttpError> {
Ok(())
}

/// Callback to complete an HTTP request.
///
Expand Down Expand Up @@ -391,7 +402,7 @@ type QuerySender<T> = Sender<Result<<T as UpstreamQuery>::Response, UpstreamRequ
#[derive(Debug)]
struct UpstreamQueryRequest<T: UpstreamQuery> {
query: T,
compiled: Option<(Vec<u8>, String)>,
body: Option<Bytes>,
max_response_size: usize,
sender: QuerySender<T>,
}
Expand All @@ -404,7 +415,7 @@ where
pub fn new(query: T, sender: QuerySender<T>) -> Self {
Self {
query,
compiled: None,
body: None,
max_response_size: 0,
sender,
}
Expand All @@ -431,6 +442,10 @@ where
true
}

fn sign(&self) -> bool {
true
}

fn method(&self) -> Method {
self.query.method()
}
Expand All @@ -443,25 +458,28 @@ where
self.query.route()
}

fn build(&mut self, config: &Config, builder: RequestBuilder) -> Result<Request, HttpError> {
// Memoize the serialized body and signature for retries.
let credentials = config.credentials().ok_or(HttpError::NoCredentials)?;
let (body, signature) = self
.compiled
.get_or_insert_with(|| credentials.secret_key.pack(&self.query));

fn configure(&mut self, config: &Config) {
// This config attribute is needed during `respond`, which does not have access to the
// config. For this reason, we need to store it on the request struct.
self.max_response_size = config.max_api_payload_size();
}

fn build(&mut self, builder: &mut RequestBuilder) -> Result<(), HttpError> {
// Memoize the serialized body for retries.
let body = match self.body {
Some(ref body) => body,
None => self.body.insert(serde_json::to_vec(&self.query)?.into()),
};

relay_statsd::metric!(
histogram(RelayHistograms::UpstreamQueryBodySize) = body.len() as u64
);

builder
.header("X-Sentry-Relay-Signature", signature.as_bytes())
.header(header::CONTENT_TYPE, b"application/json")
.body(&body)
.body(body.clone());

Ok(())
}

fn respond(
Expand Down Expand Up @@ -629,10 +647,6 @@ impl UpstreamRequest for GetHealthCheck {
"check_live"
}

fn build(&mut self, _: &Config, builder: RequestBuilder) -> Result<Request, HttpError> {
builder.finish()
}

fn respond(
self: Box<Self>,
result: Result<Response, UpstreamRequestError>,
Expand Down Expand Up @@ -744,16 +758,29 @@ impl SharedClient {
.http_host_header()
.unwrap_or_else(|| self.config.upstream_descriptor().host());

let mut builder = RequestBuilder::reqwest(self.reqwest.request(request.method(), url))
.header("Host", host_header.as_bytes());
let mut builder = RequestBuilder::reqwest(self.reqwest.request(request.method(), url));
builder.header("Host", host_header.as_bytes());

if request.set_relay_id() {
if request.set_relay_id() || request.sign() {
if let Some(credentials) = self.config.credentials() {
builder = builder.header("X-Sentry-Relay-Id", credentials.id.to_string());
builder.header("X-Sentry-Relay-Id", credentials.id.to_string());
}
}

match request.build(&self.config, builder) {
request.build(&mut builder)?;

if request.sign() {
let credentials = self
.config
.credentials()
.ok_or(UpstreamRequestError::NoCredentials)?;

let body = builder.get_body().unwrap_or_default();
let signature = credentials.secret_key.sign(body);
builder.header("X-Sentry-Relay-Signature", signature.as_bytes());
}

match builder.finish() {
Ok(Request(client_request)) => Ok(client_request),
Err(e) => Err(e.into()),
}
Expand Down Expand Up @@ -820,6 +847,7 @@ impl SharedClient {
&self,
request: &mut dyn UpstreamRequest,
) -> Result<Response, UpstreamRequestError> {
request.configure(&self.config);
let client_request = self.build_request(request)?;
let response = self.reqwest.execute(client_request).await?;
self.transform_response(request, Response(response)).await
Expand Down
13 changes: 5 additions & 8 deletions relay-server/src/endpoints/forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ impl IntoResponse for ForwardError {
}
HttpError::Io(_) => StatusCode::BAD_GATEWAY.into_response(),
HttpError::Json(_) => StatusCode::BAD_REQUEST.into_response(),
HttpError::NoCredentials => StatusCode::INTERNAL_SERVER_ERROR.into_response(),
},
UpstreamRequestError::SendFailed(e) => {
if e.is_timeout() {
Expand Down Expand Up @@ -136,23 +135,21 @@ impl UpstreamRequest for ForwardRequest {
"forward"
}

fn build(
&mut self,
_: &Config,
mut builder: RequestBuilder,
) -> Result<crate::http::Request, HttpError> {
fn build(&mut self, builder: &mut RequestBuilder) -> Result<(), HttpError> {
for (key, value) in &self.headers {
// Since the body is always decompressed by the server, we must not forward the
// content-encoding header, as the upstream client will do its own content encoding.
// Also, remove content-length because it's likely wrong.
if !HOP_BY_HOP_HEADERS.contains(key) && !IGNORED_REQUEST_HEADERS.contains(key) {
builder = builder.header(key, value);
builder.header(key, value);
}
}

builder
.header("X-Forwarded-For", self.forwarded_for.as_ref())
.body(&self.data)
.body(self.data.clone());

Ok(())
}

fn respond(
Expand Down
Loading

0 comments on commit bd75478

Please sign in to comment.