diff --git a/topic_manager.go b/topic_manager.go index 353664f9..fe373d70 100644 --- a/topic_manager.go +++ b/topic_manager.go @@ -179,6 +179,12 @@ func (m *topicManager) ensureExists(topic string, npar, rfactor int, config map[ } // no topic yet, let's create it if len(partitions) == 0 { + + // (or not) + if m.topicManagerConfig.NoCreate { + return fmt.Errorf("topic does not exist but the manager is configured with NoCreate, so it will not attempt to create it") + } + return m.createTopic(topic, npar, rfactor, @@ -361,6 +367,10 @@ type TopicManagerConfig struct { // TMConfigMismatchBehavior configures how configuration mismatches of a topic (replication, num partitions, compaction) should be // treated MismatchBehavior TMConfigMismatchBehavior + + // If set to true, the topic manager will not attempt to create the topic. + // This can be used if topic creation should be done externally. + NoCreate bool } func (tmc *TopicManagerConfig) streamCleanupPolicy() string { diff --git a/topic_manager_test.go b/topic_manager_test.go index e6c3541d..78c2ad60 100644 --- a/topic_manager_test.go +++ b/topic_manager_test.go @@ -249,6 +249,28 @@ func TestTM_EnsureStreamExists(t *testing.T) { err := tm.EnsureStreamExists(topic, npar) require.NoError(t, err) }) + t.Run("no-create", func(t *testing.T) { + tm, bm, ctrl := createTopicManager(t) + defer ctrl.Finish() + var ( + topic = "some-topic" + npar = 1 + rfactor = 1 + ) + + tm.topicManagerConfig.Stream.Replication = rfactor + tm.topicManagerConfig.Stream.Retention = time.Second + tm.topicManagerConfig.NoCreate = true + + bm.client.EXPECT().RefreshMetadata().Return(nil).AnyTimes() + + gomock.InOrder( + bm.client.EXPECT().Topics().Return(nil, nil), + ) + + err := tm.EnsureStreamExists(topic, npar) + require.ErrorContains(t, err, "will not attempt to create it") + }) t.Run("fail", func(t *testing.T) { tm, bm, ctrl := createTopicManager(t) defer ctrl.Finish()