-
Notifications
You must be signed in to change notification settings - Fork 1
/
template.go
122 lines (102 loc) · 2.85 KB
/
template.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package main
import (
"bytes"
"strings"
"text/template"
)
var asynqTemplate = `
{{$svrType := .ServiceType}}
{{$svrName := .ServiceName}}
type {{.ServiceType}}TaskServer interface {
{{- range .MethodSets}}
{{.Name}}(context.Context, *{{.Request}}) (error)
{{- end}}
}
func Register{{.ServiceType}}TaskServer(mux *asynq.ServeMux, srv {{.ServiceType}}TaskServer) {
{{- range .Methods}}
mux.HandleFunc("{{.Typename}}", _{{$svrType}}_{{.Name}}_Task_Handler(srv))
{{- end}}
}
{{range .Methods}}
func _{{$svrType}}_{{.Name}}_Task_Handler(srv {{$svrType}}TaskServer) func(context.Context, *asynq.Task) error {
return func(ctx context.Context, task *asynq.Task) error {
in := &{{.Request}}{}
ctx, span, err := myasynq.Handle_task_before(ctx, task, in)
if err != nil {
return err
}
err = srv.{{.Name}}(ctx, in)
myasynq.Handle_task_after(span, err)
return err
}
}
{{end}}
type {{.ServiceType}}TaskClient interface {
{{- range .MethodSets}}
{{.Name}}(ctx context.Context, req *{{.Request}}, opts ...asynq.Option) (info *asynq.TaskInfo, span oteltrace.Span, err error)
{{- end}}
}
type {{.ServiceType}}TaskClientImpl struct{
cc *asynq.Client
}
func New{{.ServiceType}}TaskClient (client *asynq.Client) {{.ServiceType}}TaskClient {
return &{{.ServiceType}}TaskClientImpl{client}
}
{{range .MethodSets}}
func (c *{{$svrType}}TaskClientImpl) {{.Name}}(ctx context.Context, in *{{.Request}}, opts ...asynq.Option) (*asynq.TaskInfo, oteltrace.Span, error) {
if rkgrpcctx.GetTracerPropagator(ctx) != nil {
ctx = rkgrpcctx.InjectSpanToNewContext(ctx)
}
spanCtx := oteltrace.SpanContextFromContext(ctx)
ctx, span := myasynq.HolderTracer().Start(oteltrace.ContextWithRemoteSpanContext(ctx, spanCtx), "{{.Name}}Client")
defer span.End()
// get trace metadata
m := make(map[string]string)
myasynq.HolderPropagator().Inject(ctx, propagation.MapCarrier(m))
wrap, err := json.Marshal(myasynq.WrapPayload{
Trace: m,
Payload: in,
})
if err != nil {
return nil, nil, err
}
task := asynq.NewTask("{{.Typename}}", wrap, opts...)
info, err := c.cc.Enqueue(task)
if err != nil {
return nil, nil, err
}
return info, span, nil
}
{{end}}
`
type serviceDesc struct {
ServiceType string // Greeter
ServiceName string // helloworld.Greeter
Metadata string // api/helloworld/helloworld.proto
Methods []*methodDesc
MethodSets map[string]*methodDesc
}
type methodDesc struct {
// method
Name string
Num int
Request string
Reply string
// asynq rule
Typename string
}
func (s *serviceDesc) execute() string {
s.MethodSets = make(map[string]*methodDesc)
for _, m := range s.Methods {
s.MethodSets[m.Name] = m
}
buf := new(bytes.Buffer)
tmpl, err := template.New("asynq").Parse(strings.TrimSpace(asynqTemplate))
if err != nil {
panic(err)
}
if err := tmpl.Execute(buf, s); err != nil {
panic(err)
}
return strings.Trim(buf.String(), "\r\n")
}