-
Notifications
You must be signed in to change notification settings - Fork 2
/
utils.go
73 lines (64 loc) · 2.32 KB
/
utils.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package frinesis
import (
"strings"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/endpoints"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/gofrs/uuid"
"github.com/pkg/errors"
"github.com/spf13/viper"
)
// generateID generates a unique ID for a Msg
func generateID() string {
id, _ := uuid.NewV4()
return id.String()
}
// ClientFromViper takes a viper config and returns an initialized go-kinesis client.
// Used internally and as a test helper method.
func ClientFromViper(config *viper.Viper) (*kinesis.Kinesis, error) {
if !config.IsSet("aws_region_name") {
return nil, errors.New("following config fields are required for kinesis sink: aws_region_name")
}
var client *kinesis.Kinesis
if !config.IsSet("kinesis_endpoint") {
// If no endpoint set, running in live AWS and should have valid credentials available on the default chain
client = NewClient(config.GetString("aws_region_name"))
} else {
endpoint := config.GetString("kinesis_endpoint")
// accepts endpoint with or without scheme prefix, defaults to http for testing if not provided
if !strings.HasPrefix(endpoint, "http") {
endpoint = strings.Join([]string{"http://", endpoint}, "")
}
// create client with dummy auth since we are using a manual endpoint
client = NewClientWithEndpoint(
config.GetString("aws_region_name"),
endpoint,
)
}
return client, nil
}
// NewClient returns a default AWS SDK kinesis client
func NewClient(region string) *kinesis.Kinesis {
sess := session.Must(session.NewSession(&aws.Config{
Region: aws.String(region),
}))
return kinesis.New(sess)
}
// NewClientWithEndpoint returns an AWS SDK kinesis client pointing to a custom service endpoint
func NewClientWithEndpoint(region, endpoint string) *kinesis.Kinesis {
customResolver := func(service, region string, optFns ...func(*endpoints.Options)) (endpoints.ResolvedEndpoint, error) {
if service == endpoints.KinesisServiceID {
return endpoints.ResolvedEndpoint{
URL: endpoint,
SigningRegion: region,
}, nil
}
return endpoints.DefaultResolver().EndpointFor(service, region, optFns...)
}
sess := session.Must(session.NewSession(&aws.Config{
Region: aws.String(region),
EndpointResolver: endpoints.ResolverFunc(customResolver),
}))
return kinesis.New(sess)
}