Skip to content

Commit

Permalink
Merge pull request #5 from wrongerror/main
Browse files Browse the repository at this point in the history
adjust buffer/retry mechanism
  • Loading branch information
benjaminhuo committed Oct 28, 2022
2 parents 0328157 + 4fec201 commit 6640547
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 104 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
latest
latest
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.18

require (
github.com/OpenFunction/functions-framework-go v0.5.0
github.com/cenkalti/backoff/v4 v4.1.2
github.com/dapr/components-contrib v1.8.1-rc.1
github.com/dapr/dapr v1.8.3
github.com/dapr/go-sdk v1.5.0
Expand All @@ -25,7 +26,6 @@ require (
github.com/andybalholm/brotli v1.0.4 // indirect
github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20210826220005-b48c857c3a0e // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.1.2 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/cloudevents/sdk-go/v2 v2.4.1 // indirect
github.com/dapr/kit v0.0.2-0.20210614175626-b9074b64d233 // indirect
Expand Down
28 changes: 11 additions & 17 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,9 @@ import (

ofctx "github.com/OpenFunction/functions-framework-go/context"
"github.com/OpenFunction/functions-framework-go/framework"
"github.com/cenkalti/backoff/v4"
diag "github.com/dapr/dapr/pkg/diagnostics"
"github.com/dapr/dapr/pkg/modes"
"github.com/dapr/dapr/pkg/runtime"
"github.com/pkg/errors"
"k8s.io/klog/v2"

proxyruntime "github.com/OpenFunction/dapr-proxy/pkg/runtime"
Expand Down Expand Up @@ -77,36 +75,32 @@ func EventHandler(ctx ofctx.Context, in []byte) (ofctx.Out, error) {
start := time.Now()
defer func() {
elapsed := diag.ElapsedSince(start)
pendingEventsCount := FuncRuntime.GetPendingEventsCount()
klog.V(4).Infof("Input: %s - Pending Events Count: %v", ctx.GetInputName(), pendingEventsCount)
klog.V(4).Infof("Input: %s - Event Forwarding Elapsed: %vms", ctx.GetInputName(), elapsed)
}()

c := ctx.GetNativeContext()
respCh := make(chan *proxyruntime.EventResponse, 1)

// Handle BindingEvent
bindingEvent := ctx.GetBindingEvent()
if bindingEvent != nil {
FuncRuntime.EnqueueBindingEvent(&c, bindingEvent)
event := proxyruntime.NewEvent(&c, bindingEvent, nil, respCh)
FuncRuntime.EnqueueEvent(&event)
}

// Handle TopicEvent
topicEvent := ctx.GetTopicEvent()
if topicEvent != nil {
FuncRuntime.EnqueueTopicEvent(&c, topicEvent)
event := proxyruntime.NewEvent(&c, nil, topicEvent, respCh)
FuncRuntime.EnqueueEvent(&event)
}

var resp *proxyruntime.EventResponse
err := backoff.Retry(func() error {
resp = FuncRuntime.GetEventResponse(&c)
if resp == nil {
return errors.New("Failed to get event response")
}
return nil
}, utils.NewExponentialBackOff())

if err != nil {
e := errors.New("Processing event timeout")
klog.Error(e)
return ctx.ReturnOnInternalError(), e
resp := <-respCh
if resp.Error != nil {
klog.Error(resp.Error)
return ctx.ReturnOnInternalError(), resp.Error
} else {
out := new(ofctx.FunctionOut)
out.WithData(resp.Data)
Expand Down
15 changes: 13 additions & 2 deletions pkg/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,16 @@ func (g *Manager) StartEndpointsDetection() {
}
}

for ep, _ := range endpoints {
for ep := range endpoints {
conn, teardown, err := g.getGRPCConnection(context.TODO(), ep.String(), "", "", true, false, g.sslEnabled)
teardown()
state := conn.GetState()
if err != nil {
klog.Error(err)
} else if state == connectivity.Ready || state == connectivity.Idle {
g.balancer.Add(ep)
} else {
g.balancer.Remove(ep)
}
}
time.Sleep(200 * time.Millisecond)
Expand All @@ -131,7 +133,9 @@ func (g *Manager) GetGRPCConnection() (*grpc.ClientConn, func(), error) {
state := conn.GetState()
if state != connectivity.Ready && state != connectivity.Idle {
g.balancer.Remove(address)
delete(g.connectionPool.pool, address.String())
g.lock.Lock()
defer g.lock.Unlock()
g.connectionPool.Remove(address.String())
teardown()
}
}, nil
Expand Down Expand Up @@ -257,6 +261,13 @@ func (p *connectionPool) Register(address string, conn *grpc.ClientConn) {
p.referenceCount[conn] = 2
}

func (p *connectionPool) Remove(address string) {
if conn, ok := p.pool[address]; ok {
delete(p.pool, address)
conn.Close()
}
}

func (p *connectionPool) Share(address string) (*grpc.ClientConn, bool) {
conn, ok := p.pool[address]
if !ok {
Expand Down
145 changes: 65 additions & 80 deletions pkg/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"context"
"encoding/json"
nethttp "net/http"
"sync"

ofctx "github.com/OpenFunction/functions-framework-go/context"
"github.com/cenkalti/backoff/v4"
Expand Down Expand Up @@ -33,40 +32,43 @@ type Config struct {
MaxBufferSize int
}

type EventRequest struct {
ctx *context.Context
ofctx.EventRequest
type Event struct {
ctx *context.Context
bindingEvent *common.BindingEvent
topicEvent *common.TopicEvent
respCh chan *EventResponse
}

func NewEvent(ctx *context.Context,
bindingEvent *common.BindingEvent,
topicEvent *common.TopicEvent,
respCh chan *EventResponse) Event {
return Event{
ctx: ctx,
bindingEvent: bindingEvent,
topicEvent: topicEvent,
respCh: respCh,
}
}

type EventResponse struct {
Data []byte
Error error
}

type ResponseMap struct {
l *sync.RWMutex
m map[*context.Context]*EventResponse
}

type Runtime struct {
config *Config
ctx *ofctx.FunctionContext
grpc *grpc.Manager
funcChannel channel.AppChannel
reqChan chan *EventRequest
respMap *ResponseMap
events chan *Event
}

func NewFuncRuntime(config *Config, ctx *ofctx.FunctionContext) *Runtime {
lock := new(sync.RWMutex)
return &Runtime{
config: config,
ctx: ctx,
reqChan: make(chan *EventRequest, config.MaxBufferSize),
respMap: &ResponseMap{
l: lock,
m: make(map[*context.Context]*EventResponse),
},
config: config,
ctx: ctx,
events: make(chan *Event, config.MaxBufferSize),
}
}

Expand All @@ -89,76 +91,59 @@ func (r *Runtime) CreateFuncChannel() error {
return nil
}

func (r *Runtime) ProcessEvents() {
for e := range r.reqChan {
if e.BindingEvent != nil {
var data []byte
// Retry on connection error.
err := backoff.Retry(func() error {
var err error
data, err = r.OnBindingEvent(e.ctx, e.BindingEvent)
if err != nil {
klog.V(4).Info(err)
return err
}
return nil
}, utils.NewExponentialBackOff())
func (r *Runtime) GetPendingEventsCount() int {
return len(r.events)
}

resp := EventResponse{
Data: data,
Error: err,
}
r.respMap.l.Lock()
r.respMap.m[e.ctx] = &resp
r.respMap.l.Unlock()
}
func (r *Runtime) ProcessEvents() {
for e := range r.events {
if e.bindingEvent != nil {
go func() {
var data []byte
// Retry on connection error.
err := backoff.Retry(func() error {
var err error
data, err = r.OnBindingEvent(e.ctx, e.bindingEvent)
if err != nil {
klog.V(4).Info(err)
return err
}
return nil
}, utils.NewExponentialBackOff())

if e.TopicEvent != nil {
// Retry on connection error.
err := backoff.Retry(func() error {
var err error
err = r.OnTopicEvent(e.ctx, e.TopicEvent)
if err != nil {
return err
resp := EventResponse{
Data: data,
Error: err,
}
return nil
}, utils.NewExponentialBackOff())

resp := EventResponse{
Data: nil,
Error: err,
}
r.respMap.l.Lock()
r.respMap.m[e.ctx] = &resp
r.respMap.l.Unlock()
e.respCh <- &resp
}()
}
}
}

func (r *Runtime) EnqueueBindingEvent(ctx *context.Context, event *common.BindingEvent) {
req := EventRequest{
ctx: ctx,
EventRequest: ofctx.EventRequest{BindingEvent: event},
}
r.reqChan <- &req
}
if e.topicEvent != nil {
go func() {
// Retry on connection error.
err := backoff.Retry(func() error {
var err error
err = r.OnTopicEvent(e.ctx, e.topicEvent)
if err != nil {
klog.V(4).Info(err)
return err
}
return nil
}, utils.NewExponentialBackOff())

func (r *Runtime) EnqueueTopicEvent(ctx *context.Context, event *common.TopicEvent) {
req := EventRequest{
ctx: ctx,
EventRequest: ofctx.EventRequest{TopicEvent: event},
resp := EventResponse{
Data: nil,
Error: err,
}
e.respCh <- &resp
}()
}
}
r.reqChan <- &req
}

func (r *Runtime) GetEventResponse(ctx *context.Context) *EventResponse {
defer r.respMap.l.Unlock()
r.respMap.l.Lock()
if resp, ok := r.respMap.m[ctx]; ok {
delete(r.respMap.m, ctx)
return resp
}
return nil
func (r *Runtime) EnqueueEvent(event *Event) {
r.events <- event
}

func (r *Runtime) OnBindingEvent(ctx *context.Context, event *common.BindingEvent) ([]byte, error) {
Expand Down
6 changes: 3 additions & 3 deletions pkg/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ func GetEnvVar(key, fallbackValue string) string {
func NewExponentialBackOff() *backoff.ExponentialBackOff {
b := &backoff.ExponentialBackOff{
InitialInterval: 5 * time.Millisecond,
RandomizationFactor: 0.5,
Multiplier: 1.5,
MaxInterval: 100 * time.Millisecond,
RandomizationFactor: 0.2,
Multiplier: 1,
MaxInterval: 5 * time.Millisecond,
MaxElapsedTime: 60 * time.Second,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
Expand Down

0 comments on commit 6640547

Please sign in to comment.