-
Notifications
You must be signed in to change notification settings - Fork 3
/
main.go
110 lines (92 loc) · 2.61 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package main
import (
"context"
"flag"
"strconv"
"time"
ofctx "github.com/OpenFunction/functions-framework-go/context"
"github.com/OpenFunction/functions-framework-go/framework"
diag "github.com/dapr/dapr/pkg/diagnostics"
"github.com/dapr/dapr/pkg/modes"
"github.com/dapr/dapr/pkg/runtime"
"k8s.io/klog/v2"
proxyruntime "github.com/OpenFunction/dapr-proxy/pkg/runtime"
"github.com/OpenFunction/dapr-proxy/pkg/utils"
)
const (
defaultAppProtocol = "grpc"
protocolEnvVar = "APP_PROTOCOL"
debugEnvVar = "DEBUG"
)
var (
FuncRuntime *proxyruntime.Runtime
)
func main() {
debugVal := utils.GetEnvVar(debugEnvVar, "false")
debug, _ := strconv.ParseBool(debugVal)
if debug {
klog.InitFlags(nil)
flag.Set("v", "4")
flag.Parse()
}
ctx := context.Background()
fwk, err := framework.NewFramework()
if err != nil {
klog.Exit(err)
}
funcContext := utils.GetFuncContext(fwk)
host := utils.GetFuncHost(funcContext)
port, _ := strconv.Atoi(funcContext.GetPort())
protocol := utils.GetEnvVar(protocolEnvVar, defaultAppProtocol)
config := &proxyruntime.Config{
Protocol: runtime.Protocol(protocol),
Host: host,
Port: port,
Mode: modes.KubernetesMode,
MaxBufferSize: 1000000,
}
FuncRuntime = proxyruntime.NewFuncRuntime(config, funcContext)
if err := FuncRuntime.CreateFuncChannel(); err != nil {
klog.Exit(err)
}
go FuncRuntime.ProcessEvents()
if err := fwk.Register(ctx, EventHandler); err != nil {
klog.Exit(err)
}
if err := fwk.Start(ctx); err != nil {
klog.Exit(err)
}
}
func EventHandler(ctx ofctx.Context, in []byte) (ofctx.Out, error) {
start := time.Now()
defer func() {
elapsed := diag.ElapsedSince(start)
pendingEventsCount := FuncRuntime.GetPendingEventsCount()
klog.V(4).Infof("Input: %s - Pending Events Count: %v", ctx.GetInputName(), pendingEventsCount)
klog.V(4).Infof("Input: %s - Event Forwarding Elapsed: %vms", ctx.GetInputName(), elapsed)
}()
c := ctx.GetNativeContext()
respCh := make(chan *proxyruntime.EventResponse, 1)
// Handle BindingEvent
bindingEvent := ctx.GetBindingEvent()
if bindingEvent != nil {
event := proxyruntime.NewEvent(&c, bindingEvent, nil, respCh)
FuncRuntime.EnqueueEvent(&event)
}
// Handle TopicEvent
topicEvent := ctx.GetTopicEvent()
if topicEvent != nil {
event := proxyruntime.NewEvent(&c, nil, topicEvent, respCh)
FuncRuntime.EnqueueEvent(&event)
}
resp := <-respCh
if resp.Error != nil {
klog.Error(resp.Error)
return ctx.ReturnOnInternalError(), resp.Error
} else {
out := new(ofctx.FunctionOut)
out.WithData(resp.Data)
out.WithCode(ofctx.Success)
return out, nil
}
}