diff --git a/pkg/output/kafka/client.go b/pkg/output/kafka/client.go index 93907e61..76b1e737 100644 --- a/pkg/output/kafka/client.go +++ b/pkg/output/kafka/client.go @@ -1,8 +1,9 @@ package kafka import ( - "github.com/IBM/sarama" "strings" + + "github.com/IBM/sarama" ) type CompressionStrategy string @@ -33,6 +34,7 @@ var ( func NewSyncProducer(config *Config) (sarama.SyncProducer, error) { producerConfig := Init(config) brokersList := strings.Split(config.Brokers, ",") + return sarama.NewSyncProducer(brokersList, producerConfig) } func Init(config *Config) *sarama.Config { @@ -51,6 +53,7 @@ func Init(config *Config) *sarama.Config { default: c.Producer.RequiredAcks = sarama.WaitForLocal } + switch config.Compression { case CompressionStrategyLZ4: c.Producer.Compression = sarama.CompressionLZ4 @@ -63,11 +66,13 @@ func Init(config *Config) *sarama.Config { default: c.Producer.Compression = sarama.CompressionNone } + switch config.Partitioning { case PartitionStrategyNone: c.Producer.Partitioner = sarama.NewHashPartitioner default: c.Producer.Partitioner = sarama.NewRandomPartitioner } + return c } diff --git a/pkg/output/kafka/config.go b/pkg/output/kafka/config.go index 0b0d7305..b8d14431 100644 --- a/pkg/output/kafka/config.go +++ b/pkg/output/kafka/config.go @@ -20,10 +20,10 @@ type Config struct { } func (c *Config) Validate() error { - if c.Brokers == "" { return errors.New("brokers is required") } + if c.Topic == "" { return errors.New("topic is required") } diff --git a/pkg/output/kafka/exporter.go b/pkg/output/kafka/exporter.go index c35324e1..0ea93d3d 100644 --- a/pkg/output/kafka/exporter.go +++ b/pkg/output/kafka/exporter.go @@ -3,6 +3,7 @@ package kafka import ( "context" "errors" + "github.com/IBM/sarama" "github.com/ethpandaops/xatu/pkg/observability" "github.com/ethpandaops/xatu/pkg/proto/xatu" @@ -20,7 +21,6 @@ type ItemExporter struct { } func NewItemExporter(name string, config *Config, log logrus.FieldLogger) (ItemExporter, error) { - producer, err := NewSyncProducer(config) if err != nil { @@ -29,6 +29,7 @@ func NewItemExporter(name string, config *Config, log logrus.FieldLogger) (ItemE WithField("output_name", name). WithField("output_type", SinkType). Error("Error while creating the Kafka Client") + return ItemExporter{}, err } @@ -65,41 +66,51 @@ func (e ItemExporter) Shutdown(ctx context.Context) error { func (e *ItemExporter) sendUpstream(ctx context.Context, items []*xatu.DecoratedEvent) error { msgs := make([]*sarama.ProducerMessage, 0, len(items)) msgByteSize := 0 + for _, p := range items { r, err := protojson.Marshal(p) if err != nil { return err } - routingKey := sarama.StringEncoder(p.Event.Id) - eventPayload := sarama.StringEncoder(r) + + routingKey, eventPayload := sarama.StringEncoder(p.Event.Id), sarama.StringEncoder(r) m := &sarama.ProducerMessage{ Topic: e.config.Topic, Key: routingKey, Value: eventPayload, } + msgByteSize = m.ByteSize(2) if msgByteSize > e.config.FlushBytes { e.log.WithField("event_id", routingKey).WithField("msg_size", msgByteSize).Debug("Message too large, consider increasing `max_message_bytes`") + continue } + msgs = append(msgs, m) } - err := e.client.SendMessages(msgs) + errorCount := 0 + + err := e.client.SendMessages(msgs) if err != nil { var errs sarama.ProducerErrors if errors.As(err, &errs) { errorCount = len(errs) + for _, producerError := range errs { e.log. WithError(producerError.Err). - WithField("num_events", len(errs)). + WithField("events", errorCount). Error("Failed to send events to Kafka") + return producerError } } + return err } + e.log.WithField("count", len(msgs)-errorCount).Debug("Items written to Kafka") return nil diff --git a/pkg/output/kafka/kafka.go b/pkg/output/kafka/kafka.go index 673ab1ee..7db4becb 100644 --- a/pkg/output/kafka/kafka.go +++ b/pkg/output/kafka/kafka.go @@ -3,6 +3,7 @@ package kafka import ( "context" "errors" + "github.com/ethpandaops/xatu/pkg/processor" "github.com/ethpandaops/xatu/pkg/proto/xatu" "github.com/sirupsen/logrus"