Skip to content

Commit

Permalink
feat(output/kafka-sink):
Browse files Browse the repository at this point in the history
* fix lint
  • Loading branch information
00x-dx committed Oct 18, 2023
1 parent c8adee5 commit 6ffa2c9
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 7 deletions.
7 changes: 6 additions & 1 deletion pkg/output/kafka/client.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package kafka

import (
"github.com/IBM/sarama"
"strings"

"github.com/IBM/sarama"
)

type CompressionStrategy string
Expand Down Expand Up @@ -33,6 +34,7 @@ var (
func NewSyncProducer(config *Config) (sarama.SyncProducer, error) {
producerConfig := Init(config)
brokersList := strings.Split(config.Brokers, ",")

return sarama.NewSyncProducer(brokersList, producerConfig)
}
func Init(config *Config) *sarama.Config {
Expand All @@ -51,6 +53,7 @@ func Init(config *Config) *sarama.Config {
default:
c.Producer.RequiredAcks = sarama.WaitForLocal
}

switch config.Compression {
case CompressionStrategyLZ4:
c.Producer.Compression = sarama.CompressionLZ4
Expand All @@ -63,11 +66,13 @@ func Init(config *Config) *sarama.Config {
default:
c.Producer.Compression = sarama.CompressionNone
}

switch config.Partitioning {
case PartitionStrategyNone:
c.Producer.Partitioner = sarama.NewHashPartitioner
default:
c.Producer.Partitioner = sarama.NewRandomPartitioner
}

return c
}
2 changes: 1 addition & 1 deletion pkg/output/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ type Config struct {
}

func (c *Config) Validate() error {

if c.Brokers == "" {
return errors.New("brokers is required")
}

if c.Topic == "" {
return errors.New("topic is required")
}
Expand Down
21 changes: 16 additions & 5 deletions pkg/output/kafka/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kafka
import (
"context"
"errors"

"github.com/IBM/sarama"
"github.com/ethpandaops/xatu/pkg/observability"
"github.com/ethpandaops/xatu/pkg/proto/xatu"
Expand All @@ -20,7 +21,6 @@ type ItemExporter struct {
}

func NewItemExporter(name string, config *Config, log logrus.FieldLogger) (ItemExporter, error) {

producer, err := NewSyncProducer(config)

if err != nil {
Expand All @@ -29,6 +29,7 @@ func NewItemExporter(name string, config *Config, log logrus.FieldLogger) (ItemE
WithField("output_name", name).
WithField("output_type", SinkType).
Error("Error while creating the Kafka Client")

return ItemExporter{}, err
}

Expand Down Expand Up @@ -65,41 +66,51 @@ func (e ItemExporter) Shutdown(ctx context.Context) error {
func (e *ItemExporter) sendUpstream(ctx context.Context, items []*xatu.DecoratedEvent) error {
msgs := make([]*sarama.ProducerMessage, 0, len(items))
msgByteSize := 0

for _, p := range items {
r, err := protojson.Marshal(p)
if err != nil {
return err
}
routingKey := sarama.StringEncoder(p.Event.Id)
eventPayload := sarama.StringEncoder(r)

routingKey, eventPayload := sarama.StringEncoder(p.Event.Id), sarama.StringEncoder(r)
m := &sarama.ProducerMessage{
Topic: e.config.Topic,
Key: routingKey,
Value: eventPayload,
}

msgByteSize = m.ByteSize(2)
if msgByteSize > e.config.FlushBytes {
e.log.WithField("event_id", routingKey).WithField("msg_size", msgByteSize).Debug("Message too large, consider increasing `max_message_bytes`")

continue
}

msgs = append(msgs, m)
}
err := e.client.SendMessages(msgs)

errorCount := 0

err := e.client.SendMessages(msgs)
if err != nil {
var errs sarama.ProducerErrors
if errors.As(err, &errs) {
errorCount = len(errs)

for _, producerError := range errs {
e.log.
WithError(producerError.Err).
WithField("num_events", len(errs)).
WithField("events", errorCount).
Error("Failed to send events to Kafka")

return producerError
}
}

return err
}

e.log.WithField("count", len(msgs)-errorCount).Debug("Items written to Kafka")

return nil
Expand Down
1 change: 1 addition & 0 deletions pkg/output/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kafka
import (
"context"
"errors"

"github.com/ethpandaops/xatu/pkg/processor"
"github.com/ethpandaops/xatu/pkg/proto/xatu"
"github.com/sirupsen/logrus"
Expand Down

0 comments on commit 6ffa2c9

Please sign in to comment.