diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 78c1c39cfe509..569c5a2fb1e4e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -2496,9 +2496,10 @@ remoteAddress, new String(commandGetSchema.getSchemaVersion()), schemaVersion = schemaService.versionFromBytes(commandGetSchema.getSchemaVersion()); } + final String topic = commandGetSchema.getTopic(); String schemaName; try { - schemaName = TopicName.get(commandGetSchema.getTopic()).getSchemaName(); + schemaName = TopicName.get(topic).getSchemaName(); } catch (Throwable t) { commandSender.sendGetSchemaErrorResponse(requestId, ServerError.InvalidTopicName, t.getMessage()); return; @@ -2507,7 +2508,7 @@ remoteAddress, new String(commandGetSchema.getSchemaVersion()), schemaService.getSchema(schemaName, schemaVersion).thenAccept(schemaAndMetadata -> { if (schemaAndMetadata == null) { commandSender.sendGetSchemaErrorResponse(requestId, ServerError.TopicNotFound, - String.format("Topic not found or no-schema %s", commandGetSchema.getTopic())); + String.format("Topic not found or no-schema %s", topic)); } else { commandSender.sendGetSchemaResponse(requestId, SchemaInfoUtil.newSchemaInfo(schemaName, schemaAndMetadata.schema), schemaAndMetadata.version); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java index d4ef041f6dea6..aa47c378fc38c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java @@ -46,6 +46,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.regex.Pattern; import java.util.stream.Collectors; import lombok.Cleanup; import lombok.EqualsAndHashCode; @@ -69,6 +70,8 @@ import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.api.schema.SchemaDefinition; +import org.apache.pulsar.client.impl.ConsumerImpl; +import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl; import org.apache.pulsar.client.impl.schema.ProtobufSchema; import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; @@ -98,6 +101,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest { @BeforeMethod @Override public void setup() throws Exception { + isTcpLookup = true; super.internalSetup(); // Setup namespaces @@ -106,6 +110,7 @@ public void setup() throws Exception { .allowedClusters(Collections.singleton(CLUSTER_NAME)) .build(); admin.tenants().createTenant(PUBLIC_TENANT, tenantInfo); + admin.namespaces().createNamespace(PUBLIC_TENANT + "/my-ns"); } @AfterMethod(alwaysRun = true) @@ -130,6 +135,34 @@ public void testGetSchemaWhenCreateAutoProduceBytesProducer() throws Exception{ pulsarClient.newProducer(org.apache.pulsar.client.api.Schema.AUTO_PRODUCE_BYTES()).topic(topic).create(); } + @Test + public void testGetSchemaWithPatternTopic() throws Exception { + final String topicPrefix = "persistent://public/my-ns/test-getSchema"; + + int topicNums = 10; + for (int i = 0; i < topicNums; i++) { + String topic = topicPrefix + "-" + i; + admin.topics().createNonPartitionedTopic(topic); + } + + Pattern pattern = Pattern.compile(topicPrefix + "-.*"); + @Cleanup + Consumer consumer = pulsarClient.newConsumer(Schema.AUTO_CONSUME()) + .topicsPattern(pattern) + .subscriptionName("sub") + .subscriptionType(SubscriptionType.Shared) + .subscribe(); + + List> consumers = + ((MultiTopicsConsumerImpl) consumer).getConsumers(); + Assert.assertEquals(topicNums, consumers.size()); + + for (int i = 0; i < topicNums; i++) { + String topic = topicPrefix + "-" + i; + admin.topics().delete(topic, true); + } + } + @Test public void testMultiTopicSetSchemaProvider() throws Exception { final String tenant = PUBLIC_TENANT;