Skip to content

Commit

Permalink
Add events websocket endpoint
Browse files Browse the repository at this point in the history
This change adds a new websocket endpoint for database events. The events
endpoint allows clients to stream events as they happen in GARM. Events
are defined as a structure containning the event type (create, update, delete),
the database entity involved (instances, pools, repos, etc) and the payload
consisting of the object involved in the event. The payload translates
to the types normally returned by the API and can be deserialized as one
of the types present in the params package.

The events endpoint is a websocket endpoint and it accepts filters as
a simple json send over the websocket connection. The filters allows the
user to specify which entities are of interest, and which operations should
be returned. For example, you may be interested in changes made to pools
or runners, in which case you could create a filter that only returns
update operations for pools. Or update and delete operations.

The filters can be defined as:

{
  "filters": [
    {
      "entity_type": "instance",
      "operations": ["update", "delete"]
    },
    {
      "entity_type": "pool"
    },
  ],
  "send_everything": false
}

This would return only update and delete events for instances and all events
for pools. Alternatively you can ask GARM to send you everything:

{
  "send_everything": true
}

Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
  • Loading branch information
gabriel-samfira committed Jul 4, 2024
1 parent cf35997 commit 62b5e96
Show file tree
Hide file tree
Showing 91 changed files with 9,680 additions and 4,675 deletions.
38 changes: 38 additions & 0 deletions apiserver/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

gErrors "github.com/cloudbase/garm-provider-common/errors"
"github.com/cloudbase/garm-provider-common/util"
"github.com/cloudbase/garm/apiserver/events"
"github.com/cloudbase/garm/apiserver/params"
"github.com/cloudbase/garm/auth"
"github.com/cloudbase/garm/metrics"
Expand Down Expand Up @@ -163,6 +164,43 @@ func (a *APIController) WebhookHandler(w http.ResponseWriter, r *http.Request) {
}
}

func (a *APIController) EventsHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
if !auth.IsAdmin(ctx) {
w.WriteHeader(http.StatusForbidden)
if _, err := w.Write([]byte("events are available to admin users")); err != nil {
slog.With(slog.Any("error", err)).ErrorContext(ctx, "failed to encode response")
}
return
}

conn, err := a.upgrader.Upgrade(w, r, nil)
if err != nil {
slog.With(slog.Any("error", err)).ErrorContext(ctx, "error upgrading to websockets")
return
}
defer conn.Close()

wsClient, err := wsWriter.NewClient(ctx, conn)
if err != nil {
slog.With(slog.Any("error", err)).ErrorContext(ctx, "failed to create new client")
return
}
defer wsClient.Stop()

eventHandler, err := events.NewHandler(ctx, wsClient)
if err != nil {
slog.With(slog.Any("error", err)).ErrorContext(ctx, "failed to create new event handler")
return
}

if err := eventHandler.Start(); err != nil {
slog.With(slog.Any("error", err)).ErrorContext(ctx, "failed to start event handler")
return
}
<-eventHandler.Done()
}

