Skip to content

Commit

Permalink
Multi kafka support (#726)
Browse files Browse the repository at this point in the history
* backend: create interface for console service

* backend: move kafka service into the console pkg

* backend: allow creation of multiple kafka services

* backend: make eager kafka connectivity check configurable

* backend: make console service creation configurable
  • Loading branch information
weeco authored Jun 9, 2023
1 parent 0ba44b2 commit baf3b2f
Show file tree
Hide file tree
Showing 15 changed files with 204 additions and 133 deletions.
26 changes: 8 additions & 18 deletions backend/pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/redpanda-data/console/backend/pkg/console"
"github.com/redpanda-data/console/backend/pkg/embed"
"github.com/redpanda-data/console/backend/pkg/git"
"github.com/redpanda-data/console/backend/pkg/kafka"
"github.com/redpanda-data/console/backend/pkg/redpanda"
"github.com/redpanda-data/console/backend/pkg/version"
)
Expand All @@ -36,8 +35,7 @@ type API struct {
Cfg *config.Config

Logger *zap.Logger
KafkaSvc *kafka.Service
ConsoleSvc *console.Service
ConsoleSvc console.Servicer
ConnectSvc *connect.Service
GitSvc *git.Service
RedpandaSvc *redpanda.Service
Expand Down Expand Up @@ -67,11 +65,6 @@ func New(cfg *config.Config, opts ...Option) *API {
zap.String("version", version.Version),
zap.String("built_at", version.BuiltAt))

kafkaSvc, err := kafka.NewService(cfg, logger, cfg.MetricsNamespace)
if err != nil {
logger.Fatal("failed to create kafka service", zap.Error(err))
}

redpandaSvc, err := redpanda.NewService(cfg.Redpanda, logger)
if err != nil {
logger.Fatal("failed to create Redpanda service", zap.Error(err))
Expand All @@ -82,9 +75,12 @@ func New(cfg *config.Config, opts ...Option) *API {
logger.Fatal("failed to create Kafka connect service", zap.Error(err))
}

consoleSvc, err := console.NewService(cfg.Console, logger, kafkaSvc, redpandaSvc, connectSvc)
if err != nil {
logger.Fatal("failed to create owl service", zap.Error(err))
var consoleSvc console.Servicer
if cfg.Console.Enabled {
consoleSvc, err = console.NewService(cfg, logger, redpandaSvc, connectSvc)
if err != nil {
logger.Fatal("failed to create console service", zap.Error(err))
}
}

// Use default frontend resources from embeds. They may be overridden via functional options.
Expand All @@ -97,7 +93,6 @@ func New(cfg *config.Config, opts ...Option) *API {
a := &API{
Cfg: cfg,
Logger: logger,
KafkaSvc: kafkaSvc,
ConsoleSvc: consoleSvc,
ConnectSvc: connectSvc,
RedpandaSvc: redpandaSvc,
Expand All @@ -118,12 +113,7 @@ func New(cfg *config.Config, opts ...Option) *API {

// Start the API server and block
func (api *API) Start() {
err := api.KafkaSvc.Start()
if err != nil {
api.Logger.Fatal("failed to start kafka service", zap.Error(err))
}

err = api.ConsoleSvc.Start()
err := api.ConsoleSvc.Start()
if err != nil {
api.Logger.Fatal("failed to start console service", zap.Error(err))
}
Expand Down
41 changes: 19 additions & 22 deletions backend/pkg/api/api_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ import (
"io"
"math"
"math/rand"
"net"
"net/http"
"runtime"
"strconv"
"strings"
"testing"
"time"

"github.com/cloudhut/common/logging"
"github.com/cloudhut/common/rest"
"github.com/go-chi/chi/v5"
"github.com/stretchr/testify/assert"
Expand All @@ -34,7 +34,6 @@ import (
"github.com/testcontainers/testcontainers-go/modules/redpanda"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"
"go.uber.org/zap"

"github.com/redpanda-data/console/backend/pkg/config"
"github.com/redpanda-data/console/backend/pkg/connect"
Expand Down Expand Up @@ -77,32 +76,30 @@ func (s *APIIntegrationTestSuite) SetupSuite() {

s.kafkaClient, s.kafkaAdminClient = testutil.CreateClients(t, []string{seedBroker})

s.cfg = &config.Config{
REST: config.Server{
Config: rest.Config{
HTTPListenAddress: "0.0.0.0",
HTTPListenPort: rand.Intn(50000) + 10000,
},
},
Kafka: config.Kafka{
Brokers: []string{s.testSeedBroker},
},
Connect: config.Connect{
Enabled: false,
},
Logger: logging.Config{
LogLevelInput: "info",
LogLevel: zap.NewAtomicLevel(),
httpListenPort := rand.Intn(50000) + 10000
s.cfg = &config.Config{}
s.cfg.SetDefaults()
s.cfg.REST = config.Server{
Config: rest.Config{
HTTPListenAddress: "0.0.0.0",
HTTPListenPort: httpListenPort,
},
}

s.cfg.Kafka.Brokers = []string{s.testSeedBroker}
s.api = New(s.cfg)

go s.api.Start()

// allow for server to start
timer1 := time.NewTimer(10 * time.Millisecond)
<-timer1.C
httpServerAddress := net.JoinHostPort("localhost", strconv.Itoa(httpListenPort))
retries := 60
for retries > 0 {
if _, err := net.DialTimeout("tcp", httpServerAddress, 100*time.Millisecond); err == nil {
break
}
time.Sleep(100 * time.Millisecond)
retries--
}
}

func (s *APIIntegrationTestSuite) TearDownSuite() {
Expand Down Expand Up @@ -153,7 +150,7 @@ func (s *APIIntegrationTestSuite) apiRequest(ctx context.Context,
req.Header.Set("Content-Type", "application/json")

res, err := http.DefaultClient.Do(req)
assert.NoError(t, err)
require.NoError(t, err)

body, err := io.ReadAll(res.Body)
res.Body.Close()
Expand Down
2 changes: 1 addition & 1 deletion backend/pkg/api/handle_probes.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (api *API) handleStartupProbe() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
// Check Kafka connectivity
isKafkaOK := false
err := api.KafkaSvc.IsHealthy(r.Context())
err := api.ConsoleSvc.IsHealthy(r.Context())
if err == nil {
isKafkaOK = true
}
Expand Down
14 changes: 2 additions & 12 deletions backend/pkg/api/handle_topic_create_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"encoding/json"
"fmt"
"net/http"
"strings"
"testing"
"time"

Expand All @@ -28,7 +27,6 @@ import (
"go.uber.org/zap"

"github.com/redpanda-data/console/backend/pkg/console"
"github.com/redpanda-data/console/backend/pkg/kafka"
"github.com/redpanda-data/console/backend/pkg/testutil"
)

Expand Down Expand Up @@ -59,7 +57,7 @@ func (s *APIIntegrationTestSuite) TestHandleCreateTopic() {

res, body := s.apiRequest(ctx, http.MethodPost, "/api/topics", input)

assert.Equal(200, res.StatusCode)
require.Equal(200, res.StatusCode)

topicName := testutil.TopicNameForTest("create_topic")

Expand Down Expand Up @@ -279,20 +277,15 @@ func (s *APIIntegrationTestSuite) TestHandleCreateTopic() {

// new kafka service
newConfig.Kafka.Brokers = fakeCluster.ListenAddrs()
newKafkaSvc, err := kafka.NewService(newConfig, log,
testutil.MetricNameForTest(strings.ReplaceAll(t.Name(), " ", "")))
assert.NoError(err)

// new console service
newConsoleSvc, err := console.NewService(newConfig.Console, log, newKafkaSvc, s.api.RedpandaSvc, s.api.ConnectSvc)
newConsoleSvc, err := console.NewService(newConfig, log, s.api.RedpandaSvc, s.api.ConnectSvc)
assert.NoError(err)

// save old
oldConsoleSvc := s.api.ConsoleSvc
oldKafkaSvc := s.api.KafkaSvc

// switch
s.api.KafkaSvc = newKafkaSvc
s.api.ConsoleSvc = newConsoleSvc

// call the fake control and expect function
Expand Down Expand Up @@ -328,9 +321,6 @@ func (s *APIIntegrationTestSuite) TestHandleCreateTopic() {

// undo switch
defer func() {
if oldKafkaSvc != nil {
s.api.KafkaSvc = oldKafkaSvc
}
if oldConsoleSvc != nil {
s.api.ConsoleSvc = oldConsoleSvc
}
Expand Down
22 changes: 2 additions & 20 deletions backend/pkg/api/handle_topics_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"encoding/json"
"fmt"
"net/http"
"strings"
"testing"
"time"

Expand All @@ -30,7 +29,6 @@ import (
"go.uber.org/zap/zaptest/observer"

"github.com/redpanda-data/console/backend/pkg/console"
"github.com/redpanda-data/console/backend/pkg/kafka"
"github.com/redpanda-data/console/backend/pkg/testutil"
)

Expand Down Expand Up @@ -201,20 +199,15 @@ func (s *APIIntegrationTestSuite) TestHandleGetTopics() {

// new kafka service
newConfig.Kafka.Brokers = fakeCluster.ListenAddrs()
newKafkaSvc, err := kafka.NewService(newConfig, log,
testutil.MetricNameForTest(strings.ReplaceAll(t.Name(), " ", "")))
assert.NoError(err)

// new console service
newConsoleSvc, err := console.NewService(newConfig.Console, log, newKafkaSvc, s.api.RedpandaSvc, s.api.ConnectSvc)
newConsoleSvc, err := console.NewService(newConfig, log, s.api.RedpandaSvc, s.api.ConnectSvc)
assert.NoError(err)

// save old
oldConsoleSvc := s.api.ConsoleSvc
oldKafkaSvc := s.api.KafkaSvc

// switch
s.api.KafkaSvc = newKafkaSvc
s.api.ConsoleSvc = newConsoleSvc

// call the fake control and expect function
Expand Down Expand Up @@ -250,9 +243,6 @@ func (s *APIIntegrationTestSuite) TestHandleGetTopics() {

// undo switch
defer func() {
if oldKafkaSvc != nil {
s.api.KafkaSvc = oldKafkaSvc
}
if oldConsoleSvc != nil {
s.api.ConsoleSvc = oldConsoleSvc
}
Expand Down Expand Up @@ -304,20 +294,15 @@ func (s *APIIntegrationTestSuite) TestHandleGetTopics() {

// new kafka service
newConfig.Kafka.Brokers = fakeCluster.ListenAddrs()
newKafkaSvc, err := kafka.NewService(newConfig, log,
testutil.MetricNameForTest(strings.ReplaceAll(t.Name(), " ", "")))
assert.NoError(err)

// new console service
newConsoleSvc, err := console.NewService(newConfig.Console, log, newKafkaSvc, s.api.RedpandaSvc, s.api.ConnectSvc)
newConsoleSvc, err := console.NewService(newConfig, log, s.api.RedpandaSvc, s.api.ConnectSvc)
assert.NoError(err)

// save old
oldConsoleSvc := s.api.ConsoleSvc
oldKafkaSvc := s.api.KafkaSvc

// switch
s.api.KafkaSvc = newKafkaSvc
s.api.ConsoleSvc = newConsoleSvc

// call the fake control and expect function
Expand Down Expand Up @@ -365,9 +350,6 @@ func (s *APIIntegrationTestSuite) TestHandleGetTopics() {

// undo switch
defer func() {
if oldKafkaSvc != nil {
s.api.KafkaSvc = oldKafkaSvc
}
if oldConsoleSvc != nil {
s.api.ConsoleSvc = oldConsoleSvc
}
Expand Down
4 changes: 4 additions & 0 deletions backend/pkg/config/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@ import (
// Console contains all configuration options for features that are generic,
// such as documentation plumbing.
type Console struct {
// Enabled should always be true unless you use your own
// implementation that satisfies the Console interface.
Enabled bool `yaml:"enabled"`
TopicDocumentation ConsoleTopicDocumentation `yaml:"topicDocumentation"`
}

// SetDefaults for Console configs.
func (c *Console) SetDefaults() {
c.Enabled = true
c.TopicDocumentation.SetDefaults()
}

Expand Down
14 changes: 10 additions & 4 deletions backend/pkg/config/kafka_startup.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,20 @@ import (
// we should try to connect to the Kafka service. If all attempts have failed the
// application will exit with code 1.
type KafkaStartup struct {
MaxRetries int `yaml:"maxRetries"`
RetryInterval time.Duration `yaml:"retryInterval"`
MaxRetryInterval time.Duration `yaml:"maxRetryInterval"`
BackoffMultiplier float64 `yaml:"backoffMultiplier"`
// EstablishConnectionEagerly determines whether the Kafka connection should
// be tested when it is created. This is handy to ensure the Kafka connection
// is working before issuing any further requests, but it requires some extra
// latency as requests are sent and awaited.
EstablishConnectionEagerly bool `yaml:"establishConnectionEagerly"`
MaxRetries int `yaml:"maxRetries"`
RetryInterval time.Duration `yaml:"retryInterval"`
MaxRetryInterval time.Duration `yaml:"maxRetryInterval"`
BackoffMultiplier float64 `yaml:"backoffMultiplier"`
}

// SetDefaults for Kafka startup configuration.
func (k *KafkaStartup) SetDefaults() {
k.EstablishConnectionEagerly = true
k.MaxRetries = 5
k.RetryInterval = time.Second
k.MaxRetryInterval = 60 * time.Second
Expand Down
9 changes: 2 additions & 7 deletions backend/pkg/console/cluster_info_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"go.uber.org/zap"

"github.com/redpanda-data/console/backend/pkg/config"
"github.com/redpanda-data/console/backend/pkg/kafka"
"github.com/redpanda-data/console/backend/pkg/testutil"
)

Expand All @@ -40,13 +39,9 @@ func (s *ConsoleIntegrationTestSuite) TestGetClusterInfo() {
cfg.MetricsNamespace = testutil.MetricNameForTest("get_cluster_info")
cfg.Kafka.Brokers = []string{testSeedBroker}

kafkaSvc, err := kafka.NewService(&cfg, log, cfg.MetricsNamespace)
svc, err := NewService(&cfg, log, nil, nil)
assert.NoError(err)

svc, err := NewService(cfg.Console, log, kafkaSvc, nil, nil)
assert.NoError(err)

defer svc.kafkaSvc.KafkaClient.Close()
defer svc.Stop()

info, err := svc.GetClusterInfo(ctx)
assert.NoError(err)
Expand Down
21 changes: 21 additions & 0 deletions backend/pkg/console/create_kafka_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright 2023 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package console

import (
"context"

"github.com/twmb/franz-go/pkg/kgo"
)

// CreateKafkaClient returns a new Kafka client based on the existing Kafka configuration.
func (s *Service) CreateKafkaClient(_ context.Context, additionalOpts ...kgo.Opt) (*kgo.Client, error) {
return s.kafkaSvc.NewKgoClient(additionalOpts...)
}
Loading

0 comments on commit baf3b2f

Please sign in to comment.