Skip to content

Commit

Permalink
Feature - Add support for RabbitMQ source (#1911)
Browse files Browse the repository at this point in the history
* feat(amqp): adding support to amqp

* feat(amqp): adding support to amqp

* feat(amqp): adding support to amqp
Cleared unrelevant files

* feat(amqp): adding support to amqp
- Fixed failing tests

* feat(amqp): adding support to amqp
- Fixed failing tests

* feat(amqp): adding support to amqp
- Fixed lint
- Added go_test_stub.txt

* feat(amqp): adding support to amqp
- Reverted docs.go

* task(webhooks): add webhooks support
- Update required field on host and queue

* task(webhooks): add webhooks support
- Fixed PR comments
- Added DLQ in case of failure
- Ack / Nack messages
- Updated UI

* feat(amqp): adding support to amqp
- Reverted docs.go

* feat(amqp): adding support to amqp
- Reverted api/ui/build/go_test_stub.txt

* feat(amqp): adding support to amqp
- Cleaned files

* feat(amqp): adding support to amqp
- Using consume with ctx
- Checking k.ctx is error and exiting function to release connections

* feat(amqp): adding support to amqp
- Removed check for ctx in msgs loop

* task(webhooks): add webhooks support
- Fixed linter

---------

Co-authored-by: Nitzan Goldfeder <n88holy@gmail.com>
  • Loading branch information
nitzangoldfeder and HolyNitzan authored Feb 8, 2024
1 parent b909653 commit aebfe9c
Show file tree
Hide file tree
Showing 20 changed files with 6,602 additions and 2,670 deletions.
48 changes: 48 additions & 0 deletions api/models/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ type PubSubConfig struct {
Sqs *SQSPubSubConfig `json:"sqs"`
Google *GooglePubSubConfig `json:"google"`
Kafka *KafkaPubSubConfig `json:"kafka"`
Amqp *AmqpPubSubconfig `json:"amqp"`
}

func (pc *PubSubConfig) Transform() *datastore.PubSubConfig {
Expand All @@ -286,6 +287,7 @@ func (pc *PubSubConfig) Transform() *datastore.PubSubConfig {
Sqs: pc.Sqs.transform(),
Google: pc.Google.transform(),
Kafka: pc.Kafka.transform(),
Amqp: pc.Amqp.transform(),
}
}

Expand Down Expand Up @@ -334,6 +336,52 @@ type KafkaPubSubConfig struct {
Auth *KafkaAuth `json:"auth"`
}

type AmqpPubSubconfig struct {
Schema string `json:"schema"`
Host string `json:"host"`
Port string `json:"port"`
Auth *AmqpAuth `json:"auth"`
Queue string `json:"queue"`
BindedExchange *AmqpExchange `json:"bindExchange"`
DeadLetterExchange *string `json:"deadLetterExchange"`
}

type AmqpAuth struct {
User string `json:"user"`
Password string `json:"password"`
}

type AmqpExchange struct {
Exchange *string `json:"exchange"`
RoutingKey *string `json:"routingKey"`
}

func (ac *AmqpPubSubconfig) transform() *datastore.AmqpPubSubConfig {
if ac == nil {
return nil
}

bind := AmqpExchange{
Exchange: nil,
RoutingKey: nil,
}

if ac.BindedExchange != nil {
bind = *ac.BindedExchange
}

return &datastore.AmqpPubSubConfig{
Schema: ac.Schema,
Host: ac.Host,
Port: ac.Port,
Queue: ac.Queue,
BindedExchange: bind.Exchange,
RoutingKey: *bind.RoutingKey,
Auth: (*datastore.AmqpCredentials)(ac.Auth),
DeadLetterExchange: ac.DeadLetterExchange,
}
}

func (kc *KafkaPubSubConfig) transform() *datastore.KafkaPubSubConfig {
if kc == nil {
return nil
Expand Down
18 changes: 18 additions & 0 deletions datastore/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ const (
SqsPubSub PubSubType = "sqs"
GooglePubSub PubSubType = "google"
KafkaPubSub PubSubType = "kafka"
AmqpPubSub PubSubType = "amqp"
)

func (s SourceProvider) IsValid() bool {
Expand Down Expand Up @@ -1068,6 +1069,7 @@ type PubSubConfig struct {
Sqs *SQSPubSubConfig `json:"sqs" db:"sqs"`
Google *GooglePubSubConfig `json:"google" db:"google"`
Kafka *KafkaPubSubConfig `json:"kafka" db:"kafka"`
Amqp *AmqpPubSubConfig `json:"amqp" db:"amqp"`
}

func (p *PubSubConfig) Scan(value interface{}) error {
Expand Down Expand Up @@ -1115,6 +1117,22 @@ type KafkaPubSubConfig struct {
Auth *KafkaAuth `json:"auth" db:"auth"`
}

type AmqpPubSubConfig struct {
Schema string `json:"schema" db:"schema"`
Host string `json:"host" db:"host"`
Port string `json:"port" db:"port"`
Queue string `json:"queue" db:"queue"`
Auth *AmqpCredentials `json:"auth" db:"auth"`
BindedExchange *string `json:"bindedExchange" db:"binded_exchange"`
RoutingKey string `json:"routingKey" db:"routing_key"`
DeadLetterExchange *string `json:"deadLetterExchange" db:"dead_letter_exchange"`
}

type AmqpCredentials struct {
User string `json:"user" db:"user"`
Password string `json:"password" db:"password"`
}

type KafkaAuth struct {
Type string `json:"type" db:"type"`
Hash string `json:"hash" db:"hash"`
Expand Down
100 changes: 98 additions & 2 deletions docs/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -5274,6 +5274,46 @@
}
}
},
"datastore.AmqpCredentials": {
"type": "object",
"properties": {
"password": {
"type": "string"
},
"user": {
"type": "string"
}
}
},
"datastore.AmqpPubSubConfig": {
"type": "object",
"properties": {
"auth": {
"$ref": "#/definitions/datastore.AmqpCredentials"
},
"bindedExchange": {
"type": "string"
},
"deadLetterExchange": {
"type": "string"
},
"host": {
"type": "string"
},
"port": {
"type": "string"
},
"queue": {
"type": "string"
},
"routingKey": {
"type": "string"
},
"schema": {
"type": "string"
}
}
},
"datastore.ApiKey": {
"type": "object",
"properties": {
Expand Down Expand Up @@ -5809,6 +5849,9 @@
"datastore.PubSubConfig": {
"type": "object",
"properties": {
"amqp": {
"$ref": "#/definitions/datastore.AmqpPubSubConfig"
},
"google": {
"$ref": "#/definitions/datastore.GooglePubSubConfig"
},
Expand All @@ -5831,12 +5874,14 @@
"enum": [
"sqs",
"google",
"kafka"
"kafka",
"amqp"
],
"x-enum-varnames": [
"SqsPubSub",
"GooglePubSub",
"KafkaPubSub"
"KafkaPubSub",
"AmqpPubSub"
]
},
"datastore.RateLimitConfiguration": {
Expand Down Expand Up @@ -6081,6 +6126,54 @@
}
}
},
"models.AmqpAuth": {
"type": "object",
"properties": {
"password": {
"type": "string"
},
"user": {
"type": "string"
}
}
},
"models.AmqpExchange": {
"type": "object",
"properties": {
"exchange": {
"type": "string"
},
"routingKey": {
"type": "string"
}
}
},
"models.AmqpPubSubconfig": {
"type": "object",
"properties": {
"auth": {
"$ref": "#/definitions/models.AmqpAuth"
},
"bindExchange": {
"$ref": "#/definitions/models.AmqpExchange"
},
"deadLetterExchange": {
"type": "string"
},
"host": {
"type": "string"
},
"port": {
"type": "string"
},
"queue": {
"type": "string"
},
"schema": {
"type": "string"
}
}
},
"models.ApiKey": {
"type": "object",
"properties": {
Expand Down Expand Up @@ -6819,6 +6912,9 @@
"models.PubSubConfig": {
"type": "object",
"properties": {
"amqp": {
"$ref": "#/definitions/models.AmqpPubSubconfig"
},
"google": {
"$ref": "#/definitions/models.GooglePubSubConfig"
},
Expand Down
63 changes: 63 additions & 0 deletions docs/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,32 @@ definitions:
threshold:
type: string
type: object
datastore.AmqpCredentials:
properties:
password:
type: string
user:
type: string
type: object
datastore.AmqpPubSubConfig:
properties:
auth:
$ref: '#/definitions/datastore.AmqpCredentials'
bindedExchange:
type: string
deadLetterExchange:
type: string
host:
type: string
port:
type: string
queue:
type: string
routingKey:
type: string
schema:
type: string
type: object
datastore.ApiKey:
properties:
header_name:
Expand Down Expand Up @@ -371,6 +397,8 @@ definitions:
type: object
datastore.PubSubConfig:
properties:
amqp:
$ref: '#/definitions/datastore.AmqpPubSubConfig'
google:
$ref: '#/definitions/datastore.GooglePubSubConfig'
kafka:
Expand All @@ -387,11 +415,13 @@ definitions:
- sqs
- google
- kafka
- amqp
type: string
x-enum-varnames:
- SqsPubSub
- GooglePubSub
- KafkaPubSub
- AmqpPubSub
datastore.RateLimitConfiguration:
properties:
count:
Expand Down Expand Up @@ -558,6 +588,37 @@ definitions:
threshold:
type: string
type: object
models.AmqpAuth:
properties:
password:
type: string
user:
type: string
type: object
models.AmqpExchange:
properties:
exchange:
type: string
routingKey:
type: string
type: object
models.AmqpPubSubconfig:
properties:
auth:
$ref: '#/definitions/models.AmqpAuth'
bindExchange:
$ref: '#/definitions/models.AmqpExchange'
deadLetterExchange:
type: string
host:
type: string
port:
type: string
queue:
type: string
schema:
type: string
type: object
models.ApiKey:
properties:
header_name:
Expand Down Expand Up @@ -1073,6 +1134,8 @@ definitions:
type: object
models.PubSubConfig:
properties:
amqp:
$ref: '#/definitions/models.AmqpPubSubconfig'
google:
$ref: '#/definitions/models.GooglePubSubConfig'
kafka:
Expand Down
Loading

0 comments on commit aebfe9c

Please sign in to comment.