Skip to content

Commit

Permalink
feat(rust-consumer): Write to ClickHouse (#4490)
Browse files Browse the repository at this point in the history
feat(rust-consumer): Add basic clickhouse strategy

Same idea as getsentry/snuba#4437 with a few differences:
- Does not switch the arroyo interface to async
- Does not write to ClickHouse by default (only if --no-skip-write is passed)
- Batches inserted rows into fewer writes with multiple rows
  • Loading branch information
lynnagara authored Jul 12, 2023
1 parent 3e3a325 commit 7a7c349
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 28 deletions.
7 changes: 2 additions & 5 deletions examples/transform_and_produce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl ProcessingStrategy<KafkaPayload> for Noop {
fn poll(&mut self) -> Option<CommitRequest> {
None
}
fn submit(&mut self, _message: Message<KafkaPayload>) -> Result<(), MessageRejected> {
fn submit(&mut self, _message: Message<KafkaPayload>) -> Result<(), MessageRejected<KafkaPayload>> {
Ok(())
}
fn close(&mut self) {}
Expand All @@ -56,10 +56,7 @@ async fn main() {
fn create(&self) -> Box<dyn ProcessingStrategy<KafkaPayload>> {
let producer = KafkaProducer::new(self.config.clone());
let topic = TopicOrPartition::Topic(self.topic.clone());
let reverse_string_and_produce_strategy = Transform {
function: reverse_string,
next_step: Box::new(Produce::new(producer, Box::new(Noop {}), topic)),
};
let reverse_string_and_produce_strategy = Transform::new(reverse_string, Produce::new(producer, Box::new(Noop {}), topic));
Box::new(reverse_string_and_produce_strategy)
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/processing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ mod tests {
}
}

fn submit(&mut self, message: Message<String>) -> Result<(), MessageRejected> {
fn submit(&mut self, message: Message<String>) -> Result<(), MessageRejected<String>> {
self.message = Some(message);
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion src/processing/strategies/commit_offsets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ impl<T: Clone> ProcessingStrategy<T> for CommitOffsets {
self.commit(false)
}

fn submit(&mut self, message: Message<T>) -> Result<(), MessageRejected> {
fn submit(&mut self, message: Message<T>) -> Result<(), MessageRejected<T>> {
for (partition, offset) in message.committable() {
self.partitions.insert(partition, offset);
}
Expand Down
6 changes: 4 additions & 2 deletions src/processing/strategies/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ pub mod reduce;
pub mod transform;

#[derive(Debug, Clone)]
pub struct MessageRejected;
pub struct MessageRejected<T: Clone> {
pub message: Message<T>,
}

#[derive(Debug, Clone)]
pub struct InvalidMessage;
Expand Down Expand Up @@ -49,7 +51,7 @@ pub trait ProcessingStrategy<TPayload: Clone>: Send + Sync {
/// If the processing strategy is unable to accept a message (due to it
/// being at or over capacity, for example), this method will raise a
/// ``MessageRejected`` exception.
fn submit(&mut self, message: Message<TPayload>) -> Result<(), MessageRejected>;
fn submit(&mut self, message: Message<TPayload>) -> Result<(), MessageRejected<TPayload>>;

/// Close this instance. No more messages should be accepted by the
/// instance after this method has been called.
Expand Down
6 changes: 3 additions & 3 deletions src/processing/strategies/produce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,12 @@ impl ProcessingStrategy<KafkaPayload> for Produce<KafkaPayload> {
None
}

fn submit(&mut self, message: Message<KafkaPayload>) -> Result<(), MessageRejected> {
fn submit(&mut self, message: Message<KafkaPayload>) -> Result<(), MessageRejected<KafkaPayload>> {
if self.closed {
panic!("Attempted to submit a message to a closed Produce strategy")
}
if self.queue.len() >= self.max_queue_size {
return Err(MessageRejected);
return Err(MessageRejected {message});
}

let produce_fut = ProduceFuture {
Expand Down Expand Up @@ -168,7 +168,7 @@ mod tests {
fn poll(&mut self) -> Option<CommitRequest> {
None
}
fn submit(&mut self, _message: Message<KafkaPayload>) -> Result<(), MessageRejected> {
fn submit(&mut self, _message: Message<KafkaPayload>) -> Result<(), MessageRejected<KafkaPayload>> {
Ok(())
}
fn close(&mut self) {}
Expand Down
8 changes: 4 additions & 4 deletions src/processing/strategies/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ impl<T: Clone + Send + Sync, TResult: Clone + Send + Sync> ProcessingStrategy<T>
self.next_step.poll()
}

fn submit(&mut self, message: Message<T>) -> Result<(), MessageRejected> {
fn submit(&mut self, message: Message<T>) -> Result<(), MessageRejected<T>> {
if self.batch_state.is_complete {
return Err(MessageRejected);
return Err(MessageRejected{ message });
}
self.batch_state.add(message);

Expand Down Expand Up @@ -113,7 +113,7 @@ impl <T: Clone + Send + Sync, TResult: Clone + Send + Sync>Reduce<T, TResult> {
Ok(_) => {
self.batch_state = BatchState::new(self.initial_value.clone(), self.accumulator.clone());
}
Err(MessageRejected) => {
Err(MessageRejected{..}) => {
// The batch is marked is_complete, and we stop accepting
// messages until the batch can be sucessfully submitted to the next step.
self.batch_state.is_complete = true;
Expand Down Expand Up @@ -146,7 +146,7 @@ mod tests {
None
}

fn submit(&mut self, message: Message<T>) -> Result<(), MessageRejected> {
fn submit(&mut self, message: Message<T>) -> Result<(), MessageRejected<T>> {
self.submitted.lock().unwrap().push(message.payload());
Ok(())
}
Expand Down
43 changes: 36 additions & 7 deletions src/processing/strategies/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,52 @@ use std::time::Duration;
pub struct Transform<TPayload: Clone + Send + Sync, TTransformed: Clone + Send + Sync> {
pub function: fn(TPayload) -> Result<TTransformed, InvalidMessage>,
pub next_step: Box<dyn ProcessingStrategy<TTransformed>>,
pub message_carried_over: Option<Message<TTransformed>>,
}

impl<TPayload: Clone + Send + Sync, TTransformed: Clone + Send + Sync> Transform<TPayload, TTransformed> {
pub fn new<N>(
function: fn(TPayload) -> Result<TTransformed, InvalidMessage>,
next_step: N,
) -> Self
where
N: ProcessingStrategy<TTransformed> + 'static,
{
Self {
function,
next_step: Box::new(next_step),
message_carried_over: None,
}
}
}

impl<TPayload: Clone + Send + Sync, TTransformed: Clone + Send + Sync> ProcessingStrategy<TPayload>
for Transform<TPayload, TTransformed>
{
fn poll(&mut self) -> Option<CommitRequest> {
if let Some(message) = self.message_carried_over.take() {
if let Err(MessageRejected{message: transformed_message}) = self.next_step.submit(message) {
self.message_carried_over = Some(transformed_message);
}
}

self.next_step.poll()
}

fn submit(&mut self, message: Message<TPayload>) -> Result<(), MessageRejected> {
fn submit(&mut self, message: Message<TPayload>) -> Result<(), MessageRejected<TPayload>> {
if self.message_carried_over.is_some() {
return Err(MessageRejected {
message,
});
}

// TODO: Handle InvalidMessage
let transformed = (self.function)(message.payload()).unwrap();

self.next_step.submit(message.replace(transformed))
if let Err(MessageRejected{message: transformed_message}) = self.next_step.submit(message.replace(transformed)) {
self.message_carried_over = Some(transformed_message);
}
Ok(())
}

fn close(&mut self) {
Expand Down Expand Up @@ -57,7 +89,7 @@ mod tests {
fn poll(&mut self) -> Option<CommitRequest> {
None
}
fn submit(&mut self, _message: Message<String>) -> Result<(), MessageRejected> {
fn submit(&mut self, _message: Message<String>) -> Result<(), MessageRejected<String>> {
Ok(())
}
fn close(&mut self) {}
Expand All @@ -67,10 +99,7 @@ mod tests {
}
}

let mut strategy = Transform {
function: identity,
next_step: Box::new(Noop {}),
};
let mut strategy = Transform::new(identity, Noop {});

let partition = Partition {
topic: Topic {
Expand Down
11 changes: 6 additions & 5 deletions src/utils/clickhouse_client.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use reqwest::header::{HeaderMap, HeaderValue, ACCEPT_ENCODING, CONNECTION};
use reqwest::{Client, Error, Response};

#[derive(Clone)]
pub struct ClickhouseClient {
client: Client,
url: String,
headers: HeaderMap<HeaderValue>,
table: String,
}
impl ClickhouseClient {
pub fn new(hostname: &str, http_port: u16, table: &str) -> ClickhouseClient {
pub fn new(hostname: &str, http_port: u16, table: &str, database: &str) -> ClickhouseClient {
let mut client = ClickhouseClient {
client: Client::new(),
url: format!("http://{}:{}", hostname, http_port),
Expand All @@ -24,11 +25,11 @@ impl ClickhouseClient {
.insert(ACCEPT_ENCODING, HeaderValue::from_static("gzip,deflate"));
client
.headers
.insert("X-ClickHouse-Database", HeaderValue::from_static("default"));
.insert("X-ClickHouse-Database", HeaderValue::from_str(database).unwrap());
client
}

pub async fn send(&self, body: String) -> Result<Response, Error> {
pub async fn send(&self, body: Vec<u8>) -> Result<Response, Error> {
self.client
.post(self.url.clone())
.headers(self.headers.clone())
Expand All @@ -47,10 +48,10 @@ mod tests {
use super::*;
#[tokio::test]
async fn it_works() -> Result<(), reqwest::Error> {
let client: ClickhouseClient = ClickhouseClient::new("localhost", 8123, "querylog_local");
let client: ClickhouseClient = ClickhouseClient::new("localhost", 8123, "querylog_local", "default");

println!("{}", "running test");
let res = client.send("[]".to_string()).await;
let res = client.send(b"[]".to_vec()).await;
println!("Response status {}", res.unwrap().status());
Ok(())
}
Expand Down

0 comments on commit 7a7c349

Please sign in to comment.