From 6b8b7c1ac7fa3bd9026e3d25d35717a318c7f227 Mon Sep 17 00:00:00 2001 From: Micah Wylde Date: Mon, 10 Jun 2024 17:06:07 -0700 Subject: [PATCH] Add table for kafka exactly-once state --- crates/arroyo-connectors/src/kafka/sink/mod.rs | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/crates/arroyo-connectors/src/kafka/sink/mod.rs b/crates/arroyo-connectors/src/kafka/sink/mod.rs index cfe95d9a7..20d3c568b 100644 --- a/crates/arroyo-connectors/src/kafka/sink/mod.rs +++ b/crates/arroyo-connectors/src/kafka/sink/mod.rs @@ -1,6 +1,6 @@ use anyhow::Result; -use arroyo_rpc::grpc::TableConfig; +use arroyo_rpc::grpc::{GlobalKeyedTableConfig, TableConfig, TableEnum}; use arroyo_rpc::{CheckpointEvent, ControlMessage, ControlResp}; use arroyo_types::*; use std::collections::HashMap; @@ -18,6 +18,7 @@ use arroyo_operator::context::ArrowContext; use arroyo_operator::operator::ArrowOperator; use arroyo_types::CheckpointBarrier; use async_trait::async_trait; +use prost::Message; use rdkafka::error::{KafkaError, RDKafkaErrorCode}; use std::time::{Duration, SystemTime}; @@ -157,7 +158,18 @@ impl ArrowOperator for KafkaSinkFunc { fn tables(&self) -> HashMap { if self.is_committing() { - todo!("implement committing state") + single_item_hash_map( + "i".to_string(), + TableConfig { + table_type: TableEnum::GlobalKeyValue.into(), + config: GlobalKeyedTableConfig { + table_name: "i".to_string(), + description: "index for transactional ids".to_string(), + uses_two_phase_commit: true, + } + .encode_to_vec(), + }, + ) } else { HashMap::new() }