Skip to content

Commit

Permalink
feat: add waitTime config field to azure queue storage event source (#…
Browse files Browse the repository at this point in the history
…2996)

Signed-off-by: eduardodbr <eduardodbr@hotmail.com>
  • Loading branch information
eduardodbr committed Jan 30, 2024
1 parent 6414f3b commit e948d73
Show file tree
Hide file tree
Showing 12 changed files with 1,057 additions and 960 deletions.
13 changes: 13 additions & 0 deletions api/event-source.html

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions api/event-source.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions api/jsonschema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -978,6 +978,11 @@
"storageAccountName": {
"description": "StorageAccountName is the name of the storage account where the queue is. This field is necessary to access via Azure AD (managed identity) and it is ignored if ConnectionString is set.",
"type": "string"
},
"waitTimeInSeconds": {
"description": "WaitTimeInSeconds is the duration (in seconds) for which the event source waits between empty results from the queue. The default value is 3 seconds.",
"format": "int32",
"type": "integer"
}
},
"required": [
Expand Down
5 changes: 5 additions & 0 deletions api/openapi-spec/swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions eventsources/sources/azurequeuestorage/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt
}
var numMessages int32 = 10
var visibilityTimeout int32 = 120
var waitTime int32 = 3 // Defaults to 3 seconds
if el.AzureQueueStorageEventSource.WaitTimeInSeconds != nil {
waitTime = *el.AzureQueueStorageEventSource.WaitTimeInSeconds
}
log.Info("listening for messages on the queue...")
for {
select {
Expand All @@ -98,6 +102,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt
return nil
default:
}
log.Info("dequeing messages....")
messages, err := queueClient.DequeueMessages(ctx, &azqueue.DequeueMessagesOptions{
NumberOfMessages: &numMessages,
VisibilityTimeout: &visibilityTimeout,
Expand All @@ -115,6 +120,9 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt
}
}, log)
}
if len(messages.Messages) == 0 {
time.Sleep(time.Second * time.Duration(waitTime))
}
}
}

Expand Down
2 changes: 2 additions & 0 deletions examples/event-sources/azure-queue-storage.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ spec:
jsonBody: true
# DecodeMessage specifies if all the messages from AQS should be base64 decoded
decodeMessage: false
# waitTimeInSeconds defines the wait time between empty reads from the queue
waitTimeInSeconds: 2
# connection string contains information about K8s secret that stores the connection string
connectionString:
# Key within the K8s secret whose corresponding value (must be base64 encoded) is access key
Expand Down
929 changes: 480 additions & 449 deletions pkg/apis/eventsource/v1alpha1/generated.pb.go

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions pkg/apis/eventsource/v1alpha1/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions pkg/apis/eventsource/v1alpha1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pkg/apis/eventsource/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1299,6 +1299,10 @@ type AzureQueueStorageEventSource struct {
// If set to true the decoding is done before the evaluation of JSONBody
// +optional
DecodeMessage bool `json:"decodeMessage,omitempty" protobuf:"bytes,8,opt,name=decodeMessage"`
// WaitTimeInSeconds is the duration (in seconds) for which the event source waits between empty results from the queue.
// The default value is 3 seconds.
// +optional
WaitTimeInSeconds *int32 `json:"waitTimeInSeconds,omitempty" protobuf:"varint,9,opt,name=waitTimeInSeconds"`
}

// StripeEventSource describes the event source for stripe webhook notifications
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/eventsource/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit e948d73

Please sign in to comment.