Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(server): Use multer directly #3978

Merged
merged 7 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 1 addition & 6 deletions relay-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,7 @@ workspace = true
[dependencies]
anyhow = { workspace = true }
serde_path_to_error = { workspace = true }
axum = { workspace = true, features = [
"macros",
"matched-path",
"multipart",
"tracing",
] }
axum = { workspace = true, features = ["macros", "matched-path", "tracing"] }
axum-extra = { workspace = true }
axum-server = { workspace = true }
arc-swap = { workspace = true }
Expand Down
26 changes: 10 additions & 16 deletions relay-server/src/endpoints/attachments.rs
Original file line number Diff line number Diff line change
@@ -1,49 +1,43 @@
use axum::extract::{DefaultBodyLimit, Multipart, Path};
use axum::extract::Path;
use axum::http::StatusCode;
use axum::response::IntoResponse;
use axum::routing::{post, MethodRouter};
use relay_config::Config;
use multer::Multipart;
use relay_event_schema::protocol::EventId;
use serde::Deserialize;

use crate::endpoints::common::{self, BadStoreRequest};
use crate::envelope::{AttachmentType, Envelope};
use crate::extractors::RequestMeta;
use crate::extractors::{Remote, RequestMeta};
use crate::service::ServiceState;
use crate::utils;

