Skip to content

Commit

Permalink
feat: Jestream configurable DiscardNew policies (#3151)
Browse files Browse the repository at this point in the history
Signed-off-by: Taleb Zeghmi <talebz@zillowgroup.com>
  • Loading branch information
talebzeghmi committed Jul 12, 2024
1 parent 0e8ef63 commit a6b45f1
Show file tree
Hide file tree
Showing 11 changed files with 55 additions and 9 deletions.
3 changes: 2 additions & 1 deletion api/event-bus.html

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion api/event-bus.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion api/jsonschema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@
"type": "array"
},
"streamConfig": {
"description": "Optional configuration for the streams to be created in this JetStream service, if specified, it will be merged with the default configuration in controller-config. It accepts a YAML format configuration, available fields include, \"maxBytes\", \"maxMsgs\", \"maxAge\" (e.g. 72h), \"replicas\" (1, 3, 5), \"duplicates\" (e.g. 5m).",
"description": "Optional configuration for the streams to be created in this JetStream service, if specified, it will be merged with the default configuration in controller-config. It accepts a YAML format configuration, available fields include, \"maxBytes\", \"maxMsgs\", \"maxAge\" (e.g. 72h), \"replicas\" (1, 3, 5), \"duplicates\" (e.g. 5m), \"retention\" (e.g. 0: Limits (default), 1: Interest, 2: WorkQueue), \"Discard\" (e.g. 0: DiscardOld (default), 1: DiscardNew).",
"type": "string"
},
"tolerations": {
Expand Down
2 changes: 1 addition & 1 deletion api/openapi-spec/swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 31 additions & 2 deletions eventbus/jetstream/base/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,25 @@ func (stream *Jetstream) CreateStream(conn *JetstreamConnection) error {
return err
}

v.SetDefault("retention", 0) // Limits
v.SetDefault("discard", 0) // DiscardOld

retentionPolicy, err := intToRetentionPolicy(v.GetInt("retention"))
if err != nil {
stream.Logger.Errorf("invalid retention policy: %s, error: %v", retentionPolicy, err)
return err
}

discardPolicy, err := intToDiscardPolicy(v.GetInt("discard"))
if err != nil {
stream.Logger.Errorf("invalid discard policy: %s, error: %v", discardPolicy, err)
return err
}
streamConfig := nats.StreamConfig{
Name: common.JetStreamStreamName,
Subjects: []string{common.JetStreamStreamName + ".*.*"},
Retention: nats.LimitsPolicy,
Discard: nats.DiscardOld,
Retention: retentionPolicy,
Discard: discardPolicy,
MaxMsgs: v.GetInt64("maxMsgs"),
MaxAge: v.GetDuration("maxAge"),
MaxBytes: v.GetInt64("maxBytes"),
Expand All @@ -158,3 +172,18 @@ func (stream *Jetstream) CreateStream(conn *JetstreamConnection) error {
stream.Logger.Infof("Created Jetstream stream '%s' for connection %+v", common.JetStreamStreamName, conn)
return nil
}

func intToRetentionPolicy(i int) (nats.RetentionPolicy, error) {
if i < 0 || i > int(nats.WorkQueuePolicy) {
// Handle invalid value, return a default value or panic
return -1, fmt.Errorf("invalid int for RetentionPolicy: %d", i)
}
return nats.RetentionPolicy(i), nil
}

func intToDiscardPolicy(i int) (nats.DiscardPolicy, error) {
if i < 0 || i > int(nats.DiscardNew) {
return -1, fmt.Errorf("invalid int for DiscardPolicy: %d", i)
}
return nats.DiscardPolicy(i), nil
}
4 changes: 4 additions & 0 deletions manifests/base/controller-manager/controller-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ data:
maxBytes: -1
replicas: 3
duplicates: 300s
# 0: Limits, 1: Interest, 2: WorkQueue
retention: 0
# 0: DiscardOld, 1: DiscardNew
discard: 0
versions:
- version: latest
natsImage: nats:2.10.10
Expand Down
4 changes: 4 additions & 0 deletions manifests/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,10 @@ data:
maxBytes: -1
replicas: 3
duplicates: 300s
# 0: Limits, 1: Interest, 2: WorkQueue
retention: 0
# 0: DiscardOld, 1: DiscardNew
discard: 0
versions:
- version: latest
natsImage: nats:2.10.10
Expand Down
4 changes: 4 additions & 0 deletions manifests/namespace-install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,10 @@ data:
maxBytes: -1
replicas: 3
duplicates: 300s
# 0: Limits, 1: Interest, 2: WorkQueue
retention: 0
# 0: DiscardOld, 1: DiscardNew
discard: 0
versions:
- version: latest
natsImage: nats:2.10.10
Expand Down
3 changes: 2 additions & 1 deletion pkg/apis/eventbus/v1alpha1/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion pkg/apis/eventbus/v1alpha1/jetstream_eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ type JetStreamBus struct {
// +optional
StartArgs []string `json:"startArgs,omitempty" protobuf:"bytes,17,rep,name=startArgs"`
// Optional configuration for the streams to be created in this JetStream service, if specified, it will be merged with the default configuration in controller-config.
// It accepts a YAML format configuration, available fields include, "maxBytes", "maxMsgs", "maxAge" (e.g. 72h), "replicas" (1, 3, 5), "duplicates" (e.g. 5m).
// It accepts a YAML format configuration, available fields include, "maxBytes", "maxMsgs", "maxAge" (e.g. 72h), "replicas" (1, 3, 5), "duplicates" (e.g. 5m),
// "retention" (e.g. 0: Limits (default), 1: Interest, 2: WorkQueue), "Discard" (e.g. 0: DiscardOld (default), 1: DiscardNew).
// +optional
StreamConfig *string `json:"streamConfig,omitempty" protobuf:"bytes,18,opt,name=streamConfig"`
// Maximum number of bytes in a message payload, 0 means unlimited. Defaults to 1MB
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/eventbus/v1alpha1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit a6b45f1

Please sign in to comment.