Skip to content

Commit

Permalink
feat(cloudevents-server): support sink events and produce for tibuild (
Browse files Browse the repository at this point in the history
…#53)

Signed-off-by: wuhuizuo <wuhuizuo@126.com>

Signed-off-by: wuhuizuo <wuhuizuo@126.com>
  • Loading branch information
wuhuizuo authored Dec 13, 2023
1 parent f338670 commit dd77962
Show file tree
Hide file tree
Showing 15 changed files with 523 additions and 273 deletions.
3 changes: 3 additions & 0 deletions cloudevents-server/configs/example-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,6 @@ lark:
app_id: cli_12345678
app_secret: s123456789
receiver: abc@test.com
tibuild:
result_sink_url: http://localhost:49244 # url of tibuild events listener.
trigger_sink_url: http://localhost:8080 # url of tekton event listener.
68 changes: 68 additions & 0 deletions cloudevents-server/handlers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package main

import (
"net/http"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/gin-gonic/gin"
"github.com/rs/zerolog/log"

"github.com/PingCAP-QE/ee-apps/cloudevents-server/pkg/config"
"github.com/PingCAP-QE/ee-apps/cloudevents-server/pkg/events/custom/tekton"
"github.com/PingCAP-QE/ee-apps/cloudevents-server/pkg/events/custom/testcaserun"
"github.com/PingCAP-QE/ee-apps/cloudevents-server/pkg/events/custom/tibuild"
"github.com/PingCAP-QE/ee-apps/cloudevents-server/pkg/events/handler"
)

func indexHandler(c *gin.Context) {
c.JSON(http.StatusOK, "Welcome to CloudEvents")
}

func healthzHandler(c *gin.Context) {
c.String(http.StatusOK, "OK")
}

func newEventsHandlerFunc(cfg *config.Config) gin.HandlerFunc {
p, err := cloudevents.NewHTTP()
if err != nil {
log.Fatal().Err(err).Msg("Failed to create protocol")
}

handler, err := newCloudEventsHandler(cfg)
if err != nil {
log.Fatal().Err(err).Msg("failed to create cloudevents handler")
}
log.Debug().Any("types", handler.SupportEventTypes()).Msgf("registered event handlers")

h, err := cloudevents.NewHTTPReceiveHandler(nil, p, handler.Handle)
if err != nil {
log.Fatal().Err(err).Msg("failed to create handler")
}

return func(c *gin.Context) {
h.ServeHTTP(c.Writer, c.Request)
}
}

// receiver creates a receiverFn wrapper class that is used by the client to
// validate and invoke the provided function.
func newCloudEventsHandler(cfg *config.Config) (handler.EventHandler, error) {
caseRunHandler, err := testcaserun.NewHandler(cfg.Store)
if err != nil {
return nil, err
}

tektonHandler, err := tekton.NewHandler(cfg.Lark)
if err != nil {
return nil, err
}

tibuildHandler, err := tibuild.NewHandler(cfg.TiBuild.ResultSinkURL, cfg.TiBuild.TriggerSinkURL)
if err != nil {
return nil, err
}

return new(handler.CompositeEventHandler).AddHandlers(
caseRunHandler, tektonHandler, tibuildHandler,
), nil
}
35 changes: 2 additions & 33 deletions cloudevents-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,11 @@ import (
"flag"
"net/http"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/gin-gonic/gin"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"

"github.com/PingCAP-QE/ee-apps/cloudevents-server/pkg/config"
"github.com/PingCAP-QE/ee-apps/cloudevents-server/pkg/events"
)

func main() {
Expand Down Expand Up @@ -38,7 +36,7 @@ func main() {
case gin.DebugMode:
zerolog.SetGlobalLevel(zerolog.DebugLevel)
default:
zerolog.SetGlobalLevel(zerolog.DebugLevel)
zerolog.SetGlobalLevel(zerolog.InfoLevel)
}

gin.SetMode(ginMode)
Expand All @@ -56,34 +54,5 @@ func main() {
func setRouters(r gin.IRoutes, cfg *config.Config) {
r.GET("/", indexHandler)
r.GET("/healthz", healthzHandler)
r.POST("/events", eventsHandler(cfg))
}

func indexHandler(c *gin.Context) {
c.JSON(http.StatusOK, "Welcome to CloudEvents")
}

func healthzHandler(c *gin.Context) {
c.String(http.StatusOK, "OK")
}

func eventsHandler(cfg *config.Config) gin.HandlerFunc {
p, err := cloudevents.NewHTTP()
if err != nil {
log.Fatal().Err(err).Msg("Failed to create protocol")
}

handler, err := events.NewEventsHandler(cfg)
if err != nil {
log.Fatal().Err(err).Msg("failed to create cloudevents handler")
}

h, err := cloudevents.NewHTTPReceiveHandler(nil, p, handler.Handle)
if err != nil {
log.Fatal().Err(err).Msg("failed to create handler")
}

return func(c *gin.Context) {
h.ServeHTTP(c.Writer, c.Request)
}
r.POST("/events", newEventsHandlerFunc(cfg))
}
8 changes: 6 additions & 2 deletions cloudevents-server/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@ type Lark struct {
}

type Config struct {
Store Store `yaml:"store,omitempty" json:"store,omitempty"`
Lark Lark `yaml:"lark,omitempty" json:"lark,omitempty"`
Store Store `yaml:"store,omitempty" json:"store,omitempty"`
Lark Lark `yaml:"lark,omitempty" json:"lark,omitempty"`
TiBuild struct {
ResultSinkURL string `yaml:"result_sink_url,omitempty" json:"result_sink_url,omitempty"`
TriggerSinkURL string `yaml:"trigger_sink_url,omitempty" json:"trigger_sink_url,omitempty"`
} `yaml:"tibuild,omitempty" json:"tibuild,omitempty"`
}

func (c *Config) LoadFromFile(file string) error {
Expand Down
77 changes: 3 additions & 74 deletions cloudevents-server/pkg/events/custom/tekton/handler.go
Original file line number Diff line number Diff line change
@@ -1,85 +1,14 @@
package tekton

import (
"context"
"net/http"
"strings"

"github.com/PingCAP-QE/ee-apps/cloudevents-server/pkg/config"
cloudevents "github.com/cloudevents/sdk-go/v2"
lark "github.com/larksuite/oapi-sdk-go/v3"
"github.com/rs/zerolog/log"
tektoncloudevent "github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent"
"github.com/PingCAP-QE/ee-apps/cloudevents-server/pkg/events/handler"
)

type Handler struct {
LarkClient *lark.Client
RunDetailBaseURL string
Receiver string
}

func NewHandler(cfg config.Lark) (*Handler, error) {
return &Handler{
func NewHandler(cfg config.Lark) (handler.EventHandler, error) {
return &pipelineRunHandler{
LarkClient: newLarkClient(cfg),
Receiver: cfg.Receiver,
RunDetailBaseURL: "https://do.pingcap.net/tekton",
}, nil
}

func (h *Handler) SupportEventTypes() []string {
return []string{
string(tektoncloudevent.PipelineRunFailedEventV1),
string(tektoncloudevent.PipelineRunRunningEventV1),
string(tektoncloudevent.PipelineRunStartedEventV1),
string(tektoncloudevent.PipelineRunSuccessfulEventV1),
string(tektoncloudevent.PipelineRunUnknownEventV1),
string(tektoncloudevent.RunFailedEventV1),
string(tektoncloudevent.RunRunningEventV1),
string(tektoncloudevent.RunStartedEventV1),
string(tektoncloudevent.RunSuccessfulEventV1),
string(tektoncloudevent.TaskRunFailedEventV1),
string(tektoncloudevent.TaskRunRunningEventV1),
string(tektoncloudevent.TaskRunStartedEventV1),
string(tektoncloudevent.TaskRunSuccessfulEventV1),
string(tektoncloudevent.TaskRunUnknownEventV1),
}
}

func (h *Handler) Handle(event cloudevents.Event) cloudevents.Result {
data := new(tektoncloudevent.TektonCloudEventData)
if err := event.DataAs(&data); err != nil {
return cloudevents.NewHTTPResult(http.StatusBadRequest, err.Error())
}

if strings.HasPrefix(event.Type(), "dev.tekton.event.pipelinerun.") {
return h.notifyRunStatus(event)
}

log.Debug().Str("ce-type", event.Type()).Msg("skip notifing for the event type.")
return cloudevents.ResultACK
}

func (h *Handler) notifyRunStatus(event cloudevents.Event) cloudevents.Result {
createMsgReq, err := newLarkMessage(h.Receiver, event, h.RunDetailBaseURL)
if err != nil {
log.Error().Err(err).Msg("compose lark message failed")
return cloudevents.NewHTTPResult(http.StatusInternalServerError, "compose lark message failed: %v", err)
}

resp, err := h.LarkClient.Im.Message.Create(context.Background(), createMsgReq)
if err != nil {
log.Error().Err(err).Msg("send lark message failed")
return cloudevents.NewHTTPResult(http.StatusInternalServerError, "send lark message failed: %v", err)
}

if resp.Success() {
log.Info().
Str("request-id", resp.RequestId()).
Str("message-id", *resp.Data.MessageId).
Msg("send lark message successfully.")
return cloudevents.ResultACK
}

log.Error().Err(resp).Msg("send lark message failed!")
return cloudevents.NewHTTPResult(http.StatusInternalServerError, "send lark message failed!")
}
62 changes: 62 additions & 0 deletions cloudevents-server/pkg/events/custom/tekton/handler_pipelinerun.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package tekton

import (
"context"
"net/http"

cloudevents "github.com/cloudevents/sdk-go/v2"
lark "github.com/larksuite/oapi-sdk-go/v3"
"github.com/rs/zerolog/log"
tektoncloudevent "github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent"
)

type pipelineRunHandler struct {
LarkClient *lark.Client
RunDetailBaseURL string
Receiver string
}

func (h *pipelineRunHandler) SupportEventTypes() []string {
return []string{
string(tektoncloudevent.PipelineRunFailedEventV1),
string(tektoncloudevent.PipelineRunRunningEventV1),
string(tektoncloudevent.PipelineRunStartedEventV1),
string(tektoncloudevent.PipelineRunSuccessfulEventV1),
string(tektoncloudevent.PipelineRunUnknownEventV1),
}
}

func (h *pipelineRunHandler) Handle(event cloudevents.Event) cloudevents.Result {
data := new(tektoncloudevent.TektonCloudEventData)
if err := event.DataAs(&data); err != nil {
return cloudevents.NewHTTPResult(http.StatusBadRequest, err.Error())
}

log.Debug().Str("ce-type", event.Type()).Msg("skip notifing for the event type.")
return cloudevents.ResultACK
}

func (h *pipelineRunHandler) notifyRunStatus(event cloudevents.Event) cloudevents.Result {
createMsgReq, err := newLarkMessage(h.Receiver, event, h.RunDetailBaseURL)
if err != nil {
log.Error().Err(err).Msg("compose lark message failed")
return cloudevents.NewHTTPResult(http.StatusInternalServerError, "compose lark message failed: %v", err)
}

resp, err := h.LarkClient.Im.Message.Create(context.Background(), createMsgReq)
if err != nil {
log.Error().Err(err).Msg("send lark message failed")
return cloudevents.NewHTTPResult(http.StatusInternalServerError, "send lark message failed: %v", err)
}

if resp.Success() {
log.Info().
Str("request-id", resp.RequestId()).
Str("message-id", *resp.Data.MessageId).
Msg("send lark message successfully.")
return cloudevents.ResultACK
}

log.Error().Err(resp).Msg("send lark message failed!")
return cloudevents.NewHTTPResult(http.StatusInternalServerError, "send lark message failed!")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package tekton

import (
"encoding/json"
"reflect"
"testing"

cloudevents "github.com/cloudevents/sdk-go/v2"
lark "github.com/larksuite/oapi-sdk-go/v3"
tektoncloudevent "github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent"

_ "embed"
)

// test events.
var (
//go:embed testdata/event-pipelinerun.failed.json
pipelineRunFailedEventBytes []byte
//go:embed testdata/event-pipelinerun.running.json
pipelineRunRunningEventBytes []byte
//go:embed testdata/event-pipelinerun.started.json
pipelineRunStartedEventBytes []byte
//go:embed testdata/event-pipelinerun.successful.json
pipelineRunSuccessfulEventBytes []byte
//go:embed testdata/event-pipelinerun.unknown.json
pipelineRunUnknownEventBytes []byte
)

func Test_pipelineRunHandler_Handle(t *testing.T) {
type fields struct {
LarkClient *lark.Client
}
type args struct {
}
tests := []struct {
name tektoncloudevent.TektonEventType
eventJSON []byte
want cloudevents.Result
}{
{name: tektoncloudevent.PipelineRunFailedEventV1, eventJSON: pipelineRunFailedEventBytes, want: cloudevents.ResultACK},
{name: tektoncloudevent.PipelineRunRunningEventV1, eventJSON: pipelineRunRunningEventBytes, want: cloudevents.ResultACK},
{name: tektoncloudevent.PipelineRunStartedEventV1, eventJSON: pipelineRunStartedEventBytes, want: cloudevents.ResultACK},
{name: tektoncloudevent.PipelineRunSuccessfulEventV1, eventJSON: pipelineRunSuccessfulEventBytes, want: cloudevents.ResultACK},
{name: tektoncloudevent.TaskRunFailedEventV1, eventJSON: taskRunFailedEventBytes, want: cloudevents.ResultACK},
{name: tektoncloudevent.TaskRunRunningEventV1, eventJSON: taskRunRunningEventBytes, want: cloudevents.ResultACK},
{name: tektoncloudevent.TaskRunStartedEventV1, eventJSON: taskRunStartedEventBytes, want: cloudevents.ResultACK},
{name: tektoncloudevent.TaskRunSuccessfulEventV1, eventJSON: taskRunSuccessfulEventBytes, want: cloudevents.ResultACK},
{name: tektoncloudevent.TaskRunUnknownEventV1, eventJSON: taskRunUnknownEventBytes, want: cloudevents.ResultACK},
}

h := &pipelineRunHandler{
LarkClient: lark.NewClient(larkAppID, larkAppSecret, lark.WithLogReqAtDebug(true), lark.WithEnableTokenCache(true)),
RunDetailBaseURL: baseURL,
}
for _, tt := range tests {
t.Run(string(tt.name), func(t *testing.T) {
e := cloudevents.NewEvent()
if err := json.Unmarshal(tt.eventJSON, &e); err != nil {
t.Error(err)
return
}

if got := h.Handle(e); !reflect.DeepEqual(got, tt.want) {
t.Errorf("pipelineRunHandler.Handle() = %v, want %v", got, tt.want)
}
})
}
}
Loading

0 comments on commit dd77962

Please sign in to comment.