Skip to content

Commit

Permalink
feat: add new get schema id endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
doxsch committed Feb 8, 2024
1 parent 608b076 commit c732544
Show file tree
Hide file tree
Showing 5 changed files with 258 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.confluent.kafka.schemaregistry.avro.AvroSchema;

import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.FindSchemaIdRequest;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaRequest;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaResponse;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -164,6 +165,12 @@ public Schema(String subject, RegisterSchemaRequest request) {
this.schema = request.getSchema();
}

public Schema(FindSchemaIdRequest request) {
this.schemaType = request.getSchemaType() != null
? request.getSchemaType() : AvroSchema.TYPE;
this.schema = request.getSchema();
}

public Schema(String subject, RegisterSchemaResponse response) {
this.subject = subject;
this.version = response.getVersion() != null ? response.getVersion() : 0;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright 2018 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.confluent.kafka.schemaregistry.client.rest.entities.requests;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import io.confluent.kafka.schemaregistry.client.rest.entities.Schema;
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaTypeConverter;
import io.confluent.kafka.schemaregistry.utils.JacksonMapper;
import java.io.IOException;
import java.util.Objects;

@JsonInclude(JsonInclude.Include.NON_EMPTY)
@JsonIgnoreProperties(ignoreUnknown = true)
@io.swagger.v3.oas.annotations.media.Schema(description = "Find schema id request")
public class FindSchemaIdRequest {

private String schemaType;
private String schema;

public FindSchemaIdRequest() {
}

public static FindSchemaIdRequest fromJson(String json) throws IOException {
return JacksonMapper.INSTANCE.readValue(json, FindSchemaIdRequest.class);
}

@io.swagger.v3.oas.annotations.media.Schema(description = Schema.TYPE_DESC)
@JsonProperty("schemaType")
@JsonSerialize(converter = SchemaTypeConverter.class)
public String getSchemaType() {
return this.schemaType;
}

@JsonProperty("schemaType")
public void setSchemaType(String schemaType) {
this.schemaType = schemaType;
}

@io.swagger.v3.oas.annotations.media.Schema(description = Schema.SCHEMA_DESC)
@JsonProperty("schema")
public String getSchema() {
return this.schema;
}

@JsonProperty("schema")
public void setSchema(String schema) {
this.schema = schema;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
FindSchemaIdRequest that = (FindSchemaIdRequest) o;
return Objects.equals(schemaType, that.schemaType)
&& Objects.equals(schema, that.schema);
}

@Override
public int hashCode() {
return Objects.hash(schemaType, schema);
}

@Override
public String toString() {
StringBuilder buf = new StringBuilder();
buf.append("{");
buf.append("schemaType=").append(this.schemaType).append(", ");
buf.append("schema=").append(schema).append("}");
return buf.toString();
}

public String toJson() throws IOException {
return JacksonMapper.INSTANCE.writeValueAsString(this);
}

}
84 changes: 84 additions & 0 deletions core/generated/swagger-ui/schema-registry-api-spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2131,6 +2131,70 @@ paths:
Error code 40403 -- Schema not found
"500":
description: Internal server error
/schemas/ids:
post:
tags:
- Schemas (v1)
summary: Find global schema id
description: Get the schemas id matching the specified schema.
operationId: findSchemaId
requestBody:
description: Schema
content:
application/vnd.schemaregistry.v1+json:
schema:
$ref: '#/components/schemas/FindSchemaIdRequest'
application/vnd.schemaregistry+json:
schema:
$ref: '#/components/schemas/FindSchemaIdRequest'
application/json:
schema:
$ref: '#/components/schemas/FindSchemaIdRequest'
application/octet-stream:
schema:
$ref: '#/components/schemas/FindSchemaIdRequest'
required: true
responses:
"200":
description: Returns the global schema id.
content:
application/vnd.schemaregistry.v1+json:
schema:
type: integer
format: int32
application/vnd.schemaregistry+json; qs=0.9:
schema:
type: integer
format: int32
application/json; qs=0.5:
schema:
type: integer
format: int32
"404":
description: Not Found. Error code 40403 indicates schema not found.
content:
application/vnd.schemaregistry.v1+json:
schema:
$ref: '#/components/schemas/ErrorMessage'
application/vnd.schemaregistry+json; qs=0.9:
schema:
$ref: '#/components/schemas/ErrorMessage'
application/json; qs=0.5:
schema:
$ref: '#/components/schemas/ErrorMessage'
"500":
description: Internal Server Error. Error code 50001 indicates a failure
in the backend data store.
content:
application/vnd.schemaregistry.v1+json:
schema:
$ref: '#/components/schemas/ErrorMessage'
application/vnd.schemaregistry+json; qs=0.9:
schema:
$ref: '#/components/schemas/ErrorMessage'
application/json; qs=0.5:
schema:
$ref: '#/components/schemas/ErrorMessage'
components:
schemas:
CompatibilityCheckResponse:
Expand Down Expand Up @@ -2495,3 +2559,23 @@ components:
items:
$ref: '#/components/schemas/Rule'
description: Schema rule set
FindSchemaIdRequest:
type: object
properties:
schemaType:
type: string
description: Schema type
schema:
type: string
description: Schema definition string
description: Find schema id request
SchemaEntity:
type: object
properties:
entityPath:
type: string
entityType:
type: string
enum:
- sr_record
- sr_field
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.confluent.kafka.schemaregistry.client.rest.entities.Schema;
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaString;
import io.confluent.kafka.schemaregistry.client.rest.entities.SubjectVersion;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.FindSchemaIdRequest;
import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryException;
import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryStoreException;
import io.confluent.kafka.schemaregistry.rest.exceptions.Errors;
Expand All @@ -33,21 +34,23 @@
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.tags.Tag;
import io.swagger.v3.oas.annotations.tags.Tags;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.validation.constraints.NotNull;
import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.QueryParam;
import javax.ws.rs.PathParam;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Path("/schemas")
@Produces({Versions.SCHEMA_REGISTRY_V1_JSON_WEIGHTED,
Expand All @@ -66,6 +69,48 @@ public SchemasResource(KafkaSchemaRegistry schemaRegistry) {
this.schemaRegistry = schemaRegistry;
}

@POST
@Path("/ids")
@DocumentedName("findSchemaId")
@Operation(summary = "Find global schema id",
description = "Get the schemas id matching the specified schema.",
responses = {
@ApiResponse(responseCode = "200",
description = "Returns the global schema id.", content = @Content(schema = @io.swagger.v3.oas.annotations.media.Schema(
implementation = Integer.class))),
@ApiResponse(responseCode = "404",
description = "Not Found. Error code 40403 indicates schema not found.",
content = @Content(schema = @io.swagger.v3.oas.annotations.media.Schema(implementation =
ErrorMessage.class))),
@ApiResponse(responseCode = "500",
description = "Internal Server Error. "
+ "Error code 50001 indicates a failure in the backend data store.",
content = @Content(schema = @io.swagger.v3.oas.annotations.media.Schema(implementation =
ErrorMessage.class)))})
@Tags(@Tag(name = apiTag))
@PerformanceMetric("schemas.find-schema-id")
public Integer findSchemaId(
@Parameter(description = "Schema", required = true)
@NotNull FindSchemaIdRequest request
) {
Optional<Integer> id;
String errorMessage = "Error while finding schema";
Schema schema = new Schema(request);

try {
id = schemaRegistry.findSchemaId(schema);
} catch (SchemaRegistryStoreException e) {
log.debug(errorMessage, e);
throw Errors.storeException(errorMessage, e);
}

if (!id.isPresent()) {
throw Errors.schemaNotFoundException();
}

return id.get();
}

@GET
@DocumentedName("getSchemas")
@Operation(summary = "List schemas",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -1669,6 +1670,20 @@ public Set<String> listSubjectsWithPrefix(String prefix, LookupFilter filter)
}
}

public Optional<Integer> findSchemaId(Schema schema)
throws SchemaRegistryStoreException {

SchemaIdAndSubjects schemaIdAndSubjects;

try {
schemaIdAndSubjects = this.lookupCache.schemaIdAndSubjects(schema);
} catch (StoreException e) {
throw new SchemaRegistryStoreException("Error while retrieving schema", e);
}

return schemaIdAndSubjects!= null ? Optional.of(schemaIdAndSubjects.getSchemaId()) : Optional.empty();
}

public Set<String> listSubjectsForId(int id, String subject) throws SchemaRegistryException {
return listSubjectsForId(id, subject, false);
}
Expand Down

0 comments on commit c732544

Please sign in to comment.