From 4f01b34791aaeb6b30931ad672c8989447d2fe98 Mon Sep 17 00:00:00 2001 From: Joel Millage Date: Wed, 4 Oct 2023 23:47:25 -0400 Subject: [PATCH] Feat/allow replica of 1 for Jetstream (#2822) Signed-off-by: jmillage Co-authored-by: jmillage --- .../assets/jetstream/nats-cluster.conf | 47 +++++++++++++++++++ .../installer/assets/jetstream/nats.conf | 19 -------- controllers/eventbus/installer/jetstream.go | 11 +++-- .../eventbus/installer/jetstream_test.go | 3 -- controllers/eventbus/validate.go | 4 +- controllers/eventbus/validate_test.go | 6 +-- .../eventbus/v1alpha1/jetstream_eventbus.go | 3 -- 7 files changed, 56 insertions(+), 37 deletions(-) create mode 100644 controllers/eventbus/installer/assets/jetstream/nats-cluster.conf diff --git a/controllers/eventbus/installer/assets/jetstream/nats-cluster.conf b/controllers/eventbus/installer/assets/jetstream/nats-cluster.conf new file mode 100644 index 0000000000..21ac516c45 --- /dev/null +++ b/controllers/eventbus/installer/assets/jetstream/nats-cluster.conf @@ -0,0 +1,47 @@ +max_payload: {{.MaxPayloadSize}} +port: {{.ClientPort}} +pid_file: "/var/run/nats/nats.pid" +############### +# # +# Monitoring # +# # +############### +http: {{.MonitorPort}} +server_name: $POD_NAME +################################### +# # +# NATS JetStream # +# # +################################### +jetstream { + key: $JS_KEY + store_dir: "/data/jetstream/store" + {{.Settings}} +} + +################################### +# # +# NATS Cluster # +# # +################################### +cluster { + port: {{.ClusterPort}} + name: {{.ClusterName}} + routes: [{{.Routes}}] + cluster_advertise: $CLUSTER_ADVERTISE + connect_retries: 120 + + tls { + cert_file: "/etc/nats-config/cluster-server-cert.pem" + key_file: "/etc/nats-config/cluster-server-key.pem" + ca_file: "/etc/nats-config/cluster-ca-cert.pem" + } +} + +lame_duck_duration: 120s +################## +# # +# Authorization # +# # +################## +include ./auth.conf \ No newline at end of file diff --git a/controllers/eventbus/installer/assets/jetstream/nats.conf b/controllers/eventbus/installer/assets/jetstream/nats.conf index 0f8cde9b62..2b980d21c8 100644 --- a/controllers/eventbus/installer/assets/jetstream/nats.conf +++ b/controllers/eventbus/installer/assets/jetstream/nats.conf @@ -18,25 +18,6 @@ jetstream { store_dir: "/data/jetstream/store" {{.Settings}} } - -################################### -# # -# NATS Cluster # -# # -################################### -cluster { - port: {{.ClusterPort}} - name: {{.ClusterName}} - routes: [{{.Routes}}] - cluster_advertise: $CLUSTER_ADVERTISE - connect_retries: 120 - - tls { - cert_file: "/etc/nats-config/cluster-server-cert.pem" - key_file: "/etc/nats-config/cluster-server-key.pem" - ca_file: "/etc/nats-config/cluster-ca-cert.pem" - } -} lame_duck_duration: 120s ################## # # diff --git a/controllers/eventbus/installer/jetstream.go b/controllers/eventbus/installer/jetstream.go index 17141df5ea..0fc808984a 100644 --- a/controllers/eventbus/installer/jetstream.go +++ b/controllers/eventbus/installer/jetstream.go @@ -634,9 +634,6 @@ func (r *jetStreamInstaller) createConfigMap(ctx context.Context) error { svcName := generateJetStreamServiceName(r.eventBus) ssName := generateJetStreamStatefulSetName(r.eventBus) replicas := r.eventBus.Spec.JetStream.GetReplicas() - if replicas < 3 { - replicas = 3 - } routes := []string{} for j := 0; j < replicas; j++ { routes = append(routes, fmt.Sprintf("nats://%s-%s.%s.%s.svc:%s", ssName, strconv.Itoa(j), svcName, r.eventBus.Namespace, strconv.Itoa(int(jsClusterPort)))) @@ -649,8 +646,12 @@ func (r *jetStreamInstaller) createConfigMap(ctx context.Context) error { if r.eventBus.Spec.JetStream.MaxPayload != nil { maxPayload = *r.eventBus.Spec.JetStream.MaxPayload } - - confTpl := template.Must(template.ParseFS(jetStremAssets, "assets/jetstream/nats.conf")) + var confTpl *template.Template + if replicas > 2 { + confTpl = template.Must(template.ParseFS(jetStremAssets, "assets/jetstream/nats-cluster.conf")) + } else { + confTpl = template.Must(template.ParseFS(jetStremAssets, "assets/jetstream/nats.conf")) + } var confTplOutput bytes.Buffer if err := confTpl.Execute(&confTplOutput, struct { MaxPayloadSize string diff --git a/controllers/eventbus/installer/jetstream_test.go b/controllers/eventbus/installer/jetstream_test.go index 400e9abba5..b2371f1eca 100644 --- a/controllers/eventbus/installer/jetstream_test.go +++ b/controllers/eventbus/installer/jetstream_test.go @@ -266,7 +266,4 @@ func Test_JSBufferGetReplicas(t *testing.T) { five := int32(5) s.Replicas = &five assert.Equal(t, 5, s.GetReplicas()) - two := int32(2) - s.Replicas = &two - assert.Equal(t, 3, s.GetReplicas()) } diff --git a/controllers/eventbus/validate.go b/controllers/eventbus/validate.go index 889b41b4a5..aa56b4fda8 100644 --- a/controllers/eventbus/validate.go +++ b/controllers/eventbus/validate.go @@ -32,8 +32,8 @@ func ValidateEventBus(eb *v1alpha1.EventBus) error { if x.Version == "" { return fmt.Errorf("invalid spec: a version for jetstream needs to be specified") } - if x.Replicas != nil && *x.Replicas < 3 { - return fmt.Errorf("invalid spec: a jetstream eventbus requires at least 3 replicas") + if x.Replicas != nil && (*x.Replicas == 2 || *x.Replicas <= 0) { + return fmt.Errorf("invalid spec: a jetstream eventbus requires 1 replica or >= 3 replicas") } } if x := eb.Spec.Kafka; x != nil { diff --git a/controllers/eventbus/validate_test.go b/controllers/eventbus/validate_test.go index 8c4ce69667..6041e64d54 100644 --- a/controllers/eventbus/validate_test.go +++ b/controllers/eventbus/validate_test.go @@ -115,12 +115,8 @@ func TestValidate(t *testing.T) { t.Run("test js eventbus replica", func(t *testing.T) { eb := testJetStreamEventBus.DeepCopy() - eb.Spec.JetStream.Replicas = pointer.Int32(2) - err := ValidateEventBus(eb) - assert.Error(t, err) - assert.Contains(t, err.Error(), "invalid spec: a jetstream eventbus requires at least 3 replicas") eb.Spec.JetStream.Replicas = pointer.Int32(3) - err = ValidateEventBus(eb) + err := ValidateEventBus(eb) assert.NoError(t, err) eb.Spec.JetStream.Replicas = nil err = ValidateEventBus(eb) diff --git a/pkg/apis/eventbus/v1alpha1/jetstream_eventbus.go b/pkg/apis/eventbus/v1alpha1/jetstream_eventbus.go index c1e4652709..33e6d91fb4 100644 --- a/pkg/apis/eventbus/v1alpha1/jetstream_eventbus.go +++ b/pkg/apis/eventbus/v1alpha1/jetstream_eventbus.go @@ -91,9 +91,6 @@ func (j JetStreamBus) GetReplicas() int { if j.Replicas == nil { return 3 } - if *j.Replicas < 3 { - return 3 - } return int(*j.Replicas) }