diff --git a/pkg/output/xatu/config.go b/pkg/output/xatu/config.go index 902d2232..2716c596 100644 --- a/pkg/output/xatu/config.go +++ b/pkg/output/xatu/config.go @@ -6,15 +6,16 @@ import ( ) type Config struct { - Address string `yaml:"address"` - Headers map[string]string `yaml:"headers"` - TLS bool `yaml:"tls" default:"false"` - MaxQueueSize int `yaml:"maxQueueSize" default:"51200"` - BatchTimeout time.Duration `yaml:"batchTimeout" default:"5s"` - ExportTimeout time.Duration `yaml:"exportTimeout" default:"30s"` - MaxExportBatchSize int `yaml:"maxExportBatchSize" default:"512"` - Workers int `yaml:"workers" default:"1"` - Retry RetryConfig `yaml:"retry"` + Address string `yaml:"address"` + Headers map[string]string `yaml:"headers"` + TLS bool `yaml:"tls" default:"false"` + MaxQueueSize int `yaml:"maxQueueSize" default:"51200"` + BatchTimeout time.Duration `yaml:"batchTimeout" default:"5s"` + ExportTimeout time.Duration `yaml:"exportTimeout" default:"30s"` + MaxExportBatchSize int `yaml:"maxExportBatchSize" default:"512"` + Workers int `yaml:"workers" default:"1"` + Retry RetryConfig `yaml:"retry"` + AuthorizationSecret string `yaml:"authorizationSecret"` } func (c *Config) Validate() error { diff --git a/pkg/output/xatu/exporter.go b/pkg/output/xatu/exporter.go index a2229dd9..f48cc64f 100644 --- a/pkg/output/xatu/exporter.go +++ b/pkg/output/xatu/exporter.go @@ -100,6 +100,10 @@ func (e *ItemExporter) sendUpstream(ctx context.Context, items []*pb.DecoratedEv md := metadata.New(e.config.Headers) ctx = metadata.NewOutgoingContext(ctx, md) + if e.config.AuthorizationSecret != "" { + md.Set("authorization", e.config.AuthorizationSecret) + } + var rsp *pb.CreateEventsResponse var err error diff --git a/pkg/server/service/event-ingester/config.go b/pkg/server/service/event-ingester/config.go index 1585973b..8d98ab2b 100644 --- a/pkg/server/service/event-ingester/config.go +++ b/pkg/server/service/event-ingester/config.go @@ -10,6 +10,8 @@ type Config struct { Enabled bool `yaml:"enabled" default:"false"` // Outputs is the list of sinks to use. Outputs []output.Config `yaml:"outputs"` + // AuthorizationSecret is the secret to use for authorization. + AuthorizationSecret string `yaml:"authorizationSecret"` } func (c *Config) Validate() error { diff --git a/pkg/server/service/event-ingester/ingester.go b/pkg/server/service/event-ingester/ingester.go index 24ec5bd9..0467e742 100644 --- a/pkg/server/service/event-ingester/ingester.go +++ b/pkg/server/service/event-ingester/ingester.go @@ -2,6 +2,7 @@ package eventingester import ( "context" + "errors" "time" "github.com/ethpandaops/xatu/pkg/output" @@ -12,6 +13,7 @@ import ( "github.com/sirupsen/logrus" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" ) @@ -78,6 +80,20 @@ func (e *Ingester) CreateEvents(ctx context.Context, req *xatu.CreateEventsReque // TODO(sam.calder-mason): Derive client id/name from the request jwt clientID := "unknown" + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + return nil, errors.New("failed to get metadata from context") + } + + if e.config.AuthorizationSecret != "" { + authorization := md.Get("authorization") + if len(authorization) > 0 { + if authorization[0] != e.config.AuthorizationSecret { + return nil, status.Error(codes.Unauthenticated, "invalid authorization secret") + } + } + } + filteredEvents, err := e.handler.Events(ctx, clientID, req.Events) if err != nil { return nil, status.Error(codes.Internal, err.Error())