Skip to content

Commit

Permalink
Add Recap schema registry
Browse files Browse the repository at this point in the history
Recap can now store schemas in a registry similar to
[Confluent's schema registry](https://docs.confluent.io/platform/current/schema-registry/index.html)
and [Buf's schema registry](https://buf.build/product/bsr).

The registry is meant to be used as a source of truth for Recap schema
definitions in an organization. It can also be used as a cache for schemas that
have been read from elsewhere, though it is not meant to be a general purpose
data catalog (i.e. data discoverability is not the goal).

There are two components to the registry:

- Storage
- HTTP/JSON API

The storage layer stores schemas on a filesystem like S3, GCS, or the local
filesystem using [fsspec](https://filesystem-spec.readthedocs.io/en/latest/).
The HTTP/JSON layer is a Flask app that exposes a REST API for interacting with
the registry.

The API largely mirrors Confluent's API, but with slightly different paths. It
also doesn't have a delete endpoint.
  • Loading branch information
criccomini committed Oct 5, 2023
1 parent af99557 commit 4a582d9
Show file tree
Hide file tree
Showing 15 changed files with 511 additions and 38 deletions.
12 changes: 11 additions & 1 deletion pdm.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ app = [
"uvicorn>=0.23.2",
"rich>=13.5.2",
"python-dotenv>=1.0.0",
"fsspec>=2023.9.2",
]
all = [
"recap-core[app,bigquery,hive,json,kafka,proto]",
Expand Down Expand Up @@ -125,7 +126,7 @@ unit = "pytest tests/unit -vv"
spec = "pytest tests/spec -vv"
integration = "pytest tests/integration -vv"
test = {composite = ["unit", "spec"]}
serve ="uvicorn recap.gateway:app --reload"
serve ="uvicorn recap.server.app:app --reload"

[tool.pytest.ini_options]
addopts = [
Expand Down
2 changes: 1 addition & 1 deletion recap/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,4 @@ def serve(

import uvicorn

uvicorn.run("recap.gateway:app", host=host, port=port, log_level=log_level)
uvicorn.run("recap.server.app:app", host=host, port=port, log_level=log_level)
8 changes: 8 additions & 0 deletions recap/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ class SchemaFormat(str, Enum):
recap = "recap"


FORMAT_MAP = {
"application/schema+json": SchemaFormat.json,
"application/avro+json": SchemaFormat.avro,
"application/x-protobuf": SchemaFormat.protobuf,
"application/x-recap+json": SchemaFormat.recap,
}


def ls(url: str | None = None) -> list[str] | None:
"""
List a URL's children.
Expand Down
Empty file added recap/server/__init__.py
Empty file.
7 changes: 7 additions & 0 deletions recap/server/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from fastapi import FastAPI

from recap.server import gateway, registry

app = FastAPI()
app.include_router(gateway.router)
app.include_router(registry.router)
19 changes: 6 additions & 13 deletions recap/gateway.py → recap/server/gateway.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,11 @@
from fastapi import FastAPI, HTTPException, Request
from fastapi import APIRouter, HTTPException, Request

from recap import commands

app = FastAPI()
router = APIRouter(prefix="/gateway")

FORMAT_MAP = {
"application/schema+json": commands.SchemaFormat.json,
"application/avro+json": commands.SchemaFormat.avro,
"application/x-protobuf": commands.SchemaFormat.protobuf,
"application/x-recap": commands.SchemaFormat.recap,
}


@app.get("/ls/{url:path}")
@router.get("/ls/{url:path}")
async def ls(url: str | None = None) -> list[str]:
"""
List the children of a URL.
Expand All @@ -24,14 +17,14 @@ async def ls(url: str | None = None) -> list[str]:
raise HTTPException(status_code=404, detail="URL not found")


@app.get("/schema/{url:path}")
@router.get("/schema/{url:path}")
async def schema(url: str, request: Request):
"""
Get the schema of a URL.
"""

content_type = request.headers.get("content-type") or "application/x-recap"
if format := FORMAT_MAP.get(content_type):
content_type = request.headers.get("content-type") or "application/x-recap+json"
if format := commands.FORMAT_MAP.get(content_type):
return commands.schema(url, format)
else:
raise HTTPException(
Expand Down
115 changes: 115 additions & 0 deletions recap/server/registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import json

from fastapi import APIRouter, Depends, HTTPException, Request

from recap.settings import RecapSettings
from recap.storage.registry import RegistryStorage
from recap.types import RecapType, from_dict, to_dict

router = APIRouter(prefix="/registry")
settings = RecapSettings()


def get_storage() -> RegistryStorage:
return RegistryStorage(
settings.registry_storage_url.unicode_string(),
**settings.registry_storage_url_args,
)


@router.get("/")
async def ls(storage: RegistryStorage = Depends(get_storage)) -> list[str]:
return storage.ls()


@router.get("/{name:str}")
async def latest(
name: str,
storage: RegistryStorage = Depends(get_storage),
) -> tuple[dict | list | str, int]:
if type_and_version := storage.get(name):
type_, version = type_and_version
return to_dict(type_), version
else:
raise HTTPException(status_code=404, detail="Not found")


@router.get("/{name:str}/versions")
async def versions(
name: str,
storage: RegistryStorage = Depends(get_storage),
) -> list[int]:
if versions := storage.versions(name):
return versions
else:
raise HTTPException(status_code=404, detail="Not found")


@router.get("/{name:str}/versions/{version:int}")
async def version(
name: str,
version: int,
storage: RegistryStorage = Depends(get_storage),
) -> tuple[dict | list | str, int]:
if type_and_version := storage.get(name, version):
type_, version = type_and_version
return to_dict(type_), version
else:
raise HTTPException(status_code=404, detail="Not found")


@router.post("/{name:str}")
async def post(
name: str,
request: Request,
storage: RegistryStorage = Depends(get_storage),
) -> int:
type_ = await _request_to_type(request)

return storage.put(name, type_)


@router.put("/{name:str}/versions/{version:int}")
async def put(
name: str,
version: int,
request: Request,
storage: RegistryStorage = Depends(get_storage),
):
if storage.get(name, version):
raise HTTPException(
status_code=409,
detail=f"Type {name} version {version} already exists",
)

type_ = await _request_to_type(request)

return storage.put(name, type_, version)


async def _request_to_type(request: Request) -> RecapType:
content_type = request.headers.get("content-type") or "application/x-recap+json"

if content_type != "application/x-recap+json":
raise HTTPException(
status_code=415,
detail=f"Unsupported content type: {content_type}",
)

type_bytes = await request.body()
type_str = type_bytes.decode("utf-8")

try:
type_json = json.loads(type_str)
except json.JSONDecodeError:
# Assume payload is string alias if we can't decode as JSON.
# If it's not, from_dict will fail below.
type_json = type_str

try:
return from_dict(type_json)
except Exception as e:
raise HTTPException(
status_code=400,
detail=f"Failed to parse type: {e}",
)
26 changes: 13 additions & 13 deletions recap/settings.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
from pathlib import Path
from typing import Any
from urllib.parse import parse_qs, urlencode, urlsplit, urlunsplit

from dotenv import load_dotenv
Expand All @@ -8,31 +9,30 @@

load_dotenv()

CONFIG_FILE = os.environ.get("RECAP_CONFIG") or os.path.expanduser("~/.recap/config")
SECRETS_DIR = os.environ.get("RECAP_SECRETS")
HOME_PATH = Path(os.environ.get("RECAP_HOME") or os.path.expanduser("~/.recap"))
DEFAULT_REGISTRY_STORAGE_PATH = Path(HOME_PATH, "schemas")
SECRETS_PATH = Path(dir) if (dir := os.environ.get("RECAP_SECRETS")) else None


def touch_config():
config_path = Path(CONFIG_FILE)
config_path.parent.mkdir(parents=True, exist_ok=True)
config_path.touch(mode=0o600, exist_ok=True)
if SECRETS_DIR:
secrets_path = Path(SECRETS_DIR)
secrets_path.mkdir(mode=0o700, parents=True, exist_ok=True)
def mkdirs():
HOME_PATH.mkdir(exist_ok=True)

if SECRETS_PATH:
SECRETS_PATH.mkdir(exist_ok=True)

touch_config()

mkdirs()


class RecapSettings(BaseSettings):
urls: list[AnyUrl] = Field(default_factory=list)
registry_storage_url: AnyUrl = Field(default=DEFAULT_REGISTRY_STORAGE_PATH.as_uri())
registry_storage_url_args: dict[str, Any] = Field(default_factory=dict)
model_config = SettingsConfigDict(
# .env takes priority over CONFIG_FILE
env_file=[CONFIG_FILE, ".env"],
env_file_encoding="utf-8",
env_prefix="recap_",
env_nested_delimiter="__",
secrets_dir=SECRETS_DIR,
secrets_dir=str(SECRETS_PATH) if SECRETS_PATH else None,
)

@property
Expand Down
Empty file added recap/storage/__init__.py
Empty file.
83 changes: 83 additions & 0 deletions recap/storage/registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import json
from pathlib import Path
from urllib.parse import quote_plus, unquote_plus

import fsspec

from recap.types import RecapType, from_dict, to_dict


class RegistryStorage:
def __init__(self, storage_url: str, **storage_url_args):
self.fs, self.root_path = fsspec.core.url_to_fs(storage_url, **storage_url_args)
self.fs.mkdirs(self.root_path, exist_ok=True)

def ls(self) -> list[str]:
return sorted(
[
unquote_plus(file_path[len(self.root_path) + 1 :])
for file_path in self.fs.ls(self.root_path)
]
)

def get(
self,
name: str,
version: int | None = None,
) -> tuple[RecapType, int] | None:
quoted_name = quote_plus(name)

if version is None:
versions = self.versions(name)
if not versions:
return None
version = max(versions)

try:
with self.fs.open(f"{self.root_path}/{quoted_name}/{version}.json") as f:
type_json = json.load(f)
type_ = from_dict(type_json)
return (type_, version)
except FileNotFoundError:
return None

def put(
self,
name: str,
type_: RecapType,
version: int | None = None,
) -> int:
quoted_name = quote_plus(name)

if version is None:
version = (self.latest(name) or 0) + 1

path_without_version = f"{self.root_path}/{quoted_name}"
type_dict = to_dict(type_)

self.fs.mkdirs(path_without_version, exist_ok=True)

with self.fs.open(f"{path_without_version}/{version}.json", "w") as f:
json.dump(type_dict, f)

return version

def versions(self, name: str) -> list[int] | None:
quoted_name = quote_plus(name)
path_without_version = f"{self.root_path}/{quoted_name}"

try:
return sorted(
[
int(Path(file_path).stem)
for file_path in self.fs.ls(path_without_version)
]
)
except FileNotFoundError:
return None

def latest(self, name: str) -> int | None:
versions = self.versions(name)
if not versions:
return None
return max(versions)
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
from uvicorn import Server
from uvicorn.config import Config

from recap.gateway import app
from recap.server.app import app

client = httpx.Client(follow_redirects=True, base_url="http://localhost:8000")
client = httpx.Client(follow_redirects=True, base_url="http://localhost:8000/gateway")


class TestGateway:
Expand Down Expand Up @@ -114,7 +114,7 @@ def test_schema_json(self):
"properties": {"test_integer": {"default": None, "type": "integer"}},
}

@pytest.mark.xfail(reason="Enable when #397 is fixed")
@pytest.mark.skip(reason="Enable when #397 is fixed")
def test_schema_protobuf(self):
response = client.get(
"/schema/postgresql://localhost:5432/testdb/public/test_types",
Expand All @@ -131,7 +131,7 @@ def test_schema_protobuf(self):
def test_schema_recap(self):
response = client.get(
"/schema/postgresql://localhost:5432/testdb/public/test_types",
headers={"Content-Type": "application/x-recap"},
headers={"Content-Type": "application/x-recap+json"},
)
assert response.status_code == 200
assert response.json() == {
Expand Down
Loading

0 comments on commit 4a582d9

Please sign in to comment.