-
Notifications
You must be signed in to change notification settings - Fork 28
/
kafka_connector.go
169 lines (143 loc) · 5.07 KB
/
kafka_connector.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
package aiven
import (
"context"
"fmt"
"net/http"
)
type (
// KafkaConnectorsHandler Aiven go-client handler for Kafka Connectors
KafkaConnectorsHandler struct {
client *Client
}
// KafkaConnectorConfig represents a configuration for Kafka Connectors
KafkaConnectorConfig map[string]string
// KafkaConnector represents Kafka Connector
KafkaConnector struct {
Name string `json:"name"`
Config KafkaConnectorConfig
Plugin KafkaConnectorPlugin
Tasks []KafkaConnectorTask
}
// KafkaConnectorTask represents Kafka Connector Task
KafkaConnectorTask struct {
Connector string
Task int
}
// KafkaConnectorPlugin represents Kafka Connector Plugin
KafkaConnectorPlugin struct {
Author string `json:"author"`
Class string `json:"class"`
DocumentationURL string `json:"docURL"`
Title string `json:"title"`
Type string `json:"type"`
Version string `json:"version"`
}
// KafkaConnectorsResponse represents Kafka Connectors API response
KafkaConnectorsResponse struct {
APIResponse
Connectors []KafkaConnector
}
// KafkaConnectorResponse represents single Kafka Connector API response
KafkaConnectorResponse struct {
APIResponse
Connector KafkaConnector
}
// KafkaConnectorStatusResponse represents single Kafka Connector API status response
KafkaConnectorStatusResponse struct {
APIResponse
Status KafkaConnectorStatus `json:"status"`
}
// KafkaConnectorStatus represents the status of a kafka connector
KafkaConnectorStatus struct {
State string `json:"state"`
Tasks []KafkaConnectorTaskStatus `json:"tasks"`
}
// KafkaConnectorTaskStatus represents the status of a kafka connector task
KafkaConnectorTaskStatus struct {
Id int `json:"id"`
State string `json:"state"`
Trace string `json:"trace"`
}
)
// Create creates Kafka Connector attached to Kafka or Kafka Connector service based on configuration
func (h *KafkaConnectorsHandler) Create(ctx context.Context, project, service string, c KafkaConnectorConfig) error {
path := buildPath("project", project, "service", service, "connectors")
bts, err := h.client.doPostRequest(ctx, path, c)
if err != nil {
return err
}
return checkAPIResponse(bts, nil)
}
// Delete deletes Kafka Connector by name
func (h *KafkaConnectorsHandler) Delete(ctx context.Context, project, service, name string) error {
path := buildPath("project", project, "service", service, "connectors", name)
bts, err := h.client.doDeleteRequest(ctx, path, nil)
if err != nil {
return err
}
return checkAPIResponse(bts, nil)
}
// List lists all available Kafka Connectors for a service
func (h *KafkaConnectorsHandler) List(ctx context.Context, project, service string) (*KafkaConnectorsResponse, error) {
path := buildPath("project", project, "service", service, "connectors")
bts, err := h.client.doGetRequest(ctx, path, nil)
if err != nil {
return nil, err
}
var rsp KafkaConnectorsResponse
if err := checkAPIResponse(bts, &rsp); err != nil {
return nil, err
}
return &rsp, nil
}
// GetByName gets a KafkaConnector by name
func (h *KafkaConnectorsHandler) GetByName(ctx context.Context, project, service, name string) (*KafkaConnector, error) {
path := buildPath("project", project, "service", service, "connectors")
bts, err := h.client.doGetRequest(ctx, path, nil)
if err != nil {
return nil, err
}
var rsp KafkaConnectorsResponse
if err := checkAPIResponse(bts, &rsp); err != nil {
return nil, err
}
for i := range rsp.Connectors {
con := rsp.Connectors[i]
if con.Name == name {
return &con, nil
}
}
// TODO: This is a hack. We pretend that this was an API call all along, even though this is only convenience
// It is acceptable because all other functions here have the contract that we return a non nil result if the
// error is nil. So for the sake of API consistency we pretend that the remote API returned this error.
return nil, Error{
Message: fmt.Sprintf("no kafka connector with name '%s' found in project '%s' for service '%s'", name, project, service),
Status: http.StatusNotFound,
}
}
// Get the status of a single Kafka Connector by name
func (h *KafkaConnectorsHandler) Status(ctx context.Context, project, service, name string) (*KafkaConnectorStatusResponse, error) {
path := buildPath("project", project, "service", service, "connectors", name, "status")
bts, err := h.client.doGetRequest(ctx, path, nil)
if err != nil {
return nil, err
}
var rsp KafkaConnectorStatusResponse
if err := checkAPIResponse(bts, &rsp); err != nil {
return nil, err
}
return &rsp, nil
}
// Update updates a Kafka Connector configuration by Connector AuthenticationMethodName
func (h *KafkaConnectorsHandler) Update(ctx context.Context, project, service, name string, c KafkaConnectorConfig) (*KafkaConnectorResponse, error) {
path := buildPath("project", project, "service", service, "connectors", name)
bts, err := h.client.doPutRequest(ctx, path, c)
if err != nil {
return nil, err
}
var rsp KafkaConnectorResponse
if err := checkAPIResponse(bts, &rsp); err != nil {
return nil, err
}
return &rsp, nil
}