Skip to content

Commit

Permalink
fix(publisher): fix worker initialization
Browse files Browse the repository at this point in the history
Signed-off-by: wuhuizuo <wuhuizuo@126.com>
  • Loading branch information
wuhuizuo committed Oct 22, 2024
1 parent 1e5cf25 commit b10bb38
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 11 deletions.
11 changes: 10 additions & 1 deletion publisher/cmd/worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"syscall"

"github.com/cloudevents/sdk-go/v2/event"
"github.com/go-redis/redis/v8"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/segmentio/kafka-go"
Expand Down Expand Up @@ -48,12 +49,20 @@ func main() {
}
}

// Configure Redis client
redisClient := redis.NewClient(&redis.Options{
Addr: config.Redis.Addr,
Password: config.Redis.Password,
Username: config.Redis.Username,
DB: config.Redis.DB,
})

ctx := log.Logger.WithContext(context.Background())
// Create TiUP publisher
var handler *tiup.Publisher
{
var err error
handler, err = tiup.NewPublisher(config.MirrorUrl, config.LarkWebhookURL, &log.Logger)
handler, err = tiup.NewPublisher(config.MirrorUrl, config.LarkWebhookURL, &log.Logger, redisClient)
if err != nil {
log.Fatal().Err(err).Msg("Error creating handler")
}
Expand Down
19 changes: 11 additions & 8 deletions publisher/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,22 @@ type Worker struct {
KafkaBasic `yaml:",inline" json:",inline"`
ConsumerGroup string `yaml:"consumer_group" json:"consumer_group,omitempty"`
} `yaml:"kafka" json:"kafka,omitempty"`
Redis Redis `yaml:"redis" json:"redis,omitempty"`
MirrorUrl string `yaml:"mirror_url" json:"mirror_url,omitempty"`
LarkWebhookURL string `yaml:"lark_webhook_url" json:"lark_webhook_url,omitempty"`
}

type Service struct {
Kafka KafkaBasic `yaml:"kafka" json:"kafka,omitempty"`
Redis struct {
Addr string `yaml:"addr" json:"addr,omitempty"`
DB int `yaml:"db" json:"db,omitempty"`
Username string `yaml:"username" json:"username,omitempty"`
Password string `yaml:"password" json:"password,omitempty"`
} `yaml:"redis" json:"redis,omitempty"`
EventSource string `yaml:"event_source" json:"event_source,omitempty"`
Kafka KafkaBasic `yaml:"kafka" json:"kafka,omitempty"`
Redis Redis `yaml:"redis" json:"redis,omitempty"`
EventSource string `yaml:"event_source" json:"event_source,omitempty"`
}

type Redis struct {
Addr string `yaml:"addr" json:"addr,omitempty"`
DB int `yaml:"db" json:"db,omitempty"`
Username string `yaml:"username" json:"username,omitempty"`
Password string `yaml:"password" json:"password,omitempty"`
}

type KafkaBasic struct {
Expand Down
4 changes: 2 additions & 2 deletions publisher/pkg/impl/tiup/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ type Publisher struct {
redisClient redis.Cmdable
}

func NewPublisher(mirrorURL, larkWebhookURL string, logger *zerolog.Logger) (*Publisher, error) {
handler := Publisher{mirrorURL: mirrorURL, larkWebhookURL: larkWebhookURL}
func NewPublisher(mirrorURL, larkWebhookURL string, logger *zerolog.Logger, redisClient redis.Cmdable) (*Publisher, error) {
handler := Publisher{mirrorURL: mirrorURL, larkWebhookURL: larkWebhookURL, redisClient: redisClient}
if logger == nil {
handler.logger = zerolog.New(os.Stderr).With().Timestamp().Logger()
} else {
Expand Down

0 comments on commit b10bb38

Please sign in to comment.