From b10bb38d4408a250ce240895c3266fb9a2a5442b Mon Sep 17 00:00:00 2001 From: wuhuizuo Date: Tue, 22 Oct 2024 12:36:34 +0000 Subject: [PATCH] fix(publisher): fix worker initialization Signed-off-by: wuhuizuo --- publisher/cmd/worker/main.go | 11 ++++++++++- publisher/pkg/config/config.go | 19 +++++++++++-------- publisher/pkg/impl/tiup/publisher.go | 4 ++-- 3 files changed, 23 insertions(+), 11 deletions(-) diff --git a/publisher/cmd/worker/main.go b/publisher/cmd/worker/main.go index 966cc47..7a897a1 100644 --- a/publisher/cmd/worker/main.go +++ b/publisher/cmd/worker/main.go @@ -11,6 +11,7 @@ import ( "syscall" "github.com/cloudevents/sdk-go/v2/event" + "github.com/go-redis/redis/v8" "github.com/rs/zerolog" "github.com/rs/zerolog/log" "github.com/segmentio/kafka-go" @@ -48,12 +49,20 @@ func main() { } } + // Configure Redis client + redisClient := redis.NewClient(&redis.Options{ + Addr: config.Redis.Addr, + Password: config.Redis.Password, + Username: config.Redis.Username, + DB: config.Redis.DB, + }) + ctx := log.Logger.WithContext(context.Background()) // Create TiUP publisher var handler *tiup.Publisher { var err error - handler, err = tiup.NewPublisher(config.MirrorUrl, config.LarkWebhookURL, &log.Logger) + handler, err = tiup.NewPublisher(config.MirrorUrl, config.LarkWebhookURL, &log.Logger, redisClient) if err != nil { log.Fatal().Err(err).Msg("Error creating handler") } diff --git a/publisher/pkg/config/config.go b/publisher/pkg/config/config.go index 5dc964d..b9c5b65 100644 --- a/publisher/pkg/config/config.go +++ b/publisher/pkg/config/config.go @@ -5,19 +5,22 @@ type Worker struct { KafkaBasic `yaml:",inline" json:",inline"` ConsumerGroup string `yaml:"consumer_group" json:"consumer_group,omitempty"` } `yaml:"kafka" json:"kafka,omitempty"` + Redis Redis `yaml:"redis" json:"redis,omitempty"` MirrorUrl string `yaml:"mirror_url" json:"mirror_url,omitempty"` LarkWebhookURL string `yaml:"lark_webhook_url" json:"lark_webhook_url,omitempty"` } type Service struct { - Kafka KafkaBasic `yaml:"kafka" json:"kafka,omitempty"` - Redis struct { - Addr string `yaml:"addr" json:"addr,omitempty"` - DB int `yaml:"db" json:"db,omitempty"` - Username string `yaml:"username" json:"username,omitempty"` - Password string `yaml:"password" json:"password,omitempty"` - } `yaml:"redis" json:"redis,omitempty"` - EventSource string `yaml:"event_source" json:"event_source,omitempty"` + Kafka KafkaBasic `yaml:"kafka" json:"kafka,omitempty"` + Redis Redis `yaml:"redis" json:"redis,omitempty"` + EventSource string `yaml:"event_source" json:"event_source,omitempty"` +} + +type Redis struct { + Addr string `yaml:"addr" json:"addr,omitempty"` + DB int `yaml:"db" json:"db,omitempty"` + Username string `yaml:"username" json:"username,omitempty"` + Password string `yaml:"password" json:"password,omitempty"` } type KafkaBasic struct { diff --git a/publisher/pkg/impl/tiup/publisher.go b/publisher/pkg/impl/tiup/publisher.go index 9dad150..b0ff51b 100644 --- a/publisher/pkg/impl/tiup/publisher.go +++ b/publisher/pkg/impl/tiup/publisher.go @@ -22,8 +22,8 @@ type Publisher struct { redisClient redis.Cmdable } -func NewPublisher(mirrorURL, larkWebhookURL string, logger *zerolog.Logger) (*Publisher, error) { - handler := Publisher{mirrorURL: mirrorURL, larkWebhookURL: larkWebhookURL} +func NewPublisher(mirrorURL, larkWebhookURL string, logger *zerolog.Logger, redisClient redis.Cmdable) (*Publisher, error) { + handler := Publisher{mirrorURL: mirrorURL, larkWebhookURL: larkWebhookURL, redisClient: redisClient} if logger == nil { handler.logger = zerolog.New(os.Stderr).With().Timestamp().Logger() } else {