From a7470b636c2eed39f03783dff02bf54a65271423 Mon Sep 17 00:00:00 2001 From: wuhuizuo Date: Tue, 22 Oct 2024 21:12:26 +0800 Subject: [PATCH] fix(publisher): fix redis command usage (#184) This pull request introduces Redis integration into the `publisher` service, refactoring the code to accommodate the new Redis client and updating the handling of Redis commands. The most important changes include adding the Redis client configuration, modifying the `Publisher` and `Service` structs, and updating Redis command methods. ### Redis Integration: * [`publisher/cmd/worker/main.go`](diffhunk://#diff-0c11d49a84dbe2803f3a531ba845598d4d2c9f4be0b64f1f2ff347d8e9afcb45R14): Added Redis client configuration and passed the Redis client to the `tiup.NewPublisher` function. [[1]](diffhunk://#diff-0c11d49a84dbe2803f3a531ba845598d4d2c9f4be0b64f1f2ff347d8e9afcb45R14) [[2]](diffhunk://#diff-0c11d49a84dbe2803f3a531ba845598d4d2c9f4be0b64f1f2ff347d8e9afcb45R52-R65) ### Struct Modifications: * [`publisher/pkg/config/config.go`](diffhunk://#diff-41ecac7d460be2b3759c4d88b60e7177542db7888ba0c8254f8eab123a17b64bR8-L20): Added a `Redis` struct to hold Redis configuration and updated the `Worker` and `Service` structs to include this new `Redis` field. ### Redis Command Updates: * [`publisher/pkg/impl/tiup/publisher.go`](diffhunk://#diff-620e3293cf87ba54e2ed9269601e334343caa82c36ecd1371d17fa71ce8393c5L22-R26): Changed the `redisClient` type from `*redis.Client` to `redis.Cmdable` and updated Redis command methods to use `SetXX` instead of `Set`. [[1]](diffhunk://#diff-620e3293cf87ba54e2ed9269601e334343caa82c36ecd1371d17fa71ce8393c5L22-R26) [[2]](diffhunk://#diff-620e3293cf87ba54e2ed9269601e334343caa82c36ecd1371d17fa71ce8393c5L45-R45) [[3]](diffhunk://#diff-620e3293cf87ba54e2ed9269601e334343caa82c36ecd1371d17fa71ce8393c5L55-R57) * [`publisher/pkg/impl/tiup/service.go`](diffhunk://#diff-df9108e5a06158c0822dfd658fdfcbf5fa1ca7bcde2884ecaa32a0d79bf515faL22-R28): Updated the `redisClient` type and changed the Redis command method from `SetXX` to `SetNX` for initial status setting. [[1]](diffhunk://#diff-df9108e5a06158c0822dfd658fdfcbf5fa1ca7bcde2884ecaa32a0d79bf515faL22-R28) [[2]](diffhunk://#diff-df9108e5a06158c0822dfd658fdfcbf5fa1ca7bcde2884ecaa32a0d79bf515faL77-R77) Signed-off-by: wuhuizuo --------- Signed-off-by: wuhuizuo --- publisher/cmd/worker/main.go | 11 ++++++++++- publisher/pkg/config/config.go | 19 +++++++++++-------- publisher/pkg/impl/tiup/publisher.go | 12 ++++++------ publisher/pkg/impl/tiup/service.go | 6 +++--- 4 files changed, 30 insertions(+), 18 deletions(-) diff --git a/publisher/cmd/worker/main.go b/publisher/cmd/worker/main.go index 966cc47..7a897a1 100644 --- a/publisher/cmd/worker/main.go +++ b/publisher/cmd/worker/main.go @@ -11,6 +11,7 @@ import ( "syscall" "github.com/cloudevents/sdk-go/v2/event" + "github.com/go-redis/redis/v8" "github.com/rs/zerolog" "github.com/rs/zerolog/log" "github.com/segmentio/kafka-go" @@ -48,12 +49,20 @@ func main() { } } + // Configure Redis client + redisClient := redis.NewClient(&redis.Options{ + Addr: config.Redis.Addr, + Password: config.Redis.Password, + Username: config.Redis.Username, + DB: config.Redis.DB, + }) + ctx := log.Logger.WithContext(context.Background()) // Create TiUP publisher var handler *tiup.Publisher { var err error - handler, err = tiup.NewPublisher(config.MirrorUrl, config.LarkWebhookURL, &log.Logger) + handler, err = tiup.NewPublisher(config.MirrorUrl, config.LarkWebhookURL, &log.Logger, redisClient) if err != nil { log.Fatal().Err(err).Msg("Error creating handler") } diff --git a/publisher/pkg/config/config.go b/publisher/pkg/config/config.go index 5dc964d..b9c5b65 100644 --- a/publisher/pkg/config/config.go +++ b/publisher/pkg/config/config.go @@ -5,19 +5,22 @@ type Worker struct { KafkaBasic `yaml:",inline" json:",inline"` ConsumerGroup string `yaml:"consumer_group" json:"consumer_group,omitempty"` } `yaml:"kafka" json:"kafka,omitempty"` + Redis Redis `yaml:"redis" json:"redis,omitempty"` MirrorUrl string `yaml:"mirror_url" json:"mirror_url,omitempty"` LarkWebhookURL string `yaml:"lark_webhook_url" json:"lark_webhook_url,omitempty"` } type Service struct { - Kafka KafkaBasic `yaml:"kafka" json:"kafka,omitempty"` - Redis struct { - Addr string `yaml:"addr" json:"addr,omitempty"` - DB int `yaml:"db" json:"db,omitempty"` - Username string `yaml:"username" json:"username,omitempty"` - Password string `yaml:"password" json:"password,omitempty"` - } `yaml:"redis" json:"redis,omitempty"` - EventSource string `yaml:"event_source" json:"event_source,omitempty"` + Kafka KafkaBasic `yaml:"kafka" json:"kafka,omitempty"` + Redis Redis `yaml:"redis" json:"redis,omitempty"` + EventSource string `yaml:"event_source" json:"event_source,omitempty"` +} + +type Redis struct { + Addr string `yaml:"addr" json:"addr,omitempty"` + DB int `yaml:"db" json:"db,omitempty"` + Username string `yaml:"username" json:"username,omitempty"` + Password string `yaml:"password" json:"password,omitempty"` } type KafkaBasic struct { diff --git a/publisher/pkg/impl/tiup/publisher.go b/publisher/pkg/impl/tiup/publisher.go index 4f247b7..b0ff51b 100644 --- a/publisher/pkg/impl/tiup/publisher.go +++ b/publisher/pkg/impl/tiup/publisher.go @@ -19,11 +19,11 @@ type Publisher struct { mirrorURL string larkWebhookURL string logger zerolog.Logger - redisClient *redis.Client + redisClient redis.Cmdable } -func NewPublisher(mirrorURL, larkWebhookURL string, logger *zerolog.Logger) (*Publisher, error) { - handler := Publisher{mirrorURL: mirrorURL, larkWebhookURL: larkWebhookURL} +func NewPublisher(mirrorURL, larkWebhookURL string, logger *zerolog.Logger, redisClient redis.Cmdable) (*Publisher, error) { + handler := Publisher{mirrorURL: mirrorURL, larkWebhookURL: larkWebhookURL, redisClient: redisClient} if logger == nil { handler.logger = zerolog.New(os.Stderr).With().Timestamp().Logger() } else { @@ -42,7 +42,7 @@ func (p *Publisher) Handle(event cloudevents.Event) cloudevents.Result { if !slices.Contains(p.SupportEventTypes(), event.Type()) { return cloudevents.ResultNACK } - p.redisClient.Set(context.Background(), event.ID(), PublishStateProcessing, redis.KeepTTL) + p.redisClient.SetXX(context.Background(), event.ID(), PublishStateProcessing, redis.KeepTTL) data := new(PublishRequest) if err := event.DataAs(&data); err != nil { @@ -52,9 +52,9 @@ func (p *Publisher) Handle(event cloudevents.Event) cloudevents.Result { result := p.handleImpl(data) switch { case cloudevents.IsACK(result): - p.redisClient.Set(context.Background(), event.ID(), PublishStateSuccess, redis.KeepTTL) + p.redisClient.SetXX(context.Background(), event.ID(), PublishStateSuccess, redis.KeepTTL) default: - p.redisClient.Set(context.Background(), event.ID(), PublishStateFailed, redis.KeepTTL) + p.redisClient.SetXX(context.Background(), event.ID(), PublishStateFailed, redis.KeepTTL) p.notifyLark(&data.Publish, result) } diff --git a/publisher/pkg/impl/tiup/service.go b/publisher/pkg/impl/tiup/service.go index 3852385..02a1f09 100644 --- a/publisher/pkg/impl/tiup/service.go +++ b/publisher/pkg/impl/tiup/service.go @@ -19,13 +19,13 @@ import ( type tiupsrvc struct { logger *zerolog.Logger kafkaWriter *kafka.Writer - redisClient *redis.Client + redisClient redis.Cmdable eventSource string stateTTL time.Duration } // NewTiup returns the tiup service implementation. -func NewTiup(logger *zerolog.Logger, kafkaWriter *kafka.Writer, redisClient *redis.Client, eventSrc string) gentiup.Service { +func NewTiup(logger *zerolog.Logger, kafkaWriter *kafka.Writer, redisClient redis.Cmdable, eventSrc string) gentiup.Service { return &tiupsrvc{ logger: logger, kafkaWriter: kafkaWriter, @@ -74,7 +74,7 @@ func (s *tiupsrvc) RequestToPublish(ctx context.Context, p *gentiup.RequestToPub // 5. Init the request dealing status in redis with the request id. for _, requestID := range requestIDs { - if err := s.redisClient.SetXX(ctx, requestID, PublishStateQueued, s.stateTTL).Err(); err != nil { + if err := s.redisClient.SetNX(ctx, requestID, PublishStateQueued, s.stateTTL).Err(); err != nil { return nil, fmt.Errorf("failed to set initial status in Redis: %v", err) } }