diff --git a/src/fontra/__main__.py b/src/fontra/__main__.py index ae03aac5d..b7cbb5d39 100644 --- a/src/fontra/__main__.py +++ b/src/fontra/__main__.py @@ -4,12 +4,13 @@ from importlib.metadata import entry_points from . import __version__ as fontraVersion +from .core.protocols import ProjectManager, ProjectManagerFactory from .core.server import FontraServer, findFreeTCPPort DEFAULT_PORT = 8000 -def main(): +def main() -> None: logging.basicConfig( format="%(asctime)s %(name)-17s %(levelname)-8s %(message)s", level=logging.INFO, @@ -41,7 +42,7 @@ def main(): # See https://github.com/googlefonts/fontra/issues/141 continue subParser = subParsers.add_parser(entryPoint.name) - pmFactory = entryPoint.load() + pmFactory: ProjectManagerFactory = entryPoint.load() pmFactory.addArguments(subParser) subParser.set_defaults(getProjectManager=pmFactory.getProjectManager) @@ -49,7 +50,7 @@ def main(): host = args.host httpPort = args.http_port - manager = args.getProjectManager(args) + manager: ProjectManager = args.getProjectManager(args) server = FontraServer( host=host, httpPort=httpPort if httpPort is not None else findFreeTCPPort(DEFAULT_PORT), diff --git a/src/fontra/backends/__init__.py b/src/fontra/backends/__init__.py index c323c4744..010fafeba 100644 --- a/src/fontra/backends/__init__.py +++ b/src/fontra/backends/__init__.py @@ -1,19 +1,22 @@ import logging import pathlib from importlib.metadata import entry_points +from os import PathLike + +from ..core.protocols import ReadableFontBackend, WritableFontBackend logger = logging.getLogger(__name__) -def getFileSystemBackend(path): +def getFileSystemBackend(path: PathLike) -> ReadableFontBackend: return _getFileSystemBackend(path, False) -def newFileSystemBackend(path): +def newFileSystemBackend(path: PathLike) -> WritableFontBackend: return _getFileSystemBackend(path, True) -def _getFileSystemBackend(path, create): +def _getFileSystemBackend(path: PathLike, create: bool) -> WritableFontBackend: logVerb = "creating" if create else "loading" path = pathlib.Path(path) @@ -34,5 +37,10 @@ def _getFileSystemBackend(path, create): else: backend = backendClass.fromPath(path) + if create: + assert isinstance(backend, WritableFontBackend) + else: + assert isinstance(backend, ReadableFontBackend) + logger.info(f"done {logVerb} {path.name}") return backend diff --git a/src/fontra/backends/copy.py b/src/fontra/backends/copy.py index 92d39f74b..b0055fb4a 100644 --- a/src/fontra/backends/copy.py +++ b/src/fontra/backends/copy.py @@ -5,14 +5,20 @@ import shutil from contextlib import closing +from ..core.protocols import ReadableFontBackend, WritableFontBackend from . import getFileSystemBackend, newFileSystemBackend logger = logging.getLogger(__name__) async def copyFont( - sourceBackend, destBackend, *, glyphNames=None, numTasks=1, progressInterval=0 -): + sourceBackend: ReadableFontBackend, + destBackend: WritableFontBackend, + *, + glyphNames=None, + numTasks=1, + progressInterval=0, +) -> None: await destBackend.putGlobalAxes(await sourceBackend.getGlobalAxes()) await destBackend.putCustomData(await sourceBackend.getCustomData()) glyphMap = await sourceBackend.getGlyphMap() @@ -20,7 +26,7 @@ async def copyFont( glyphNamesToCopy = sorted( glyphNamesInFont if not glyphNames else set(glyphNames) & set(glyphMap) ) - glyphNamesCopied = set() + glyphNamesCopied: set[str] = set() tasks = [ asyncio.create_task( @@ -38,21 +44,25 @@ async def copyFont( done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION) for task in pending: task.cancel() - exceptions = [task.exception() for task in done if task.exception()] + exceptions: list[BaseException | None] = [ + task.exception() for task in done if task.exception() + ] if exceptions: if len(exceptions) > 1: logger.error(f"Multiple exceptions were raised: {exceptions}") - raise exceptions[0] + e = exceptions[0] + assert e is not None + raise e async def copyGlyphs( - sourceBackend, - destBackend, - glyphMap, - glyphNamesToCopy, - glyphNamesCopied, - progressInterval, -): + sourceBackend: ReadableFontBackend, + destBackend: WritableFontBackend, + glyphMap: dict[str, list[int]], + glyphNamesToCopy: list[str], + glyphNamesCopied: set[str], + progressInterval: int, +) -> None: while glyphNamesToCopy: if progressInterval and not (len(glyphNamesToCopy) % progressInterval): logger.info(f"{len(glyphNamesToCopy)} glyphs left to copy") @@ -73,14 +83,10 @@ async def copyGlyphs( } glyphNamesToCopy.extend(sorted(componentNames - glyphNamesCopied)) - error = await destBackend.putGlyph(glyphName, glyph, glyphMap[glyphName]) - if error: - # FIXME: putGlyph should always raise, and not return some error string - # This may be unique to the rcjk backend, though. - raise ValueError(error) + await destBackend.putGlyph(glyphName, glyph, glyphMap[glyphName]) -async def mainAsync(): +async def mainAsync() -> None: logging.basicConfig( format="%(asctime)s %(name)-17s %(levelname)-8s %(message)s", level=logging.INFO, @@ -118,7 +124,7 @@ async def mainAsync(): elif destPath.exists(): destPath.unlink() elif destPath.exists(): - raise argparse.ArgumentError("the destination file already exists") + raise argparse.ArgumentError(None, "the destination file already exists") sourceBackend = getFileSystemBackend(sourcePath) destBackend = newFileSystemBackend(destPath) diff --git a/src/fontra/backends/designspace.py b/src/fontra/backends/designspace.py index ad5c56741..cc44fa935 100644 --- a/src/fontra/backends/designspace.py +++ b/src/fontra/backends/designspace.py @@ -10,7 +10,9 @@ from dataclasses import asdict, dataclass from datetime import datetime from functools import cache, cached_property, singledispatch +from os import PathLike from types import SimpleNamespace +from typing import Any, AsyncGenerator, Callable import watchfiles from fontTools.designspaceLib import ( @@ -19,6 +21,7 @@ DiscreteAxisDescriptor, ) from fontTools.misc.transform import DecomposedTransform +from fontTools.pens.pointPen import AbstractPointPen from fontTools.pens.recordingPen import RecordingPointPen from fontTools.ufoLib import UFOReaderWriter from fontTools.ufoLib.glifLib import GlyphSet @@ -35,6 +38,7 @@ VariableGlyph, ) from ..core.path import PackedPathPointPen +from ..core.protocols import WritableFontBackend from .ufo_utils import extractGlyphNameAndUnicodes logger = logging.getLogger(__name__) @@ -60,11 +64,11 @@ class DesignspaceBackend: @classmethod - def fromPath(cls, path): + def fromPath(cls, path: PathLike) -> WritableFontBackend: return cls(DesignSpaceDocument.fromfile(path)) @classmethod - def createFromPath(cls, path): + def createFromPath(cls, path: PathLike) -> WritableFontBackend: path = pathlib.Path(path) ufoDir = path.parent @@ -174,10 +178,13 @@ def updateGlyphSetContents(self, glyphSet): for glyphName, fileName in glyphSet.contents.items(): glifFileNames[fileName] = glyphName - async def getGlyphMap(self): + async def getGlyphMap(self) -> dict[str, list[int]]: return dict(self.glyphMap) - async def getGlyph(self, glyphName): + async def putGlyphMap(self, value: dict[str, list[int]]) -> None: + pass + + async def getGlyph(self, glyphName: str) -> VariableGlyph | None: if glyphName not in self.glyphMap: return None @@ -278,7 +285,9 @@ def _unpackLocalDesignSpace(self, dsDict, defaultLayerName): ) return axes, sources - async def putGlyph(self, glyphName, glyph, unicodes): + async def putGlyph( + self, glyphName: str, glyph: VariableGlyph, unicodes: list[int] + ) -> None: assert isinstance(unicodes, list) assert all(isinstance(cp, int) for cp in unicodes) self.glyphMap[glyphName] = unicodes @@ -534,7 +543,7 @@ async def deleteGlyph(self, glyphName): del self.glyphMap[glyphName] self.savedGlyphModificationTimes[glyphName] = None - async def getGlobalAxes(self): + async def getGlobalAxes(self) -> list[GlobalAxis | GlobalDiscreteAxis]: return self.axes async def putGlobalAxes(self, axes): @@ -557,17 +566,27 @@ async def putGlobalAxes(self, axes): self.updateAxisInfo() self.loadUFOLayers() - async def getUnitsPerEm(self): + async def getUnitsPerEm(self) -> int: return self.defaultFontInfo.unitsPerEm - async def getCustomData(self): + async def putUnitsPerEm(self, value: int) -> None: + del self.defaultFontInfo + ufoPaths = sorted(set(self.ufoLayers.iterAttrs("path"))) + for ufoPath in ufoPaths: + reader = self.ufoManager.getReader(ufoPath) + info = UFOFontInfo() + reader.readInfo(info) + info.unitsPerEm = value + reader.writeInfo(info) + + async def getCustomData(self) -> dict[str, Any]: return deepcopy(self.dsDoc.lib) async def putCustomData(self, lib): self.dsDoc.lib = deepcopy(lib) self.dsDoc.write(self.dsDoc.path) - async def watchExternalChanges(self): + async def watchExternalChanges(self) -> AsyncGenerator[tuple[Any, Any], None]: ufoPaths = sorted(set(self.ufoLayers.iterAttrs("path"))) async for changes in watchfiles.awatch(*ufoPaths): changes = cleanupWatchFilesChanges(changes) @@ -910,7 +929,7 @@ def readGlyphOrCreate( glyphSet: GlyphSet, glyphName: str, unicodes: list[int], -): +) -> UFOGlyph: layerGlyph = UFOGlyph() layerGlyph.lib = {} if glyphName in glyphSet: @@ -925,7 +944,7 @@ def populateUFOLayerGlyph( layerGlyph: UFOGlyph, staticGlyph: StaticGlyph, forceVariableComponents: bool = False, -) -> None: +) -> Callable[[AbstractPointPen], None]: pen = RecordingPointPen() layerGlyph.width = staticGlyph.xAdvance layerGlyph.height = staticGlyph.yAdvance diff --git a/src/fontra/backends/fontra.py b/src/fontra/backends/fontra.py index 9fcff8ca5..1bb647b1f 100644 --- a/src/fontra/backends/fontra.py +++ b/src/fontra/backends/fontra.py @@ -6,9 +6,18 @@ import shutil from copy import deepcopy from dataclasses import dataclass, field -from typing import Callable - -from fontra.core.classes import Font, VariableGlyph, structure, unstructure +from os import PathLike +from typing import Any, Callable + +from fontra.core.classes import ( + Font, + GlobalAxis, + GlobalDiscreteAxis, + VariableGlyph, + structure, + unstructure, +) +from fontra.core.protocols import WritableFontBackend from .filenames import stringToFileName @@ -21,15 +30,15 @@ class FontraBackend: glyphsDirName = "glyphs" @classmethod - def fromPath(cls, path): + def fromPath(cls, path) -> WritableFontBackend: return cls(path=path) @classmethod - def createFromPath(cls, path): + def createFromPath(cls, path) -> WritableFontBackend: return cls(path=path, create=True) - def __init__(self, *, path=None, create=False): - self.path = pathlib.Path(path).resolve() if path is not None else None + def __init__(self, *, path: PathLike, create: bool = False): + self.path = pathlib.Path(path).resolve() if create: if self.path.is_dir(): shutil.rmtree(self.path) @@ -37,7 +46,7 @@ def __init__(self, *, path=None, create=False): self.path.unlink() self.path.mkdir() self.glyphsDir.mkdir(exist_ok=True) - self.glyphMap = {} + self.glyphMap: dict[str, list[int]] = {} if not create: self._readGlyphInfo() self._readFontData() @@ -64,21 +73,29 @@ def close(self): def flush(self): self._scheduler.flush() - async def getUnitsPerEm(self): + async def getUnitsPerEm(self) -> int: return self.fontData.unitsPerEm async def putUnitsPerEm(self, unitsPerEm): self.fontData.unitsPerEm = unitsPerEm self._scheduler.schedule(self._writeFontData) - async def getGlyphMap(self): + async def getGlyphMap(self) -> dict[str, list[int]]: return dict(self.glyphMap) - async def getGlyph(self, glyphName): - jsonSource = self.getGlyphData(glyphName) + async def putGlyphMap(self, value: dict[str, list[int]]) -> None: + pass + + async def getGlyph(self, glyphName: str) -> VariableGlyph | None: + try: + jsonSource = self.getGlyphData(glyphName) + except KeyError: + return None return deserializeGlyph(jsonSource, glyphName) - async def putGlyph(self, glyphName, glyph, codePoints): + async def putGlyph( + self, glyphName: str, glyph: VariableGlyph, codePoints: list[int] + ) -> None: jsonSource = serializeGlyph(glyph, glyphName) filePath = self.getGlyphFilePath(glyphName) filePath.write_text(jsonSource, encoding="utf=8") @@ -88,14 +105,14 @@ async def putGlyph(self, glyphName, glyph, codePoints): async def deleteGlyph(self, glyphName): self.glyphMap.pop(glyphName, None) - async def getGlobalAxes(self): + async def getGlobalAxes(self) -> list[GlobalAxis | GlobalDiscreteAxis]: return deepcopy(self.fontData.axes) async def putGlobalAxes(self, axes): self.fontData.axes = deepcopy(axes) self._scheduler.schedule(self._writeFontData) - async def getCustomData(self): + async def getCustomData(self) -> dict[str, Any]: return deepcopy(self.fontData.customData) async def putCustomData(self, customData): diff --git a/src/fontra/backends/opentype.py b/src/fontra/backends/opentype.py index 127a4a434..ea02b2b24 100644 --- a/src/fontra/backends/opentype.py +++ b/src/fontra/backends/opentype.py @@ -1,15 +1,29 @@ +from os import PathLike +from typing import Any + from fontTools.misc.psCharStrings import SimpleT2Decompiler from fontTools.pens.pointPen import GuessSmoothPointPen from fontTools.ttLib import TTFont -from ..core.classes import GlobalAxis, Layer, Source, StaticGlyph, VariableGlyph +from fontra.core.protocols import ReadableFontBackend + +from ..core.classes import ( + GlobalAxis, + GlobalDiscreteAxis, + Layer, + Source, + StaticGlyph, + VariableGlyph, +) from ..core.path import PackedPath, PackedPathPointPen class OTFBackend: @classmethod - def fromPath(cls, path): - self = cls() + def fromPath(cls, path: PathLike) -> ReadableFontBackend: + return cls(path=path) + + def __init__(self, *, path): self.path = path self.font = TTFont(path, lazy=True) self.globalAxes = unpackAxes(self.font) @@ -29,22 +43,21 @@ def fromPath(cls, path): self.glyphMap = glyphMap self.glyphSet = self.font.getGlyphSet() self.variationGlyphSets = {} - return self def close(self): self.font.close() - async def getGlyphMap(self): + async def getGlyphMap(self) -> dict[str, list[int]]: return self.glyphMap - async def getGlyph(self, glyphName): + async def getGlyph(self, glyphName: str) -> VariableGlyph | None: if glyphName not in self.glyphSet: return None defaultLayerName = "" glyph = VariableGlyph(name=glyphName) staticGlyph = buildStaticGlyph(self.glyphSet, glyphName) layers = {defaultLayerName: Layer(glyph=staticGlyph)} - defaultLocation = {axis.name: 0 for axis in self.globalAxes} + defaultLocation = {axis.name: 0.0 for axis in self.globalAxes} sources = [ Source( location=defaultLocation, @@ -95,13 +108,13 @@ def _getGlyphVariationLocations(self, glyphName): } return [dict(loc) for loc in sorted(locations)] - async def getGlobalAxes(self): + async def getGlobalAxes(self) -> list[GlobalAxis | GlobalDiscreteAxis]: return self.globalAxes - async def getUnitsPerEm(self): + async def getUnitsPerEm(self) -> int: return self.font["head"].unitsPerEm - async def getCustomData(self): + async def getCustomData(self) -> dict[str, Any]: return {} diff --git a/src/fontra/core/fonthandler.py b/src/fontra/core/fonthandler.py index 00b2d7ab3..87cc08043 100644 --- a/src/fontra/core/fonthandler.py +++ b/src/fontra/core/fonthandler.py @@ -6,7 +6,8 @@ from contextlib import asynccontextmanager from copy import deepcopy from dataclasses import dataclass -from typing import Any, Callable, Optional +from functools import cached_property +from typing import Any, AsyncGenerator, Awaitable, Callable, Optional from .changes import ( applyChange, @@ -18,8 +19,9 @@ patternIntersect, patternUnion, ) -from .classes import Font +from .classes import Font, VariableGlyph from .lrucache import LRUCache +from .protocols import ReadableFontBackend, WatchableFontBackend, WritableFontBackend logger = logging.getLogger(__name__) @@ -33,28 +35,14 @@ def remoteMethod(method): return method -backendAttrMapping = [ - ("axes", "GlobalAxes"), - ("glyphMap", "GlyphMap"), - ("customData", "CustomData"), - ("unitsPerEm", "UnitsPerEm"), -] - -backendGetterNames = {attr: "get" + baseName for attr, baseName in backendAttrMapping} -backendSetterNames = {attr: "set" + baseName for attr, baseName in backendAttrMapping} -backendDeleterNames = { - attr: "delete" + baseName for attr, baseName in backendAttrMapping -} - - @dataclass class FontHandler: - backend: Any # TODO: need Backend protocol + backend: ReadableFontBackend readOnly: bool = False - allConnectionsClosedCallback: Optional[Callable[[], None]] = None + allConnectionsClosedCallback: Optional[Callable[[], Awaitable[Any]]] = None def __post_init__(self): - if not hasattr(self.backend, "putGlyph"): + if self.writableBackend is None: self.readOnly = True self.connections = set() self.glyphUsedBy = {} @@ -63,19 +51,23 @@ def __post_init__(self): self.localData = LRUCache() self._dataScheduledForWriting = {} - async def startTasks(self): - if hasattr(self.backend, "watchExternalChanges"): + @cached_property + def writableBackend(self) -> WritableFontBackend | None: + return self.backend if isinstance(self.backend, WritableFontBackend) else None + + async def startTasks(self) -> None: + if isinstance(self.backend, WatchableFontBackend): self._watcherTask = scheduleTaskAndLogException( self.processExternalChanges() ) - self._processWritesError = None + self._processWritesError: Exception | None = None self._processWritesEvent = asyncio.Event() self._processWritesTask = scheduleTaskAndLogException(self.processWrites()) self._processWritesTask.add_done_callback(self._processWritesTaskDone) self._writingInProgressEvent = asyncio.Event() self._writingInProgressEvent.set() - async def close(self): + async def close(self) -> None: self.backend.close() if hasattr(self, "_watcherTask"): self._watcherTask.cancel() @@ -83,7 +75,8 @@ async def close(self): await self.finishWriting() # shield for cancel? self._processWritesTask.cancel() - async def processExternalChanges(self): + async def processExternalChanges(self) -> None: + assert isinstance(self.backend, WatchableFontBackend) async for change, reloadPattern in self.backend.watchExternalChanges(): try: if change is not None: @@ -95,16 +88,18 @@ async def processExternalChanges(self): logger.error("exception in external changes watcher: %r", e) traceback.print_exc() - def _processWritesTaskDone(self, task): + def _processWritesTaskDone(self, task) -> None: # Signal that the write-"thread" is no longer running self._dataScheduledForWriting = None - async def finishWriting(self): + async def finishWriting(self) -> None: if self._processWritesError is not None: raise self._processWritesError await self._writingInProgressEvent.wait() + if self._processWritesError is not None: + raise self._processWritesError - async def processWrites(self): + async def processWrites(self) -> None: while True: await self._processWritesEvent.wait() try: @@ -116,7 +111,7 @@ async def processWrites(self): self._processWritesEvent.clear() self._writingInProgressEvent.set() - async def _processWritesOneCycle(self): + async def _processWritesOneCycle(self) -> None: while self._dataScheduledForWriting: writeKey, (writeFunc, connection) = popFirstItem( self._dataScheduledForWriting @@ -159,7 +154,7 @@ async def _processWritesOneCycle(self): await asyncio.sleep(0) @asynccontextmanager - async def useConnection(self, connection): + async def useConnection(self, connection) -> AsyncGenerator[None, None]: self.connections.add(connection) try: yield @@ -169,32 +164,61 @@ async def useConnection(self, connection): await self.allConnectionsClosedCallback() @remoteMethod - async def getGlyph(self, glyphName, *, connection=None): + async def getGlyph( + self, glyphName: str, *, connection=None + ) -> VariableGlyph | None: glyph = self.localData.get(("glyphs", glyphName)) if glyph is None: glyph = await self._getGlyph(glyphName) self.localData[("glyphs", glyphName)] = glyph return glyph - def _getGlyph(self, glyphName): + def _getGlyph(self, glyphName) -> Awaitable[VariableGlyph | None]: return asyncio.create_task(self._getGlyphFromBackend(glyphName)) - async def _getGlyphFromBackend(self, glyphName): + async def _getGlyphFromBackend(self, glyphName) -> VariableGlyph | None: glyph = await self.backend.getGlyph(glyphName) if glyph is not None: self.updateGlyphDependencies(glyphName, glyph) return glyph - async def getData(self, key): + async def getData(self, key: str) -> Any: data = self.localData.get(key) if data is None: data = await self._getData(key) self.localData[key] = data return data - async def _getData(self, key): - getterName = backendGetterNames[key] - return await getattr(self.backend, getterName)() + async def _getData(self, key: str) -> Any: + value: Any + + match key: + case "axes": + value = await self.backend.getGlobalAxes() + case "glyphMap": + value = await self.backend.getGlyphMap() + case "customData": + value = await self.backend.getCustomData() + case "unitsPerEm": + value = await self.backend.getUnitsPerEm() + case _: + raise KeyError(key) + + return value + + async def _putData(self, key: str, value: Any) -> None: + assert self.writableBackend is not None + match key: + case "axes": + await self.writableBackend.putGlobalAxes(value) + case "glyphMap": + await self.writableBackend.putGlyphMap(value) + case "customData": + await self.writableBackend.putCustomData(value) + case "unitsPerEm": + await self.writableBackend.putUnitsPerEm(value) + case _: + raise KeyError(key) @remoteMethod async def getGlyphMap(self, *, connection): @@ -335,7 +359,8 @@ async def _prepareRootObject(self, change): async def _updateLocalData( self, rootKeys, rootObject, sourceConnection, writeToBackEnd - ): + ) -> None: + writeFunc: Callable[[], Awaitable] # inferencing with partial() goes wrong for rootKey in rootKeys + sorted(rootObject._assignedAttributeNames): if rootKey == "glyphs": glyphSet = rootObject.glyphs @@ -346,8 +371,9 @@ async def _updateLocalData( self.localData[writeKey] = glyphSet[glyphName] if not writeToBackEnd: continue + assert self.writableBackend is not None writeFunc = functools.partial( - self.backend.putGlyph, + self.writableBackend.putGlyph, glyphName, deepcopy(glyphSet[glyphName]), glyphMap.get(glyphName, []), @@ -358,18 +384,20 @@ async def _updateLocalData( _ = self.localData.pop(writeKey, None) if not writeToBackEnd: continue - writeFunc = functools.partial(self.backend.deleteGlyph, glyphName) + assert self.writableBackend is not None + writeFunc = functools.partial( + self.writableBackend.deleteGlyph, glyphName + ) await self.scheduleDataWrite(writeKey, writeFunc, sourceConnection) else: if rootKey in rootObject._assignedAttributeNames: self.localData[rootKey] = getattr(rootObject, rootKey) if not writeToBackEnd: continue - method = getattr(self.backend, backendSetterNames[rootKey], None) - if method is None: - logger.info(f"No backend write method found for {rootKey}") - continue - writeFunc = functools.partial(method, deepcopy(rootObject[rootKey])) + assert self.writableBackend is not None + writeFunc = functools.partial( + self._putData, rootKey, deepcopy(self.localData[rootKey]) + ) await self.scheduleDataWrite(rootKey, writeFunc, sourceConnection) async def scheduleDataWrite(self, writeKey, writeFunc, connection): diff --git a/src/fontra/core/protocols.py b/src/fontra/core/protocols.py new file mode 100644 index 000000000..6af196c0a --- /dev/null +++ b/src/fontra/core/protocols.py @@ -0,0 +1,97 @@ +from __future__ import annotations + +import argparse +from types import SimpleNamespace +from typing import Any, AsyncGenerator, Callable, Protocol, runtime_checkable + +from aiohttp import web + +from .classes import GlobalAxis, GlobalDiscreteAxis, VariableGlyph + + +@runtime_checkable +class ReadableFontBackend(Protocol): + def close(self) -> None: + ... + + async def getGlyph(self, glyphName: str) -> VariableGlyph | None: + ... + + async def getGlobalAxes(self) -> list[GlobalAxis | GlobalDiscreteAxis]: + ... + + async def getGlyphMap(self) -> dict[str, list[int]]: + ... + + async def getCustomData(self) -> dict[str, Any]: + ... + + async def getUnitsPerEm(self) -> int: + ... + + +@runtime_checkable +class WritableFontBackend(ReadableFontBackend, Protocol): + async def putGlyph( + self, glyphName: str, glyph: VariableGlyph, codePoints: list[int] + ) -> None: + ... + + async def deleteGlyph(self, glyphName: str) -> None: + ... + + async def putGlobalAxes(self, value: list[GlobalAxis | GlobalDiscreteAxis]) -> None: + ... + + async def putGlyphMap(self, value: dict[str, list[int]]) -> None: + ... + + async def putCustomData(self, value: dict[str, Any]) -> None: + ... + + async def putUnitsPerEm(self, value: int) -> None: + ... + + +@runtime_checkable +class WatchableFontBackend(Protocol): + async def watchExternalChanges(self) -> AsyncGenerator[tuple[Any, Any], None]: + if False: + yield None, None + + +@runtime_checkable +class ProjectManagerFactory(Protocol): + @staticmethod + def addArguments(parser: argparse.ArgumentParser) -> None: + ... + + @staticmethod + def getProjectManager(arguments: SimpleNamespace) -> ProjectManager: + ... + + +@runtime_checkable +class ProjectManager(Protocol): + async def close(self) -> None: + ... + + async def authorize(self, request: web.Request) -> str | None: + ... + + async def projectAvailable(self, path: str, token: str) -> bool: + ... + + async def getRemoteSubject(self, path: str, token: str) -> Any: + ... + + async def getProjectList(self, token: str) -> list[str]: + ... + + async def projectPageHandler( + self, request: web.Request, filterContent: Callable | None = None + ) -> web.Response: + ... + + def setupWebRoutes(self, server) -> None: + ... diff --git a/src/fontra/core/remote.py b/src/fontra/core/remote.py index 65982eb7e..9754ec480 100644 --- a/src/fontra/core/remote.py +++ b/src/fontra/core/remote.py @@ -1,8 +1,11 @@ +from __future__ import annotations + import asyncio import logging import traceback +from typing import Any, AsyncGenerator -from aiohttp import WSMsgType +from aiohttp import WSMsgType, web from .classes import unstructure @@ -14,23 +17,29 @@ class RemoteObjectConnectionException(Exception): class RemoteObjectConnection: - def __init__(self, websocket, path, subject, verboseErrors): + def __init__( + self, + websocket: web.WebSocketResponse, + path: str, + subject: Any, + verboseErrors: bool, + ): self.websocket = websocket self.path = path self.subject = subject self.verboseErrors = verboseErrors self.clientUUID = None - self.callReturnFutures = {} + self.callReturnFutures: dict[str, asyncio.Future] = {} self.getNextServerCallID = _genNextServerCallID() @property - def proxy(self): + def proxy(self) -> RemoteClientProxy: return RemoteClientProxy(self) - async def handleConnection(self): + async def handleConnection(self) -> None: message = await anext(aiter(self.websocket)) - message = message.json() - self.clientUUID = message.get("client-uuid") + messageObj = message.json() + self.clientUUID = messageObj.get("client-uuid") if self.clientUUID is None: raise RemoteObjectConnectionException("unrecognized message") try: @@ -41,8 +50,8 @@ async def handleConnection(self): traceback.print_exc() await self.websocket.close() - async def _handleConnection(self): - tasks = [] + async def _handleConnection(self) -> None: + tasks: list[asyncio.Task] = [] try: async for task in self._iterCallTasks(): tasks = [task for task in tasks if not task.done()] @@ -55,26 +64,26 @@ async def _handleConnection(self): if not task.done(): task.cancel() - async def _iterCallTasks(self): + async def _iterCallTasks(self) -> AsyncGenerator[asyncio.Task, None]: async for message in self.websocket: if message.type == WSMsgType.ERROR: # We need to explicitly check for an error, or else # message.json() will fail with a TypeError. # https://github.com/aio-libs/aiohttp/issues/7313#issuecomment-1586150267 raise message.data - message = message.json() + messageObj = message.json() - if message.get("connection") == "close": + if messageObj.get("connection") == "close": logger.info("client requested connection close") break - if "client-call-id" in message: + if "client-call-id" in messageObj: # this is an incoming client -> server call - yield asyncio.create_task(self._performCall(message, self.subject)) - elif "server-call-id" in message: + yield asyncio.create_task(self._performCall(messageObj, self.subject)) + elif "server-call-id" in messageObj: # this is a response to a server -> client call - fut = self.callReturnFutures[message["server-call-id"]] - returnValue = message.get("return-value") - error = message.get("error") + fut = self.callReturnFutures[messageObj["server-call-id"]] + returnValue = messageObj.get("return-value") + error = messageObj.get("error") if error is None: fut.set_result(returnValue) else: @@ -83,14 +92,16 @@ async def _iterCallTasks(self): logger.info("invalid message, closing connection") break - async def _performCall(self, message, subject): + async def _performCall(self, message: dict, subject: Any) -> None: clientCallID = "unknown-client-call-id" try: clientCallID = message["client-call-id"] methodName = message["method-name"] arguments = message.get("arguments", []) methodHandler = getattr(subject, methodName, None) - if getattr(methodHandler, "fontraRemoteMethod", False): + if methodHandler is not None and getattr( + methodHandler, "fontraRemoteMethod", False + ): returnValue = await methodHandler(*arguments, connection=self) returnValue = unstructure(returnValue) response = {"client-call-id": clientCallID, "return-value": returnValue} diff --git a/src/fontra/core/server.py b/src/fontra/core/server.py index fcf15fcde..011670712 100644 --- a/src/fontra/core/server.py +++ b/src/fontra/core/server.py @@ -12,12 +12,14 @@ from functools import partial from http.cookies import SimpleCookie from importlib import resources +from importlib.abc import Traversable from importlib.metadata import entry_points from typing import Any, Optional from urllib.parse import quote from aiohttp import WSCloseCode, web +from .protocols import ProjectManager from .remote import RemoteObjectConnection, RemoteObjectConnectionException from .serverutils import apiFunctions @@ -41,7 +43,7 @@ class FontraServer: host: str httpPort: int - projectManager: Any + projectManager: ProjectManager launchWebBrowser: bool = False versionToken: Optional[str] = None cookieMaxAge: int = 7 * 24 * 60 * 60 @@ -49,14 +51,13 @@ class FontraServer: ["css", "html", "ico", "js", "json", "svg", "woff2"] ) - def setup(self): + def setup(self) -> None: self.startupTime = datetime.now(timezone.utc).replace(microsecond=0) self.httpApp = web.Application() self.viewEntryPoints = { ep.name: ep.value for ep in entry_points(group="fontra.views") } - if hasattr(self.projectManager, "setupWebRoutes"): - self.projectManager.setupWebRoutes(self) + self.projectManager.setupWebRoutes(self) routes = [] routes.append(web.get("/", self.rootDocumentHandler)) routes.append(web.get("/websocket/{path:.*}", self.websocketHandler)) @@ -91,9 +92,9 @@ def setup(self): self.httpApp.on_startup.append(self.launchWebBrowserCallback) self.httpApp.on_shutdown.append(self.closeActiveWebsockets) self.httpApp.on_shutdown.append(self.closeProjectManager) - self._activeWebsockets = set() + self._activeWebsockets: set = set() - def run(self, showLaunchBanner=True): + def run(self, showLaunchBanner: bool = True) -> None: host = self.host httpPort = self.httpPort if showLaunchBanner: @@ -109,7 +110,7 @@ def run(self, showLaunchBanner=True): print("+---------------------------------------------------+") web.run_app(self.httpApp, host=host, port=httpPort) - async def launchWebBrowserCallback(self, httpApp): + async def launchWebBrowserCallback(self, httpApp: web.Application) -> None: import asyncio import webbrowser @@ -123,24 +124,24 @@ async def _launcher(): asyncio.create_task(_launcher()) - async def closeActiveWebsockets(self, httpApp): + async def closeActiveWebsockets(self, httpApp: web.Application) -> None: for websocket in list(self._activeWebsockets): await websocket.close( code=WSCloseCode.GOING_AWAY, message="Server shutdown" ) - async def closeProjectManager(self, httpApp): + async def closeProjectManager(self, httpApp: web.Application) -> None: await self.projectManager.close() - async def websocketHandler(self, request): + async def websocketHandler(self, request: web.Request) -> web.WebSocketResponse: path = "/" + request.match_info["path"] remote = request.headers.get("X-FORWARDED-FOR", request.remote) logger.info(f"incoming connection from {remote} for {path!r}") cookies = SimpleCookie() cookies.load(request.headers.get("Cookie", "")) - cookies = {k: v.value for k, v in cookies.items()} - token = cookies.get("fontra-authorization-token") + cookieValues = {k: v.value for k, v in cookies.items()} + token = cookieValues.get("fontra-authorization-token") websocket = web.WebSocketResponse(heartbeat=55, max_msg_size=0x2000000) await websocket.prepare(request) @@ -163,13 +164,13 @@ async def websocketHandler(self, request): return websocket - async def getSubject(self, websocket, path, token): + async def getSubject(self, websocket, path, token) -> Any: subject = await self.projectManager.getRemoteSubject(path, token) if subject is None: raise RemoteObjectConnectionException("unauthorized") return subject - async def projectListHandler(self, request): + async def projectListHandler(self, request) -> web.Response: authToken = await self.projectManager.authorize(request) if not authToken: raise web.HTTPUnauthorized() @@ -178,7 +179,7 @@ async def projectListHandler(self, request): text=json.dumps(projectList), content_type="application/json" ) - async def serverInfoHandler(self, request): + async def serverInfoHandler(self, request) -> web.Response: from .. import __version__ as fontraVersion authToken = await self.projectManager.authorize(request) @@ -204,7 +205,7 @@ async def serverInfoHandler(self, request): text=json.dumps(serverInfo), content_type="application/json" ) - async def webAPIHandler(self, request): + async def webAPIHandler(self, request) -> web.Response: functionName = request.match_info["function"] function = apiFunctions.get(functionName) if function is None: @@ -219,7 +220,7 @@ async def webAPIHandler(self, request): result = {"returnValue": returnValue} return web.Response(text=json.dumps(result), content_type="application/json") - async def staticContentHandler(self, packageName, request): + async def staticContentHandler(self, packageName, request) -> web.Response: ifModSince = request.if_modified_since if ifModSince is not None and ifModSince >= self.startupTime: raise web.HTTPNotModified() @@ -239,22 +240,24 @@ async def staticContentHandler(self, packageName, request): ext = resourceName.rsplit(".", 1)[-1].lower() if ext not in self.allowedFileExtensions: raise web.HTTPNotFound() - contentType = mimeTypes.get(resourceName.rsplit(".")[-1]) + contentType = mimeTypes.get(resourceName.rsplit(".")[-1], "") data = self._addVersionTokenToReferences(data, contentType) response = web.Response(body=data, content_type=contentType) response.last_modified = self.startupTime return response - async def notFoundHandler(self, request): + async def notFoundHandler(self, request) -> web.Response: raise web.HTTPNotFound() - async def rootDocumentHandler(self, request): + async def rootDocumentHandler(self, request) -> web.Response: response = await self.projectManager.projectPageHandler( request, self._addVersionTokenToReferences ) return response - async def viewPathHandler(self, viewName, request): + async def viewPathHandler( + self, viewName: str, request: web.Request + ) -> web.Response: authToken = await self.projectManager.authorize(request) if not authToken: qs = quote(request.path_qs, safe="") @@ -267,15 +270,15 @@ async def viewPathHandler(self, viewName, request): try: html = getResourcePath( self.viewEntryPoints[viewName], f"{viewName}.html" - ).read_text() + ).read_bytes() except (FileNotFoundError, ModuleNotFoundError): raise web.HTTPNotFound() html = self._addVersionTokenToReferences(html, "text/html") - return web.Response(text=html, content_type="text/html") + return web.Response(body=html, content_type="text/html") - def _addVersionTokenToReferences(self, data, contentType): + def _addVersionTokenToReferences(self, data: bytes, contentType: str) -> bytes: if self.versionToken is None: return data jsAllowedFileExtensions = ["css", "js", "svg"] @@ -292,23 +295,25 @@ def _addVersionTokenToReferences(self, data, contentType): return data -def addVersionTokenToReferences(data, versionToken, extensions): - pattern = rf"""((['"])[./][./A-Za-z-]+)(\.({"|".join(extensions)})\2)""" - repl = rf"\1.{versionToken}\3" - if isinstance(data, bytes): - data = re.sub(pattern, repl, data.decode("utf-8")).encode("utf-8") - else: - data = re.sub(pattern, repl, data) - return data +def addVersionTokenToReferences(data: bytes, versionToken, extensions) -> bytes: + assert isinstance(data, bytes) + pattern = rf"""((['"])[./][./A-Za-z-]+)(\.({"|".join(extensions)})\2)""".encode( + "utf-8" + ) + repl = rf"\1.{versionToken}\3".encode("utf-8") + return re.sub(pattern, repl, data) -def getResourcePath(modulePath, resourceName): +def getResourcePath(modulePath: str, resourceName: str) -> Traversable: moduleParts = modulePath.split(".") moduleRoot = resources.files(moduleParts[0]) - return moduleRoot.joinpath(*moduleParts[1:], resourceName) + resourcePath = moduleRoot + for pathItem in [*moduleParts[1:], resourceName]: + resourcePath = resourcePath.joinpath(pathItem) + return resourcePath -def splitVersionToken(fileName): +def splitVersionToken(fileName: str) -> tuple[str, str | None]: parts = fileName.rsplit(".", 2) if len(parts) == 3: fileName, versionToken, ext = parts @@ -316,7 +321,7 @@ def splitVersionToken(fileName): return fileName, None -def findFreeTCPPort(startPort=8000): +def findFreeTCPPort(startPort: int = 8000) -> int: port = startPort while True: tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM) diff --git a/src/fontra/filesystem/projectmanager.py b/src/fontra/filesystem/projectmanager.py index ee8d28827..b8d6412ad 100644 --- a/src/fontra/filesystem/projectmanager.py +++ b/src/fontra/filesystem/projectmanager.py @@ -3,11 +3,15 @@ import pathlib from importlib import resources from importlib.metadata import entry_points +from os import PathLike +from types import SimpleNamespace +from typing import Callable from aiohttp import web from ..backends import getFileSystemBackend from ..core.fonthandler import FontHandler +from ..core.protocols import ProjectManager logger = logging.getLogger(__name__) @@ -19,7 +23,7 @@ class FileSystemProjectManagerFactory: @staticmethod - def addArguments(parser): + def addArguments(parser: argparse.ArgumentParser) -> None: parser.add_argument( "path", type=existingFolderOrFontFile, @@ -32,7 +36,7 @@ def addArguments(parser): parser.add_argument("--read-only", action="store_true") @staticmethod - def getProjectManager(arguments): + def getProjectManager(arguments: SimpleNamespace) -> ProjectManager: return FileSystemProjectManager( rootPath=arguments.path, maxFolderDepth=arguments.max_folder_depth, @@ -51,7 +55,12 @@ def existingFolderOrFontFile(path): class FileSystemProjectManager: - def __init__(self, rootPath, maxFolderDepth=3, readOnly=False): + def __init__( + self, + rootPath: pathlib.Path | None, + maxFolderDepth: int = 3, + readOnly: bool = False, + ): self.rootPath = rootPath self.singleFilePath = None self.maxFolderDepth = maxFolderDepth @@ -59,26 +68,30 @@ def __init__(self, rootPath, maxFolderDepth=3, readOnly=False): if self.rootPath is not None and self.rootPath.suffix.lower() in fileExtensions: self.singleFilePath = self.rootPath self.rootPath = self.rootPath.parent - self.fontHandlers = {} + self.fontHandlers: dict[str, FontHandler] = {} - async def close(self): + async def close(self) -> None: for fontHandler in self.fontHandlers.values(): await fontHandler.close() - async def authorize(self, request): + async def authorize(self, request: web.Request) -> str: return "yes" # arbitrary non-false string token - async def projectPageHandler(self, request, filterContent=None): + async def projectPageHandler( + self, + request: web.Request, + filterContent: Callable[[bytes, str], bytes] | None = None, + ) -> web.Response: htmlPath = resources.files("fontra") / "filesystem" / "landing.html" - html = htmlPath.read_text() + html = htmlPath.read_bytes() if filterContent is not None: html = filterContent(html, "text/html") - return web.Response(text=html, content_type="text/html") + return web.Response(body=html, content_type="text/html") - async def projectAvailable(self, path, token): + async def projectAvailable(self, path: str, token: str) -> bool: return bool(self._getProjectPath(path)) - async def getRemoteSubject(self, path, token): + async def getRemoteSubject(self, path: str, token: str) -> FontHandler: assert path[0] == "/" path = path[1:] fontHandler = self.fontHandlers.get(path) @@ -103,7 +116,7 @@ async def closeFontHandler(): self.fontHandlers[path] = fontHandler return fontHandler - def _getProjectPath(self, path): + def _getProjectPath(self, path: str) -> PathLike | None: if self.rootPath is None: projectPath = pathlib.Path(path) if not projectPath.is_absolute(): @@ -115,7 +128,7 @@ def _getProjectPath(self, path): return projectPath return None - async def getProjectList(self, token): + async def getProjectList(self, token: str) -> list[str]: if self.rootPath is None: return [] projectPaths = [] @@ -132,6 +145,9 @@ async def getProjectList(self, token): projectPaths.append("/".join(projectItems[len(rootItems) :])) return projectPaths + def setupWebRoutes(self, server): + pass + def _iterFolder(folderPath, extensions, maxDepth=3): if maxDepth is not None and maxDepth <= 0: diff --git a/test-py/test_add_version_token.py b/test-py/test_add_version_token.py index 6a5534cec..45db40637 100644 --- a/test-py/test_add_version_token.py +++ b/test-py/test_add_version_token.py @@ -52,5 +52,7 @@ ], ) def test_addVersionTokenToReferences(inputData, expectedData): - data = addVersionTokenToReferences(inputData, VERSION_TOKEN, EXTENSIONS) - assert expectedData == data + data = addVersionTokenToReferences( + inputData.encode("utf-8"), VERSION_TOKEN, EXTENSIONS + ) + assert expectedData == data.decode("utf-8") diff --git a/test-py/test_fonthandler.py b/test-py/test_fonthandler.py index cfc17eb22..c69a496e3 100644 --- a/test-py/test_fonthandler.py +++ b/test-py/test_fonthandler.py @@ -182,6 +182,7 @@ async def test_fontHandler_getData(testFontHandler): async def test_fontHandler_setData(testFontHandler, caplog): caplog.set_level(logging.INFO) async with asyncClosing(testFontHandler): + await testFontHandler.startTasks() glyphMap = await testFontHandler.getData("glyphMap") assert [65, 97] == glyphMap["A"] change = { @@ -200,13 +201,14 @@ async def test_fontHandler_setData(testFontHandler, caplog): glyphMap = await testFontHandler.getData("glyphMap") assert [97] == glyphMap["A"] - assert "No backend write method found for glyphMap" == caplog.records[0].message + assert "write glyphMap to backend" == caplog.records[0].message @pytest.mark.asyncio async def test_fontHandler_setData_unitsPerEm(testFontHandler, caplog): caplog.set_level(logging.INFO) async with asyncClosing(testFontHandler): + await testFontHandler.startTasks() unitsPerEm = await testFontHandler.getData("unitsPerEm") assert 1000 == unitsPerEm change = { @@ -224,7 +226,8 @@ async def test_fontHandler_setData_unitsPerEm(testFontHandler, caplog): unitsPerEm = await testFontHandler.getData("unitsPerEm") assert 2000 == unitsPerEm - assert "No backend write method found for unitsPerEm" == caplog.records[0].message + assert 2000 == await testFontHandler.backend.getUnitsPerEm() + assert "write unitsPerEm to backend" == caplog.records[0].message @pytest.mark.asyncio diff --git a/test-py/test_protocols.py b/test-py/test_protocols.py new file mode 100644 index 000000000..657d2270d --- /dev/null +++ b/test-py/test_protocols.py @@ -0,0 +1,67 @@ +import pathlib +from os import PathLike + +from fontra.backends.designspace import DesignspaceBackend, UFOBackend +from fontra.backends.fontra import FontraBackend +from fontra.backends.opentype import OTFBackend +from fontra.core.protocols import ( + ProjectManagerFactory, + ReadableFontBackend, + WritableFontBackend, +) +from fontra.filesystem.projectmanager import FileSystemProjectManagerFactory + +repoRoot = pathlib.Path(__file__).resolve().parent.parent + + +def test_designspace_read() -> None: + backend: ReadableFontBackend = DesignspaceBackend.fromPath( + repoRoot / "test-py" / "data" / "mutatorsans" / "MutatorSans.designspace" + ) + assert isinstance(backend, ReadableFontBackend) + + +def test_designspace_write(tmpdir: PathLike) -> None: + tmpdir = pathlib.Path(tmpdir) + backend: WritableFontBackend = DesignspaceBackend.createFromPath( + tmpdir / "Test.designspace" + ) + assert isinstance(backend, WritableFontBackend) + + +def test_ufo_read() -> None: + backend: ReadableFontBackend = UFOBackend.fromPath( + repoRoot / "test-py" / "data" / "mutatorsans" / "MutatorSansLightCondensed.ufo" + ) + assert isinstance(backend, ReadableFontBackend) + + +def test_ufo_write(tmpdir: PathLike) -> None: + tmpdir = pathlib.Path(tmpdir) + backend: WritableFontBackend = UFOBackend.createFromPath(tmpdir / "Test.ufo") + assert isinstance(backend, WritableFontBackend) + + +def test_fontra_read() -> None: + backend: ReadableFontBackend = FontraBackend.fromPath( + repoRoot / "test-common" / "fonts" / "MutatorSans.fontra" + ) + assert isinstance(backend, ReadableFontBackend) + + +def test_fontra_write(tmpdir: PathLike) -> None: + tmpdir = pathlib.Path(tmpdir) + backend: WritableFontBackend = FontraBackend.createFromPath(tmpdir / "Test.fontra") + assert isinstance(backend, WritableFontBackend) + + +def test_opentype_read() -> None: + backend: ReadableFontBackend = OTFBackend.fromPath( + repoRoot / "test-py" / "data" / "mutatorsans" / "MutatorSans.ttf" + ) + assert isinstance(backend, ReadableFontBackend) + + +def test_FileSystemProjectManagerFactory() -> None: + factory: ProjectManagerFactory = FileSystemProjectManagerFactory() + assert isinstance(factory, ProjectManagerFactory)