diff --git a/go.mod b/go.mod index 70fa6b3..b49d3f4 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/OpenFunction/dapr-proxy go 1.18 require ( - github.com/OpenFunction/functions-framework-go v0.0.0-20220925145105-86f7bcc9cf8c + github.com/OpenFunction/functions-framework-go v0.5.0 github.com/dapr/components-contrib v1.8.1-rc.1 github.com/dapr/dapr v1.8.3 github.com/dapr/go-sdk v1.5.0 diff --git a/go.sum b/go.sum index 8c6c4f1..227db7e 100644 --- a/go.sum +++ b/go.sum @@ -57,8 +57,8 @@ github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb06e3pkSAbeQ52E9H9iFoQsEEwGN64994WTCIhntQ= github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= -github.com/OpenFunction/functions-framework-go v0.0.0-20220925145105-86f7bcc9cf8c h1:TFstl4SFFVZpd2+2yhVGjW15em/lV9NJ7NGoM6kaUHQ= -github.com/OpenFunction/functions-framework-go v0.0.0-20220925145105-86f7bcc9cf8c/go.mod h1:ussM725MZuGmAH4PPmwGdHoDxIT4oSx7VK5Wiibg/No= +github.com/OpenFunction/functions-framework-go v0.5.0 h1:s2L4PyazI8EoPzrfW0Es1esCSZS2SFXD6KNXyDKaJKc= +github.com/OpenFunction/functions-framework-go v0.5.0/go.mod h1:+uYjTEYmn2uqIyViZtg9OF+bUNdjbkWNd7jrQWc7iEc= github.com/PuerkitoBio/purell v1.1.1 h1:WEQqlqaGbrPkxLJWfBwQmfEAE1Z7ONdDLqrN38tNFfI= github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 h1:d+Bc7a5rLufV/sSk/8dngufqelfh6jnri85riMAaF/M= diff --git a/main.go b/main.go index b9bf4a2..a1a98f7 100644 --- a/main.go +++ b/main.go @@ -6,15 +6,17 @@ import ( "strconv" "time" - proxyruntime "github.com/OpenFunction/dapr-proxy/pkg/runtime" - "github.com/OpenFunction/dapr-proxy/pkg/utils" ofctx "github.com/OpenFunction/functions-framework-go/context" "github.com/OpenFunction/functions-framework-go/framework" + "github.com/cenkalti/backoff/v4" diag "github.com/dapr/dapr/pkg/diagnostics" "github.com/dapr/dapr/pkg/modes" "github.com/dapr/dapr/pkg/runtime" "github.com/pkg/errors" "k8s.io/klog/v2" + + proxyruntime "github.com/OpenFunction/dapr-proxy/pkg/runtime" + "github.com/OpenFunction/dapr-proxy/pkg/utils" ) const ( @@ -48,10 +50,11 @@ func main() { port, _ := strconv.Atoi(funcContext.GetPort()) protocol := utils.GetEnvVar(protocolEnvVar, defaultAppProtocol) config := &proxyruntime.Config{ - Protocol: runtime.Protocol(protocol), - Host: host, - Port: port, - Mode: modes.KubernetesMode, + Protocol: runtime.Protocol(protocol), + Host: host, + Port: port, + Mode: modes.KubernetesMode, + MaxBufferSize: 1000000, } FuncRuntime = proxyruntime.NewFuncRuntime(config, funcContext) @@ -59,6 +62,8 @@ func main() { klog.Exit(err) } + go FuncRuntime.ProcessEvents() + if err := fwk.Register(ctx, EventHandler); err != nil { klog.Exit(err) } @@ -75,35 +80,37 @@ func EventHandler(ctx ofctx.Context, in []byte) (ofctx.Out, error) { klog.V(4).Infof("Input: %s - Event Forwarding Elapsed: %vms", ctx.GetInputName(), elapsed) }() - // Forwarding BindingEvent + c := ctx.GetNativeContext() + + // Handle BindingEvent bindingEvent := ctx.GetBindingEvent() if bindingEvent != nil { - data, err := FuncRuntime.OnBindingEvent(ctx, bindingEvent) - if err != nil { - klog.Error(err) - return ctx.ReturnOnInternalError(), err - } else { - out := new(ofctx.FunctionOut) - out.WithData(data) - out.WithCode(ofctx.Success) - return out, nil - } + FuncRuntime.EnqueueBindingEvent(&c, bindingEvent) } - // Forwarding TopicEvent + // Handle TopicEvent topicEvent := ctx.GetTopicEvent() if topicEvent != nil { - err := FuncRuntime.OnTopicEvent(ctx, topicEvent) - if err != nil { - klog.Error(err) - return ctx.ReturnOnInternalError(), err - } else { - out := new(ofctx.FunctionOut) - out.WithCode(ofctx.Success) - return out, nil - } + FuncRuntime.EnqueueTopicEvent(&c, topicEvent) } - err := errors.New("Only Binding and Pubsub events are supported") - return ctx.ReturnOnInternalError(), err + var resp *proxyruntime.EventResponse + err := backoff.Retry(func() error { + resp = FuncRuntime.GetEventResponse(&c) + if resp == nil { + return errors.New("Failed to get event response") + } + return nil + }, utils.NewExponentialBackOff()) + + if err != nil { + e := errors.New("Processing event timeout") + klog.Error(e) + return ctx.ReturnOnInternalError(), e + } else { + out := new(ofctx.FunctionOut) + out.WithData(resp.Data) + out.WithCode(ofctx.Success) + return out, nil + } } diff --git a/pkg/grpc/grpc.go b/pkg/grpc/grpc.go index 4f0d111..99f524a 100644 --- a/pkg/grpc/grpc.go +++ b/pkg/grpc/grpc.go @@ -10,7 +10,6 @@ import ( "sync" "time" - "github.com/OpenFunction/dapr-proxy/pkg/lb" "github.com/dapr/dapr/pkg/channel" diag "github.com/dapr/dapr/pkg/diagnostics" "github.com/dapr/dapr/pkg/modes" @@ -20,6 +19,8 @@ import ( "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" "k8s.io/klog/v2" + + "github.com/OpenFunction/dapr-proxy/pkg/lb" ) type endpoint string @@ -89,7 +90,7 @@ func (g *Manager) StartEndpointsDetection() { endpoints[ep] = true } } else { - klog.Error(err) + klog.V(4).Info(err) } oldEndpoints := g.balancer.All() @@ -107,11 +108,9 @@ func (g *Manager) StartEndpointsDetection() { klog.Error(err) } else if state == connectivity.Ready || state == connectivity.Idle { g.balancer.Add(ep) - } else { - g.balancer.Remove(ep) } } - time.Sleep(time.Second) + time.Sleep(200 * time.Millisecond) } }() } @@ -119,12 +118,12 @@ func (g *Manager) StartEndpointsDetection() { func (g *Manager) GetGRPCConnection() (*grpc.ClientConn, func(), error) { address, _ := g.balancer.Next(lb.DummyFactor) if address == nil { - return nil, nil, errors.Errorf("No available endpoints") + return nil, func() {}, errors.Errorf("No available endpoints") } conn, teardown, err := g.getGRPCConnection(context.TODO(), address.String(), "", "", true, false, false) if err != nil { - return nil, nil, errors.Errorf("error establishing connection to app grpc on address %s: %s", address, err) + return nil, teardown, errors.Errorf("error establishing connection to app grpc on address %s: %s", address, err) } return conn, func() { diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index 44b6efc..279ee79 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -2,13 +2,13 @@ package runtime import ( "bytes" + "context" "encoding/json" nethttp "net/http" + "sync" - "github.com/OpenFunction/dapr-proxy/pkg/grpc" - "github.com/OpenFunction/dapr-proxy/pkg/http" - "github.com/OpenFunction/dapr-proxy/pkg/utils" ofctx "github.com/OpenFunction/functions-framework-go/context" + "github.com/cenkalti/backoff/v4" "github.com/dapr/components-contrib/contenttype" "github.com/dapr/dapr/pkg/channel" invoke "github.com/dapr/dapr/pkg/messaging/v1" @@ -17,14 +17,35 @@ import ( "github.com/dapr/dapr/pkg/runtime" "github.com/dapr/go-sdk/service/common" "github.com/pkg/errors" + "k8s.io/klog/v2" + + "github.com/OpenFunction/dapr-proxy/pkg/grpc" + "github.com/OpenFunction/dapr-proxy/pkg/http" + "github.com/OpenFunction/dapr-proxy/pkg/utils" ) type Config struct { - Protocol runtime.Protocol - Host string - Port int - Mode modes.DaprMode - sslEnabled bool + Protocol runtime.Protocol + Host string + Port int + Mode modes.DaprMode + sslEnabled bool + MaxBufferSize int +} + +type EventRequest struct { + ctx *context.Context + ofctx.EventRequest +} + +type EventResponse struct { + Data []byte + Error error +} + +type ResponseMap struct { + l *sync.RWMutex + m map[*context.Context]*EventResponse } type Runtime struct { @@ -32,12 +53,20 @@ type Runtime struct { ctx *ofctx.FunctionContext grpc *grpc.Manager funcChannel channel.AppChannel + reqChan chan *EventRequest + respMap *ResponseMap } func NewFuncRuntime(config *Config, ctx *ofctx.FunctionContext) *Runtime { + lock := new(sync.RWMutex) return &Runtime{ - config: config, - ctx: ctx, + config: config, + ctx: ctx, + reqChan: make(chan *EventRequest, config.MaxBufferSize), + respMap: &ResponseMap{ + l: lock, + m: make(map[*context.Context]*EventResponse), + }, } } @@ -51,15 +80,89 @@ func (r *Runtime) CreateFuncChannel() error { r.funcChannel = funcChannel case runtime.GRPCProtocol: r.grpc = grpc.NewGRPCManager(r.config.Host, r.config.Port, r.config.sslEnabled) - r.grpc.StartEndpointsDetection() + if r.ctx.Runtime == ofctx.Async { + r.grpc.StartEndpointsDetection() + } default: return errors.Errorf("cannot create app channel for protocol %s", string(r.config.Protocol)) } return nil } -func (r *Runtime) OnBindingEvent(ctx ofctx.Context, event *common.BindingEvent) ([]byte, error) { - var function func(ctx ofctx.Context, event *common.BindingEvent) ([]byte, error) +func (r *Runtime) ProcessEvents() { + for e := range r.reqChan { + if e.BindingEvent != nil { + var data []byte + // Retry on connection error. + err := backoff.Retry(func() error { + var err error + data, err = r.OnBindingEvent(e.ctx, e.BindingEvent) + if err != nil { + klog.V(4).Info(err) + return err + } + return nil + }, utils.NewExponentialBackOff()) + + resp := EventResponse{ + Data: data, + Error: err, + } + r.respMap.l.Lock() + r.respMap.m[e.ctx] = &resp + r.respMap.l.Unlock() + } + + if e.TopicEvent != nil { + // Retry on connection error. + err := backoff.Retry(func() error { + var err error + err = r.OnTopicEvent(e.ctx, e.TopicEvent) + if err != nil { + return err + } + return nil + }, utils.NewExponentialBackOff()) + + resp := EventResponse{ + Data: nil, + Error: err, + } + r.respMap.l.Lock() + r.respMap.m[e.ctx] = &resp + r.respMap.l.Unlock() + } + } +} + +func (r *Runtime) EnqueueBindingEvent(ctx *context.Context, event *common.BindingEvent) { + req := EventRequest{ + ctx: ctx, + EventRequest: ofctx.EventRequest{BindingEvent: event}, + } + r.reqChan <- &req +} + +func (r *Runtime) EnqueueTopicEvent(ctx *context.Context, event *common.TopicEvent) { + req := EventRequest{ + ctx: ctx, + EventRequest: ofctx.EventRequest{TopicEvent: event}, + } + r.reqChan <- &req +} + +func (r *Runtime) GetEventResponse(ctx *context.Context) *EventResponse { + defer r.respMap.l.Unlock() + r.respMap.l.Lock() + if resp, ok := r.respMap.m[ctx]; ok { + delete(r.respMap.m, ctx) + return resp + } + return nil +} + +func (r *Runtime) OnBindingEvent(ctx *context.Context, event *common.BindingEvent) ([]byte, error) { + var function func(ctx *context.Context, event *common.BindingEvent) ([]byte, error) switch r.config.Protocol { case runtime.HTTPProtocol: function = r.onBindingEventHTTP @@ -69,8 +172,8 @@ func (r *Runtime) OnBindingEvent(ctx ofctx.Context, event *common.BindingEvent) return function(ctx, event) } -func (r *Runtime) OnTopicEvent(ctx ofctx.Context, event *common.TopicEvent) error { - var function func(ctx ofctx.Context, event *common.TopicEvent) error +func (r *Runtime) OnTopicEvent(ctx *context.Context, event *common.TopicEvent) error { + var function func(ctx *context.Context, event *common.TopicEvent) error switch r.config.Protocol { case runtime.HTTPProtocol: function = r.onTopicEventHTTP @@ -80,7 +183,7 @@ func (r *Runtime) OnTopicEvent(ctx ofctx.Context, event *common.TopicEvent) erro return function(ctx, event) } -func (r *Runtime) onBindingEventHTTP(ctx ofctx.Context, event *common.BindingEvent) ([]byte, error) { +func (r *Runtime) onBindingEventHTTP(ctx *context.Context, event *common.BindingEvent) ([]byte, error) { path, _ := utils.GetComponentName(r.ctx) req := invoke.NewInvokeMethodRequest(path) req.WithHTTPExtension(nethttp.MethodPost, "") @@ -92,7 +195,7 @@ func (r *Runtime) onBindingEventHTTP(ctx ofctx.Context, event *common.BindingEve } req.WithMetadata(reqMetadata) - resp, err := r.funcChannel.InvokeMethod(ctx.GetNativeContext(), req) + resp, err := r.funcChannel.InvokeMethod(*ctx, req) if err != nil { return nil, errors.Errorf("Error sending topic event to function: %s", err) } @@ -104,7 +207,7 @@ func (r *Runtime) onBindingEventHTTP(ctx ofctx.Context, event *common.BindingEve return data, nil } -func (r *Runtime) onBindingEventGRPC(ctx ofctx.Context, bindingEvent *common.BindingEvent) ([]byte, error) { +func (r *Runtime) onBindingEventGRPC(ctx *context.Context, bindingEvent *common.BindingEvent) ([]byte, error) { conn, release, err := r.grpc.GetGRPCConnection() defer release() if err != nil { @@ -117,14 +220,14 @@ func (r *Runtime) onBindingEventGRPC(ctx ofctx.Context, bindingEvent *common.Bin Data: bindingEvent.Data, Metadata: bindingEvent.Metadata, } - if resp, err := client.OnBindingEvent(ctx.GetNativeContext(), req); err != nil { + if resp, err := client.OnBindingEvent(*ctx, req); err != nil { return nil, errors.Errorf("Error sending binding event to function: %s", err) } else { return resp.Data, nil } } -func (r *Runtime) onTopicEventHTTP(ctx ofctx.Context, event *common.TopicEvent) error { +func (r *Runtime) onTopicEventHTTP(ctx *context.Context, event *common.TopicEvent) error { pubsubName, _ := utils.GetComponentName(r.ctx) path, _ := utils.GetTopicEventPath(r.ctx) req := invoke.NewInvokeMethodRequest(path) @@ -140,7 +243,7 @@ func (r *Runtime) onTopicEventHTTP(ctx ofctx.Context, event *common.TopicEvent) metadata["pubsubName"] = pubsubName req.WithCustomHTTPMetadata(metadata) - resp, err := r.funcChannel.InvokeMethod(ctx.GetNativeContext(), req) + resp, err := r.funcChannel.InvokeMethod(*ctx, req) if err != nil { return errors.Errorf("Error sending topic event to function: %s", err) } @@ -151,7 +254,7 @@ func (r *Runtime) onTopicEventHTTP(ctx ofctx.Context, event *common.TopicEvent) return nil } -func (r *Runtime) onTopicEventGRPC(ctx ofctx.Context, event *common.TopicEvent) error { +func (r *Runtime) onTopicEventGRPC(ctx *context.Context, event *common.TopicEvent) error { conn, release, err := r.grpc.GetGRPCConnection() defer release() if err != nil { @@ -171,7 +274,7 @@ func (r *Runtime) onTopicEventGRPC(ctx ofctx.Context, event *common.TopicEvent) Path: path, } - if _, err := client.OnTopicEvent(ctx.GetNativeContext(), req); err != nil { + if _, err := client.OnTopicEvent(*ctx, req); err != nil { return errors.Errorf("Error sending topic event to function: %s", err) } return nil diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index 52cd5f5..b4c7f4f 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -5,9 +5,11 @@ import ( "os" "reflect" "strings" + "time" "unsafe" ofctx "github.com/OpenFunction/functions-framework-go/context" + "github.com/cenkalti/backoff/v4" "github.com/pkg/errors" ) @@ -54,3 +56,17 @@ func GetEnvVar(key, fallbackValue string) string { } return fallbackValue } + +func NewExponentialBackOff() *backoff.ExponentialBackOff { + b := &backoff.ExponentialBackOff{ + InitialInterval: 5 * time.Millisecond, + RandomizationFactor: 0.5, + Multiplier: 1.5, + MaxInterval: 100 * time.Millisecond, + MaxElapsedTime: 60 * time.Second, + Stop: backoff.Stop, + Clock: backoff.SystemClock, + } + b.Reset() + return b +}