diff --git a/publisher/cmd/worker/main.go b/publisher/cmd/worker/main.go index 966cc47..7a897a1 100644 --- a/publisher/cmd/worker/main.go +++ b/publisher/cmd/worker/main.go @@ -11,6 +11,7 @@ import ( "syscall" "github.com/cloudevents/sdk-go/v2/event" + "github.com/go-redis/redis/v8" "github.com/rs/zerolog" "github.com/rs/zerolog/log" "github.com/segmentio/kafka-go" @@ -48,12 +49,20 @@ func main() { } } + // Configure Redis client + redisClient := redis.NewClient(&redis.Options{ + Addr: config.Redis.Addr, + Password: config.Redis.Password, + Username: config.Redis.Username, + DB: config.Redis.DB, + }) + ctx := log.Logger.WithContext(context.Background()) // Create TiUP publisher var handler *tiup.Publisher { var err error - handler, err = tiup.NewPublisher(config.MirrorUrl, config.LarkWebhookURL, &log.Logger) + handler, err = tiup.NewPublisher(config.MirrorUrl, config.LarkWebhookURL, &log.Logger, redisClient) if err != nil { log.Fatal().Err(err).Msg("Error creating handler") } diff --git a/publisher/pkg/config/config.go b/publisher/pkg/config/config.go index 5dc964d..b9c5b65 100644 --- a/publisher/pkg/config/config.go +++ b/publisher/pkg/config/config.go @@ -5,19 +5,22 @@ type Worker struct { KafkaBasic `yaml:",inline" json:",inline"` ConsumerGroup string `yaml:"consumer_group" json:"consumer_group,omitempty"` } `yaml:"kafka" json:"kafka,omitempty"` + Redis Redis `yaml:"redis" json:"redis,omitempty"` MirrorUrl string `yaml:"mirror_url" json:"mirror_url,omitempty"` LarkWebhookURL string `yaml:"lark_webhook_url" json:"lark_webhook_url,omitempty"` } type Service struct { - Kafka KafkaBasic `yaml:"kafka" json:"kafka,omitempty"` - Redis struct { - Addr string `yaml:"addr" json:"addr,omitempty"` - DB int `yaml:"db" json:"db,omitempty"` - Username string `yaml:"username" json:"username,omitempty"` - Password string `yaml:"password" json:"password,omitempty"` - } `yaml:"redis" json:"redis,omitempty"` - EventSource string `yaml:"event_source" json:"event_source,omitempty"` + Kafka KafkaBasic `yaml:"kafka" json:"kafka,omitempty"` + Redis Redis `yaml:"redis" json:"redis,omitempty"` + EventSource string `yaml:"event_source" json:"event_source,omitempty"` +} + +type Redis struct { + Addr string `yaml:"addr" json:"addr,omitempty"` + DB int `yaml:"db" json:"db,omitempty"` + Username string `yaml:"username" json:"username,omitempty"` + Password string `yaml:"password" json:"password,omitempty"` } type KafkaBasic struct { diff --git a/publisher/pkg/impl/tiup/publisher.go b/publisher/pkg/impl/tiup/publisher.go index 4f247b7..b0ff51b 100644 --- a/publisher/pkg/impl/tiup/publisher.go +++ b/publisher/pkg/impl/tiup/publisher.go @@ -19,11 +19,11 @@ type Publisher struct { mirrorURL string larkWebhookURL string logger zerolog.Logger - redisClient *redis.Client + redisClient redis.Cmdable } -func NewPublisher(mirrorURL, larkWebhookURL string, logger *zerolog.Logger) (*Publisher, error) { - handler := Publisher{mirrorURL: mirrorURL, larkWebhookURL: larkWebhookURL} +func NewPublisher(mirrorURL, larkWebhookURL string, logger *zerolog.Logger, redisClient redis.Cmdable) (*Publisher, error) { + handler := Publisher{mirrorURL: mirrorURL, larkWebhookURL: larkWebhookURL, redisClient: redisClient} if logger == nil { handler.logger = zerolog.New(os.Stderr).With().Timestamp().Logger() } else { @@ -42,7 +42,7 @@ func (p *Publisher) Handle(event cloudevents.Event) cloudevents.Result { if !slices.Contains(p.SupportEventTypes(), event.Type()) { return cloudevents.ResultNACK } - p.redisClient.Set(context.Background(), event.ID(), PublishStateProcessing, redis.KeepTTL) + p.redisClient.SetXX(context.Background(), event.ID(), PublishStateProcessing, redis.KeepTTL) data := new(PublishRequest) if err := event.DataAs(&data); err != nil { @@ -52,9 +52,9 @@ func (p *Publisher) Handle(event cloudevents.Event) cloudevents.Result { result := p.handleImpl(data) switch { case cloudevents.IsACK(result): - p.redisClient.Set(context.Background(), event.ID(), PublishStateSuccess, redis.KeepTTL) + p.redisClient.SetXX(context.Background(), event.ID(), PublishStateSuccess, redis.KeepTTL) default: - p.redisClient.Set(context.Background(), event.ID(), PublishStateFailed, redis.KeepTTL) + p.redisClient.SetXX(context.Background(), event.ID(), PublishStateFailed, redis.KeepTTL) p.notifyLark(&data.Publish, result) } diff --git a/publisher/pkg/impl/tiup/service.go b/publisher/pkg/impl/tiup/service.go index 3852385..02a1f09 100644 --- a/publisher/pkg/impl/tiup/service.go +++ b/publisher/pkg/impl/tiup/service.go @@ -19,13 +19,13 @@ import ( type tiupsrvc struct { logger *zerolog.Logger kafkaWriter *kafka.Writer - redisClient *redis.Client + redisClient redis.Cmdable eventSource string stateTTL time.Duration } // NewTiup returns the tiup service implementation. -func NewTiup(logger *zerolog.Logger, kafkaWriter *kafka.Writer, redisClient *redis.Client, eventSrc string) gentiup.Service { +func NewTiup(logger *zerolog.Logger, kafkaWriter *kafka.Writer, redisClient redis.Cmdable, eventSrc string) gentiup.Service { return &tiupsrvc{ logger: logger, kafkaWriter: kafkaWriter, @@ -74,7 +74,7 @@ func (s *tiupsrvc) RequestToPublish(ctx context.Context, p *gentiup.RequestToPub // 5. Init the request dealing status in redis with the request id. for _, requestID := range requestIDs { - if err := s.redisClient.SetXX(ctx, requestID, PublishStateQueued, s.stateTTL).Err(); err != nil { + if err := s.redisClient.SetNX(ctx, requestID, PublishStateQueued, s.stateTTL).Err(); err != nil { return nil, fmt.Errorf("failed to set initial status in Redis: %v", err) } }