Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cloudevents-server): add dead letter topic support #168

Merged
merged 1 commit into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions cloudevents-server/configs/example-config-sqlite3.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,19 @@ tekton:
- event_type: dev.tekton.event.pipelinerun.failed.v1
event_subject_reg: ^xxx-from-.*
receivers: [failure-receiver]
kafka:
brokers:
- broker1:9092
client_id: cloudevents-server
# authentication:
# mechanism: SCRAM-SHA-256
# user: username
# password: password
producer:
topic_mapping: {}
default_topic: test-topic
consumer:
group_id: consumer-group-1
topic_mapping:
'*': test-topic
dead_letter_topic: test-topic-dead-letter
16 changes: 16 additions & 0 deletions cloudevents-server/configs/example-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,19 @@ tekton:
- event_type: dev.tekton.event.pipelinerun.failed.v1
event_subject_reg: ^xxx-from-.*
receivers: [failure-receiver]
kafka:
brokers:
- broker1:9092
client_id: cloudevents-server
# authentication:
# mechanism: SCRAM-SHA-256
# user: username
# password: password
producer:
topic_mapping: {}
default_topic: test-topic
consumer:
group_id: consumer-group-1
topic_mapping:
'*': test-topic
dead_letter_topic: test-topic-dead-letter
46 changes: 38 additions & 8 deletions cloudevents-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ import (
"context"
"flag"
"net/http"
"os"
"os/signal"
"syscall"
"time"

"github.com/gin-gonic/gin"
"github.com/rs/zerolog"
Expand Down Expand Up @@ -42,10 +46,10 @@ func main() {
}

gin.SetMode(ginMode)
r := gin.Default()
_ = r.SetTrustedProxies(nil)
ginEngine := gin.Default()
_ = ginEngine.SetTrustedProxies(nil)

setRouters(r, cfg)
setRouters(ginEngine, cfg)

hd, err := newCloudEventsHandler(cfg)
if err != nil {
Expand All @@ -57,13 +61,39 @@ func main() {
if err != nil {
log.Fatal().Err(err).Msg("failed to create consumer group")
}
defer cg.Close()
go cg.Start(context.Background())

log.Info().Str("address", serveAddr).Msg("server started.")
if err := http.ListenAndServe(serveAddr, r); err != nil {
log.Fatal().Err(err).Send()
srv := &http.Server{Addr: serveAddr, Handler: ginEngine}
startServices(srv, cg)
}

func startServices(srv *http.Server, cg handler.EventConsumerGroup) {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

go func() {
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatal().Err(err).Msg("server error")
}
}()
log.Info().Str("address", srv.Addr).Msg("server started.")
cgCtx, cgCancel := context.WithCancel(context.Background())
go cg.Start(cgCtx)

// Wait for interrupt signal to gracefully shutdown
sig := <-sigChan
log.Warn().Str("signal", sig.String()).Msg("signal received")

// shutdown consumer group.
cgCancel()

// shutdown http server.
shutdownSrvCtx, shutdownSrvCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer shutdownSrvCancel()
if err := srv.Shutdown(shutdownSrvCtx); err != nil {
log.Error().Err(err).Msg("server shutdown error")
}

log.Info().Msg("server gracefully stopped")
}

func setRouters(r gin.IRoutes, cfg *config.Config) {
Expand Down
30 changes: 15 additions & 15 deletions cloudevents-server/pkg/events/handler/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@ package handler

import (
"context"
"os"
"os/signal"
"sync"
"syscall"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/rs/zerolog/log"
Expand All @@ -28,25 +25,32 @@ func NewEventProducer(cfg config.Kafka) (*EventProducer, error) {
}, nil
}

func NewEventConsumer(cfg config.Kafka, topic string, hander EventHandler) (*EventConsumer, error) {
func NewEventConsumer(cfg config.Kafka, topic string, hander EventHandler, faultWriter *kafka.Writer) (*EventConsumer, error) {
reader, err := skakfa.NewReader(cfg.Authentication, cfg.Brokers, topic, cfg.Consumer.GroupID, cfg.ClientID)
if err != nil {
return nil, err
}

return &EventConsumer{
reader: reader,
handler: hander,
reader: reader,
handler: hander,
writer: faultWriter,
faultTopic: cfg.Consumer.DeadLetterTopic,
}, nil
}

func NewEventConsumerGroup(cfg config.Kafka, hander EventHandler) (EventConsumerGroup, error) {
faultWriter, err := skakfa.NewWriter(cfg.Authentication, cfg.Brokers, "", cfg.ClientID)
if err != nil {
return nil, err
}

consumerGroup := make(EventConsumerGroup)
for _, topic := range cfg.Consumer.TopicMapping {
if consumerGroup[topic] != nil {
continue
}
consumer, err := NewEventConsumer(cfg, topic, hander)
consumer, err := NewEventConsumer(cfg, topic, hander, faultWriter)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -106,6 +110,9 @@ func (ecs EventConsumerGroup) Close() {
}
}

// Start runs the EventConsumerGroup in parallel, starting each EventConsumer
// in a separate goroutine. It waits for all EventConsumers to finish before
// returning.
func (ecs EventConsumerGroup) Start(ctx context.Context) {
wg := new(sync.WaitGroup)
for _, ec := range ecs {
Expand All @@ -129,19 +136,12 @@ type EventConsumer struct {

// consumer workers
func (ec *EventConsumer) Start(ctx context.Context) error {
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)

defer ec.Close()

for {
select {
case <-ctx.Done():
return ctx.Err()
case <-sigterm:
// When SIGTERM received, try to flush remaining messages
// and exit gracefully
return nil
default:
m, err := ec.reader.ReadMessage(ctx)
if err != nil {
Expand All @@ -157,7 +157,7 @@ func (ec *EventConsumer) Start(ctx context.Context) error {
result := ec.handler.Handle(event)
if !cloudevents.IsACK(result) {
log.Error().Err(err).Msg("error handling event")
// ec.writer.WriteMessages(ctx, kafka.Message{Topic: ec.faultTopic, Key: m.Key, Value: m.Value})
ec.writer.WriteMessages(ctx, kafka.Message{Topic: ec.faultTopic, Key: m.Key, Value: m.Value})
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions cloudevents-server/pkg/kafka/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ type Producer struct {
}

type Consumer struct {
GroupID string `yaml:"group_id,omitempty" json:"group_id,omitempty"`
TopicMapping map[string]string `yaml:"topic_mapping,omitempty" json:"topic_mapping,omitempty"` // event type to topic.
GroupID string `yaml:"group_id,omitempty" json:"group_id,omitempty"`
TopicMapping map[string]string `yaml:"topic_mapping,omitempty" json:"topic_mapping,omitempty"` // event type to topic.
DeadLetterTopic string `yaml:"dead_letter_topic,omitempty" json:"dead_letter_topic,omitempty"`
}