From 0ac0a755fc742e991cf48f08b8bfd667c65b77e9 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Thu, 5 Oct 2023 15:58:26 -0700 Subject: [PATCH] ref: Remove duplicate file (#4822) Inadvertent copy of https://github.com/getsentry/snuba/blob/76e1c664bf8b84f6f7a3f361a413333417113b74/rust_snuba/src/strategies/commit_log.rs in the wrong place --- .../src/processing/strategies/commit_log.rs | 104 ------------------ 1 file changed, 104 deletions(-) delete mode 100644 rust_snuba/rust_arroyo/src/processing/strategies/commit_log.rs diff --git a/rust_snuba/rust_arroyo/src/processing/strategies/commit_log.rs b/rust_snuba/rust_arroyo/src/processing/strategies/commit_log.rs deleted file mode 100644 index ba71aad72a..0000000000 --- a/rust_snuba/rust_arroyo/src/processing/strategies/commit_log.rs +++ /dev/null @@ -1,104 +0,0 @@ -use crate::backends::kafka::types::KafkaPayload; -use serde::{Deserialize, Serialize}; -use std::str; -use thiserror::Error; - -#[derive(Debug)] -struct Commit { - topic: String, - partition: u16, - consumer_group: String, - orig_message_ts: f64, - offset: u64, -} - -#[derive(Debug, Deserialize, Serialize)] -struct Payload { - offset: u64, - orig_message_ts: f64, -} - -#[derive(Error, Debug)] -enum CommitLogError { - #[error("json error")] - JsonError(#[from] serde_json::Error), - #[error("invalid message key")] - InvalidKey, - #[error("invalid message payload")] - InvalidPayload, -} - -impl TryFrom for Commit { - type Error = CommitLogError; - - fn try_from(payload: KafkaPayload) -> Result { - let key = payload.key.unwrap(); - - let data: Vec<&str> = str::from_utf8(&key).unwrap().split(':').collect(); - if data.len() != 3 { - return Err(CommitLogError::InvalidKey); - } - - let topic = data[0].to_string(); - let partition = data[1].parse::().unwrap(); - let consumer_group = data[2].to_string(); - - let d: Payload = - serde_json::from_slice(&payload.payload.ok_or(CommitLogError::InvalidPayload)?)?; - - Ok(Commit { - topic, - partition, - consumer_group, - orig_message_ts: d.orig_message_ts, - offset: d.offset, - }) - } -} - -impl TryFrom for KafkaPayload { - type Error = CommitLogError; - - fn try_from(commit: Commit) -> Result { - let key = Some( - format!( - "{}:{}:{}", - commit.topic, commit.partition, commit.consumer_group - ) - .into_bytes(), - ); - - let payload = Some(serde_json::to_vec(&Payload { - offset: commit.offset, - orig_message_ts: commit.orig_message_ts, - })?); - - Ok(KafkaPayload { - key, - headers: None, - payload, - }) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn commit() { - let payload = KafkaPayload { - key: Some(b"topic:0:group1".to_vec()), - headers: None, - payload: Some(b"{\"offset\":5,\"orig_message_ts\":1696381946.0}".to_vec()), - }; - - let payload_clone = payload.clone(); - - let commit: Commit = payload.try_into().unwrap(); - assert_eq!(commit.partition, 0); - let transformed: KafkaPayload = commit.try_into().unwrap(); - assert_eq!(transformed.key, payload_clone.key); - assert_eq!(transformed.payload, payload_clone.payload); - } -}