Skip to content

Commit

Permalink
deploy event-source-adapter as apps/deployment instead of ksvc (kyma-…
Browse files Browse the repository at this point in the history
…project#9186)

* replace ksvc by deployment

* bump image

* update rbac

* dd missing files

* add simple service test

* add more tests

* reorder imports

* add metrics port

* add missing permissions

* fix port name

must be 15chars or shorter

* remove serving from the upgrade test

* bump image

* fix rbac for upgrade test

* add label for podmonitor

* update latency dashboard

* fix delivery dashboard

* fix tracing

* Apply suggestions from code review

Co-authored-by: Nils Schmidt <github@nilstschmidt.de>

* fix comments

* remove tracing env var support

* dep ensure

* fix latency dashboard

* fix resources/knative-eventing/charts/event-mesh-dashboard/templates/event-mesh-delivery.yaml

* RC from @anishj0shi: Remove dead code and unnecessary type check

* Use same cloudevents client in integration test and adapter

* Incorporate review comments from sayanh

* Incorporate review comments round #2 from sayanh

* Remove copy right note in non-autogenerated files

* Remove copy right note in autogenerated files (using codegen)

* Remove copy right note in resources files

* Use Fatal with zap.Error logging

* Fix style

* Add "Connectivity Validator -> HTTP Source" panel

* Incorporate review comments from marcobebway

* Add post-upgrade hook to delete orphaned ksvcs

* Fix dashboard (order and double graphs in validator->http event source)

* Use internal logger

* Introduce app label for http source pod for inclusion in pods dashboard

* Fix style

Co-authored-by: Nils Schmidt <github@nilstschmidt.de>
Co-authored-by: Nils Schmidt <nils.schmidt02@sap.com>
  • Loading branch information
3 people authored Aug 17, 2020
1 parent bde8b99 commit a28b2c7
Show file tree
Hide file tree
Showing 102 changed files with 1,611 additions and 2,177 deletions.
98 changes: 13 additions & 85 deletions components/event-sources/Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions components/event-sources/Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@ required = [
revision = "b7391e95e576cacdcdd422573063bc057239113d"

# Direct dependencies
[[constraint]]
name = "knative.dev/serving"
branch = "release-0.12"
[[constraint]]
name = "knative.dev/pkg"
branch = "release-0.12"
Expand Down
14 changes: 0 additions & 14 deletions components/event-sources/Makefile
Original file line number Diff line number Diff line change
@@ -1,17 +1,3 @@
# Copyright 2019 The Kyma Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

APP_NAME = event-sources
APP_PATH = components/event-sources
BUILDPACK = eu.gcr.io/kyma-project/test-infra/buildpack-golang-toolbox:v20200423-1d9d6590
Expand Down
69 changes: 47 additions & 22 deletions components/event-sources/adapter/http/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,32 @@ package http
import (
"context"
"fmt"
"log"
"net/http"
"net/url"

cloudevents "github.com/cloudevents/sdk-go"
cloudeventshttp "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http"
"github.com/pkg/errors"
"go.opencensus.io/trace"
"go.uber.org/zap"
"knative.dev/eventing/pkg/adapter"
"knative.dev/eventing/pkg/kncloudevents"
"knative.dev/eventing/pkg/utils"
"knative.dev/pkg/logging"
"knative.dev/pkg/source"
"knative.dev/pkg/tracing"
pkgtracing "knative.dev/pkg/tracing"

"github.com/kyma-project/kyma/components/event-sources/apis/sources"
)

var _ adapter.EnvConfigAccessor = (*envConfig)(nil)

type envConfig struct {
adapter.EnvConfig
EventSource string `envconfig:"EVENT_SOURCE" required:"true"`

// PORT as required by knative serving runtime contract
Port int `envconfig:"PORT" required:"true" default:"8080"`
TracingEnabled bool `envconfig:"TRACING_ENABLED" default:"true"`
Port int `envconfig:"PORT" required:"true" default:"8080"`
}

func (e *envConfig) GetSource() string {
Expand All @@ -36,10 +39,6 @@ func (e *envConfig) GetPort() int {
return e.Port
}

func (e *envConfig) IsTracingEnabled() bool {
return e.TracingEnabled
}

type httpAdapter struct {
ceClient cloudevents.Client
statsReporter source.StatsReporter
Expand All @@ -52,9 +51,13 @@ type AdapterEnvConfigAccessor interface {
adapter.EnvConfigAccessor
GetSource() string
GetPort() int
IsTracingEnabled() bool
}

const (
defaultMaxIdleConnections = 1000
defaultMaxIdleConnectionsPerHost = 1000
)

const resourceGroup = "http." + sources.GroupName

const (
Expand Down Expand Up @@ -90,32 +93,49 @@ func NewAdapter(ctx context.Context, processed adapter.EnvConfigAccessor, ceClie
}
}

// Start is the entrypoint for the adapter and is called by sharedmain coming from pkg/adapter
func (h *httpAdapter) Start(_ <-chan struct{}) error {

t, err := cloudevents.NewHTTPTransport(
cloudevents.WithPort(h.accessor.GetPort()),
// NewCloudEventsClient creates a new client for receiving and sending cloud events
func NewCloudEventsClient(port int) (cloudevents.Client, error) {
options := []cloudeventshttp.Option{
cloudevents.WithBinaryEncoding(),
cloudevents.WithMiddleware(pkgtracing.HTTPSpanMiddleware),
cloudevents.WithPort(port),
cloudevents.WithPath(endpointCE),
cloudevents.WithMiddleware(WithReadinessMiddleware),
cloudevents.WithMiddleware(tracing.HTTPSpanMiddleware),
}

httpTransport, err := cloudevents.NewHTTPTransport(
options...,
)
if err != nil {
return errors.Wrap(err, "failed to create transport")
return nil, errors.Wrap(err, "failed to create transport")
}

c, err := cloudevents.NewClient(t)
connectionArgs := kncloudevents.ConnectionArgs{
MaxIdleConns: defaultMaxIdleConnections,
MaxIdleConnsPerHost: defaultMaxIdleConnectionsPerHost,
}

ceClient, err := kncloudevents.NewDefaultClientGivenHttpTransport(
httpTransport,
&connectionArgs)

if err != nil {
return errors.Wrap(err, "failed to create client")
return nil, errors.Wrap(err, "failed to create client")
}
return ceClient, nil
}

// Start is the entrypoint for the adapter and is called by sharedmain coming from pkg/adapter
func (h *httpAdapter) Start(_ <-chan struct{}) error {

log.Printf("listening on :%d%s\n", h.accessor.GetPort(), endpointCE)
h.logger.Info("listening on", zap.String("address", fmt.Sprintf("%d%s", h.accessor.GetPort(), endpointCE)))

// note about graceful shutdown:
// TLDR; StartReceiver unblocks as soon as a stop signal is received
// `StartReceiver` waits internally until `ctx.Done()` does not block anymore
// the context `h.adapterContext` returns a channel (when calling `ctx.Done()`)
// which is closed as soon as a stop signal is received, see https://github.com/knative/pkg/blob/master/signals/signal.go#L37
if err := c.StartReceiver(h.adapterContext, h.serveHTTP); err != nil {
if err := h.ceClient.StartReceiver(h.adapterContext, h.serveHTTP); err != nil {
return errors.Wrap(err, "error occurred while serving")
}
h.logger.Info("adapter stopped")
Expand Down Expand Up @@ -176,8 +196,13 @@ func (h *httpAdapter) serveHTTP(ctx context.Context, event cloudevents.Event, re
// Shamelessly copied from https://github.com/knative/eventing/blob/5631d771968bbf00e64988a0e4217c2915ee778e/pkg/broker/ingress/ingress_handler.go#L116
// Due to an issue in utils.ContextFrom, we don't retain the original trace context from ctx, so
// bring it in manually.
sendingCTX := utils.ContextFrom(tctx, nil)
sendingCTX = trace.NewContext(sendingCTX, trace.FromContext(ctx))
uri, err := url.Parse(h.accessor.GetSinkURI())
if err != nil {
return err
}
sendingCTX := utils.ContextFrom(tctx, uri)
trc := trace.FromContext(ctx)
sendingCTX = trace.NewContext(sendingCTX, trc)
rctx, revt, err := h.ceClient.Send(sendingCTX, event)
if err != nil {
h.logger.Error("failed to send cloudevent to sink", zap.Error(err), zap.Any("sink", h.accessor.GetSinkURI()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,11 @@ func TestAdapterShutdown(t *testing.T) {
// used to simulate sending a stop signal
ctx, cancelFunc := context.WithCancel(ctx)

httpAdapter := NewAdapter(ctx, c, nil, nil)
sinkClient, err := kncloudevents.NewDefaultClient(c.GetSinkURI())
if err != nil {
t.Fatal("error building cloud event client", zap.Error(err))
}
httpAdapter := NewAdapter(ctx, c, sinkClient, nil)
stopChannel := make(chan error)

// start adapter
Expand Down Expand Up @@ -432,11 +436,12 @@ func waitAdapterReady(t *testing.T, adapterURI string) {
}

// startHttpAdapter starts the adapter with a cloudevents client configured with the test sink as target
func startHttpAdapter(t *testing.T, c adapter.EnvConfigAccessor, ctx context.Context) *adapter.Adapter {
sinkClient, err := kncloudevents.NewDefaultClient(c.GetSinkURI())
func startHttpAdapter(t *testing.T, c *envConfig, ctx context.Context) *adapter.Adapter {
sinkClient, err := NewCloudEventsClient(c.GetPort())
if err != nil {
t.Fatal("error building cloud event client", zap.Error(err))
t.Fatalf("error while creating sinkclient: %+v", err)
}

statsReporter, err := source.NewStatsReporter()
if err != nil {
t.Errorf("error building statsreporter: %v", err)
Expand Down
16 changes: 0 additions & 16 deletions components/event-sources/apis/sources/register.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,3 @@
/*
Copyright 2019 The Kyma Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package sources

const (
Expand Down
Loading

0 comments on commit a28b2c7

Please sign in to comment.