diff --git a/VERSION b/VERSION index b9bc2fd..a0f9a4b 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -latest \ No newline at end of file +latest diff --git a/go.mod b/go.mod index b49d3f4..9dca2b3 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.18 require ( github.com/OpenFunction/functions-framework-go v0.5.0 + github.com/cenkalti/backoff/v4 v4.1.2 github.com/dapr/components-contrib v1.8.1-rc.1 github.com/dapr/dapr v1.8.3 github.com/dapr/go-sdk v1.5.0 @@ -25,7 +26,6 @@ require ( github.com/andybalholm/brotli v1.0.4 // indirect github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20210826220005-b48c857c3a0e // indirect github.com/beorn7/perks v1.0.1 // indirect - github.com/cenkalti/backoff/v4 v4.1.2 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/cloudevents/sdk-go/v2 v2.4.1 // indirect github.com/dapr/kit v0.0.2-0.20210614175626-b9074b64d233 // indirect diff --git a/main.go b/main.go index a1a98f7..8671e1f 100644 --- a/main.go +++ b/main.go @@ -8,11 +8,9 @@ import ( ofctx "github.com/OpenFunction/functions-framework-go/context" "github.com/OpenFunction/functions-framework-go/framework" - "github.com/cenkalti/backoff/v4" diag "github.com/dapr/dapr/pkg/diagnostics" "github.com/dapr/dapr/pkg/modes" "github.com/dapr/dapr/pkg/runtime" - "github.com/pkg/errors" "k8s.io/klog/v2" proxyruntime "github.com/OpenFunction/dapr-proxy/pkg/runtime" @@ -77,36 +75,32 @@ func EventHandler(ctx ofctx.Context, in []byte) (ofctx.Out, error) { start := time.Now() defer func() { elapsed := diag.ElapsedSince(start) + pendingEventsCount := FuncRuntime.GetPendingEventsCount() + klog.V(4).Infof("Input: %s - Pending Events Count: %v", ctx.GetInputName(), pendingEventsCount) klog.V(4).Infof("Input: %s - Event Forwarding Elapsed: %vms", ctx.GetInputName(), elapsed) }() c := ctx.GetNativeContext() + respCh := make(chan *proxyruntime.EventResponse, 1) // Handle BindingEvent bindingEvent := ctx.GetBindingEvent() if bindingEvent != nil { - FuncRuntime.EnqueueBindingEvent(&c, bindingEvent) + event := proxyruntime.NewEvent(&c, bindingEvent, nil, respCh) + FuncRuntime.EnqueueEvent(&event) } // Handle TopicEvent topicEvent := ctx.GetTopicEvent() if topicEvent != nil { - FuncRuntime.EnqueueTopicEvent(&c, topicEvent) + event := proxyruntime.NewEvent(&c, nil, topicEvent, respCh) + FuncRuntime.EnqueueEvent(&event) } - var resp *proxyruntime.EventResponse - err := backoff.Retry(func() error { - resp = FuncRuntime.GetEventResponse(&c) - if resp == nil { - return errors.New("Failed to get event response") - } - return nil - }, utils.NewExponentialBackOff()) - - if err != nil { - e := errors.New("Processing event timeout") - klog.Error(e) - return ctx.ReturnOnInternalError(), e + resp := <-respCh + if resp.Error != nil { + klog.Error(resp.Error) + return ctx.ReturnOnInternalError(), resp.Error } else { out := new(ofctx.FunctionOut) out.WithData(resp.Data) diff --git a/pkg/grpc/grpc.go b/pkg/grpc/grpc.go index 99f524a..2eba79c 100644 --- a/pkg/grpc/grpc.go +++ b/pkg/grpc/grpc.go @@ -100,7 +100,7 @@ func (g *Manager) StartEndpointsDetection() { } } - for ep, _ := range endpoints { + for ep := range endpoints { conn, teardown, err := g.getGRPCConnection(context.TODO(), ep.String(), "", "", true, false, g.sslEnabled) teardown() state := conn.GetState() @@ -108,6 +108,8 @@ func (g *Manager) StartEndpointsDetection() { klog.Error(err) } else if state == connectivity.Ready || state == connectivity.Idle { g.balancer.Add(ep) + } else { + g.balancer.Remove(ep) } } time.Sleep(200 * time.Millisecond) @@ -131,7 +133,9 @@ func (g *Manager) GetGRPCConnection() (*grpc.ClientConn, func(), error) { state := conn.GetState() if state != connectivity.Ready && state != connectivity.Idle { g.balancer.Remove(address) - delete(g.connectionPool.pool, address.String()) + g.lock.Lock() + defer g.lock.Unlock() + g.connectionPool.Remove(address.String()) teardown() } }, nil @@ -257,6 +261,13 @@ func (p *connectionPool) Register(address string, conn *grpc.ClientConn) { p.referenceCount[conn] = 2 } +func (p *connectionPool) Remove(address string) { + if conn, ok := p.pool[address]; ok { + delete(p.pool, address) + conn.Close() + } +} + func (p *connectionPool) Share(address string) (*grpc.ClientConn, bool) { conn, ok := p.pool[address] if !ok { diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index 279ee79..f929671 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -5,7 +5,6 @@ import ( "context" "encoding/json" nethttp "net/http" - "sync" ofctx "github.com/OpenFunction/functions-framework-go/context" "github.com/cenkalti/backoff/v4" @@ -33,9 +32,23 @@ type Config struct { MaxBufferSize int } -type EventRequest struct { - ctx *context.Context - ofctx.EventRequest +type Event struct { + ctx *context.Context + bindingEvent *common.BindingEvent + topicEvent *common.TopicEvent + respCh chan *EventResponse +} + +func NewEvent(ctx *context.Context, + bindingEvent *common.BindingEvent, + topicEvent *common.TopicEvent, + respCh chan *EventResponse) Event { + return Event{ + ctx: ctx, + bindingEvent: bindingEvent, + topicEvent: topicEvent, + respCh: respCh, + } } type EventResponse struct { @@ -43,30 +56,19 @@ type EventResponse struct { Error error } -type ResponseMap struct { - l *sync.RWMutex - m map[*context.Context]*EventResponse -} - type Runtime struct { config *Config ctx *ofctx.FunctionContext grpc *grpc.Manager funcChannel channel.AppChannel - reqChan chan *EventRequest - respMap *ResponseMap + events chan *Event } func NewFuncRuntime(config *Config, ctx *ofctx.FunctionContext) *Runtime { - lock := new(sync.RWMutex) return &Runtime{ - config: config, - ctx: ctx, - reqChan: make(chan *EventRequest, config.MaxBufferSize), - respMap: &ResponseMap{ - l: lock, - m: make(map[*context.Context]*EventResponse), - }, + config: config, + ctx: ctx, + events: make(chan *Event, config.MaxBufferSize), } } @@ -89,76 +91,59 @@ func (r *Runtime) CreateFuncChannel() error { return nil } -func (r *Runtime) ProcessEvents() { - for e := range r.reqChan { - if e.BindingEvent != nil { - var data []byte - // Retry on connection error. - err := backoff.Retry(func() error { - var err error - data, err = r.OnBindingEvent(e.ctx, e.BindingEvent) - if err != nil { - klog.V(4).Info(err) - return err - } - return nil - }, utils.NewExponentialBackOff()) +func (r *Runtime) GetPendingEventsCount() int { + return len(r.events) +} - resp := EventResponse{ - Data: data, - Error: err, - } - r.respMap.l.Lock() - r.respMap.m[e.ctx] = &resp - r.respMap.l.Unlock() - } +func (r *Runtime) ProcessEvents() { + for e := range r.events { + if e.bindingEvent != nil { + go func() { + var data []byte + // Retry on connection error. + err := backoff.Retry(func() error { + var err error + data, err = r.OnBindingEvent(e.ctx, e.bindingEvent) + if err != nil { + klog.V(4).Info(err) + return err + } + return nil + }, utils.NewExponentialBackOff()) - if e.TopicEvent != nil { - // Retry on connection error. - err := backoff.Retry(func() error { - var err error - err = r.OnTopicEvent(e.ctx, e.TopicEvent) - if err != nil { - return err + resp := EventResponse{ + Data: data, + Error: err, } - return nil - }, utils.NewExponentialBackOff()) - - resp := EventResponse{ - Data: nil, - Error: err, - } - r.respMap.l.Lock() - r.respMap.m[e.ctx] = &resp - r.respMap.l.Unlock() + e.respCh <- &resp + }() } - } -} -func (r *Runtime) EnqueueBindingEvent(ctx *context.Context, event *common.BindingEvent) { - req := EventRequest{ - ctx: ctx, - EventRequest: ofctx.EventRequest{BindingEvent: event}, - } - r.reqChan <- &req -} + if e.topicEvent != nil { + go func() { + // Retry on connection error. + err := backoff.Retry(func() error { + var err error + err = r.OnTopicEvent(e.ctx, e.topicEvent) + if err != nil { + klog.V(4).Info(err) + return err + } + return nil + }, utils.NewExponentialBackOff()) -func (r *Runtime) EnqueueTopicEvent(ctx *context.Context, event *common.TopicEvent) { - req := EventRequest{ - ctx: ctx, - EventRequest: ofctx.EventRequest{TopicEvent: event}, + resp := EventResponse{ + Data: nil, + Error: err, + } + e.respCh <- &resp + }() + } } - r.reqChan <- &req } -func (r *Runtime) GetEventResponse(ctx *context.Context) *EventResponse { - defer r.respMap.l.Unlock() - r.respMap.l.Lock() - if resp, ok := r.respMap.m[ctx]; ok { - delete(r.respMap.m, ctx) - return resp - } - return nil +func (r *Runtime) EnqueueEvent(event *Event) { + r.events <- event } func (r *Runtime) OnBindingEvent(ctx *context.Context, event *common.BindingEvent) ([]byte, error) { diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index b4c7f4f..19e95fe 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -60,9 +60,9 @@ func GetEnvVar(key, fallbackValue string) string { func NewExponentialBackOff() *backoff.ExponentialBackOff { b := &backoff.ExponentialBackOff{ InitialInterval: 5 * time.Millisecond, - RandomizationFactor: 0.5, - Multiplier: 1.5, - MaxInterval: 100 * time.Millisecond, + RandomizationFactor: 0.2, + Multiplier: 1, + MaxInterval: 5 * time.Millisecond, MaxElapsedTime: 60 * time.Second, Stop: backoff.Stop, Clock: backoff.SystemClock,