diff --git a/cloudevents-server/configs/example-config.yaml b/cloudevents-server/configs/example-config.yaml index d89ff62..f8a0946 100644 --- a/cloudevents-server/configs/example-config.yaml +++ b/cloudevents-server/configs/example-config.yaml @@ -5,3 +5,6 @@ lark: app_id: cli_12345678 app_secret: s123456789 receiver: abc@test.com +tibuild: + result_sink_url: http://localhost:49244 # url of tibuild events listener. + trigger_sink_url: http://localhost:8080 # url of tekton event listener. diff --git a/cloudevents-server/handlers.go b/cloudevents-server/handlers.go new file mode 100644 index 0000000..19ad3ca --- /dev/null +++ b/cloudevents-server/handlers.go @@ -0,0 +1,68 @@ +package main + +import ( + "net/http" + + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/gin-gonic/gin" + "github.com/rs/zerolog/log" + + "github.com/PingCAP-QE/ee-apps/cloudevents-server/pkg/config" + "github.com/PingCAP-QE/ee-apps/cloudevents-server/pkg/events/custom/tekton" + "github.com/PingCAP-QE/ee-apps/cloudevents-server/pkg/events/custom/testcaserun" + "github.com/PingCAP-QE/ee-apps/cloudevents-server/pkg/events/custom/tibuild" + "github.com/PingCAP-QE/ee-apps/cloudevents-server/pkg/events/handler" +) + +func indexHandler(c *gin.Context) { + c.JSON(http.StatusOK, "Welcome to CloudEvents") +} + +func healthzHandler(c *gin.Context) { + c.String(http.StatusOK, "OK") +} + +func newEventsHandlerFunc(cfg *config.Config) gin.HandlerFunc { + p, err := cloudevents.NewHTTP() + if err != nil { + log.Fatal().Err(err).Msg("Failed to create protocol") + } + + handler, err := newCloudEventsHandler(cfg) + if err != nil { + log.Fatal().Err(err).Msg("failed to create cloudevents handler") + } + log.Debug().Any("types", handler.SupportEventTypes()).Msgf("registered event handlers") + + h, err := cloudevents.NewHTTPReceiveHandler(nil, p, handler.Handle) + if err != nil { + log.Fatal().Err(err).Msg("failed to create handler") + } + + return func(c *gin.Context) { + h.ServeHTTP(c.Writer, c.Request) + } +} + +// receiver creates a receiverFn wrapper class that is used by the client to +// validate and invoke the provided function. +func newCloudEventsHandler(cfg *config.Config) (handler.EventHandler, error) { + caseRunHandler, err := testcaserun.NewHandler(cfg.Store) + if err != nil { + return nil, err + } + + tektonHandler, err := tekton.NewHandler(cfg.Lark) + if err != nil { + return nil, err + } + + tibuildHandler, err := tibuild.NewHandler(cfg.TiBuild.ResultSinkURL, cfg.TiBuild.TriggerSinkURL) + if err != nil { + return nil, err + } + + return new(handler.CompositeEventHandler).AddHandlers( + caseRunHandler, tektonHandler, tibuildHandler, + ), nil +} diff --git a/cloudevents-server/main.go b/cloudevents-server/main.go index 217106d..e22bbc8 100644 --- a/cloudevents-server/main.go +++ b/cloudevents-server/main.go @@ -4,13 +4,11 @@ import ( "flag" "net/http" - cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/gin-gonic/gin" "github.com/rs/zerolog" "github.com/rs/zerolog/log" "github.com/PingCAP-QE/ee-apps/cloudevents-server/pkg/config" - "github.com/PingCAP-QE/ee-apps/cloudevents-server/pkg/events" ) func main() { @@ -38,7 +36,7 @@ func main() { case gin.DebugMode: zerolog.SetGlobalLevel(zerolog.DebugLevel) default: - zerolog.SetGlobalLevel(zerolog.DebugLevel) + zerolog.SetGlobalLevel(zerolog.InfoLevel) } gin.SetMode(ginMode) @@ -56,34 +54,5 @@ func main() { func setRouters(r gin.IRoutes, cfg *config.Config) { r.GET("/", indexHandler) r.GET("/healthz", healthzHandler) - r.POST("/events", eventsHandler(cfg)) -} - -func indexHandler(c *gin.Context) { - c.JSON(http.StatusOK, "Welcome to CloudEvents") -} - -func healthzHandler(c *gin.Context) { - c.String(http.StatusOK, "OK") -} - -func eventsHandler(cfg *config.Config) gin.HandlerFunc { - p, err := cloudevents.NewHTTP() - if err != nil { - log.Fatal().Err(err).Msg("Failed to create protocol") - } - - handler, err := events.NewEventsHandler(cfg) - if err != nil { - log.Fatal().Err(err).Msg("failed to create cloudevents handler") - } - - h, err := cloudevents.NewHTTPReceiveHandler(nil, p, handler.Handle) - if err != nil { - log.Fatal().Err(err).Msg("failed to create handler") - } - - return func(c *gin.Context) { - h.ServeHTTP(c.Writer, c.Request) - } + r.POST("/events", newEventsHandlerFunc(cfg)) } diff --git a/cloudevents-server/pkg/config/config.go b/cloudevents-server/pkg/config/config.go index 37a0036..fffeecf 100644 --- a/cloudevents-server/pkg/config/config.go +++ b/cloudevents-server/pkg/config/config.go @@ -22,8 +22,12 @@ type Lark struct { } type Config struct { - Store Store `yaml:"store,omitempty" json:"store,omitempty"` - Lark Lark `yaml:"lark,omitempty" json:"lark,omitempty"` + Store Store `yaml:"store,omitempty" json:"store,omitempty"` + Lark Lark `yaml:"lark,omitempty" json:"lark,omitempty"` + TiBuild struct { + ResultSinkURL string `yaml:"result_sink_url,omitempty" json:"result_sink_url,omitempty"` + TriggerSinkURL string `yaml:"trigger_sink_url,omitempty" json:"trigger_sink_url,omitempty"` + } `yaml:"tibuild,omitempty" json:"tibuild,omitempty"` } func (c *Config) LoadFromFile(file string) error { diff --git a/cloudevents-server/pkg/events/custom/tekton/handler.go b/cloudevents-server/pkg/events/custom/tekton/handler.go index b3009f8..114a2c8 100644 --- a/cloudevents-server/pkg/events/custom/tekton/handler.go +++ b/cloudevents-server/pkg/events/custom/tekton/handler.go @@ -1,85 +1,14 @@ package tekton import ( - "context" - "net/http" - "strings" - "github.com/PingCAP-QE/ee-apps/cloudevents-server/pkg/config" - cloudevents "github.com/cloudevents/sdk-go/v2" - lark "github.com/larksuite/oapi-sdk-go/v3" - "github.com/rs/zerolog/log" - tektoncloudevent "github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent" + "github.com/PingCAP-QE/ee-apps/cloudevents-server/pkg/events/handler" ) -type Handler struct { - LarkClient *lark.Client - RunDetailBaseURL string - Receiver string -} - -func NewHandler(cfg config.Lark) (*Handler, error) { - return &Handler{ +func NewHandler(cfg config.Lark) (handler.EventHandler, error) { + return &pipelineRunHandler{ LarkClient: newLarkClient(cfg), Receiver: cfg.Receiver, RunDetailBaseURL: "https://do.pingcap.net/tekton", }, nil } - -func (h *Handler) SupportEventTypes() []string { - return []string{ - string(tektoncloudevent.PipelineRunFailedEventV1), - string(tektoncloudevent.PipelineRunRunningEventV1), - string(tektoncloudevent.PipelineRunStartedEventV1), - string(tektoncloudevent.PipelineRunSuccessfulEventV1), - string(tektoncloudevent.PipelineRunUnknownEventV1), - string(tektoncloudevent.RunFailedEventV1), - string(tektoncloudevent.RunRunningEventV1), - string(tektoncloudevent.RunStartedEventV1), - string(tektoncloudevent.RunSuccessfulEventV1), - string(tektoncloudevent.TaskRunFailedEventV1), - string(tektoncloudevent.TaskRunRunningEventV1), - string(tektoncloudevent.TaskRunStartedEventV1), - string(tektoncloudevent.TaskRunSuccessfulEventV1), - string(tektoncloudevent.TaskRunUnknownEventV1), - } -} - -func (h *Handler) Handle(event cloudevents.Event) cloudevents.Result { - data := new(tektoncloudevent.TektonCloudEventData) - if err := event.DataAs(&data); err != nil { - return cloudevents.NewHTTPResult(http.StatusBadRequest, err.Error()) - } - - if strings.HasPrefix(event.Type(), "dev.tekton.event.pipelinerun.") { - return h.notifyRunStatus(event) - } - - log.Debug().Str("ce-type", event.Type()).Msg("skip notifing for the event type.") - return cloudevents.ResultACK -} - -func (h *Handler) notifyRunStatus(event cloudevents.Event) cloudevents.Result { - createMsgReq, err := newLarkMessage(h.Receiver, event, h.RunDetailBaseURL) - if err != nil { - log.Error().Err(err).Msg("compose lark message failed") - return cloudevents.NewHTTPResult(http.StatusInternalServerError, "compose lark message failed: %v", err) - } - - resp, err := h.LarkClient.Im.Message.Create(context.Background(), createMsgReq) - if err != nil { - log.Error().Err(err).Msg("send lark message failed") - return cloudevents.NewHTTPResult(http.StatusInternalServerError, "send lark message failed: %v", err) - } - - if resp.Success() { - log.Info(). - Str("request-id", resp.RequestId()). - Str("message-id", *resp.Data.MessageId). - Msg("send lark message successfully.") - return cloudevents.ResultACK - } - - log.Error().Err(resp).Msg("send lark message failed!") - return cloudevents.NewHTTPResult(http.StatusInternalServerError, "send lark message failed!") -} diff --git a/cloudevents-server/pkg/events/custom/tekton/handler_pipelinerun.go b/cloudevents-server/pkg/events/custom/tekton/handler_pipelinerun.go new file mode 100644 index 0000000..b4cf45a --- /dev/null +++ b/cloudevents-server/pkg/events/custom/tekton/handler_pipelinerun.go @@ -0,0 +1,62 @@ +package tekton + +import ( + "context" + "net/http" + + cloudevents "github.com/cloudevents/sdk-go/v2" + lark "github.com/larksuite/oapi-sdk-go/v3" + "github.com/rs/zerolog/log" + tektoncloudevent "github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent" +) + +type pipelineRunHandler struct { + LarkClient *lark.Client + RunDetailBaseURL string + Receiver string +} + +func (h *pipelineRunHandler) SupportEventTypes() []string { + return []string{ + string(tektoncloudevent.PipelineRunFailedEventV1), + string(tektoncloudevent.PipelineRunRunningEventV1), + string(tektoncloudevent.PipelineRunStartedEventV1), + string(tektoncloudevent.PipelineRunSuccessfulEventV1), + string(tektoncloudevent.PipelineRunUnknownEventV1), + } +} + +func (h *pipelineRunHandler) Handle(event cloudevents.Event) cloudevents.Result { + data := new(tektoncloudevent.TektonCloudEventData) + if err := event.DataAs(&data); err != nil { + return cloudevents.NewHTTPResult(http.StatusBadRequest, err.Error()) + } + + log.Debug().Str("ce-type", event.Type()).Msg("skip notifing for the event type.") + return cloudevents.ResultACK +} + +func (h *pipelineRunHandler) notifyRunStatus(event cloudevents.Event) cloudevents.Result { + createMsgReq, err := newLarkMessage(h.Receiver, event, h.RunDetailBaseURL) + if err != nil { + log.Error().Err(err).Msg("compose lark message failed") + return cloudevents.NewHTTPResult(http.StatusInternalServerError, "compose lark message failed: %v", err) + } + + resp, err := h.LarkClient.Im.Message.Create(context.Background(), createMsgReq) + if err != nil { + log.Error().Err(err).Msg("send lark message failed") + return cloudevents.NewHTTPResult(http.StatusInternalServerError, "send lark message failed: %v", err) + } + + if resp.Success() { + log.Info(). + Str("request-id", resp.RequestId()). + Str("message-id", *resp.Data.MessageId). + Msg("send lark message successfully.") + return cloudevents.ResultACK + } + + log.Error().Err(resp).Msg("send lark message failed!") + return cloudevents.NewHTTPResult(http.StatusInternalServerError, "send lark message failed!") +} diff --git a/cloudevents-server/pkg/events/custom/tekton/handler_pipelinerun_test.go b/cloudevents-server/pkg/events/custom/tekton/handler_pipelinerun_test.go new file mode 100644 index 0000000..eb4e102 --- /dev/null +++ b/cloudevents-server/pkg/events/custom/tekton/handler_pipelinerun_test.go @@ -0,0 +1,68 @@ +package tekton + +import ( + "encoding/json" + "reflect" + "testing" + + cloudevents "github.com/cloudevents/sdk-go/v2" + lark "github.com/larksuite/oapi-sdk-go/v3" + tektoncloudevent "github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent" + + _ "embed" +) + +// test events. +var ( + //go:embed testdata/event-pipelinerun.failed.json + pipelineRunFailedEventBytes []byte + //go:embed testdata/event-pipelinerun.running.json + pipelineRunRunningEventBytes []byte + //go:embed testdata/event-pipelinerun.started.json + pipelineRunStartedEventBytes []byte + //go:embed testdata/event-pipelinerun.successful.json + pipelineRunSuccessfulEventBytes []byte + //go:embed testdata/event-pipelinerun.unknown.json + pipelineRunUnknownEventBytes []byte +) + +func Test_pipelineRunHandler_Handle(t *testing.T) { + type fields struct { + LarkClient *lark.Client + } + type args struct { + } + tests := []struct { + name tektoncloudevent.TektonEventType + eventJSON []byte + want cloudevents.Result + }{ + {name: tektoncloudevent.PipelineRunFailedEventV1, eventJSON: pipelineRunFailedEventBytes, want: cloudevents.ResultACK}, + {name: tektoncloudevent.PipelineRunRunningEventV1, eventJSON: pipelineRunRunningEventBytes, want: cloudevents.ResultACK}, + {name: tektoncloudevent.PipelineRunStartedEventV1, eventJSON: pipelineRunStartedEventBytes, want: cloudevents.ResultACK}, + {name: tektoncloudevent.PipelineRunSuccessfulEventV1, eventJSON: pipelineRunSuccessfulEventBytes, want: cloudevents.ResultACK}, + {name: tektoncloudevent.TaskRunFailedEventV1, eventJSON: taskRunFailedEventBytes, want: cloudevents.ResultACK}, + {name: tektoncloudevent.TaskRunRunningEventV1, eventJSON: taskRunRunningEventBytes, want: cloudevents.ResultACK}, + {name: tektoncloudevent.TaskRunStartedEventV1, eventJSON: taskRunStartedEventBytes, want: cloudevents.ResultACK}, + {name: tektoncloudevent.TaskRunSuccessfulEventV1, eventJSON: taskRunSuccessfulEventBytes, want: cloudevents.ResultACK}, + {name: tektoncloudevent.TaskRunUnknownEventV1, eventJSON: taskRunUnknownEventBytes, want: cloudevents.ResultACK}, + } + + h := &pipelineRunHandler{ + LarkClient: lark.NewClient(larkAppID, larkAppSecret, lark.WithLogReqAtDebug(true), lark.WithEnableTokenCache(true)), + RunDetailBaseURL: baseURL, + } + for _, tt := range tests { + t.Run(string(tt.name), func(t *testing.T) { + e := cloudevents.NewEvent() + if err := json.Unmarshal(tt.eventJSON, &e); err != nil { + t.Error(err) + return + } + + if got := h.Handle(e); !reflect.DeepEqual(got, tt.want) { + t.Errorf("pipelineRunHandler.Handle() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/cloudevents-server/pkg/events/custom/tekton/handler_taskrun.go b/cloudevents-server/pkg/events/custom/tekton/handler_taskrun.go new file mode 100644 index 0000000..186bc64 --- /dev/null +++ b/cloudevents-server/pkg/events/custom/tekton/handler_taskrun.go @@ -0,0 +1,62 @@ +package tekton + +import ( + "context" + "net/http" + + cloudevents "github.com/cloudevents/sdk-go/v2" + lark "github.com/larksuite/oapi-sdk-go/v3" + "github.com/rs/zerolog/log" + tektoncloudevent "github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent" +) + +type taskRunHandler struct { + LarkClient *lark.Client + RunDetailBaseURL string + Receiver string +} + +func (h *taskRunHandler) SupportEventTypes() []string { + return []string{ + string(tektoncloudevent.TaskRunFailedEventV1), + string(tektoncloudevent.TaskRunRunningEventV1), + string(tektoncloudevent.TaskRunStartedEventV1), + string(tektoncloudevent.TaskRunSuccessfulEventV1), + string(tektoncloudevent.TaskRunUnknownEventV1), + } +} + +func (h *taskRunHandler) Handle(event cloudevents.Event) cloudevents.Result { + data := new(tektoncloudevent.TektonCloudEventData) + if err := event.DataAs(&data); err != nil { + return cloudevents.NewHTTPResult(http.StatusBadRequest, err.Error()) + } + + log.Debug().Str("ce-type", event.Type()).Msg("skip notifing for the event type.") + return cloudevents.ResultACK +} + +func (h *taskRunHandler) notifyRunStatus(event cloudevents.Event) cloudevents.Result { + createMsgReq, err := newLarkMessage(h.Receiver, event, h.RunDetailBaseURL) + if err != nil { + log.Error().Err(err).Msg("compose lark message failed") + return cloudevents.NewHTTPResult(http.StatusInternalServerError, "compose lark message failed: %v", err) + } + + resp, err := h.LarkClient.Im.Message.Create(context.Background(), createMsgReq) + if err != nil { + log.Error().Err(err).Msg("send lark message failed") + return cloudevents.NewHTTPResult(http.StatusInternalServerError, "send lark message failed: %v", err) + } + + if resp.Success() { + log.Info(). + Str("request-id", resp.RequestId()). + Str("message-id", *resp.Data.MessageId). + Msg("send lark message successfully.") + return cloudevents.ResultACK + } + + log.Error().Err(resp).Msg("send lark message failed!") + return cloudevents.NewHTTPResult(http.StatusInternalServerError, "send lark message failed!") +} diff --git a/cloudevents-server/pkg/events/custom/tekton/handler_taskrun_test.go b/cloudevents-server/pkg/events/custom/tekton/handler_taskrun_test.go new file mode 100644 index 0000000..7df04da --- /dev/null +++ b/cloudevents-server/pkg/events/custom/tekton/handler_taskrun_test.go @@ -0,0 +1,64 @@ +package tekton + +import ( + "encoding/json" + "reflect" + "testing" + + cloudevents "github.com/cloudevents/sdk-go/v2" + lark "github.com/larksuite/oapi-sdk-go/v3" + tektoncloudevent "github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent" + + _ "embed" +) + +// test events. +var ( + //go:embed testdata/event-taskrun.failed.json + taskRunFailedEventBytes []byte + //go:embed testdata/event-taskrun.running.json + taskRunRunningEventBytes []byte + //go:embed testdata/event-taskrun.started.json + taskRunStartedEventBytes []byte + //go:embed testdata/event-taskrun.successful.json + taskRunSuccessfulEventBytes []byte + //go:embed testdata/event-taskrun.unknown.json + taskRunUnknownEventBytes []byte +) + +func Test_taskRunHandler_Handle(t *testing.T) { + type fields struct { + LarkClient *lark.Client + } + type args struct { + } + tests := []struct { + name tektoncloudevent.TektonEventType + eventJSON []byte + want cloudevents.Result + }{ + {name: tektoncloudevent.TaskRunFailedEventV1, eventJSON: taskRunFailedEventBytes, want: cloudevents.ResultACK}, + {name: tektoncloudevent.TaskRunRunningEventV1, eventJSON: taskRunRunningEventBytes, want: cloudevents.ResultACK}, + {name: tektoncloudevent.TaskRunStartedEventV1, eventJSON: taskRunStartedEventBytes, want: cloudevents.ResultACK}, + {name: tektoncloudevent.TaskRunSuccessfulEventV1, eventJSON: taskRunSuccessfulEventBytes, want: cloudevents.ResultACK}, + {name: tektoncloudevent.TaskRunUnknownEventV1, eventJSON: taskRunUnknownEventBytes, want: cloudevents.ResultACK}, + } + + h := &taskRunHandler{ + LarkClient: lark.NewClient(larkAppID, larkAppSecret, lark.WithLogReqAtDebug(true), lark.WithEnableTokenCache(true)), + RunDetailBaseURL: baseURL, + } + for _, tt := range tests { + t.Run(string(tt.name), func(t *testing.T) { + e := cloudevents.NewEvent() + if err := json.Unmarshal(tt.eventJSON, &e); err != nil { + t.Error(err) + return + } + + if got := h.Handle(e); !reflect.DeepEqual(got, tt.want) { + t.Errorf("taskRunHandler.Handle() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/cloudevents-server/pkg/events/custom/tekton/handler_test.go b/cloudevents-server/pkg/events/custom/tekton/handler_test.go index 9e91bff..11faaf0 100644 --- a/cloudevents-server/pkg/events/custom/tekton/handler_test.go +++ b/cloudevents-server/pkg/events/custom/tekton/handler_test.go @@ -1,86 +1,8 @@ package tekton -import ( - "encoding/json" - "reflect" - "testing" - - cloudevents "github.com/cloudevents/sdk-go/v2" - lark "github.com/larksuite/oapi-sdk-go/v3" - tektoncloudevent "github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent" - - _ "embed" -) - -// test events. -var ( - //go:embed testdata/event-pipelinerun.failed.json - pipelineRunFailedEventBytes []byte - //go:embed testdata/event-pipelinerun.running.json - pipelineRunRunningEventBytes []byte - //go:embed testdata/event-pipelinerun.started.json - pipelineRunStartedEventBytes []byte - //go:embed testdata/event-pipelinerun.successful.json - pipelineRunSuccessfulEventBytes []byte - //go:embed testdata/event-pipelinerun.unknown.json - pipelineRunUnknownEventBytes []byte - - //go:embed testdata/event-taskrun.failed.json - taskRunFailedEventBytes []byte - //go:embed testdata/event-taskrun.running.json - taskRunRunningEventBytes []byte - //go:embed testdata/event-taskrun.started.json - taskRunStartedEventBytes []byte - //go:embed testdata/event-taskrun.successful.json - taskRunSuccessfulEventBytes []byte - //go:embed testdata/event-taskrun.unknown.json - taskRunUnknownEventBytes []byte -) - const ( larkAppID = "" larkAppSecret = "" receiver = "@xxx.com" baseURL = "https://chagne.me.com/" ) - -func TestHandler_Handle(t *testing.T) { - type fields struct { - LarkClient *lark.Client - } - type args struct { - } - tests := []struct { - name tektoncloudevent.TektonEventType - eventJSON []byte - want cloudevents.Result - }{ - {name: tektoncloudevent.PipelineRunFailedEventV1, eventJSON: pipelineRunFailedEventBytes, want: cloudevents.ResultACK}, - {name: tektoncloudevent.PipelineRunRunningEventV1, eventJSON: pipelineRunRunningEventBytes, want: cloudevents.ResultACK}, - {name: tektoncloudevent.PipelineRunStartedEventV1, eventJSON: pipelineRunStartedEventBytes, want: cloudevents.ResultACK}, - {name: tektoncloudevent.PipelineRunSuccessfulEventV1, eventJSON: pipelineRunSuccessfulEventBytes, want: cloudevents.ResultACK}, - {name: tektoncloudevent.TaskRunFailedEventV1, eventJSON: taskRunFailedEventBytes, want: cloudevents.ResultACK}, - {name: tektoncloudevent.TaskRunRunningEventV1, eventJSON: taskRunRunningEventBytes, want: cloudevents.ResultACK}, - {name: tektoncloudevent.TaskRunStartedEventV1, eventJSON: taskRunStartedEventBytes, want: cloudevents.ResultACK}, - {name: tektoncloudevent.TaskRunSuccessfulEventV1, eventJSON: taskRunSuccessfulEventBytes, want: cloudevents.ResultACK}, - {name: tektoncloudevent.TaskRunUnknownEventV1, eventJSON: taskRunUnknownEventBytes, want: cloudevents.ResultACK}, - } - - h := &Handler{ - LarkClient: lark.NewClient(larkAppID, larkAppSecret, lark.WithLogReqAtDebug(true), lark.WithEnableTokenCache(true)), - RunDetailBaseURL: baseURL, - } - for _, tt := range tests { - t.Run(string(tt.name), func(t *testing.T) { - e := cloudevents.NewEvent() - if err := json.Unmarshal(tt.eventJSON, &e); err != nil { - t.Error(err) - return - } - - if got := h.Handle(e); !reflect.DeepEqual(got, tt.want) { - t.Errorf("Handler.Handle() = %v, want %v", got, tt.want) - } - }) - } -} diff --git a/cloudevents-server/pkg/events/custom/tibuild/handler.go b/cloudevents-server/pkg/events/custom/tibuild/handler.go new file mode 100644 index 0000000..a84e9e6 --- /dev/null +++ b/cloudevents-server/pkg/events/custom/tibuild/handler.go @@ -0,0 +1,74 @@ +package tibuild + +import ( + "context" + "fmt" + + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/cloudevents/sdk-go/v2/client" + tektoncloudevent "github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent" + + "github.com/PingCAP-QE/ee-apps/cloudevents-server/pkg/events/handler" +) + +// NewHandler creates a new SinkHandler with the specified sink URL. +func NewHandler(tibuildSinkURL, tektonSinkURL string) (handler.EventHandler, error) { + routeToTbHandler, err := newRouteHandler(tibuildSinkURL) + if err != nil { + return nil, err + } + routeToTektonHandler, err := newRouteHandler(tektonSinkURL) + if err != nil { + return nil, err + } + + ret := new(handler.CompositeEventHandler).AddHandlers( + &triggerHandler{routeToTektonHandler}, + &resultHandler{routeToTbHandler}, + ) + + return ret, nil +} + +type triggerHandler struct{ *routeHandler } + +// SupportEventTypes returns an empty list, indicating that this handler supports all event types. +func (h *triggerHandler) SupportEventTypes() []string { + return []string{ + EventTypeDevbuildFakeGithubCreate, + EventTypeDevbuildFakeGithubPush, + EventTypeHotfixFakeGithubCreate, + EventTypeHotfixFakeGithubPush, + } +} + +type resultHandler struct{ *routeHandler } + +func (h *resultHandler) SupportEventTypes() []string { + return []string{ + string(tektoncloudevent.PipelineRunFailedEventV1), + string(tektoncloudevent.PipelineRunRunningEventV1), + string(tektoncloudevent.PipelineRunStartedEventV1), + string(tektoncloudevent.PipelineRunSuccessfulEventV1), + } +} + +// routeHandler is an event handler that forward events to target sink URL using the CloudEvents SDK. +type routeHandler struct { + sinkURL string // tibuild's sink URL. + client client.Client +} + +// Handle routes the given event to the specified sink URL using the CloudEvents SDK. +func (h *routeHandler) Handle(event cloudevents.Event) cloudevents.Result { + return h.client.Send(context.Background(), event) +} + +func newRouteHandler(sinkURL string) (*routeHandler, error) { + client, err := cloudevents.NewClientHTTP(cloudevents.WithTarget(sinkURL)) + if err != nil { + return nil, fmt.Errorf("error creating client: %v", err) + } + + return &routeHandler{client: client, sinkURL: sinkURL}, nil +} diff --git a/cloudevents-server/pkg/events/custom/tibuild/types.go b/cloudevents-server/pkg/events/custom/tibuild/types.go new file mode 100644 index 0000000..7fdbc17 --- /dev/null +++ b/cloudevents-server/pkg/events/custom/tibuild/types.go @@ -0,0 +1,8 @@ +package tibuild + +const ( + EventTypeDevbuildFakeGithubPush = "net.pingcap.tibuild.devbuild.push" // Fake GitHub push events in the development build. + EventTypeDevbuildFakeGithubCreate = "net.pingcap.tibuild.devbuild.create" // Fake GitHub create events in the development build. + EventTypeHotfixFakeGithubPush = "net.pingcap.tibuild.hotfix.push" // Fake GitHub push events in the hotfix build. + EventTypeHotfixFakeGithubCreate = "net.pingcap.tibuild.hotfix.create" // Fake GitHub create events in the hotfix build. +) diff --git a/cloudevents-server/pkg/events/handler/handler.go b/cloudevents-server/pkg/events/handler/handler.go new file mode 100644 index 0000000..b98fe37 --- /dev/null +++ b/cloudevents-server/pkg/events/handler/handler.go @@ -0,0 +1,79 @@ +package handler + +import ( + "net/http" + + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/pkg/errors" + "github.com/rs/zerolog/log" +) + +// CompositeEventHandler is a public struct that composes multiple event handlers. +type CompositeEventHandler struct { + handleMap map[string][]EventHandler +} + +// SupportEventTypes returns a list of supported event types. +func (h *CompositeEventHandler) SupportEventTypes() []string { + var ret []string + for t := range h.handleMap { + ret = append(ret, t) + } + + return ret +} + +// Handle handles the given event. +func (h *CompositeEventHandler) Handle(event cloudevents.Event) cloudevents.Result { + handlers, ok := h.handleMap[event.Type()] + if !ok { + log.Error().Str("type", event.Type()).Msg("no handlers registered for the event") + return cloudevents.NewHTTPResult(http.StatusNotFound, "no handlers registered for event type: %s, ignoring it", event.Type()) + } + + var results []cloudevents.Result + + // Loop through all registered handlers for the given event type. + for _, eh := range handlers { + if handlerResult := eh.Handle(event); !cloudevents.IsACK(handlerResult) { + // Accumulate errors from each handler. + results = append(results, handlerResult) + } + } + + // Combine and return the aggregated results. + return combineResults(results) +} + +// AddHandlers adds child handlers to the composite event handler. +func (h *CompositeEventHandler) AddHandlers(handlers ...EventHandler) *CompositeEventHandler { + if h.handleMap == nil { + h.handleMap = make(map[string][]EventHandler) + } + + for _, e := range handlers { + for _, eventType := range e.SupportEventTypes() { + h.handleMap[eventType] = append(h.handleMap[eventType], e) + } + } + + return h +} + +// combineResults combines multiple errors into a single error. +func combineResults(errorsList []cloudevents.Result) cloudevents.Result { + // If there are no errors, return nil. + if len(errorsList) == 0 { + return cloudevents.ResultACK + } + + // Combine errors using the first error as the base. + result := errorsList[0] + + // Append additional errors, if any, to the base error. + for i := 1; i < len(errorsList); i++ { + result = errors.Wrap(result, errorsList[i].Error()) + } + + return result +} diff --git a/cloudevents-server/pkg/events/handler/types.go b/cloudevents-server/pkg/events/handler/types.go new file mode 100644 index 0000000..2bcbef2 --- /dev/null +++ b/cloudevents-server/pkg/events/handler/types.go @@ -0,0 +1,24 @@ +package handler + +import cloudevents "github.com/cloudevents/sdk-go/v2" + +// EventHandler is an interface that defines the Handle method. +type EventHandler interface { + // Valid fn signatures are: + // * func() + // * func() protocol.Result + // * func(context.Context) + // * func(context.Context) protocol.Result + // * func(event.Event) + // * func(event.Event) transport.Result + // * func(context.Context, event.Event) + // * func(context.Context, event.Event) protocol.Result + // * func(event.Event) *event.Event + // * func(event.Event) (*event.Event, protocol.Result) + // * func(context.Context, event.Event) *event.Event + // * func(context.Context, event.Event) (*event.Event, protocol.Result) + // Handle handles the given event. + Handle(event cloudevents.Event) cloudevents.Result + // SupportEventTypes returns a list of supported event types. + SupportEventTypes() []string +} diff --git a/cloudevents-server/pkg/events/handlers.go b/cloudevents-server/pkg/events/handlers.go deleted file mode 100644 index 092c1e2..0000000 --- a/cloudevents-server/pkg/events/handlers.go +++ /dev/null @@ -1,86 +0,0 @@ -package events - -import ( - "net/http" - - cloudevents "github.com/cloudevents/sdk-go/v2" - "github.com/rs/zerolog/log" - - "github.com/PingCAP-QE/ee-apps/cloudevents-server/pkg/config" - "github.com/PingCAP-QE/ee-apps/cloudevents-server/pkg/events/custom/tekton" - "github.com/PingCAP-QE/ee-apps/cloudevents-server/pkg/events/custom/testcaserun" -) - -type EventHandler interface { - // Valid fn signatures are: - // * func() - // * func() protocol.Result - // * func(context.Context) - // * func(context.Context) protocol.Result - // * func(event.Event) - // * func(event.Event) transport.Result - // * func(context.Context, event.Event) - // * func(context.Context, event.Event) protocol.Result - // * func(event.Event) *event.Event - // * func(event.Event) (*event.Event, protocol.Result) - // * func(context.Context, event.Event) *event.Event - // * func(context.Context, event.Event) (*event.Event, protocol.Result) - Handle(event cloudevents.Event) cloudevents.Result - SupportEventTypes() []string -} - -type handlerImpl struct { - handleMap map[string]EventHandler -} - -func (h *handlerImpl) SupportEventTypes() []string { - var ret []string - for t, _ := range h.handleMap { - ret = append(ret, t) - } - - return ret -} - -func (h *handlerImpl) Handle(event cloudevents.Event) cloudevents.Result { - eh, ok := h.handleMap[event.Type()] - if ok { - return eh.Handle(event) - } - - log.Error().Str("type", event.Type()).Msg("none handlers registered") - return cloudevents.NewHTTPResult(http.StatusNotFound, "none handlers registered for event type: %s, ignore it.", event.Type()) -} - -func (h *handlerImpl) addChildHandlers(handlers ...EventHandler) *handlerImpl { - if h.handleMap == nil { - h.handleMap = make(map[string]EventHandler) - } - - for _, e := range handlers { - for _, eventType := range e.SupportEventTypes() { - h.handleMap[eventType] = e - } - } - - return h -} - -// receiver creates a receiverFn wrapper class that is used by the client to -// validate and invoke the provided function. -func NewEventsHandler(cfg *config.Config) (EventHandler, error) { - caseRunHandler, err := testcaserun.NewHandler(cfg.Store) - if err != nil { - return nil, err - } - - tektonHandler, err := tekton.NewHandler(cfg.Lark) - if err != nil { - return nil, err - } - - ret := new(handlerImpl) - ret.addChildHandlers(caseRunHandler, tektonHandler) - - return ret, nil -}