-
Notifications
You must be signed in to change notification settings - Fork 24
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Recap can now store schemas in a registry similar to [Confluent's schema registry](https://docs.confluent.io/platform/current/schema-registry/index.html) and [Buf's schema registry](https://buf.build/product/bsr). The registry is meant to be used as a source of truth for Recap schema definitions in an organization. It can also be used as a cache for schemas that have been read from elsewhere, though it is not meant to be a general purpose data catalog (i.e. data discoverability is not the goal). There are two components to the registry: - Storage - HTTP/JSON API The storage layer stores schemas on a filesystem like S3, GCS, or the local filesystem using [fsspec](https://filesystem-spec.readthedocs.io/en/latest/). The HTTP/JSON layer is a Flask app that exposes a REST API for interacting with the registry. The API largely mirrors Confluent's API, but with slightly different paths. It also doesn't have a delete endpoint.
- Loading branch information
1 parent
af99557
commit 53bc3f7
Showing
15 changed files
with
511 additions
and
38 deletions.
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
from fastapi import FastAPI | ||
|
||
from recap.server import gateway, registry | ||
|
||
app = FastAPI() | ||
app.include_router(gateway.router) | ||
app.include_router(registry.router) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,115 @@ | ||
import json | ||
|
||
from fastapi import APIRouter, Depends, HTTPException, Request | ||
|
||
from recap.settings import RecapSettings | ||
from recap.storage.registry import RegistryStorage | ||
from recap.types import RecapType, from_dict, to_dict | ||
|
||
router = APIRouter(prefix="/registry") | ||
settings = RecapSettings() | ||
|
||
|
||
def get_storage() -> RegistryStorage: | ||
return RegistryStorage( | ||
settings.registry_storage_url.unicode_string(), | ||
**settings.registry_storage_url_args, | ||
) | ||
|
||
|
||
@router.get("/") | ||
async def ls(storage: RegistryStorage = Depends(get_storage)) -> list[str]: | ||
return storage.ls() | ||
|
||
|
||
@router.get("/{name:str}") | ||
async def latest( | ||
name: str, | ||
storage: RegistryStorage = Depends(get_storage), | ||
) -> tuple[dict | list | str, int]: | ||
if type_and_version := storage.get(name): | ||
type_, version = type_and_version | ||
return to_dict(type_), version | ||
else: | ||
raise HTTPException(status_code=404, detail="Not found") | ||
|
||
|
||
@router.get("/{name:str}/versions") | ||
async def versions( | ||
name: str, | ||
storage: RegistryStorage = Depends(get_storage), | ||
) -> list[int]: | ||
if versions := storage.versions(name): | ||
return versions | ||
else: | ||
raise HTTPException(status_code=404, detail="Not found") | ||
|
||
|
||
@router.get("/{name:str}/versions/{version:int}") | ||
async def version( | ||
name: str, | ||
version: int, | ||
storage: RegistryStorage = Depends(get_storage), | ||
) -> tuple[dict | list | str, int]: | ||
if type_and_version := storage.get(name, version): | ||
type_, version = type_and_version | ||
return to_dict(type_), version | ||
else: | ||
raise HTTPException(status_code=404, detail="Not found") | ||
|
||
|
||
@router.post("/{name:str}") | ||
async def post( | ||
name: str, | ||
request: Request, | ||
storage: RegistryStorage = Depends(get_storage), | ||
) -> int: | ||
type_ = await _request_to_type(request) | ||
|
||
return storage.put(name, type_) | ||
|
||
|
||
@router.put("/{name:str}/versions/{version:int}") | ||
async def put( | ||
name: str, | ||
version: int, | ||
request: Request, | ||
storage: RegistryStorage = Depends(get_storage), | ||
): | ||
if storage.get(name, version): | ||
raise HTTPException( | ||
status_code=409, | ||
detail=f"Type {name} version {version} already exists", | ||
) | ||
|
||
type_ = await _request_to_type(request) | ||
|
||
return storage.put(name, type_, version) | ||
|
||
|
||
async def _request_to_type(request: Request) -> RecapType: | ||
content_type = request.headers.get("content-type") or "application/x-recap+json" | ||
|
||
if content_type != "application/x-recap+json": | ||
raise HTTPException( | ||
status_code=415, | ||
detail=f"Unsupported content type: {content_type}", | ||
) | ||
|
||
type_bytes = await request.body() | ||
type_str = type_bytes.decode("utf-8") | ||
|
||
try: | ||
type_json = json.loads(type_str) | ||
except json.JSONDecodeError: | ||
# Assume payload is string alias if we can't decode as JSON. | ||
# If it's not, from_dict will fail below. | ||
type_json = type_str | ||
|
||
try: | ||
return from_dict(type_json) | ||
except Exception as e: | ||
raise HTTPException( | ||
status_code=400, | ||
detail=f"Failed to parse type: {e}", | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
import json | ||
from pathlib import Path | ||
from urllib.parse import quote_plus, unquote_plus | ||
|
||
import fsspec | ||
|
||
from recap.types import RecapType, from_dict, to_dict | ||
|
||
|
||
class RegistryStorage: | ||
def __init__(self, storage_url: str, **storage_url_args): | ||
self.fs, self.root_path = fsspec.core.url_to_fs(storage_url, **storage_url_args) | ||
self.fs.mkdirs(self.root_path, exist_ok=True) | ||
|
||
def ls(self) -> list[str]: | ||
return sorted( | ||
[ | ||
unquote_plus(file_path[len(self.root_path) + 1 :]) | ||
for file_path in self.fs.ls(self.root_path) | ||
] | ||
) | ||
|
||
def get( | ||
self, | ||
name: str, | ||
version: int | None = None, | ||
) -> tuple[RecapType, int] | None: | ||
quoted_name = quote_plus(name) | ||
|
||
if version is None: | ||
versions = self.versions(name) | ||
if not versions: | ||
return None | ||
version = max(versions) | ||
|
||
try: | ||
with self.fs.open(f"{self.root_path}/{quoted_name}/{version}.json") as f: | ||
type_json = json.load(f) | ||
type_ = from_dict(type_json) | ||
return (type_, version) | ||
except FileNotFoundError: | ||
return None | ||
|
||
def put( | ||
self, | ||
name: str, | ||
type_: RecapType, | ||
version: int | None = None, | ||
) -> int: | ||
quoted_name = quote_plus(name) | ||
|
||
if version is None: | ||
version = (self.latest(name) or 0) + 1 | ||
|
||
path_without_version = f"{self.root_path}/{quoted_name}" | ||
type_dict = to_dict(type_) | ||
|
||
self.fs.mkdirs(path_without_version, exist_ok=True) | ||
|
||
with self.fs.open(f"{path_without_version}/{version}.json", "w") as f: | ||
json.dump(type_dict, f) | ||
|
||
return version | ||
|
||
def versions(self, name: str) -> list[int] | None: | ||
quoted_name = quote_plus(name) | ||
path_without_version = f"{self.root_path}/{quoted_name}" | ||
|
||
try: | ||
return sorted( | ||
[ | ||
int(Path(file_path).stem) | ||
for file_path in self.fs.ls(path_without_version) | ||
] | ||
) | ||
except FileNotFoundError: | ||
return None | ||
|
||
def latest(self, name: str) -> int | None: | ||
versions = self.versions(name) | ||
if not versions: | ||
return None | ||
return max(versions) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.