diff --git a/docs/api/index.rst b/docs/api/index.rst index 8b9120d6..29b1c725 100644 --- a/docs/api/index.rst +++ b/docs/api/index.rst @@ -22,3 +22,4 @@ The API reference gives an overview of `TopoNetX`, which consists of several mod algorithms generators transform + readwrite diff --git a/docs/api/readwrite.rst b/docs/api/readwrite.rst new file mode 100644 index 00000000..1d40db93 --- /dev/null +++ b/docs/api/readwrite.rst @@ -0,0 +1,9 @@ +********* +Read-Write +********* + +.. automodule:: toponetx.readwrite.atomlist + :members: + +.. automodule:: toponetx.readwrite.serialization + :members: diff --git a/test/readwrite/test_atomlist.py b/test/readwrite/test_atomlist.py new file mode 100644 index 00000000..c8940df0 --- /dev/null +++ b/test/readwrite/test_atomlist.py @@ -0,0 +1,140 @@ +"""Tests for the atomlist read/write functions.""" +import pytest + +from toponetx.classes import CellComplex, PathComplex, SimplicialComplex +from toponetx.readwrite.atomlist import ( + generate_atomlist, + load_from_atomlist, + parse_atomlist, + write_atomlist, +) + + +class TestGenerateAtomList: + """Test the `generate_atomlist` function.""" + + def test_generate_atomlist_simplicial(self): + """Test generate_atomlist for simplicial complexes.""" + domain = SimplicialComplex() + domain.add_simplex((1,), weight=1.0) + domain.add_simplex((1, 2, 3), weight=4.0) + + atomlist = set(generate_atomlist(domain)) + assert atomlist == { + "1 {'weight': 1.0}", + "1 2 3 {'weight': 4.0}", + } + + def test_generate_atomlist_cell(self): + """Test generate_atomlist for cell complexes.""" + domain = CellComplex() + domain.add_node(1, weight=1.0) + domain.add_node(4) + domain.add_edge(2, 3, weight=2.0) + domain.add_edge(2, 5) + domain.add_cell((1, 2, 3), rank=2, weight=4.0) + domain.add_cell((6, 7), rank=2) + + atomlist = set(generate_atomlist(domain)) + assert atomlist == { + "1 {'weight': 1.0}", + "4", + "2 3 {'weight': 2.0}", + "2 5", + "6 7 {'rank': 2}", + "1 2 3 {'weight': 4.0}", + } + + def test_generate_atomlist_error(self): + """Test generate_atomlist for erroneous inputs.""" + with pytest.raises(TypeError): + list(generate_atomlist(PathComplex())) + + +class TestAtomListFileManagment: + """Test the `write_atomlist` and `load_from_atomlist` functions.""" + + def test_atomlist_simplicial(self): + """Test that a simplicial complex can be written to and read from the filesystem as an atomlist.""" + SC = SimplicialComplex([(1, 2, 3), (2, 3, 4)]) + + write_atomlist(SC, "test.atomlist") + SC_loaded = load_from_atomlist("test.atomlist", "simplicial") + assert isinstance(SC_loaded, SimplicialComplex) + assert SC_loaded.shape == (4, 5, 2) + + def test_atomlist_cell(self): + """Test that a cell complex can be written to and read from the filesystem as an atomlist.""" + CC = CellComplex() + CC.add_cell((1, 2, 3), rank=2) + CC.add_cell((2, 3, 4), rank=2) + + write_atomlist(CC, "test.atomlist") + CC_loaded = load_from_atomlist("test.atomlist", "cell") + assert isinstance(CC_loaded, CellComplex) + assert CC_loaded.shape == (4, 5, 2) + + def test_load_from_atomlist_error(self): + """Test that an error is raised when trying to read an atomlist with a wrong complex type.""" + SC = SimplicialComplex([(1, 2, 3), (2, 3, 4)]) + write_atomlist(SC, "test.atomlist") + with pytest.raises(ValueError): + load_from_atomlist("test.atomlist", "path") + + def test_write_atomlist_error(self): + """Test that an error is raised when trying to write an atomlist with an unsupported complex type.""" + PC = PathComplex([(1, 2)]) + with pytest.raises(TypeError): + write_atomlist(PC, "test.atomlist") + + +class TestParseAtomList: + """Test the `parse_atomlist` function.""" + + def test_parse_atomlist_simplicial(self): + """Test parse_atomlist for simplicial complexes.""" + # empty atomlist + SC = parse_atomlist([], "simplicial") + assert isinstance(SC, SimplicialComplex) + assert SC.shape == () + + # atomlist with one simplex + SC = parse_atomlist(["1 2 3 {'weight': 4.0}"], "simplicial") + assert isinstance(SC, SimplicialComplex) + assert SC.shape == (3, 3, 1) + assert SC[("1", "2", "3")]["weight"] == 4.0 + + # nodetype + SC = parse_atomlist(["1 2 3 {'weight': 4.0}"], "simplicial", nodetype=int) + assert SC[(1, 2, 3)]["weight"] == 4.0 + + def test_parse_atomlist_cell(self): + """Test parse_atomlist for cell complexes.""" + # empty atomlist + CC = parse_atomlist([], "cell") + assert isinstance(CC, CellComplex) + assert CC.shape == (0, 0, 0) + + # atomlist with one cell + CC = parse_atomlist(["1 2 3 {'weight': 4.0}"], "cell") + assert isinstance(CC, CellComplex) + assert CC.shape == (3, 3, 1) + assert CC.cells[("1", "2", "3")]["weight"] == 4.0 + + # one node + CC = parse_atomlist(["1 {'weight': 4.0}"], "cell") + assert CC.shape == (1, 0, 0) + assert CC.nodes["1"]["weight"] == 4.0 + + # 2-element cell with rank 2 + CC = parse_atomlist(["1 2 {'rank': 2}"], "cell") + assert CC.shape == (2, 1, 1) + + # nodetype + CC = parse_atomlist(["1 2 3 {'weight': 4.0}"], "cell", nodetype=int) + assert CC.cells[(1, 2, 3)]["weight"] == 4.0 + + def test_parse_atomlist_error(self): + """Test parse_atomlist for erroneous inputs.""" + with pytest.raises(ValueError): + parse_atomlist([], "path") diff --git a/toponetx/readwrite/__init__.py b/toponetx/readwrite/__init__.py new file mode 100644 index 00000000..c3b1e3d1 --- /dev/null +++ b/toponetx/readwrite/__init__.py @@ -0,0 +1,3 @@ +"""Module for reading and writing complexes from and to files.""" +from .atomlist import * +from .serialization import * diff --git a/toponetx/readwrite/atomlist.py b/toponetx/readwrite/atomlist.py new file mode 100644 index 00000000..3243ce9e --- /dev/null +++ b/toponetx/readwrite/atomlist.py @@ -0,0 +1,306 @@ +"""Read and write complexes as a list of their atoms.""" +from collections.abc import Hashable, Iterable +from itertools import combinations +from typing import Generator, Literal, overload + +import networkx as nx + +from toponetx.classes import CellComplex, SimplicialComplex + +__all__ = [ + "generate_atomlist", + "write_atomlist", + "load_from_atomlist", + "parse_atomlist", +] + + +def _atomlist_line(atom: Iterable[Hashable] | Hashable, attributes: dict) -> str: + """Construct a single line of an atom list. + + Parameters + ---------- + atom : iterable of hashable or hashable + The atom to write. + attributes : dict + Attributes associated with the atom. + + Returns + ------- + str + The line of the atom list that represents the given atom. + """ + if isinstance(atom, Iterable): + line = " ".join(map(str, atom)) + else: + line = str(atom) + + if len(attributes) > 0: + line += " " + str(attributes) + + return line + + +def _generate_atomlist_simplicial( + domain: SimplicialComplex, +) -> Generator[str, None, None]: + """Generate an atom list from a simplicial complex. + + The list of atoms is truncated to only contain maximal simplices and simplices with user-defined attributes. All + other simplices are implicitly contained by the simplex property already. + + Parameters + ---------- + domain : SimplicialComplex + The simplicial complex to be converted to an atom list. + + Yields + ------ + str + One line of the atom list, which corresponds to one atom of the complex together with its attributes. + """ + for atom in domain.simplices: + data = domain[atom].copy() + data.pop("is_maximal", None) + data.pop("membership", None) + + if len(data) == 0 and not domain.is_maximal(atom): + continue + + yield _atomlist_line(atom, data) + + +def _generate_atomlist_cell(domain: CellComplex) -> Generator[str, None, None]: + """Generate an atom list from a cell complex. + + The list of atoms is truncated to only contain maximal cells and cells with user-defined attributes. All + other cells are implicitly contained already. + We add a special `rank` attribute to cells of cardinality 2 that have rank 2 to differentiate them from edges. + + Parameters + ---------- + domain : CellComplex + The cell complex to be converted to an atom list. + + Yields + ------ + str + One line of the atom list, which corresponds to one atom of the complex together with its attributes. + """ + for atom in domain.nodes: + if len(domain.neighbors(atom)) == 0 or len(domain._G.nodes[atom]) > 0: + yield _atomlist_line(atom, domain._G.nodes[atom]) + + covered_edges = set() + for cell in domain.cells: + for edge in combinations(cell, 2): + covered_edges.add(tuple(sorted(edge))) + for atom in domain.edges: + if len(domain._G.edges[atom]) > 0 or tuple(sorted(atom)) not in covered_edges: + yield _atomlist_line(atom, domain._G.edges[atom]) + + for atom in domain.cells: + attributes = atom._attributes.copy() + if len(atom) == 2: + attributes["rank"] = 2 + yield _atomlist_line(atom, attributes) + + +def generate_atomlist( + domain: CellComplex | SimplicialComplex, +) -> Generator[str, None, None]: + """Generate an atom list from a complex. + + The list of atoms is truncated to only contain maximal atoms and atoms with user-defined attributes. All + other atoms are implicitly contained already. + For cell complexes, e add a special `rank` attribute to cells of cardinality 2 that have rank 2 to differentiate + them from edges. + + Parameters + ---------- + domain : CellComplex or SimplicialComplex + The complex to be converted to an atom list. + + Yields + ------ + str + One line of the atom list, which corresponds to one atom of the complex together with its attributes. + + Examples + -------- + Generate a list of atoms from a simplicial complex: + + >>> from toponetx.classes import SimplicialComplex + >>> SC = SimplicialComplex() + >>> SC.add_simplex((1,), weight=1.0) + >>> SC.add_simplex((1, 2, 3), weight=4.0) + >>> list(generate_atomlist(SC)) + ["1 {'weight': 1.0}", "1 2 3 {'weight': 4.0}"] + + Generate a list of atoms from a cell complex: + + >>> from toponetx.classes import CellComplex + >>> CC = CellComplex() + >>> CC.add_cell((1, 2, 3), rank=2, weight=4.0) + >>> list(generate_atomlist(CC)) + ["1 2 3 {'weight': 4.0}"] + """ + if isinstance(domain, SimplicialComplex): + yield from _generate_atomlist_simplicial(domain) + elif isinstance(domain, CellComplex): + yield from _generate_atomlist_cell(domain) + else: + raise TypeError(f"Expected a cell or simplicial complex, got {type(domain)}.") + + +@nx.utils.open_file(1, "wb") +def write_atomlist( + domain: CellComplex | SimplicialComplex, path, encoding="utf-8" +) -> None: + """Write an atom list to a file. + + Parameters + ---------- + domain : CellComplex or SimplicialComplex + The complex to be converted to an atom list. + path : file or str + File or filename to write. If a file is provided, it must be opened in ‘wb’ + mode. Filenames ending in .gz or .bz2 will be compressed. + encoding : str, default="utf-8" + Specify which encoding to use when writing file. + + Raises + ------ + TypeError + If the domain is not a cell or simplicial complex. + """ + if not isinstance(domain, (CellComplex, SimplicialComplex)): + raise TypeError(f"Expected a cell or simplicial complex, got {type(domain)}.") + + for line in generate_atomlist(domain): + line += "\n" + path.write(line.encode(encoding)) + + +@overload +def load_from_atomlist( + filepath: str, complex_type: Literal["cell"], nodetype=None, encoding="utf-8" +) -> CellComplex: # numpydoc ignore=GL08 + pass + + +@overload +def load_from_atomlist( + filepath: str, complex_type: Literal["simplicial"], nodetype=None, encoding="utf-8" +) -> SimplicialComplex: # numpydoc ignore=GL08 + pass + + +@nx.utils.open_file(0, "rb") +def load_from_atomlist( + path, complex_type: Literal["cell", "simplicial"], nodetype=None, encoding="utf-8" +) -> CellComplex | SimplicialComplex: + """Load a complex from an atom list. + + Parameters + ---------- + path : file or str + File or filename to read. If a file is provided, it must be opened in ‘rb’ + mode. Filenames ending in .gz or .bz2 will be uncompressed. + complex_type : {"cell", "simplicial"} + The type of complex that should be constructed based on the atom list. + nodetype : callable, optional + Convert node data from strings to the specified type. + encoding : str, default="utf-8" + Specify which encoding to use when reading file. + + Returns + ------- + CellComplex or SimplicialComplex + The complex that was loaded from the atom list. + + Raises + ------ + ValueError + If the complex type is unknown. + """ + return parse_atomlist( + (line.decode(encoding) for line in path), complex_type, nodetype + ) + + +@overload +def parse_atomlist( + lines: Iterable[str], complex_type: Literal["cell"], nodetype=None +) -> CellComplex: # numpydoc ignore=GL08 + pass + + +@overload +def parse_atomlist( + lines: Iterable[str], complex_type: Literal["simplicial"], nodetype=None +) -> SimplicialComplex: # numpydoc ignore=GL08 + pass + + +def parse_atomlist( + lines: Iterable[str], complex_type: Literal["cell", "simplicial"], nodetype=None +) -> CellComplex | SimplicialComplex: + """Parse an atom list. + + Parameters + ---------- + lines : iterable of str + List of lines. + complex_type : {"cell", "simplicial"} + Complex type. + nodetype : callable, optional + Node type. + + Returns + ------- + CellComplex or SimplicialComplex + The complex that was parsed from the atom list. + + Raises + ------ + ValueError + If the complex type is unknown. + """ + from ast import literal_eval + + if complex_type == "cell": + domain = CellComplex() + elif complex_type == "simplicial": + domain = SimplicialComplex() + else: + raise ValueError(f"Unknown complex type {complex_type}.") + + for line in lines: + attributes_pos = line.find("{") + if attributes_pos == -1: + elements_str = line + attributes = {} + else: + elements_str = line[:attributes_pos].strip() + attributes = literal_eval(line[attributes_pos:]) + + elements = elements_str.split(" ") + elements = [e.strip() for e in elements] + if nodetype is not None: + elements = [nodetype(e) for e in elements] + + if complex_type == "cell": + if "rank" in attributes: + rank = attributes.pop("rank") + else: + rank = min(len(elements) - 1, 2) + + if rank == 0: + domain.add_node(elements[0], **attributes) + else: + domain.add_cell(elements, rank=rank, **attributes) + elif complex_type == "simplicial": + domain.add_simplex(elements, **attributes) + + return domain diff --git a/toponetx/read_write.py b/toponetx/readwrite/serialization.py similarity index 85% rename from toponetx/read_write.py rename to toponetx/readwrite/serialization.py index a3b9df56..8f85d2cb 100644 --- a/toponetx/read_write.py +++ b/toponetx/readwrite/serialization.py @@ -1,7 +1,8 @@ -"""Read/write utilities.""" - +"""Read and write complexes as pickled objects.""" import pickle +__all__ = ["to_pickle", "load_from_pickle"] + def to_pickle(obj, filename: str) -> None: """Write object to a pickle file.