-
Notifications
You must be signed in to change notification settings - Fork 1
/
main.go
105 lines (89 loc) · 2.78 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package main
import (
"context"
"log"
"time"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
"github.com/edgedelta/edgedelta-forwarder/cfg"
"github.com/edgedelta/edgedelta-forwarder/chunker"
"github.com/edgedelta/edgedelta-forwarder/core"
"github.com/edgedelta/edgedelta-forwarder/enrich"
"github.com/edgedelta/edgedelta-forwarder/push"
"github.com/edgedelta/edgedelta-forwarder/resource"
lambdaCl "github.com/edgedelta/edgedelta-forwarder/lambda"
)
var (
config *cfg.Config
pusher *push.Pusher
enricher *enrich.Enricher
logChunker *chunker.Chunker
)
type HandlerFn func(context.Context, events.CloudwatchLogsEvent) error
func withGracefulShutdown(handler HandlerFn, gracePeriod time.Duration) HandlerFn {
return func(ctx context.Context, logsEvent events.CloudwatchLogsEvent) error {
deadline, ok := ctx.Deadline()
if !ok {
return handler(ctx, logsEvent)
}
shorterDeadline := deadline.Add(-gracePeriod)
graceCtx, cancel := context.WithDeadline(ctx, shorterDeadline)
defer cancel()
return handler(graceCtx, logsEvent)
}
}
func main() {
lambda.Start(withGracefulShutdown(handleRequest, time.Second*5))
}
func init() {
c, err := cfg.GetConfig()
if err != nil {
log.Fatalf("Failed to get config from environment variables, err: %v", err)
}
config = c
resCl, err := resource.NewAWSClient()
if err != nil {
log.Fatalf("Failed to create AWS resourcegroupstaggingapi client, err: %v", err)
}
lambdaClient, err := lambdaCl.NewClient(config.Region)
if err != nil {
log.Fatalf("Failed to create AWS lambda client, err: %v", err)
}
enricher = enrich.NewEnricher(config, resCl, lambdaClient)
pusher = push.NewPusher(config)
}
func handleRequest(ctx context.Context, logsEvent events.CloudwatchLogsEvent) error {
defer func() {
if r := recover(); r != nil {
log.Printf("Recovering from panic in handleRequest, err: %v", r)
}
}()
data, err := logsEvent.AWSLogs.Parse()
if err != nil {
log.Printf("Failed to parse logs event, err: %v", err)
return err
}
common := enricher.GetEDCommon(ctx, data.SubscriptionFilters, data.MessageType, data.LogGroup, data.LogStream, data.Owner)
edLog := &core.Log{
Common: core.Common(*common),
Data: core.Data{
LogEvents: data.LogEvents,
},
}
logChunker, err := chunker.NewChunker(config.BatchSize, edLog)
chunks, err := logChunker.ChunkLogs()
if err != nil {
log.Printf("Failed to chunk logs, err: %v", err)
return err
}
for i, chunk := range chunks {
log.Printf("Sending chunk %d of %d, size: %d bytes", i+1, len(chunks), len(chunk))
// blocks until context deadline
if err := pusher.Push(ctx, chunk); err != nil {
log.Printf("Failed to push chunk %d, err: %v", i+1, err)
return err
}
}
log.Printf("Successfully pushed %d log chunks", len(chunks))
return nil
}