Skip to content

Commit

Permalink
ref(replays): remove chunking logic for large recordings (#2063)
Browse files Browse the repository at this point in the history
After merging #2032, we can
confirm via our metrics that we no longer hit this logic, as we reject
payloads over 10 MiB and set our chunking limit to 15 MiB.



_#skip-changelog_

Co-authored-by: Tor <tor.saebjoernsen@sentry.io>
  • Loading branch information
JoshFerge and TBS1996 authored Jun 28, 2023
1 parent 102dc21 commit d125d15
Showing 1 changed file with 1 addition and 89 deletions.
90 changes: 1 addition & 89 deletions relay-server/src/actors/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -664,10 +664,6 @@ impl StoreService {
start_time: Instant,
retention: u16,
) -> Result<(), StoreError> {
// Payloads must be chunked if they exceed a certain threshold. We do not chunk every
// message because we can achieve better parallelism when dealing with a single
// message.

// 2000 bytes are reserved for the message metadata.
let max_message_metadata_size = 2000;

Expand Down Expand Up @@ -697,90 +693,12 @@ impl StoreService {
event_type = "replay_recording_not_chunked"
);
} else {
// Produce chunks to the topic first. Ordering matters.
let replay_recording = self.produce_replay_recording_chunks(
event_id.ok_or(StoreError::NoEventId)?,
scoping.organization_id,
scoping.project_id,
item,
)?;

let message = KafkaMessage::ReplayRecording(ReplayRecordingKafkaMessage {
replay_id: event_id.ok_or(StoreError::NoEventId)?,
project_id: scoping.project_id,
key_id: scoping.key_id,
org_id: scoping.organization_id,
received: UnixTimestamp::from_instant(start_time).as_secs(),
retention_days: retention,
replay_recording,
});

self.produce(
KafkaTopic::ReplayRecordings,
scoping.organization_id,
message,
)?;

metric!(
counter(RelayCounters::ProcessingMessageProduced) += 1,
event_type = "replay_recording"
);
relay_log::warn!("replay_recording over maximum size.");
};

Ok(())
}

fn produce_replay_recording_chunks(
&self,
replay_id: EventId,
organization_id: u64,
project_id: ProjectId,
item: &Item,
) -> Result<ReplayRecordingChunkMeta, StoreError> {
let id = Uuid::new_v4().to_string();

let mut chunk_index = 0;
let mut offset = 0;
let payload = item.payload();
let size = item.len();

// This skips chunks for empty replay recordings. The consumer does not require chunks for
// empty replay recordings. `chunks` will be `0` in this case.
while offset < size {
// XXX: Max msesage size is 1MB. We reserve 2000 bytes for metadata and the rest is
// consumed by the blob.
let max_chunk_size = 1000 * 1000 - 2000;
let chunk_size = std::cmp::min(max_chunk_size, size - offset);

let replay_recording_chunk_message =
KafkaMessage::ReplayRecordingChunk(ReplayRecordingChunkKafkaMessage {
payload: payload.slice(offset..offset + chunk_size),
replay_id,
project_id,
id: id.clone(),
chunk_index,
});

self.produce(
KafkaTopic::ReplayRecordings,
organization_id,
replay_recording_chunk_message,
)?;

offset += chunk_size;
chunk_index += 1;
}

// The chunk_index is incremented after every loop iteration. After we exit the loop, it
// is one larger than the last chunk, so it is equal to the number of chunks.

Ok(ReplayRecordingChunkMeta {
id,
chunks: chunk_index,
size: Some(size),
})
}

fn produce_check_in(
&self,
organization_id: u64,
Expand Down Expand Up @@ -1079,8 +997,6 @@ enum KafkaMessage {
Profile(ProfileKafkaMessage),
ReplayEvent(ReplayEventKafkaMessage),
ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage),
ReplayRecording(ReplayRecordingKafkaMessage),
ReplayRecordingChunk(ReplayRecordingChunkKafkaMessage),
CheckIn(CheckInKafkaMessage),
}

Expand All @@ -1095,8 +1011,6 @@ impl Message for KafkaMessage {
KafkaMessage::Metric(_) => "metric",
KafkaMessage::Profile(_) => "profile",
KafkaMessage::ReplayEvent(_) => "replay_event",
KafkaMessage::ReplayRecording(_) => "replay_recording",
KafkaMessage::ReplayRecordingChunk(_) => "replay_recording_chunk",
KafkaMessage::ReplayRecordingNotChunked(_) => "replay_recording_not_chunked",
KafkaMessage::CheckIn(_) => "check_in",
}
Expand All @@ -1113,8 +1027,6 @@ impl Message for KafkaMessage {
Self::Metric(_message) => Uuid::nil(), // TODO(ja): Determine a partitioning key
Self::Profile(_message) => Uuid::nil(),
Self::ReplayEvent(message) => message.replay_id.0,
Self::ReplayRecording(message) => message.replay_id.0,
Self::ReplayRecordingChunk(message) => message.replay_id.0,
Self::ReplayRecordingNotChunked(_message) => Uuid::nil(), // Ensure random partitioning.
Self::CheckIn(_message) => Uuid::nil(),
};
Expand Down

0 comments on commit d125d15

Please sign in to comment.