func (a *APIController) WSHandler(writer http.ResponseWriter, req *http.Request) {
ctx := req.Context()
if !auth.IsAdmin(ctx) {
Expand Down
174 changes: 174 additions & 0 deletions apiserver/events/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package events

import (
"context"
"encoding/json"
"fmt"
"log/slog"
"sync"

runnerErrors "github.com/cloudbase/garm-provider-common/errors"
commonUtil "github.com/cloudbase/garm-provider-common/util"
"github.com/cloudbase/garm/auth"
"github.com/cloudbase/garm/database/common"
"github.com/cloudbase/garm/database/watcher"
"github.com/cloudbase/garm/websocket"
)

func NewHandler(ctx context.Context, client *websocket.Client) (*EventHandler, error) {
if client == nil {
return nil, runnerErrors.ErrUnauthorized
}

newID := commonUtil.NewID()
userID := auth.UserID(ctx)
if userID == "" {
return nil, runnerErrors.ErrUnauthorized
}
consumerID := fmt.Sprintf("ws-event-watcher-%s-%s", userID, newID)
consumer, err := watcher.RegisterConsumer(
// Filter everything by default. Users should set up filters
// after registration.
ctx, consumerID, watcher.WithNone())
if err != nil {
return nil, err
}

handler := &EventHandler{
client: client,
ctx: ctx,
consumer: consumer,
done: make(chan struct{}),
}
client.SetMessageHandler(handler.HandleClientMessages)

return handler, nil
}

type EventHandler struct {
client *websocket.Client
consumer common.Consumer

ctx context.Context
done chan struct{}
running bool

mux sync.Mutex
}

func (e *EventHandler) loop() {
defer e.Stop()

for {
select {
case <-e.ctx.Done():
slog.DebugContext(e.ctx, "context done, stopping event handler")
return
case <-e.client.Done():
slog.DebugContext(e.ctx, "client done, stopping event handler")
return
case <-e.Done():
slog.DebugContext(e.ctx, "done channel closed, stopping event handler")
case event, ok := <-e.consumer.Watch():
if !ok {
slog.DebugContext(e.ctx, "watcher closed, stopping event handler")
return
}
asJs, err := json.Marshal(event)
if err != nil {
slog.ErrorContext(e.ctx, "failed to marshal event", "error", err)
continue
}
if _, err := e.client.Write(asJs); err != nil {
slog.ErrorContext(e.ctx, "failed to write event", "error", err)
}
}
}
}

func (e *EventHandler) Start() error {
e.mux.Lock()
defer e.mux.Unlock()

if e.running {
return nil
}

if err := e.client.Start(); err != nil {
return err
}
e.running = true
go e.loop()
return nil
}

func (e *EventHandler) Stop() {
e.mux.Lock()
defer e.mux.Unlock()

if !e.running {
return
}
e.running = false
e.consumer.Close()
e.client.Stop()
close(e.done)
}

func (e *EventHandler) Done() <-chan struct{} {
return e.done
}

// optionsToWatcherFilters converts the Options struct to a PayloadFilterFunc.
// The client will send an array of filters that indicates which entities and which
// operations the client is interested in. The behavior is that of "any" filter.
// Which means that if any of the elements in the array match an event, it will be
// sent to the websocket.
// Alternatively, clients can choose to get everything.
func (e *EventHandler) optionsToWatcherFilters(opt Options) common.PayloadFilterFunc {
if opt.SendEverything {
return watcher.WithEverything()
}

var funcs []common.PayloadFilterFunc
for _, filter := range opt.Filters {
var filterFunc []common.PayloadFilterFunc
if filter.EntityType == "" {
return watcher.WithNone()
}
filterFunc = append(filterFunc, watcher.WithEntityTypeFilter(filter.EntityType))
if len(filter.Operations) > 0 {
var opFunc []common.PayloadFilterFunc
for _, op := range filter.Operations {
opFunc = append(opFunc, watcher.WithOperationTypeFilter(op))
}
filterFunc = append(filterFunc, watcher.WithAny(opFunc...))
}
funcs = append(funcs, watcher.WithAll(filterFunc...))
}
return watcher.WithAny(funcs...)
}

func (e *EventHandler) HandleClientMessages(message []byte) error {
if e.consumer == nil {
return fmt.Errorf("consumer not initialized")
}

var opt Options
if err := json.Unmarshal(message, &opt); err != nil {
slog.ErrorContext(e.ctx, "failed to unmarshal message from client", "error", err, "message", string(message))
// Client is in error. Disconnect.
e.client.Write([]byte("failed to unmarshal filter"))
e.Stop()
return nil
}

if len(opt.Filters) == 0 && !opt.SendEverything {
slog.DebugContext(e.ctx, "no filters provided; ignoring")
return nil
}

watcherFilters := e.optionsToWatcherFilters(opt)
e.consumer.SetFilters(watcherFilters)
return nil
}
24 changes: 24 additions & 0 deletions apiserver/events/params.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package events

import (
"github.com/cloudbase/garm/database/common"
)

type Filter struct {
Operations []common.OperationType `json:"operations"`
EntityType common.DatabaseEntityType `json:"entity_type"`
}

func (f Filter) Validate() error {
switch f.EntityType {
case common.RepositoryEntityType, common.OrganizationEntityType, common.EnterpriseEntityType, common.PoolEntityType, common.UserEntityType, common.InstanceEntityType, common.JobEntityType, common.ControllerEntityType, common.GithubCredentialsEntityType, common.GithubEndpointEntityType:
default:
return nil
}
return nil
}

type Options struct {
SendEverything bool `json:"send_everything"`
Filters []Filter `json:"filters"`
}
1 change: 1 addition & 0 deletions apiserver/routers/routers.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ func NewAPIRouter(han *controllers.APIController, authMiddleware, initMiddleware

// Websocket log writer
apiRouter.Handle("/{ws:ws\\/?}", http.HandlerFunc(han.WSHandler)).Methods("GET")
apiRouter.Handle("/{events:events\\/?}", http.HandlerFunc(han.EventsHandler)).Methods("GET")

// NotFound handler
apiRouter.PathPrefix("/").HandlerFunc(han.NotFoundHandler).Methods("GET", "POST", "PUT", "DELETE", "OPTIONS")
Expand Down
96 changes: 96 additions & 0 deletions cmd/garm-cli/cmd/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package cmd

import (
"fmt"
"log/slog"
"os"
"os/signal"
"time"

"github.com/gorilla/websocket"
"github.com/spf13/cobra"

"github.com/cloudbase/garm-provider-common/util"
garmWs "github.com/cloudbase/garm/websocket"
)

var eventsCmd = &cobra.Command{
Use: "debug-events",
SilenceUsage: true,
Short: "Stream garm events",
Long: `Stream all garm events to the terminal.`,
RunE: func(_ *cobra.Command, _ []string) error {
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)

conn, err := getWebsocketConnection("/api/v1/events")
if err != nil {
return err
}
defer conn.Close()

done := make(chan struct{})

go func() {
defer close(done)
conn.SetReadDeadline(time.Now().Add(pongWait))
conn.SetPongHandler(func(string) error { conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
for {
_, message, err := conn.ReadMessage()
if err != nil {
if garmWs.IsErrorOfInterest(err) {
slog.With(slog.Any("error", err)).Error("reading event message")
}
return
}
fmt.Println(util.SanitizeLogEntry(string(message)))
}
}()

if eventsFilters != "" {
conn.SetWriteDeadline(time.Now().Add(writeWait))
err = conn.WriteMessage(websocket.TextMessage, []byte(eventsFilters))
if err != nil {
return err
}
}

ticker := time.NewTicker(pingPeriod)
defer ticker.Stop()

for {
select {
case <-done:
slog.Info("done")
return nil
case <-ticker.C:
conn.SetWriteDeadline(time.Now().Add(writeWait))
err := conn.WriteMessage(websocket.PingMessage, nil)
if err != nil {
return err
}
case <-interrupt:
// Cleanly close the connection by sending a close message and then
// waiting (with timeout) for the server to close the connection.
conn.SetWriteDeadline(time.Now().Add(writeWait))
err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
if err != nil {
return err
}
slog.Info("waiting for server to close connection")
select {
case <-done:
slog.Info("done")
case <-time.After(time.Second):
slog.Info("timeout")
}
return nil
}
}
},
}

func init() {
eventsCmd.Flags().StringVarP(&eventsFilters, "filters", "m", "", "Json with event filters you want to apply")
rootCmd.AddCommand(eventsCmd)
}
Loading

0 comments on commit 62b5e96

Please sign in to comment.