Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(publisher): fix redis command usage #184

Merged
merged 2 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion publisher/cmd/worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"syscall"

"github.com/cloudevents/sdk-go/v2/event"
"github.com/go-redis/redis/v8"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/segmentio/kafka-go"
Expand Down Expand Up @@ -48,12 +49,20 @@ func main() {
}
}

// Configure Redis client
redisClient := redis.NewClient(&redis.Options{
Addr: config.Redis.Addr,
Password: config.Redis.Password,
Username: config.Redis.Username,
DB: config.Redis.DB,
})

ctx := log.Logger.WithContext(context.Background())
// Create TiUP publisher
var handler *tiup.Publisher
{
var err error
handler, err = tiup.NewPublisher(config.MirrorUrl, config.LarkWebhookURL, &log.Logger)
handler, err = tiup.NewPublisher(config.MirrorUrl, config.LarkWebhookURL, &log.Logger, redisClient)
if err != nil {
log.Fatal().Err(err).Msg("Error creating handler")
}
Expand Down
19 changes: 11 additions & 8 deletions publisher/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,22 @@ type Worker struct {
KafkaBasic `yaml:",inline" json:",inline"`
ConsumerGroup string `yaml:"consumer_group" json:"consumer_group,omitempty"`
} `yaml:"kafka" json:"kafka,omitempty"`
Redis Redis `yaml:"redis" json:"redis,omitempty"`
MirrorUrl string `yaml:"mirror_url" json:"mirror_url,omitempty"`
LarkWebhookURL string `yaml:"lark_webhook_url" json:"lark_webhook_url,omitempty"`
}

type Service struct {
Kafka KafkaBasic `yaml:"kafka" json:"kafka,omitempty"`
Redis struct {
Addr string `yaml:"addr" json:"addr,omitempty"`
DB int `yaml:"db" json:"db,omitempty"`
Username string `yaml:"username" json:"username,omitempty"`
Password string `yaml:"password" json:"password,omitempty"`
} `yaml:"redis" json:"redis,omitempty"`
EventSource string `yaml:"event_source" json:"event_source,omitempty"`
Kafka KafkaBasic `yaml:"kafka" json:"kafka,omitempty"`
Redis Redis `yaml:"redis" json:"redis,omitempty"`
EventSource string `yaml:"event_source" json:"event_source,omitempty"`
}

type Redis struct {
Addr string `yaml:"addr" json:"addr,omitempty"`
DB int `yaml:"db" json:"db,omitempty"`
Username string `yaml:"username" json:"username,omitempty"`
Password string `yaml:"password" json:"password,omitempty"`
}

type KafkaBasic struct {
Expand Down
12 changes: 6 additions & 6 deletions publisher/pkg/impl/tiup/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ type Publisher struct {
mirrorURL string
larkWebhookURL string
logger zerolog.Logger
redisClient *redis.Client
redisClient redis.Cmdable
}

func NewPublisher(mirrorURL, larkWebhookURL string, logger *zerolog.Logger) (*Publisher, error) {
handler := Publisher{mirrorURL: mirrorURL, larkWebhookURL: larkWebhookURL}
func NewPublisher(mirrorURL, larkWebhookURL string, logger *zerolog.Logger, redisClient redis.Cmdable) (*Publisher, error) {
handler := Publisher{mirrorURL: mirrorURL, larkWebhookURL: larkWebhookURL, redisClient: redisClient}
if logger == nil {
handler.logger = zerolog.New(os.Stderr).With().Timestamp().Logger()
} else {
Expand All @@ -42,7 +42,7 @@ func (p *Publisher) Handle(event cloudevents.Event) cloudevents.Result {
if !slices.Contains(p.SupportEventTypes(), event.Type()) {
return cloudevents.ResultNACK
}
p.redisClient.Set(context.Background(), event.ID(), PublishStateProcessing, redis.KeepTTL)
p.redisClient.SetXX(context.Background(), event.ID(), PublishStateProcessing, redis.KeepTTL)

data := new(PublishRequest)
if err := event.DataAs(&data); err != nil {
Expand All @@ -52,9 +52,9 @@ func (p *Publisher) Handle(event cloudevents.Event) cloudevents.Result {
result := p.handleImpl(data)
switch {
case cloudevents.IsACK(result):
p.redisClient.Set(context.Background(), event.ID(), PublishStateSuccess, redis.KeepTTL)
p.redisClient.SetXX(context.Background(), event.ID(), PublishStateSuccess, redis.KeepTTL)
default:
p.redisClient.Set(context.Background(), event.ID(), PublishStateFailed, redis.KeepTTL)
p.redisClient.SetXX(context.Background(), event.ID(), PublishStateFailed, redis.KeepTTL)
p.notifyLark(&data.Publish, result)
}

Expand Down
6 changes: 3 additions & 3 deletions publisher/pkg/impl/tiup/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ import (
type tiupsrvc struct {
logger *zerolog.Logger
kafkaWriter *kafka.Writer
redisClient *redis.Client
redisClient redis.Cmdable
eventSource string
stateTTL time.Duration
}

// NewTiup returns the tiup service implementation.
func NewTiup(logger *zerolog.Logger, kafkaWriter *kafka.Writer, redisClient *redis.Client, eventSrc string) gentiup.Service {
func NewTiup(logger *zerolog.Logger, kafkaWriter *kafka.Writer, redisClient redis.Cmdable, eventSrc string) gentiup.Service {
return &tiupsrvc{
logger: logger,
kafkaWriter: kafkaWriter,
Expand Down Expand Up @@ -74,7 +74,7 @@ func (s *tiupsrvc) RequestToPublish(ctx context.Context, p *gentiup.RequestToPub

// 5. Init the request dealing status in redis with the request id.
for _, requestID := range requestIDs {
if err := s.redisClient.SetXX(ctx, requestID, PublishStateQueued, s.stateTTL).Err(); err != nil {
if err := s.redisClient.SetNX(ctx, requestID, PublishStateQueued, s.stateTTL).Err(); err != nil {
return nil, fmt.Errorf("failed to set initial status in Redis: %v", err)
}
}
Expand Down