Skip to content

Commit

Permalink
Remove the unwanted dependencies in the pulsar function's instance ja…
Browse files Browse the repository at this point in the history
…r and make SchemaInfo an interface (#10878)

### Motivation

The java-instance.jar generated by the pulsar-functions-runtime-all module should only contain interfaces that Pulsar Function's framework uses to interact with user code.  The module should on have the following dependencies
    1. pulsar-io-core
    2. pulsar-functions-api
    3. pulsar-client-api
    4. slf4j-api
    5. log4j-slf4j-impl
    6. log4j-api
    7. log4j-core

*Explain here the context, and why you're making that change. What is the problem you're trying to solve.*

### Modifications

Change dep pulsar-client-original to pulsar-client-api

Slight changes in the top level pom for what is included in all sub-modules so that additional deps don't land into java-instance.jar

There is also a fix for an issue introduced by #9673. The thread context class loader was set incorrectly in ThreadRuntime.

### Future improvements

1. We should also add a test in the future to make sure external libraries don't get add accidentally this module and java-instance.jar

2. Rename the module "pulsar-functions-runtime-all" to something that describes its function better.  The current name can be confusing


(cherry picked from commit d81b5f8)
  • Loading branch information
jerrypeng authored and codelipenghui committed Jun 12, 2021
1 parent 0929015 commit 89ac98e
Show file tree
Hide file tree
Showing 61 changed files with 488 additions and 254 deletions.
6 changes: 6 additions & 0 deletions distribution/server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper-prometheus-metrics</artifactId>
<version>${zookeeper.version}</version>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-package-bookkeeper-storage</artifactId>
Expand Down
9 changes: 4 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1199,13 +1199,12 @@ flexible messaging model and an intuitive client API.</description>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper-prometheus-metrics</artifactId>
<version>${zookeeper.version}</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.schema.SchemaInfoUtil;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.CommandAck;
Expand Down Expand Up @@ -135,7 +136,6 @@
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.PulsarHandler;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaInfoUtil;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
Expand Down Expand Up @@ -97,11 +98,12 @@ public void testDisableSchemaValidationEnforcedHasSchema() throws Exception {
assertTrue(e.getMessage().contains("HTTP 404 Not Found"));
}
Map<String, String> properties = Maps.newHashMap();
SchemaInfo schemaInfo = new SchemaInfo();
schemaInfo.setType(SchemaType.STRING);
schemaInfo.setProperties(properties);
schemaInfo.setName("test");
schemaInfo.setSchema("".getBytes());
SchemaInfo schemaInfo = SchemaInfoImpl.builder()
.type(SchemaType.STRING)
.properties(properties)
.name("test")
.schema("".getBytes())
.build();
PostSchemaPayload postSchemaPayload = new PostSchemaPayload("STRING", "", properties);
admin.schemas().createSchema(topicName, postSchemaPayload);
try (Producer p = pulsarClient.newProducer().topic(topicName).create()) {
Expand Down Expand Up @@ -145,11 +147,12 @@ public void testEnableSchemaValidationEnforcedHasSchemaMismatch() throws Excepti
}
Map<String, String> properties = Maps.newHashMap();
properties.put("key1", "value1");
SchemaInfo schemaInfo = new SchemaInfo();
schemaInfo.setType(SchemaType.STRING);
schemaInfo.setProperties(properties);
schemaInfo.setName("test");
schemaInfo.setSchema("".getBytes());
SchemaInfo schemaInfo = SchemaInfoImpl.builder()
.type(SchemaType.STRING)
.properties(properties)
.name("test")
.schema("".getBytes())
.build();
PostSchemaPayload postSchemaPayload = new PostSchemaPayload("STRING", "", properties);
admin.schemas().createSchema(topicName, postSchemaPayload);
try (Producer p = pulsarClient.newProducer().topic(topicName).create()) {
Expand All @@ -174,11 +177,12 @@ public void testEnableSchemaValidationEnforcedHasSchemaMatch() throws Exception
}
admin.namespaces().setSchemaValidationEnforced(namespace,true);
Map<String, String> properties = Maps.newHashMap();
SchemaInfo schemaInfo = new SchemaInfo();
schemaInfo.setType(SchemaType.STRING);
schemaInfo.setProperties(properties);
schemaInfo.setName("test");
schemaInfo.setSchema("".getBytes());
SchemaInfo schemaInfo = SchemaInfoImpl.builder()
.type(SchemaType.STRING)
.properties(properties)
.name("test")
.schema("".getBytes())
.build();
PostSchemaPayload postSchemaPayload = new PostSchemaPayload("STRING", "", properties);
admin.schemas().createSchema(topicName, postSchemaPayload);
try (Producer<String> p = pulsarClient.newProducer(Schema.STRING).topic(topicName).create()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaInfo;
Expand Down Expand Up @@ -118,11 +119,12 @@ public static <T> OldJSONSchema<T> of(Class<T> pojo, Map<String, String> propert
JsonSchemaGenerator schemaGen = new JsonSchemaGenerator(mapper);
JsonSchema schema = schemaGen.generateSchema(pojo);

SchemaInfo info = new SchemaInfo();
info.setName("");
info.setProperties(properties);
info.setType(SchemaType.JSON);
info.setSchema(mapper.writeValueAsBytes(schema));
SchemaInfo info = SchemaInfoImpl.builder()
.name("")
.properties(properties)
.type(SchemaType.JSON)
.schema(mapper.writeValueAsBytes(schema))
.build();
return new OldJSONSchema<>(info, pojo, mapper);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
Expand Down Expand Up @@ -437,7 +438,7 @@ private void testUseAutoConsumeWithSchemalessTopic(SchemaType schema) throws Exc
admin.topics().createPartitionedTopic(topic, 2);

// set schema
SchemaInfo schemaInfo = SchemaInfo
SchemaInfo schemaInfo = SchemaInfoImpl
.builder()
.schema(new byte[0])
.name("dummySchema")
Expand Down Expand Up @@ -653,7 +654,7 @@ public void testNullKeyValueProperty() throws PulsarAdminException, PulsarClient
final Map<String, String> map = new HashMap<>();
map.put("key", null);
map.put(null, "value"); // null key is not allowed for JSON, it's only for test here
Schema.INT32.getSchemaInfo().setProperties(map);
((SchemaInfoImpl)Schema.INT32.getSchemaInfo()).setProperties(map);

final Consumer<Integer> consumer = pulsarClient.newConsumer(Schema.INT32).topic(topic)
.subscriptionName("sub")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
Expand Down Expand Up @@ -323,7 +324,7 @@ public void testSchemaComparison() throws Exception {
SchemaCompatibilityStrategy.FULL);
byte[] changeSchemaBytes = (new String(Schema.AVRO(Schemas.PersonOne.class)
.getSchemaInfo().getSchema(), UTF_8) + "/n /n /n").getBytes();
SchemaInfo schemaInfo = SchemaInfo.builder().type(SchemaType.AVRO).schema(changeSchemaBytes).build();
SchemaInfo schemaInfo = SchemaInfoImpl.builder().type(SchemaType.AVRO).schema(changeSchemaBytes).build();
admin.schemas().createSchema(fqtn, schemaInfo);

admin.namespaces().setIsAllowAutoUpdateSchema(namespaceName.toString(), false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Schemas;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.schema.DeleteSchemaResponse;
Expand Down Expand Up @@ -434,19 +435,21 @@ private WebTarget topicPath(TopicName topic, String... parts) {
// the util function converts `GetSchemaResponse` to `SchemaInfo`
static SchemaInfo convertGetSchemaResponseToSchemaInfo(TopicName tn,
GetSchemaResponse response) {
SchemaInfo info = new SchemaInfo();

byte[] schema;
if (response.getType() == SchemaType.KEY_VALUE) {
schema = DefaultImplementation.convertKeyValueDataStringToSchemaInfoSchema(
response.getData().getBytes(UTF_8));
} else {
schema = response.getData().getBytes(UTF_8);
}
info.setSchema(schema);
info.setType(response.getType());
info.setProperties(response.getProperties());
info.setName(tn.getLocalName());
return info;

return SchemaInfoImpl.builder()
.schema(schema)
.type(response.getType())
.properties(response.getProperties())
.name(tn.getLocalName())
.build();
}

static SchemaInfoWithVersion convertGetSchemaResponseToSchemaInfoWithVersion(TopicName tn,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,7 @@
*/
package org.apache.pulsar.common.schema;

import static java.nio.charset.StandardCharsets.UTF_8;
import java.util.Base64;
import java.util.Collections;
import java.util.Map;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;
Expand All @@ -37,55 +28,24 @@
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
@Data
@AllArgsConstructor
@NoArgsConstructor
@Accessors(chain = true)
@Builder
public class SchemaInfo {
public interface SchemaInfo {

@EqualsAndHashCode.Exclude
private String name;
String getName();

/**
* The schema data in AVRO JSON format.
*/
private byte[] schema;
byte[] getSchema();

/**
* The type of schema (AVRO, JSON, PROTOBUF, etc..).
*/
private SchemaType type;
SchemaType getType();

/**
* Additional properties of the schema definition (implementation defined).
*/
@Builder.Default
private Map<String, String> properties = Collections.emptyMap();

public String getSchemaDefinition() {
if (null == schema) {
return "";
}

switch (type) {
case AVRO:
case JSON:
case PROTOBUF:
case PROTOBUF_NATIVE:
return new String(schema, UTF_8);
case KEY_VALUE:
KeyValue<SchemaInfo, SchemaInfo> schemaInfoKeyValue =
DefaultImplementation.decodeKeyValueSchemaInfo(this);
return DefaultImplementation.jsonifyKeyValueSchemaInfo(schemaInfoKeyValue);
default:
return Base64.getEncoder().encodeToString(schema);
}
}

@Override
public String toString(){
return DefaultImplementation.jsonifySchemaInfo(this);
}
Map<String, String> getProperties();

String getSchemaDefinition();
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.PulsarHandler;
import org.apache.pulsar.common.protocol.schema.SchemaInfoUtil;
import org.apache.pulsar.client.impl.schema.SchemaInfoUtil;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
import org.apache.pulsar.common.protocol.schema.GetSchemaResponse;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.protocol.schema.SchemaInfoUtil;
import org.apache.pulsar.client.impl.schema.SchemaInfoUtil;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static com.google.common.base.Preconditions.checkState;

import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.KeyValueSchema;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
Expand Down Expand Up @@ -74,9 +75,9 @@ public byte[] encode(byte[] message) {

if (requireSchemaValidation) {
// verify if the message can be decoded by the underlying schema
if (schema instanceof KeyValueSchemaImpl
&& ((KeyValueSchemaImpl) schema).getKeyValueEncodingType().equals(KeyValueEncodingType.SEPARATED)) {
((KeyValueSchemaImpl) schema).getValueSchema().validate(message);
if (schema instanceof KeyValueSchema
&& ((KeyValueSchema) schema).getKeyValueEncodingType().equals(KeyValueEncodingType.SEPARATED)) {
((KeyValueSchema) schema).getValueSchema().validate(message);
} else {
schema.validate(message);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class BooleanSchema extends AbstractSchema<Boolean> {
private static final SchemaInfo SCHEMA_INFO;

static {
SCHEMA_INFO = new SchemaInfo()
SCHEMA_INFO = new SchemaInfoImpl()
.setName("Boolean")
.setType(SchemaType.BOOLEAN)
.setSchema(new byte[0]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class ByteBufSchema extends AbstractSchema<ByteBuf> {
private static final SchemaInfo SCHEMA_INFO;

static {
SCHEMA_INFO = new SchemaInfo()
SCHEMA_INFO = new SchemaInfoImpl()
.setName("ByteBuf")
.setType(SchemaType.BYTES)
.setSchema(new byte[0]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class ByteBufferSchema extends AbstractSchema<ByteBuffer> {
private static final SchemaInfo SCHEMA_INFO;

static {
SCHEMA_INFO = new SchemaInfo()
SCHEMA_INFO = new SchemaInfoImpl()
.setName("ByteBuffer")
.setType(SchemaType.BYTES)
.setSchema(new byte[0]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class ByteSchema extends AbstractSchema<Byte> {
private static final SchemaInfo SCHEMA_INFO;

static {
SCHEMA_INFO = new SchemaInfo()
SCHEMA_INFO = new SchemaInfoImpl()
.setName("INT8")
.setType(SchemaType.INT8)
.setSchema(new byte[0]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class BytesSchema extends AbstractSchema<byte[]> {
private static final SchemaInfo SCHEMA_INFO;

static {
SCHEMA_INFO = new SchemaInfo()
SCHEMA_INFO = new SchemaInfoImpl()
.setName("Bytes")
.setType(SchemaType.BYTES)
.setSchema(new byte[0]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class DateSchema extends AbstractSchema<Date> {
private static final SchemaInfo SCHEMA_INFO;

static {
SCHEMA_INFO = new SchemaInfo()
SCHEMA_INFO = new SchemaInfoImpl()
.setName("Date")
.setType(SchemaType.DATE)
.setSchema(new byte[0]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class DoubleSchema extends AbstractSchema<Double> {
private static final SchemaInfo SCHEMA_INFO;

static {
SCHEMA_INFO = new SchemaInfo()
SCHEMA_INFO = new SchemaInfoImpl()
.setName("Double")
.setType(SchemaType.DOUBLE)
.setSchema(new byte[0]);
Expand Down
Loading

0 comments on commit 89ac98e

Please sign in to comment.