diff --git a/distribution/server/pom.xml b/distribution/server/pom.xml index 57ae87178a3dc..27d12c4ff6baf 100644 --- a/distribution/server/pom.xml +++ b/distribution/server/pom.xml @@ -64,6 +64,12 @@ ${project.version} + + org.apache.zookeeper + zookeeper-prometheus-metrics + ${zookeeper.version} + + ${project.groupId} pulsar-package-bookkeeper-storage diff --git a/pom.xml b/pom.xml index f4ce36cfaf05a..c98a55a582d9b 100644 --- a/pom.xml +++ b/pom.xml @@ -1199,13 +1199,12 @@ flexible messaging model and an intuitive client API. com.fasterxml.jackson.core * + + org.apache.zookeeper + * + - - org.apache.zookeeper - zookeeper-prometheus-metrics - ${zookeeper.version} - diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 2f3d32d1f9c19..4445f64a715f9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -83,6 +83,7 @@ import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.ClientCnx; import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.client.impl.schema.SchemaInfoUtil; import org.apache.pulsar.common.api.AuthData; import org.apache.pulsar.common.api.proto.BaseCommand; import org.apache.pulsar.common.api.proto.CommandAck; @@ -135,7 +136,6 @@ import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.PulsarHandler; import org.apache.pulsar.common.protocol.schema.SchemaData; -import org.apache.pulsar.common.protocol.schema.SchemaInfoUtil; import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.util.FutureUtil; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaValidationEnforced.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaValidationEnforced.java index 1ebf7f1a66c10..3daf920c975bc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaValidationEnforced.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaValidationEnforced.java @@ -30,6 +30,7 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.ClusterDataImpl; import org.apache.pulsar.common.policies.data.TenantInfoImpl; @@ -97,11 +98,12 @@ public void testDisableSchemaValidationEnforcedHasSchema() throws Exception { assertTrue(e.getMessage().contains("HTTP 404 Not Found")); } Map properties = Maps.newHashMap(); - SchemaInfo schemaInfo = new SchemaInfo(); - schemaInfo.setType(SchemaType.STRING); - schemaInfo.setProperties(properties); - schemaInfo.setName("test"); - schemaInfo.setSchema("".getBytes()); + SchemaInfo schemaInfo = SchemaInfoImpl.builder() + .type(SchemaType.STRING) + .properties(properties) + .name("test") + .schema("".getBytes()) + .build(); PostSchemaPayload postSchemaPayload = new PostSchemaPayload("STRING", "", properties); admin.schemas().createSchema(topicName, postSchemaPayload); try (Producer p = pulsarClient.newProducer().topic(topicName).create()) { @@ -145,11 +147,12 @@ public void testEnableSchemaValidationEnforcedHasSchemaMismatch() throws Excepti } Map properties = Maps.newHashMap(); properties.put("key1", "value1"); - SchemaInfo schemaInfo = new SchemaInfo(); - schemaInfo.setType(SchemaType.STRING); - schemaInfo.setProperties(properties); - schemaInfo.setName("test"); - schemaInfo.setSchema("".getBytes()); + SchemaInfo schemaInfo = SchemaInfoImpl.builder() + .type(SchemaType.STRING) + .properties(properties) + .name("test") + .schema("".getBytes()) + .build(); PostSchemaPayload postSchemaPayload = new PostSchemaPayload("STRING", "", properties); admin.schemas().createSchema(topicName, postSchemaPayload); try (Producer p = pulsarClient.newProducer().topic(topicName).create()) { @@ -174,11 +177,12 @@ public void testEnableSchemaValidationEnforcedHasSchemaMatch() throws Exception } admin.namespaces().setSchemaValidationEnforced(namespace,true); Map properties = Maps.newHashMap(); - SchemaInfo schemaInfo = new SchemaInfo(); - schemaInfo.setType(SchemaType.STRING); - schemaInfo.setProperties(properties); - schemaInfo.setName("test"); - schemaInfo.setSchema("".getBytes()); + SchemaInfo schemaInfo = SchemaInfoImpl.builder() + .type(SchemaType.STRING) + .properties(properties) + .name("test") + .schema("".getBytes()) + .build(); PostSchemaPayload postSchemaPayload = new PostSchemaPayload("STRING", "", properties); admin.schemas().createSchema(topicName, postSchemaPayload); try (Producer p = pulsarClient.newProducer(Schema.STRING).topic(topicName).create()) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java index 04914feca4853..32a9f9e78a874 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java @@ -33,6 +33,7 @@ import org.apache.pulsar.client.api.SchemaSerializationException; import org.apache.pulsar.client.api.schema.SchemaDefinition; import org.apache.pulsar.client.impl.schema.JSONSchema; +import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.protocol.schema.SchemaData; import org.apache.pulsar.common.schema.SchemaInfo; @@ -118,11 +119,12 @@ public static OldJSONSchema of(Class pojo, Map propert JsonSchemaGenerator schemaGen = new JsonSchemaGenerator(mapper); JsonSchema schema = schemaGen.generateSchema(pojo); - SchemaInfo info = new SchemaInfo(); - info.setName(""); - info.setProperties(properties); - info.setType(SchemaType.JSON); - info.setSchema(mapper.writeValueAsBytes(schema)); + SchemaInfo info = SchemaInfoImpl.builder() + .name("") + .properties(properties) + .type(SchemaType.JSON) + .schema(mapper.writeValueAsBytes(schema)) + .build(); return new OldJSONSchema<>(info, pojo, mapper); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java index d97b989efc7a2..6e860adbc3f12 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java @@ -61,6 +61,7 @@ import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.api.schema.SchemaDefinition; import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl; +import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; @@ -437,7 +438,7 @@ private void testUseAutoConsumeWithSchemalessTopic(SchemaType schema) throws Exc admin.topics().createPartitionedTopic(topic, 2); // set schema - SchemaInfo schemaInfo = SchemaInfo + SchemaInfo schemaInfo = SchemaInfoImpl .builder() .schema(new byte[0]) .name("dummySchema") @@ -653,7 +654,7 @@ public void testNullKeyValueProperty() throws PulsarAdminException, PulsarClient final Map map = new HashMap<>(); map.put("key", null); map.put(null, "value"); // null key is not allowed for JSON, it's only for test here - Schema.INT32.getSchemaInfo().setProperties(map); + ((SchemaInfoImpl)Schema.INT32.getSchemaInfo()).setProperties(map); final Consumer consumer = pulsarClient.newConsumer(Schema.INT32).topic(topic) .subscriptionName("sub") diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java index d6d96f79549de..61d83322bab57 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java @@ -35,6 +35,7 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SchemaSerializationException; import org.apache.pulsar.client.api.schema.SchemaDefinition; +import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; @@ -323,7 +324,7 @@ public void testSchemaComparison() throws Exception { SchemaCompatibilityStrategy.FULL); byte[] changeSchemaBytes = (new String(Schema.AVRO(Schemas.PersonOne.class) .getSchemaInfo().getSchema(), UTF_8) + "/n /n /n").getBytes(); - SchemaInfo schemaInfo = SchemaInfo.builder().type(SchemaType.AVRO).schema(changeSchemaBytes).build(); + SchemaInfo schemaInfo = SchemaInfoImpl.builder().type(SchemaType.AVRO).schema(changeSchemaBytes).build(); admin.schemas().createSchema(fqtn, schemaInfo); admin.namespaces().setIsAllowAutoUpdateSchema(namespaceName.toString(), false); diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java index 9a9a4ed0bc69a..4408ae21b50f0 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java @@ -31,6 +31,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.Schemas; import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; import org.apache.pulsar.client.internal.DefaultImplementation; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.protocol.schema.DeleteSchemaResponse; @@ -434,7 +435,7 @@ private WebTarget topicPath(TopicName topic, String... parts) { // the util function converts `GetSchemaResponse` to `SchemaInfo` static SchemaInfo convertGetSchemaResponseToSchemaInfo(TopicName tn, GetSchemaResponse response) { - SchemaInfo info = new SchemaInfo(); + byte[] schema; if (response.getType() == SchemaType.KEY_VALUE) { schema = DefaultImplementation.convertKeyValueDataStringToSchemaInfoSchema( @@ -442,11 +443,13 @@ static SchemaInfo convertGetSchemaResponseToSchemaInfo(TopicName tn, } else { schema = response.getData().getBytes(UTF_8); } - info.setSchema(schema); - info.setType(response.getType()); - info.setProperties(response.getProperties()); - info.setName(tn.getLocalName()); - return info; + + return SchemaInfoImpl.builder() + .schema(schema) + .type(response.getType()) + .properties(response.getProperties()) + .name(tn.getLocalName()) + .build(); } static SchemaInfoWithVersion convertGetSchemaResponseToSchemaInfoWithVersion(TopicName tn, diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java index f2c5860ee0208..0070c4cbaa4bf 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java @@ -18,16 +18,7 @@ */ package org.apache.pulsar.common.schema; -import static java.nio.charset.StandardCharsets.UTF_8; -import java.util.Base64; -import java.util.Collections; import java.util.Map; -import lombok.AllArgsConstructor; -import lombok.Builder; -import lombok.Data; -import lombok.EqualsAndHashCode; -import lombok.NoArgsConstructor; -import lombok.experimental.Accessors; import org.apache.pulsar.client.internal.DefaultImplementation; import org.apache.pulsar.common.classification.InterfaceAudience; import org.apache.pulsar.common.classification.InterfaceStability; @@ -37,55 +28,24 @@ */ @InterfaceAudience.Public @InterfaceStability.Stable -@Data -@AllArgsConstructor -@NoArgsConstructor -@Accessors(chain = true) -@Builder -public class SchemaInfo { +public interface SchemaInfo { - @EqualsAndHashCode.Exclude - private String name; + String getName(); /** * The schema data in AVRO JSON format. */ - private byte[] schema; + byte[] getSchema(); /** * The type of schema (AVRO, JSON, PROTOBUF, etc..). */ - private SchemaType type; + SchemaType getType(); /** * Additional properties of the schema definition (implementation defined). */ - @Builder.Default - private Map properties = Collections.emptyMap(); - - public String getSchemaDefinition() { - if (null == schema) { - return ""; - } - - switch (type) { - case AVRO: - case JSON: - case PROTOBUF: - case PROTOBUF_NATIVE: - return new String(schema, UTF_8); - case KEY_VALUE: - KeyValue schemaInfoKeyValue = - DefaultImplementation.decodeKeyValueSchemaInfo(this); - return DefaultImplementation.jsonifyKeyValueSchemaInfo(schemaInfoKeyValue); - default: - return Base64.getEncoder().encodeToString(schema); - } - } - - @Override - public String toString(){ - return DefaultImplementation.jsonifySchemaInfo(this); - } + Map getProperties(); + String getSchemaDefinition(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index a7cc68e975270..bed2b9cd49619 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -90,7 +90,7 @@ import org.apache.pulsar.common.api.proto.ServerError; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.PulsarHandler; -import org.apache.pulsar.common.protocol.schema.SchemaInfoUtil; +import org.apache.pulsar.client.impl.schema.SchemaInfoUtil; import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.util.FutureUtil; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java index 9108a6ee22d01..f2cc1692eebe6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java @@ -47,7 +47,7 @@ import org.apache.pulsar.common.protocol.schema.GetSchemaResponse; import org.apache.pulsar.common.protocol.schema.SchemaData; import org.apache.pulsar.common.schema.SchemaInfo; -import org.apache.pulsar.common.protocol.schema.SchemaInfoUtil; +import org.apache.pulsar.client.impl.schema.SchemaInfoUtil; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.common.util.FutureUtil; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java index 3db955484abdd..8971aab421d9e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java @@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkState; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.schema.KeyValueSchema; import org.apache.pulsar.common.schema.KeyValueEncodingType; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; @@ -74,9 +75,9 @@ public byte[] encode(byte[] message) { if (requireSchemaValidation) { // verify if the message can be decoded by the underlying schema - if (schema instanceof KeyValueSchemaImpl - && ((KeyValueSchemaImpl) schema).getKeyValueEncodingType().equals(KeyValueEncodingType.SEPARATED)) { - ((KeyValueSchemaImpl) schema).getValueSchema().validate(message); + if (schema instanceof KeyValueSchema + && ((KeyValueSchema) schema).getKeyValueEncodingType().equals(KeyValueEncodingType.SEPARATED)) { + ((KeyValueSchema) schema).getValueSchema().validate(message); } else { schema.validate(message); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BooleanSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BooleanSchema.java index c66ff4332d16a..3b5296ec00301 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BooleanSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BooleanSchema.java @@ -32,7 +32,7 @@ public class BooleanSchema extends AbstractSchema { private static final SchemaInfo SCHEMA_INFO; static { - SCHEMA_INFO = new SchemaInfo() + SCHEMA_INFO = new SchemaInfoImpl() .setName("Boolean") .setType(SchemaType.BOOLEAN) .setSchema(new byte[0]); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufSchema.java index 658e3984f9e59..ce68298be2b49 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufSchema.java @@ -33,7 +33,7 @@ public class ByteBufSchema extends AbstractSchema { private static final SchemaInfo SCHEMA_INFO; static { - SCHEMA_INFO = new SchemaInfo() + SCHEMA_INFO = new SchemaInfoImpl() .setName("ByteBuf") .setType(SchemaType.BYTES) .setSchema(new byte[0]); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java index c560f0e76aa0b..0ff308fe017c0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java @@ -34,7 +34,7 @@ public class ByteBufferSchema extends AbstractSchema { private static final SchemaInfo SCHEMA_INFO; static { - SCHEMA_INFO = new SchemaInfo() + SCHEMA_INFO = new SchemaInfoImpl() .setName("ByteBuffer") .setType(SchemaType.BYTES) .setSchema(new byte[0]); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteSchema.java index 4e4c27e07619c..6d516879bd510 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteSchema.java @@ -32,7 +32,7 @@ public class ByteSchema extends AbstractSchema { private static final SchemaInfo SCHEMA_INFO; static { - SCHEMA_INFO = new SchemaInfo() + SCHEMA_INFO = new SchemaInfoImpl() .setName("INT8") .setType(SchemaType.INT8) .setSchema(new byte[0]); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java index 9c7ec373a2820..98a0e66439d3f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java @@ -31,7 +31,7 @@ public class BytesSchema extends AbstractSchema { private static final SchemaInfo SCHEMA_INFO; static { - SCHEMA_INFO = new SchemaInfo() + SCHEMA_INFO = new SchemaInfoImpl() .setName("Bytes") .setType(SchemaType.BYTES) .setSchema(new byte[0]); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DateSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DateSchema.java index 295dae6808117..cbdb91202179e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DateSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DateSchema.java @@ -33,7 +33,7 @@ public class DateSchema extends AbstractSchema { private static final SchemaInfo SCHEMA_INFO; static { - SCHEMA_INFO = new SchemaInfo() + SCHEMA_INFO = new SchemaInfoImpl() .setName("Date") .setType(SchemaType.DATE) .setSchema(new byte[0]); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DoubleSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DoubleSchema.java index baa1aacf17d7f..4b269a60e2b4a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DoubleSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DoubleSchema.java @@ -32,7 +32,7 @@ public class DoubleSchema extends AbstractSchema { private static final SchemaInfo SCHEMA_INFO; static { - SCHEMA_INFO = new SchemaInfo() + SCHEMA_INFO = new SchemaInfoImpl() .setName("Double") .setType(SchemaType.DOUBLE) .setSchema(new byte[0]); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java index aed905b7123b8..84d40735bc186 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java @@ -32,7 +32,7 @@ public class FloatSchema extends AbstractSchema { private static final SchemaInfo SCHEMA_INFO; static { - SCHEMA_INFO = new SchemaInfo() + SCHEMA_INFO = new SchemaInfoImpl() .setName("Float") .setType(SchemaType.FLOAT) .setSchema(new byte[0]); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/InstantSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/InstantSchema.java index 5830ceaf571d4..db33de7de4d9b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/InstantSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/InstantSchema.java @@ -33,7 +33,7 @@ public class InstantSchema extends AbstractSchema { private static final SchemaInfo SCHEMA_INFO; static { - SCHEMA_INFO = new SchemaInfo() + SCHEMA_INFO = new SchemaInfoImpl() .setName("Instant") .setType(SchemaType.INSTANT) .setSchema(new byte[0]); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/IntSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/IntSchema.java index fc8338e45b84b..dfad28082186a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/IntSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/IntSchema.java @@ -32,7 +32,7 @@ public class IntSchema extends AbstractSchema { private static final SchemaInfo SCHEMA_INFO; static { - SCHEMA_INFO = new SchemaInfo() + SCHEMA_INFO = new SchemaInfoImpl() .setName("INT32") .setType(SchemaType.INT32) .setSchema(new byte[0]); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java index 4e3b87441b731..9fe6aed99c782 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java @@ -74,11 +74,11 @@ public SchemaInfo getBackwardsCompatibleJsonSchemaInfo() { ObjectMapper objectMapper = new ObjectMapper(); JsonSchemaGenerator schemaGen = new JsonSchemaGenerator(objectMapper); JsonSchema jsonBackwardsCompatibleSchema = schemaGen.generateSchema(pojo); - backwardsCompatibleSchemaInfo = new SchemaInfo(); - backwardsCompatibleSchemaInfo.setName(""); - backwardsCompatibleSchemaInfo.setProperties(schemaInfo.getProperties()); - backwardsCompatibleSchemaInfo.setType(SchemaType.JSON); - backwardsCompatibleSchemaInfo.setSchema(objectMapper.writeValueAsBytes(jsonBackwardsCompatibleSchema)); + backwardsCompatibleSchemaInfo = new SchemaInfoImpl() + .setName("") + .setProperties(schemaInfo.getProperties()) + .setType(SchemaType.JSON) + .setSchema(objectMapper.writeValueAsBytes(jsonBackwardsCompatibleSchema)); } catch (JsonProcessingException ex) { throw new RuntimeException(ex); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateSchema.java index add6fd28b5b12..18ef3af2d1d6a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateSchema.java @@ -32,7 +32,7 @@ public class LocalDateSchema extends AbstractSchema { private static final SchemaInfo SCHEMA_INFO; static { - SCHEMA_INFO = new SchemaInfo() + SCHEMA_INFO = new SchemaInfoImpl() .setName("LocalDate") .setType(SchemaType.LOCAL_DATE) .setSchema(new byte[0]); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchema.java index aa86a1920c538..05b2787fdd607 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchema.java @@ -37,7 +37,7 @@ public class LocalDateTimeSchema extends AbstractSchema { public static final String DELIMITER = ":"; static { - SCHEMA_INFO = new SchemaInfo() + SCHEMA_INFO = new SchemaInfoImpl() .setName("LocalDateTime") .setType(SchemaType.LOCAL_DATE_TIME) .setSchema(new byte[0]); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalTimeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalTimeSchema.java index 6e2bf627006e7..e53c620f8e0a2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalTimeSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalTimeSchema.java @@ -32,7 +32,7 @@ public class LocalTimeSchema extends AbstractSchema { private static final SchemaInfo SCHEMA_INFO; static { - SCHEMA_INFO = new SchemaInfo() + SCHEMA_INFO = new SchemaInfoImpl() .setName("LocalTime") .setType(SchemaType.LOCAL_TIME) .setSchema(new byte[0]); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LongSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LongSchema.java index f1491f48069ca..deccaf4ded80e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LongSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LongSchema.java @@ -32,7 +32,7 @@ public class LongSchema extends AbstractSchema { private static final SchemaInfo SCHEMA_INFO; static { - SCHEMA_INFO = new SchemaInfo() + SCHEMA_INFO = new SchemaInfoImpl() .setName("INT64") .setType(SchemaType.INT64) .setSchema(new byte[0]); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchema.java index 385fc41191bc3..9cf753c8d49e3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchema.java @@ -72,11 +72,10 @@ private ProtobufNativeSchema(SchemaInfo schemaInfo, T protoMessageInstance) { setReader(new ProtobufNativeReader<>(protoMessageInstance)); setWriter(new ProtobufNativeWriter<>()); // update properties with protobuf related properties - Map allProperties = new HashMap<>(); - allProperties.putAll(schemaInfo.getProperties()); // set protobuf parsing info + Map allProperties = new HashMap<>(schemaInfo.getProperties()); allProperties.put(PARSING_INFO_PROPERTY, getParsingInfo(protoMessageInstance)); - schemaInfo.setProperties(allProperties); + ((SchemaInfoImpl)schemaInfo).setProperties(allProperties); } private String getParsingInfo(T protoMessageInstance) { @@ -124,7 +123,7 @@ public static ProtobufNativeSchema of(SchemaDefinition schemaDefinition) } Descriptors.Descriptor descriptor = createProtobufNativeSchema(schemaDefinition.getPojo()); - SchemaInfo schemaInfo = SchemaInfo.builder() + SchemaInfo schemaInfo = SchemaInfoImpl.builder() .schema(ProtobufNativeSchemaUtils.serialize(descriptor)) .type(SchemaType.PROTOBUF_NATIVE) .name("") diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java index f7971eb4f17fc..275cacd64951b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java @@ -69,11 +69,10 @@ private ProtobufSchema(SchemaInfo schemaInfo, T protoMessageInstance) { setReader(new ProtobufReader<>(protoMessageInstance)); setWriter(new ProtobufWriter<>()); // update properties with protobuf related properties - Map allProperties = new HashMap<>(); - allProperties.putAll(schemaInfo.getProperties()); // set protobuf parsing info + Map allProperties = new HashMap<>(schemaInfo.getProperties()); allProperties.put(PARSING_INFO_PROPERTY, getParsingInfo(protoMessageInstance)); - schemaInfo.setProperties(allProperties); + ((SchemaInfoImpl)schemaInfo).setProperties(allProperties); } private String getParsingInfo(T protoMessageInstance) { @@ -111,7 +110,7 @@ public static ProtobufSchema of(SchemaDefinition schemaDefinition) { + " is not assignable from " + pojo.getName()); } - SchemaInfo schemaInfo = SchemaInfo.builder() + SchemaInfo schemaInfo = SchemaInfoImpl.builder() .schema(createProtobufAvroSchema(schemaDefinition.getPojo()).toString().getBytes(UTF_8)) .type(SchemaType.PROTOBUF) .name("") diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/RecordSchemaBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/RecordSchemaBuilderImpl.java index ee9f0cb91f9a8..0fda7d52b0dbe 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/RecordSchemaBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/RecordSchemaBuilderImpl.java @@ -105,7 +105,7 @@ public SchemaInfo build(SchemaType schemaType) { } baseSchema.setFields(avroFields); - return new SchemaInfo( + return new SchemaInfoImpl( name, baseSchema.toString().getBytes(UTF_8), schemaType, diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaInfoUtil.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaInfoUtil.java similarity index 58% rename from pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaInfoUtil.java rename to pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaInfoUtil.java index ac5997d6c1831..fb5263ee20d26 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaInfoUtil.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaInfoUtil.java @@ -16,16 +16,20 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.common.protocol.schema; +package org.apache.pulsar.client.impl.schema; import java.nio.charset.StandardCharsets; import java.util.Collections; +import java.util.Map; import java.util.TreeMap; import lombok.experimental.UtilityClass; +import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; import org.apache.pulsar.common.api.proto.KeyValue; import org.apache.pulsar.common.api.proto.Schema; import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.common.protocol.schema.GetSchemaResponse; +import org.apache.pulsar.common.protocol.schema.SchemaData; import org.apache.pulsar.common.schema.SchemaInfo; /** @@ -35,37 +39,39 @@ public class SchemaInfoUtil { public static SchemaInfo newSchemaInfo(String name, SchemaData data) { - SchemaInfo si = new SchemaInfo(); - si.setName(name); - si.setSchema(data.getData()); - si.setType(data.getType()); - si.setProperties(data.getProps()); - return si; + return SchemaInfoImpl.builder() + .name(name) + .schema(data.getData()) + .type(data.getType()) + .properties(data.getProps()) + .build(); } public static SchemaInfo newSchemaInfo(Schema schema) { - SchemaInfo si = new SchemaInfo(); - si.setName(schema.getName()); - si.setSchema(schema.getSchemaData()); - si.setType(Commands.getSchemaType(schema.getType())); + SchemaInfoImpl.SchemaInfoImplBuilder si = SchemaInfoImpl.builder() + .name(schema.getName()) + .schema(schema.getSchemaData()) + .type(Commands.getSchemaType(schema.getType())); if (schema.getPropertiesCount() == 0) { - si.setProperties(Collections.emptyMap()); + si.properties(Collections.emptyMap()); } else { - si.setProperties(new TreeMap<>()); + Map properties = new TreeMap<>(); for (int i = 0; i < schema.getPropertiesCount(); i++) { KeyValue kv = schema.getPropertyAt(i); - si.getProperties().put(kv.getKey(), kv.getValue()); + properties.put(kv.getKey(), kv.getValue()); } + + si.properties(properties); } - return si; + return si.build(); } public static SchemaInfo newSchemaInfo(String name, GetSchemaResponse schema) { - SchemaInfo si = new SchemaInfo(); - si.setName(name); - si.setSchema(schema.getData().getBytes(StandardCharsets.UTF_8)); - si.setType(schema.getType()); - si.setProperties(schema.getProperties()); - return si; + return SchemaInfoImpl.builder() + .name(name) + .schema(schema.getData().getBytes(StandardCharsets.UTF_8)) + .type(schema.getType()) + .properties(schema.getProperties()) + .build(); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ShortSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ShortSchema.java index 4014405760848..bbb5ad6752938 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ShortSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ShortSchema.java @@ -32,7 +32,7 @@ public class ShortSchema extends AbstractSchema { private static final SchemaInfo SCHEMA_INFO; static { - SCHEMA_INFO = new SchemaInfo() + SCHEMA_INFO = new SchemaInfoImpl() .setName("INT16") .setType(SchemaType.INT16) .setSchema(new byte[0]); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java index 7e57f6ca6ed86..462fa60d89249 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java @@ -46,7 +46,7 @@ public class StringSchema extends AbstractSchema { // Ensure the ordering of the static initialization CHARSET_KEY = "__charset"; DEFAULT_CHARSET = StandardCharsets.UTF_8; - DEFAULT_SCHEMA_INFO = new SchemaInfo() + DEFAULT_SCHEMA_INFO = new SchemaInfoImpl() .setName("String") .setType(SchemaType.STRING) .setSchema(new byte[0]); @@ -87,7 +87,7 @@ public StringSchema(Charset charset) { this.charset = charset; Map properties = new HashMap<>(); properties.put(CHARSET_KEY, charset.name()); - this.schemaInfo = new SchemaInfo() + this.schemaInfo = new SchemaInfoImpl() .setName(DEFAULT_SCHEMA_INFO.getName()) .setType(SchemaType.STRING) .setSchema(DEFAULT_SCHEMA_INFO.getSchema()) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java index 8cc18682469b0..7ba116f9adca9 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java @@ -105,7 +105,7 @@ protected static Schema parseAvroSchema(String schemaJson) { } public static SchemaInfo parseSchemaInfo(SchemaDefinition schemaDefinition, SchemaType schemaType) { - return SchemaInfo.builder() + return SchemaInfoImpl.builder() .schema(createAvroSchema(schemaDefinition).toString().getBytes(UTF_8)) .properties(schemaDefinition.getProperties()) .name("") diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimeSchema.java index d56e4da79873f..ab6e1ad438761 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimeSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimeSchema.java @@ -33,7 +33,7 @@ public class TimeSchema extends AbstractSchema