#[derive(Debug, Deserialize)]
struct AttachmentPath {
pub struct AttachmentPath {
event_id: EventId,
}

async fn extract_envelope(
config: &Config,
meta: RequestMeta,
path: AttachmentPath,
multipart: Multipart,
multipart: Multipart<'static>,
) -> Result<Box<Envelope>, BadStoreRequest> {
let max_size = config.max_attachment_size();
let items = utils::multipart_items(multipart, max_size, |_| AttachmentType::default()).await?;
let items = utils::multipart_items(multipart, |_| AttachmentType::default()).await?;

let mut envelope = Envelope::from_request(Some(path.event_id), meta);
for item in items {
envelope.add_item(item);
}

Ok(envelope)
}

async fn handle(
pub async fn handle(
state: ServiceState,
meta: RequestMeta,
Path(path): Path<AttachmentPath>,
multipart: Multipart,
Remote(multipart): Remote<Multipart<'static>>,
) -> Result<impl IntoResponse, BadStoreRequest> {
let envelope = extract_envelope(state.config(), meta, path, multipart).await?;
let envelope = extract_envelope(meta, path, multipart).await?;
common::handle_envelope(&state, envelope).await?;
Ok(StatusCode::CREATED)
}

pub fn route(config: &Config) -> MethodRouter<ServiceState> {
post(handle).route_layer(DefaultBodyLimit::max(config.max_attachments_size()))
}
5 changes: 1 addition & 4 deletions relay-server/src/endpoints/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::services::outcome::{DiscardReason, Outcome};
use crate::services::processor::{MetricData, ProcessMetricMeta, ProcessingGroup};
use crate::services::project_cache::{CheckEnvelope, ProcessMetrics, ValidateEnvelope};
use crate::statsd::{RelayCounters, RelayHistograms};
use crate::utils::{self, ApiErrorResponse, FormDataIter, ManagedEnvelope, MultipartError};
use crate::utils::{self, ApiErrorResponse, FormDataIter, ManagedEnvelope};

#[derive(Clone, Copy, Debug, thiserror::Error)]
#[error("the service is overloaded")]
Expand Down Expand Up @@ -61,9 +61,6 @@ pub enum BadStoreRequest {
#[error("invalid multipart data")]
InvalidMultipart(#[from] multer::Error),

#[error("invalid multipart data")]
InvalidMultipartAxum(#[from] MultipartError),

#[error("invalid minidump")]
InvalidMinidump,

Expand Down
37 changes: 16 additions & 21 deletions relay-server/src/endpoints/minidump.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
use std::convert::Infallible;

use axum::extract::{DefaultBodyLimit, Multipart, Request};
use axum::extract::{DefaultBodyLimit, Request};
use axum::response::IntoResponse;
use axum::routing::{post, MethodRouter};
use axum::RequestExt;
use bytes::Bytes;
use futures::{future, FutureExt};
use multer::Multipart;
use relay_config::Config;
use relay_event_schema::protocol::EventId;

use crate::constants::{ITEM_NAME_BREADCRUMBS1, ITEM_NAME_BREADCRUMBS2, ITEM_NAME_EVENT};
use crate::endpoints::common::{self, BadStoreRequest, TextResponse};
use crate::envelope::{AttachmentType, ContentType, Envelope, Item, ItemType};
use crate::extractors::{RawContentType, RequestMeta};
use crate::extractors::{RawContentType, Remote, RequestMeta};
use crate::service::ServiceState;
use crate::utils;

Expand Down Expand Up @@ -69,8 +69,8 @@ async fn extract_embedded_minidump(payload: Bytes) -> Result<Option<Bytes>, BadS
None => return Ok(None),
};

let stream = future::ok::<_, Infallible>(payload.clone()).into_stream();
let mut multipart = multer::Multipart::new(stream, boundary);
let stream = futures::stream::once(async { Ok::<_, Infallible>(payload.clone()) });
let mut multipart = Multipart::new(stream, boundary);

while let Some(field) = multipart.next_field().await? {
if field.name() == Some(MINIDUMP_FIELD_NAME) {
Expand All @@ -82,12 +82,10 @@ async fn extract_embedded_minidump(payload: Bytes) -> Result<Option<Bytes>, BadS
}

async fn extract_multipart(
config: &Config,
multipart: Multipart,
multipart: Multipart<'static>,
meta: RequestMeta,
) -> Result<Box<Envelope>, BadStoreRequest> {
let max_size = config.max_attachment_size();
let mut items = utils::multipart_items(multipart, max_size, infer_attachment_type).await?;
let mut items = utils::multipart_items(multipart, infer_attachment_type).await?;

let minidump_item = items
.iter_mut()
Expand Down Expand Up @@ -139,7 +137,8 @@ async fn handle(
let envelope = if MINIDUMP_RAW_CONTENT_TYPES.contains(&content_type.as_ref()) {
extract_raw_minidump(request.extract().await?, meta)?
} else {
extract_multipart(state.config(), request.extract().await?, meta).await?
let Remote(multipart) = request.extract_with_state(&state).await?;
extract_multipart(multipart, meta).await?
};

let id = envelope.event_id();
Expand All @@ -156,15 +155,15 @@ async fn handle(
}

pub fn route(config: &Config) -> MethodRouter<ServiceState> {
post(handle).route_layer(DefaultBodyLimit::max(config.max_attachments_size()))
// Set the single-attachment limit that applies only for raw minidumps. Multipart bypasses the
// limited body and applies its own limits.
post(handle).route_layer(DefaultBodyLimit::max(config.max_attachment_size()))
Comment on lines +158 to +160
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be surprising behavior. DefaultBodyLimit applies by default for all extractors that axum defines, since they use with_limited_body() internally. Among others, this applies to the Bytes extractor, which is used in one of the branches of handle.

In our case, we want two different limits:

  1. For raw minidumps, use the single attachment limit
  2. For multipart, use the multiple attachments limit.

Our multipart extractor does that automatically, as it has access to our config, and that also bypasses with_limited_body. Hence, the limit here applies to just one of the two branches.

I've been looking into ways to layer the limit directly onto the Bytes extractor, but haven't found a good way to do so.

}

#[cfg(test)]
mod tests {
use axum::body::Body;
use axum::extract::FromRequest;

use relay_config::ByteSize;
use relay_config::Config;

use crate::utils::{multipart_items, FormDataIter};

Expand Down Expand Up @@ -214,14 +213,10 @@ mod tests {
.body(Body::from(multipart_body))
.unwrap();

let multipart = Multipart::from_request(request, &()).await?;
let config = Config::default();

let items = multipart_items(
multipart,
ByteSize::mebibytes(100).as_bytes(),
infer_attachment_type,
)
.await?;
let multipart = utils::multipart_from_request(request, &config)?;
let items = multipart_items(multipart, infer_attachment_type).await?;

// we expect the multipart body to contain
// * one arbitrary attachment from the user (a `config.json`)
Expand Down
2 changes: 1 addition & 1 deletion relay-server/src/endpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ pub fn routes(config: &Config) -> Router<ServiceState>{
// No mandatory trailing slash here because people already use it like this.
.route("/api/:project_id/minidump", minidump::route(config))
.route("/api/:project_id/minidump/", minidump::route(config))
.route("/api/:project_id/events/:event_id/attachments/", attachments::route(config))
.route("/api/:project_id/events/:event_id/attachments/", post(attachments::handle))
.route("/api/:project_id/unreal/:sentry_key/", unreal::route(config))
// NOTE: If you add a new (non-experimental) route here, please also list it in
// https://github.com/getsentry/sentry-docs/blob/master/docs/product/relay/operating-guidelines.mdx
Expand Down
2 changes: 2 additions & 0 deletions relay-server/src/extractors/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
mod content_type;
mod forwarded_for;
mod mime;
mod remote;
mod request_meta;
mod signed_json;
mod start_time;

pub use self::content_type::*;
pub use self::forwarded_for::*;
pub use self::mime::*;
pub use self::remote::*;
pub use self::request_meta::*;
pub use self::signed_json::*;
pub use self::start_time::*;
71 changes: 71 additions & 0 deletions relay-server/src/extractors/remote.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
//! Extractors for types from other crates via [`Remote`].

use axum::extract::{FromRequest, Request};
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
use multer::Multipart;

use crate::service::ServiceState;
use crate::utils::{self, ApiErrorResponse};

/// A transparent wrapper around a remote type that implements [`FromRequest`] or [`IntoResponse`].
///
/// # Example
///
/// ```ignore
/// use std::convert::Infallible;
///
/// use axum::extract::{FromRequest, Request};
/// use axum::response::IntoResponse;
///
/// use crate::extractors::Remote;
///
/// // Derive `FromRequest` for `bool` for illustration purposes:
/// #[axum::async_trait]
/// impl<S> axum::extract::FromRequest<S> for Remote<bool> {
/// type Rejection = Remote<Infallible>;
///
/// async fn from_request(request: Request) -> Result<Self, Self::Rejection> {
/// Ok(Remote(true))
/// }
/// }
///
/// impl IntoResponse for Remote<Infallible> {
/// fn into_response(self) -> axum::response::Response {
/// match self.0 {}
/// }
/// }
/// ```
#[derive(Debug)]
pub struct Remote<T>(pub T);

impl<T> From<T> for Remote<T> {
fn from(inner: T) -> Self {
Self(inner)
}
}

#[axum::async_trait]
impl FromRequest<ServiceState> for Remote<Multipart<'static>> {
type Rejection = Remote<multer::Error>;

async fn from_request(request: Request, state: &ServiceState) -> Result<Self, Self::Rejection> {
utils::multipart_from_request(request, state.config())
.map(Remote)
.map_err(Remote)
}
}

impl IntoResponse for Remote<multer::Error> {
fn into_response(self) -> Response {
let Self(ref error) = self;

let status_code = match error {
multer::Error::FieldSizeExceeded { .. } => StatusCode::PAYLOAD_TOO_LARGE,
multer::Error::StreamSizeExceeded { .. } => StatusCode::PAYLOAD_TOO_LARGE,
_ => StatusCode::BAD_REQUEST,
};

(status_code, ApiErrorResponse::from_error(error)).into_response()
}
}
Loading
Loading