From a44a5d33e851558201257b2a562c4ce29ca67595 Mon Sep 17 00:00:00 2001 From: Anthony Mahanna Date: Tue, 17 Oct 2023 11:39:54 -0400 Subject: [PATCH] checkpoint --- adbcug_adapter/abc.py | 34 +- adbcug_adapter/adapter.py | 691 +++++++++++++----- adbcug_adapter/controller.py | 113 +-- adbcug_adapter/utils.py | 42 +- examples/ArangoDB_cuGraph_Adapter.ipynb | 2 +- .../ArangoDB_cuGraph_Adapter_output.ipynb | 432 +++++------ tests/test_adapter.py | 7 +- 7 files changed, 849 insertions(+), 472 deletions(-) diff --git a/adbcug_adapter/abc.py b/adbcug_adapter/abc.py index 8ebb853..7a38589 100644 --- a/adbcug_adapter/abc.py +++ b/adbcug_adapter/abc.py @@ -2,7 +2,7 @@ # -*- coding: utf-8 -*- from abc import ABC -from typing import Any, List, Optional, Set +from typing import Any, Dict, List, Optional, Set from arango.graph import Graph as ADBGraph from cugraph import Graph as CUGGraph @@ -19,7 +19,7 @@ def arangodb_to_cugraph( self, name: str, metagraph: ADBMetagraph, - **query_options: Any, + **adb_export_kwargs: Any, ) -> CUGMultiGraph: raise NotImplementedError # pragma: no cover @@ -28,12 +28,12 @@ def arangodb_collections_to_cugraph( name: str, v_cols: Set[str], e_cols: Set[str], - **query_options: Any, + **adb_export_kwargs: Any, ) -> CUGMultiGraph: raise NotImplementedError # pragma: no cover def arangodb_graph_to_cugraph( - self, name: str, **query_options: Any + self, name: str, **adb_export_kwargs: Any ) -> CUGMultiGraph: raise NotImplementedError # pragma: no cover @@ -43,19 +43,13 @@ def cugraph_to_arangodb( cug_graph: CUGGraph, edge_definitions: Optional[List[Json]] = None, orphan_collections: Optional[List[str]] = None, - keyify_nodes: bool = False, - keyify_edges: bool = False, overwrite_graph: bool = False, - **import_options: Any, + batch_size: Optional[int] = None, + use_async: bool = False, + **adb_import_kwargs: Any, ) -> ADBGraph: raise NotImplementedError # pragma: no cover - def __fetch_adb_docs(self) -> None: - raise NotImplementedError # pragma: no cover - - def __insert_adb_docs(self) -> None: - raise NotImplementedError # pragma: no cover - class Abstract_ADBCUG_Controller(ABC): def _prepare_arangodb_vertex(self, adb_vertex: Json, col: str) -> None: @@ -65,17 +59,23 @@ def _identify_cugraph_node(self, cug_node_id: CUGId, adb_v_cols: List[str]) -> s raise NotImplementedError # pragma: no cover def _identify_cugraph_edge( - self, from_cug_node: Json, to_cug_node: Json, adb_e_cols: List[str] + self, + from_cug_id: CUGId, + to_cug_id: CUGId, + cug_map: Dict[CUGId, str], + adb_e_cols: List[str], ) -> str: raise NotImplementedError # pragma: no cover - def _keyify_cugraph_node(self, cug_node_id: CUGId, col: str) -> str: + def _keyify_cugraph_node(self, i: int, cug_node_id: CUGId, col: str) -> str: raise NotImplementedError # pragma: no cover def _keyify_cugraph_edge( self, - from_cug_node: Json, - to_cug_node: Json, + i: int, + from_cug_id: CUGId, + to_cug_id: CUGId, + cug_map: Dict[CUGId, str], col: str, ) -> str: raise NotImplementedError # pragma: no cover diff --git a/adbcug_adapter/adapter.py b/adbcug_adapter/adapter.py index e2c12e7..52918b8 100644 --- a/adbcug_adapter/adapter.py +++ b/adbcug_adapter/adapter.py @@ -2,20 +2,27 @@ # -*- coding: utf-8 -*- import logging from collections import defaultdict -from typing import Any, DefaultDict, Dict, List, Optional, Set, Tuple, Union +from typing import Any, Callable, DefaultDict, Dict, List, Optional, Set, Tuple, Union from arango.cursor import Cursor -from arango.database import Database +from arango.database import StandardDatabase from arango.graph import Graph as ADBGraph -from arango.result import Result -from cudf import DataFrame +from cudf import DataFrame, Series from cugraph import Graph as CUGGraph from cugraph import MultiGraph as CUGMultiGraph +from rich.console import Group +from rich.live import Live +from rich.progress import Progress from .abc import Abstract_ADBCUG_Adapter from .controller import ADBCUG_Controller from .typings import ADBMetagraph, CUGId, Json -from .utils import logger, progress, track +from .utils import ( + get_bar_progress, + get_export_spinner_progress, + get_import_spinner_progress, + logger, +) class ADBCUG_Adapter(Abstract_ADBCUG_Adapter): @@ -35,14 +42,14 @@ class ADBCUG_Adapter(Abstract_ADBCUG_Adapter): def __init__( self, - db: Database, + db: StandardDatabase, controller: ADBCUG_Controller = ADBCUG_Controller(), logging_lvl: Union[str, int] = logging.INFO, ): self.set_logging(logging_lvl) - if issubclass(type(db), Database) is False: - msg = "**db** parameter must inherit from arango.database.Database" + if issubclass(type(db), StandardDatabase) is False: + msg = "**db** parameter must inherit from arango.database.StandardDatabase" raise TypeError(msg) if issubclass(type(controller), ADBCUG_Controller) is False: @@ -50,12 +57,13 @@ def __init__( raise TypeError(msg) self.__db = db + self.__async_db = db.begin_async_execution(return_result=False) self.__cntrl = controller logger.info(f"Instantiated ADBCUG_Adapter with database '{db.name}'") @property - def db(self) -> Database: + def db(self) -> StandardDatabase: return self.__db # pragma: no cover @property @@ -65,13 +73,18 @@ def cntrl(self) -> ADBCUG_Controller: def set_logging(self, level: Union[int, str]) -> None: logger.setLevel(level) + ############################### + # Public: ArangoDB -> cuGraph # + ############################### + def arangodb_to_cugraph( self, name: str, metagraph: ADBMetagraph, edge_attr: str = "weights", default_edge_attr_value: int = 0, - **query_options: Any, + cug_graph: Optional[CUGMultiGraph] = None, + **adb_export_kwargs: Any, ) -> CUGMultiGraph: """Create a cuGraph graph from an ArangoDB metagraph. @@ -87,10 +100,10 @@ def arangodb_to_cugraph( :param default_edge_attr_value: The default value set to the edge attribute if **edge_attr** is not present in the ArangoDB edge. Defaults to 0. :type default_edge_attr_value: int - :param query_options: Keyword arguments to specify AQL query options when + :param adb_export_kwargs: Keyword arguments to specify AQL query options when fetching documents from the ArangoDB instance. Full parameter list: https://docs.python-arango.com/en/main/specs.html#arango.aql.AQL.execute - :type query_options: Any + :type adb_export_kwargs: Any :return: A Multi-Directed cuGraph Graph. :rtype: cugraph.structure.graph_classes.MultiDiGraph :raise ValueError: If missing required keys in metagraph @@ -109,50 +122,56 @@ def arangodb_to_cugraph( """ logger.debug(f"--arangodb_to_cugraph('{name}')--") - # Maps ArangoDB vertex IDs to cuGraph node IDs + # This maps the ArangoDB vertex IDs to cuGraph node IDs adb_map: Dict[str, CUGId] = dict() + # This stores the to-be-inserted cuGraph edges (coo format) cug_edges: List[Tuple[CUGId, CUGId, Any]] = [] - adb_v: Json + ###################### + # Vertex Collections # + ###################### + for v_col, _ in metagraph["vertexCollections"].items(): logger.debug(f"Preparing '{v_col}' vertices") - cursor = self.__fetch_adb_docs(v_col, query_options) - for adb_v in track(cursor, cursor.count(), v_col, "#8000FF"): - adb_id: str = adb_v["_id"] - self.__cntrl._prepare_arangodb_vertex(adb_v, v_col) - cug_id: str = adb_v["_id"] + # 1. Fetch ArangoDB vertices + v_col_cursor, v_col_size = self.__fetch_adb_docs(v_col, **adb_export_kwargs) + + # 2. Process ArangoDB vertices + self.__process_adb_cursor( + "#8000FF", + v_col_cursor, + v_col_size, + self.__process_adb_vertex, + v_col, + adb_map, + ) - adb_map[adb_id] = cug_id + #################### + # Edge Collections # + #################### - adb_e: Json for e_col, _ in metagraph["edgeCollections"].items(): logger.debug(f"Preparing '{e_col}' edges") - cursor = self.__fetch_adb_docs(e_col, query_options) - for adb_e in track(cursor, cursor.count(), e_col, "#9C46FF"): - from_node_id: CUGId = adb_map[adb_e["_from"]] - to_node_id: CUGId = adb_map[adb_e["_to"]] - - cug_edges.append( - ( - from_node_id, - to_node_id, - adb_e.get(edge_attr, default_edge_attr_value), - ) - ) + # 1. Fetch ArangoDB edges + e_col_cursor, e_col_size = self.__fetch_adb_docs(e_col, **adb_export_kwargs) + + # 2. Process ArangoDB edges + self.__process_adb_cursor( + "#9C46FF", + e_col_cursor, + e_col_size, + self.__process_adb_edge, + e_col, + adb_map, + cug_edges, + edge_attr, + default_edge_attr_value, + ) - logger.debug(f"Inserting {len(cug_edges)} edges") - - cug_graph = CUGMultiGraph(directed=True) - df = DataFrame(cug_edges, columns=["src", "dst", edge_attr]) - cug_graph.from_cudf_edgelist( - df, - source="src", - destination="dst", - edge_attr=edge_attr, - ) + cug_graph = self.__create_cug_graph(cug_graph, cug_edges, edge_attr) logger.info(f"Created cuGraph '{name}' Graph") return cug_graph @@ -163,7 +182,8 @@ def arangodb_collections_to_cugraph( v_cols: Set[str], e_cols: Set[str], edge_attr: str = "weights", - **query_options: Any, + default_edge_attr_value: int = 0, + **adb_export_kwargs: Any, ) -> CUGMultiGraph: """Create a cuGraph graph from ArangoDB collections. :param name: The cuGraph graph name. @@ -176,10 +196,13 @@ def arangodb_collections_to_cugraph( Defaults to 'weight'. If no weight attribute is present, will set the edge weight value to 0. :type edge_attr: str - :param query_options: Keyword arguments to specify AQL query options when + :param default_edge_attr_value: The default value set to the edge attribute + if **edge_attr** is not present in the ArangoDB edge. Defaults to 0. + :type default_edge_attr_value: int + :param adb_export_kwargs: Keyword arguments to specify AQL query options when fetching documents from the ArangoDB instance. Full parameter list: https://docs.python-arango.com/en/main/specs.html#arango.aql.AQL.execute - :type query_options: Any + :type adb_export_kwargs: Any :return: A Multi-Directed cuGraph Graph. :rtype: cugraph.structure.graph_classes.MultiDiGraph """ @@ -188,10 +211,14 @@ def arangodb_collections_to_cugraph( "edgeCollections": {col: set() for col in e_cols}, } - return self.arangodb_to_cugraph(name, metagraph, edge_attr, **query_options) + return self.arangodb_to_cugraph(name, metagraph, edge_attr, **adb_export_kwargs) def arangodb_graph_to_cugraph( - self, name: str, edge_attr: str = "weights", **query_options: Any + self, + name: str, + edge_attr: str = "weights", + default_edge_attr_value: int = 0, + **adb_export_kwargs: Any, ) -> CUGMultiGraph: """Create a cuGraph graph from an ArangoDB graph. :param name: The ArangoDB graph name. @@ -200,10 +227,13 @@ def arangodb_graph_to_cugraph( Defaults to 'weight'. If no weight attribute is present, will set the edge weight value to 0. :type edge_attr: str - :param query_options: Keyword arguments to specify AQL query options when + :param default_edge_attr_value: The default value set to the edge attribute + if **edge_attr** is not present in the ArangoDB edge. Defaults to 0. + :type default_edge_attr_value: int + :param adb_export_kwargs: Keyword arguments to specify AQL query options when fetching documents from the ArangoDB instance. Full parameter list: https://docs.python-arango.com/en/main/specs.html#arango.aql.AQL.execute - :type query_options: Any + :type adb_export_kwargs: Any :return: A Multi-Directed cuGraph Graph. :rtype: cugraph.structure.graph_classes.MultiDiGraph """ @@ -212,22 +242,26 @@ def arangodb_graph_to_cugraph( e_cols = {e_d["edge_collection"] for e_d in graph.edge_definitions()} return self.arangodb_collections_to_cugraph( - name, v_cols, e_cols, edge_attr, **query_options + name, v_cols, e_cols, edge_attr, **adb_export_kwargs ) + ############################### + # Public: cuGraph -> ArangoDB # + ############################### + def cugraph_to_arangodb( self, name: str, cug_graph: CUGGraph, edge_definitions: Optional[List[Json]] = None, orphan_collections: Optional[List[str]] = None, - keyify_nodes: bool = False, - keyify_edges: bool = False, overwrite_graph: bool = False, + batch_size: Optional[int] = None, + use_async: bool = False, src_series_key: str = "src", dst_series_key: str = "dst", edge_attr: str = "weights", - **import_options: Any, + **adb_import_kwargs: Any, ) -> ADBGraph: """Create an ArangoDB graph from a cuGraph graph, and a set of edge definitions. @@ -240,31 +274,34 @@ def cugraph_to_arangodb( definition entry is a dictionary with fields "edge_collection", "from_vertex_collections" and "to_vertex_collections" (see below for example). Can be omitted if the graph already exists. - :type edge_definitions: List[adbnx_adapter.typings.Json] + :type edge_definitions: List[Dict[str, Any]] :param orphan_collections: A list of vertex collections that will be stored as orphans in the ArangoDB graph. Can be omitted if the graph already exists. :type orphan_collections: List[str] - :param keyify_nodes: If set to True, will create custom node keys based on the - behavior of ADBCUG_Controller._keyify_cugraph_node(). - Otherwise, ArangoDB _key values for vertices will range from 1 to N, - where N is the number of cugraph nodes. - :type keyify_nodes: bool - :param keyify_edges: If set to True, will create custom edge keys based on - the behavior of the ADBNX_Controller._keyify_cugraph_edge(). - Otherwise, ArangoDB _key values for edges will range from 1 to E, - where E is the number of cugraph edges. - :type keyify_edges: bool + :param overwrite_graph: Overwrites the graph if it already exists. + Does not drop associated collections. + :type overwrite_graph: bool + :param batch_size: If specified, runs the ArangoDB Data Ingestion + process for every **batch_size** cuGraph nodes/edges within **cug_graph**. + Defaults to `len(cug_nodes)` & `len(cug_edges)`. + :type batch_size: int | None + :param use_async: Performs asynchronous ArangoDB ingestion if enabled. + Defaults to False. + :type use_async: bool + :param src_series_key: The cuGraph edge list source series key. + Defaults to 'src'. + :type src_series_key: str + :param dst_series_key: The cuGraph edge list destination series key. + Defaults to 'dst'. + :type dst_series_key: str :param edge_attr: If your cuGraph graph is weighted, you can specify the edge attribute name used to represent your cuGraph edge weight values once transferred into ArangoDB. Defaults to 'weight'. :type edge_attr: str - :param overwrite_graph: Overwrites the graph if it already exists. - Does not drop associated collections. - :type overwrite_graph: bool - :param import_options: Keyword arguments to specify additional + :param adb_import_kwargs: Keyword arguments to specify additional parameters for ArangoDB document insertion. Full parameter list: https://docs.python-arango.com/en/main/specs.html#arango.collection.Collection.import_bulk - :type import_options: Any + :type adb_import_kwargs: Any :return: The ArangoDB Graph API wrapper. :rtype: arango.graph.Graph @@ -281,153 +318,459 @@ def cugraph_to_arangodb( """ logger.debug(f"--cugraph_to_arangodb('{name}')--") - if overwrite_graph: - logger.debug("Overwrite graph flag is True. Deleting old graph.") - self.__db.delete_graph(name, ignore_missing=True) - - if self.__db.has_graph(name): - logger.debug(f"Graph {name} already exists") - adb_graph = self.__db.graph(name) - else: - logger.debug(f"Creating graph {name}") - adb_graph = self.__db.create_graph( - name, edge_definitions, orphan_collections - ) + adb_graph = self.__create_adb_graph( + name, overwrite_graph, edge_definitions, orphan_collections + ) - adb_v_cols: List[str] = adb_graph.vertex_collections() + adb_v_cols: List[str] = adb_graph.vertex_collections() # type: ignore adb_e_cols: List[str] = [ - e_d["edge_collection"] for e_d in adb_graph.edge_definitions() + c["edge_collection"] for c in adb_graph.edge_definitions() # type: ignore ] - has_one_vcol = len(adb_v_cols) == 1 - has_one_ecol = len(adb_e_cols) == 1 - logger.debug(f"Is graph '{name}' homogeneous? {has_one_vcol and has_one_ecol}") + has_one_v_col = len(adb_v_cols) == 1 + has_one_e_col = len(adb_e_cols) == 1 + logger.debug(f"Is '{name}' homogeneous? {has_one_v_col and has_one_e_col}") - # Maps cuGraph node IDs to ArangoDB vertex IDs + # This maps cuGraph node IDs to ArangoDB vertex IDs cug_map: Dict[CUGId, str] = dict() # Stores to-be-inserted ArangoDB documents by collection name - adb_documents: DefaultDict[str, List[Json]] = defaultdict(list) - - cug_id: CUGId - cug_nodes = cug_graph.nodes().values_host + adb_docs: DefaultDict[str, List[Json]] = defaultdict(list) - logger.debug("Preparing cuGraph nodes") - for i, cug_id in enumerate( - track(cug_nodes, len(cug_nodes), "Nodes", "#97C423"), - 1, - ): - logger.debug(f"N{i}: {cug_id}") - - col = ( - adb_v_cols[0] - if has_one_vcol - else self.__cntrl._identify_cugraph_node(cug_id, adb_v_cols) - ) + spinner_progress = get_import_spinner_progress(" ") - if not has_one_vcol and col not in adb_v_cols: - msg = f"'{cug_id}' identified as '{col}', which is not in {adb_v_cols}" - raise ValueError(msg) + ################# + # cuGraph Nodes # + ################# - key = ( - self.__cntrl._keyify_cugraph_node(cug_id, col) - if keyify_nodes - else str(i) - ) + cug_id: CUGId - cug_node = {"_key": key} - self.__cntrl._prepare_cugraph_node(cug_node, col) - adb_documents[col].append(cug_node) + cug_nodes = cug_graph.nodes().values_host + node_batch_size = batch_size or len(cug_nodes) + + bar_progress = get_bar_progress("(CUG → ADB): Nodes", "#97C423") + bar_progress_task = bar_progress.add_task("Nodes", total=len(cug_nodes)) + + with Live(Group(bar_progress, spinner_progress)): + for i, cug_id in enumerate(cug_nodes, 1): + bar_progress.advance(bar_progress_task) + + # 1. Process cuGraph node + self.__process_cug_node( + i, + cug_id, + cug_map, + adb_docs, + adb_v_cols, + has_one_v_col, + ) - cug_map[cug_id] = f"{col}/{key}" + # 2. Insert batch of nodes + if i % node_batch_size == 0: + self.__insert_adb_docs( + spinner_progress, adb_docs, use_async, **adb_import_kwargs + ) + # Insert remaining nodes + self.__insert_adb_docs( + spinner_progress, adb_docs, use_async, **adb_import_kwargs + ) - self.__insert_adb_docs(adb_documents, import_options) - adb_documents.clear() # for memory purposes + ################# + # cuGraph Edges # + ################# from_node_id: CUGId to_node_id: CUGId - edge_list: DataFrame = cug_graph.view_edge_list() - logger.debug("Preparing cuGraph edges") - for i in track(range(len(edge_list)), len(edge_list), "Edges", "#5E3108"): - from_node_id = edge_list[src_series_key][i] - to_node_id = edge_list[dst_series_key][i] + cug_edges: DataFrame = cug_graph.view_edge_list() + edge_batch_size = batch_size or len(cug_edges) - edge_str = f"({from_node_id}, {to_node_id})" - logger.debug(f"E{i}: {edge_str}") + bar_progress = get_bar_progress("(CUG → ADB): Edges", "#5E3108") + bar_progress_task = bar_progress.add_task("Edges", total=len(cug_edges)) - col = ( - adb_e_cols[0] - if has_one_ecol - else self.__cntrl._identify_cugraph_edge(from_node_id, to_node_id, adb_e_cols, cug_map) - ) + cug_weights = cug_edges[edge_attr] if cug_graph.is_weighted() else None - if not has_one_ecol and col not in adb_e_cols: - msg = f"{edge_str} identified as '{col}', which is not in {adb_e_cols}" - raise ValueError(msg) + with Live(Group(bar_progress, spinner_progress)): + for i in range(len(cug_edges)): + bar_progress.advance(bar_progress_task) - key = ( - self.__cntrl._keyify_cugraph_edge(from_node_id, to_node_id, col, cug_map) - if keyify_edges - else str(i) - ) + from_node_id = cug_edges[src_series_key][i] + to_node_id = cug_edges[dst_series_key][i] - cug_edge = { - "_key": key, - "_from": cug_map[from_node_id], - "_to": cug_map[to_node_id], - } - - if cug_graph.is_weighted(): - cug_edge[edge_attr] = edge_list[edge_attr][i] + # 1. Process cuGraph edge + self.__process_cug_edge( + i, + from_node_id, + to_node_id, + cug_map, + adb_docs, + adb_e_cols, + has_one_e_col, + edge_attr, + cug_weights, + ) - self.__cntrl._prepare_cugraph_edge(cug_edge, col) - adb_documents[col].append(cug_edge) + # 2. Insert batch of edges + if i % edge_batch_size == 0: + self.__insert_adb_docs( + spinner_progress, adb_docs, use_async, **adb_import_kwargs + ) - self.__insert_adb_docs(adb_documents, import_options) + # Insert remaining edges + self.__insert_adb_docs( + spinner_progress, adb_docs, use_async, **adb_import_kwargs + ) logger.info(f"Created ArangoDB '{name}' Graph") return adb_graph - def __fetch_adb_docs(self, col: str, query_options: Any) -> Result[Cursor]: - """Fetches ArangoDB documents within a collection. + ################################ + # Private: ArangoDB -> cuGraph # + ################################ + + def __fetch_adb_docs( + self, + col: str, + **adb_export_kwargs: Any, + ) -> Tuple[Cursor, int]: + """ArangoDB -> cuGraph: Fetches ArangoDB documents within a collection. :param col: The ArangoDB collection. :type col: str - :param query_options: Keyword arguments to specify AQL query options when + :param adb_export_kwargs: Keyword arguments to specify AQL query options when fetching documents from the ArangoDB instance. - :type query_options: Any - :return: Result cursor. - :rtype: arango.cursor.Cursor + :type adb_export_kwargs: Any + :return: The document cursor along with the total collection size. + :rtype: Tuple[arango.cursor.Cursor, int] + """ + col_size: int = self.__db.collection(col).count() # type: ignore + + with get_export_spinner_progress(f"ADB Export: '{col}' ({col_size})") as p: + p.add_task(col) + + cursor: Cursor = self.__db.aql.execute( # type: ignore + "FOR doc IN @@col RETURN doc", + bind_vars={"@col": col}, + **{**adb_export_kwargs, **{"stream": True}}, + ) + + return cursor, col_size + + def __process_adb_cursor( + self, + progress_color: str, + cursor: Cursor, + col_size: int, + process_adb_doc: Callable[..., None], + col: str, + adb_map: Dict[str, CUGId], + *args: Any, + ) -> None: + """ArangoDB -> cuGraph: Processes the ArangoDB Cursors for vertices and edges. + + :param progress_color: The progress bar color. + :type progress_color: str + :param cursor: The ArangoDB cursor for the current **col**. + :type cursor: arango.cursor.Cursor + :param process_adb_doc: The function to process the cursor data. + :type process_adb_doc: Callable + :param col: The ArangoDB collection for the current **cursor**. + :type col: str + :param col_size: The size of **col**. + :type col_size: int + :param adb_map: Maps ArangoDB vertex IDs to cuGraph node IDs. + :type adb_map: Dict[str, adbcug_adapter.typings.CUGId] + """ + + progress = get_bar_progress(f"(ADB → CUG): '{col}'", progress_color) + progress_task_id = progress.add_task(col, total=col_size) + + with Live(Group(progress)): + while not cursor.empty(): + for doc in cursor.batch(): # type: ignore # false positive + progress.advance(progress_task_id) + + process_adb_doc(doc, col, adb_map, *args) + + cursor.batch().clear() # type: ignore # false positive + if cursor.has_more(): + cursor.fetch() + + def __process_adb_vertex( + self, + adb_v: Json, + v_col: str, + adb_map: Dict[str, CUGId], + ) -> None: + """ArangoDB -> cuGraph: Processes an ArangoDB vertex. + + :param adb_v: The ArangoDB vertex. + :type adb_v: Dict[str, Any] + :param v_col: The ArangoDB vertex collection. + :type v_col: str + :param adb_map: Maps ArangoDB vertex IDs to cuGraph node IDs. + :type adb_map: Dict[str, adbcug_adapter.typings.CUGId] + """ + adb_id: str = adb_v["_id"] + self.__cntrl._prepare_arangodb_vertex(adb_v, v_col) + cug_id: str = adb_v["_id"] + + adb_map[adb_id] = cug_id + + def __process_adb_edge( + self, + adb_e: Json, + e_col: str, + adb_map: Dict[str, CUGId], + cug_edges: List[Tuple[CUGId, CUGId, Any]], + edge_attr: str, + default_edge_attr_value: int, + ) -> None: + """ArangoDB -> cuGraph: Processes an ArangoDB edge. + + :param adb_e: The ArangoDB edge. + :type adb_e: Dict[str, Any] + :param e_col: The ArangoDB edge collection. + :type e_col: str + :param adb_map: Maps ArangoDB vertex IDs to cuGraph node IDs. + :type adb_map: Dict[str, adbcug_adapter.typings.CUGId] + :param cug_edges: To-be-inserted cuGraph edges. + :type cug_edges: List[Tuple[CUGId, CUGId, Any]] + :param edge_attr: The weight attribute name of your ArangoDB edges. + :type edge_attr: str + :param default_edge_attr_value: The default value set to the edge attribute + if **edge_attr** is not present in the ArangoDB edge. Defaults to 0. + :type default_edge_attr_value: int """ - aql = """ - FOR doc IN @@col - RETURN doc + from_node_id: CUGId = adb_map[adb_e["_from"]] + to_node_id: CUGId = adb_map[adb_e["_to"]] + + cug_edges.append( + ( + from_node_id, + to_node_id, + adb_e.get(edge_attr, default_edge_attr_value), + ) + ) + + def __create_cug_graph( + self, + cug_graph: Optional[CUGMultiGraph], + cug_edges: List[Tuple[CUGId, CUGId, Any]], + edge_attr: str, + ) -> CUGMultiGraph: + """AragoDB -> cuGraph: Creates the cuGraph graph. + + :param cug_graph: An existing cuGraph graph. + :type cug_graph: cugraph.classes.graph.Graph | None + :param cug_edges: To-be-inserted cuGraph edges. + :type cug_edges: List[Tuple[CUGId, CUGId, Any]] + :param edge_attr: The weight attribute name of your ArangoDB edges. + :type edge_attr: str + :return: A Multi-Directed cuGraph Graph. + :rtype: cugraph.structure.graph_classes.MultiDiGraph """ + df = DataFrame(cug_edges, columns=["src", "dst", edge_attr]) + + cug_graph = cug_graph or CUGMultiGraph(directed=True) + cug_graph.from_cudf_edgelist( + df, + source="src", + destination="dst", + edge_attr=edge_attr, + ) + + return cug_graph + + ################################# + # Private: cuGraph -> ArangoDB # + ################################# - with progress(f"Export: {col}") as p: - p.add_task("__fetch_adb_docs") + def __create_adb_graph( + self, + name: str, + overwrite_graph: bool, + edge_definitions: Optional[List[Json]] = None, + orphan_collections: Optional[List[str]] = None, + ) -> ADBGraph: + """cuGraph -> ArangoDB: Creates the ArangoDB graph. - return self.__db.aql.execute( - aql, count=True, bind_vars={"@col": col}, **query_options + :param name: The ArangoDB graph name. + :type name: str + :param overwrite_graph: Overwrites the graph if it already exists. + :type overwrite_graph: bool + :param edge_definitions: ArangoDB edge definitions. + :type edge_definitions: List[Dict[str, Any]] + :param orphan_collections: ArangoDB orphan collections. + :type orphan_collections: List[str] + :return: The ArangoDB Graph API wrapper. + :rtype: arango.graph.Graph + """ + if overwrite_graph: + logger.debug("Overwrite graph flag is True. Deleting old graph.") + self.__db.delete_graph(name, ignore_missing=True) + + if self.__db.has_graph(name): + logger.debug(f"Graph {name} already exists") + return self.__db.graph(name) + + else: + logger.debug(f"Creating graph {name}") + return self.__db.create_graph( # type: ignore + name, + edge_definitions, + orphan_collections, ) - def __insert_adb_docs( - self, adb_documents: DefaultDict[str, List[Json]], kwargs: Any + def __process_cug_node( + self, + i: int, + cug_id: CUGId, + cug_map: Dict[CUGId, str], + adb_docs: DefaultDict[str, List[Json]], + adb_v_cols: List[str], + has_one_v_col: bool, ) -> None: - """Insert ArangoDB documents into their ArangoDB collection. + """cuGraph -> ArangoDB: Processes a cuGraph node. + + :param i: The node index. + :type i: int + :param cug_id: The cuGraph node ID. + :type cug_id: adbcug_adapter.typings.CUGId + :param cug_map: Maps cuGraph node IDs to ArangoDB vertex IDs. + :type cug_map: Dict[adbcug_adapter.typings.CUGId, str] + :param adb_docs: To-be-inserted ArangoDB documents. + :type adb_docs: DefaultDict[str, List[Dict[str, Any]]] + :param adb_v_cols: The ArangoDB vertex collections. + :type adb_v_cols: List[str] + :param has_one_v_col: True if the Graph has one Vertex collection. + :type has_one_v_col: bool + """ + logger.debug(f"N{i}: {cug_id}") - :param adb_documents: To-be-inserted ArangoDB documents - :type adb_documents: DefaultDict[str, List[Json]] - :param kwargs: Keyword arguments to specify additional + col = ( + adb_v_cols[0] + if has_one_v_col + else self.__cntrl._identify_cugraph_node(cug_id, adb_v_cols) + ) + + if not has_one_v_col and col not in adb_v_cols: + msg = f"'{cug_id}' identified as '{col}', which is not in {adb_v_cols}" + raise ValueError(msg) + + key = self.__cntrl._keyify_cugraph_node(i, cug_id, col) + + adb_id = f"{col}/{key}" + cug_node = {"_id": adb_id, "_key": key} + + cug_map[cug_id] = adb_id + + self.__cntrl._prepare_cugraph_node(cug_node, col) + adb_docs[col].append(cug_node) + + def __process_cug_edge( + self, + i: int, + from_node_id: CUGId, + to_node_id: CUGId, + cug_map: Dict[CUGId, str], + adb_docs: DefaultDict[str, List[Json]], + adb_e_cols: List[str], + has_one_e_col: bool, + edge_attr: str, + cug_weights: Optional[Series] = None, + ) -> None: + """cuGraph -> ArangoDB: Processes a cuGraph edge. + + :param i: The edge index. + :type i: int + :param from_node_id: The cuGraph ID of the source node. + :type from_node_id: adbcug_adapter.typings.CUGId + :param to_node_id: The cuGraph ID of the target node. + :type to_node_id: adbcug_adapter.typings.CUGId + :param cug_map: Maps cuGraph node IDs to ArangoDB vertex IDs. + :type cug_map: Dict[adbcug_adapter.typings.CUGId, str] + :param adb_docs: To-be-inserted ArangoDB documents. + :type adb_docs: DefaultDict[str, List[Dict[str, Any]]] + :param adb_e_cols: The ArangoDB edge collections. + :type adb_e_cols: List[str] + :param has_one_e_col: True if the Graph has one Edge collection. + :type has_one_e_col: bool + :param edge_attr: The weight attribute name of your ArangoDB edges. + :type edge_attr: str + :param cug_weights: The cuGraph edge weights (if graph is weighted). + :type cug_weights: Optional[cudf.Series] + """ + edge_str = f"({from_node_id}, {to_node_id})" + logger.debug(f"E{i}: {edge_str}") + + col = ( + adb_e_cols[0] + if has_one_e_col + else self.__cntrl._identify_cugraph_edge( + from_node_id, to_node_id, cug_map, adb_e_cols + ) + ) + + if not has_one_e_col and col not in adb_e_cols: + msg = f"{edge_str} identified as '{col}', which is not in {adb_e_cols}" + raise ValueError(msg) + + key = self.__cntrl._keyify_cugraph_edge( + i, from_node_id, to_node_id, cug_map, col + ) + + cug_edge = { + "_key": key, + "_from": cug_map[from_node_id], + "_to": cug_map[to_node_id], + } + + if cug_weights is not None: + cug_edge[edge_attr] = cug_weights[i] + + self.__cntrl._prepare_cugraph_edge(cug_edge, col) + adb_docs[col].append(cug_edge) + + def __insert_adb_docs( + self, + spinner_progress: Progress, + adb_docs: DefaultDict[str, List[Json]], + use_async: bool, + **adb_import_kwargs: Any, + ) -> None: + """cuGraph -> ArangoDB: Insert the ArangoDB documents. + + :param spinner_progress: The spinner progress bar. + :type spinner_progress: rich.progress.Progress + :param adb_docs: To-be-inserted ArangoDB documents + :type adb_docs: DefaultDict[str, List[Json]] + :param use_async: Performs asynchronous ArangoDB ingestion if enabled. + :type use_async: bool + :param adb_import_kwargs: Keyword arguments to specify additional parameters for ArangoDB document insertion. Full parameter list: https://docs.python-arango.com/en/main/specs.html#arango.collection.Collection.import_bulk - :type kwargs: Any + :param adb_import_kwargs: Any """ - for col, doc_list in adb_documents.items(): - with progress(f"Import: {col} ({len(doc_list)})") as p: - p.add_task("__insert_adb_docs") + if len(adb_docs) == 0: + return + + db = self.__async_db if use_async else self.__db + + # Avoiding "RuntimeError: dictionary changed size during iteration" + adb_cols = list(adb_docs.keys()) + + for col in adb_cols: + doc_list = adb_docs[col] + + action = f"ADB Import: '{col}' ({len(doc_list)})" + spinner_progress_task = spinner_progress.add_task("", action=action) + + result = db.collection(col).import_bulk(doc_list, **adb_import_kwargs) + logger.debug(result) + + del adb_docs[col] - result = self.__db.collection(col).import_bulk(doc_list, **kwargs) - logger.debug(result) + spinner_progress.stop_task(spinner_progress_task) + spinner_progress.update(spinner_progress_task, visible=False) diff --git a/adbcug_adapter/controller.py b/adbcug_adapter/controller.py index 2cda9bb..a392749 100644 --- a/adbcug_adapter/controller.py +++ b/adbcug_adapter/controller.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- -from typing import List, Dict +from typing import Dict, List from .abc import Abstract_ADBCUG_Controller from .typings import CUGId, Json @@ -33,14 +33,12 @@ def _prepare_arangodb_vertex(self, adb_vertex: Json, col: str) -> None: pass def _identify_cugraph_node(self, cug_node_id: CUGId, adb_v_cols: List[str]) -> str: - """Given a cuGraph node, and a list of ArangoDB vertex collections defined, - identify which ArangoDB vertex collection it should belong to. + """Given a CuGraph node, and a list of ArangoDB vertex collections defined, + identify which ArangoDB vertex collection **cug_node_id** should belong to. - NOTE: You must override this function if len(**adb_v_cols**) > 1 - AND **cug_node_id* does NOT comply to ArangoDB standards - (i.e "{collection}/{key}"). + NOTE: You must override this function if len(**adb_v_cols**) > 1. - :param cug_node_id: The cuGraph ID of the vertex. + :param cug_node_id: The CuGraph ID of the node. :type cug_node_id: adbcug_adapter.typings.CUGId :param adb_v_cols: All ArangoDB vertex collections specified by the **edge_definitions** parameter of cugraph_to_arangodb() @@ -48,23 +46,38 @@ def _identify_cugraph_node(self, cug_node_id: CUGId, adb_v_cols: List[str]) -> s :return: The ArangoDB collection name :rtype: str """ - # In this case, we assume that **cug_node_id** is already a valid ArangoDB _id - adb_vertex_id: str = str(cug_node_id) - return adb_vertex_id.split("/")[0] + m = f"""User must override this function, + since there are {len(adb_v_cols)} vertex collections + to choose from + """ + raise NotImplementedError(m) def _identify_cugraph_edge( - self, from_node_id: str, to_node_id: str, adb_e_cols: List[str], cug_map: Dict[CUGId, str] + self, + from_node_id: CUGId, + to_node_id: CUGId, + cug_map: Dict[CUGId, str], + adb_e_cols: List[str], ) -> str: """Given a pair of connected cuGraph nodes, and a list of ArangoDB edge collections defined, identify which ArangoDB edge collection it should belong to. - NOTE: You must override this function if len(**adb_e_cols**) > 1. + NOTE #1: You must override this function if len(**adb_e_cols**) > 1. + + NOTE #2: You can use **cug_map** to derive the ArangoDB _from and _to values + of the edge. i.e, `cug_map[from_node_id]` will give you the ArangoDB _from + value, and `cug_map[to_node_id]` will give you the ArangoDB _to value. :param from_node_id: The ID of the cuGraph node representing the edge source. - :type from_node_id: str + :type from_node_id: adbcug_adapter.typings.CUGId :param to_node_id: The ID of the cuGraph node representing the edge destination. - :type to_node_id: str + :type to_node_id: adbcug_adapter.typings.CUGId + :param cug_map: A mapping of CuGraph node ids to ArangoDB vertex ids. You + can use this to derive the ArangoDB _from and _to values of the edge. + i.e, `cug_map[from_node_id]` will give you the ArangoDB _from value, + and `cug_map[to_node_id]` will give you the ArangoDB _to value. + :type cug_map: Dict[CUGId, str] :param adb_e_cols: All ArangoDB edge collections specified by the **edge_definitions** parameter of ADBCUG_Adapter.cugraph_to_arangodb() @@ -74,57 +87,67 @@ def _identify_cugraph_edge( :return: The ArangoDB collection name :rtype: str """ - # User must override this function if len(adb_e_cols) > 1 - raise NotImplementedError # pragma: no cover + m = f"""User must override this function, + since there are {len(adb_e_cols)} edge collections + to choose from. + """ + raise NotImplementedError(m) - def _keyify_cugraph_node(self, cug_node_id: CUGId, col: str) -> str: + def _keyify_cugraph_node(self, i: int, cug_node_id: CUGId, col: str) -> str: """Given a cuGraph node, derive its valid ArangoDB key. - NOTE: You can override this function if you want to create custom ArangoDB _key - values from your cuGraph nodes. To enable the use of this method, enable the - **keyify_nodes** parameter in ADBCUG_Adapter.cugraph_to_arangodb(). + NOTE: You must override this function if you want to create custom ArangoDB + _key values for your NetworkX nodes. + :param i: The index of the NetworkX node in the list of nodes. + :type i: int :param cug_node_id: The cuGraph node id. :type cug_node_id: adbcug_adapter.typings.CUGId - :param col: The ArangoDB collection the **cug_node_id** belongs to. + :param col: The ArangoDB collection that **cug_node_id** belongs to. :type col: str :return: A valid ArangoDB _key value. :rtype: str """ - # In this case, we assume that **cug_node_id** is already a valid ArangoDB _id - # Otherwise, user must override this function if custom ArangoDB _key - # values are required for nodes - adb_vertex_id: str = str(cug_node_id) - return self._string_to_arangodb_key_helper(adb_vertex_id.split("/")[1]) + return str(i) def _keyify_cugraph_edge( self, - from_node_id: str, - to_node_id: str, - col: str, + i: int, + from_node_id: CUGId, + to_node_id: CUGId, cug_map: Dict[CUGId, str], + col: str, ) -> str: - """Given a pair of connected cuGraph nodes, and the collection - this edge belongs to, derive the edge's valid ArangoDB key. - - NOTE #1: You can override this function if you want to create custom ArangoDB - _key values from your cuGraph edges. To enable the use of this method, enable - the **keyify_edges** parameter in ADBCUG_Adapter.cugraph_to_arangodb(). - - :param from_node_id: The ID of the cuGraph node representing the edge source. - :type from_node_id: str - :param to_node_id: The ID of the cuGraph node representing the edge destination. - :type to_node_id: str - :param col: The ArangoDB collection the edge belongs to. + """Given a CuGraph edge, its collection, and its pair of nodes, derive + its ArangoDB key. + + NOTE #1: You must override this function if you want to create custom ArangoDB + _key values for your CuGraph edges. + + NOTE #2: You can use **cug_map** to derive the ArangoDB _from and _to values + of the edge. i.e, `cug_map[from_node_id]` will give you the ArangoDB _from + value, and `cug_map[to_node_id]` will give you the ArangoDB _to value. + + NOTE #3: You are free to use `_string_to_arangodb_key_helper()` to derive a + valid ArangoDB _key value. + + :param i: The index of the NetworkX edge in the list of edges. + :type i: int + :param from_node_id: The CuGraph ID of the node representing the edge source. + :type from_node_id: adbcug_adapter.typings.CUGId + :param to_node_id: The CuGraph ID of the node representing the edge destination. + :type to_node_id: adbcug_adapter.typings.CUGId + :param col: The ArangoDB collection that the cuGraph edge belongs to. :type col: str - :param cug_map: The mapping of cuGraph node IDs to ArangoDB vertex IDs. + :param cug_map: A mapping of CuGraph node ids to ArangoDB vertex ids. You + can use this to derive the ArangoDB _from and _to values of the edge. + i.e, cug_map[from_node_id] will give you the ArangoDB _from value, + and cug_map[to_node_id] will give you the ArangoDB _to value. :type cug_map: Dict[CUGId, str] :return: A valid ArangoDB _key value. :rtype: str """ - # User must override this function if custom ArangoDB _key values are - # required for edges - raise NotImplementedError # pragma: no cover + return str(i) def _prepare_cugraph_node(self, cug_node: Json, col: str) -> None: """Prepare a cuGraph node before it gets inserted into the ArangoDB diff --git a/adbcug_adapter/utils.py b/adbcug_adapter/utils.py index a047dd3..7db528c 100644 --- a/adbcug_adapter/utils.py +++ b/adbcug_adapter/utils.py @@ -1,9 +1,14 @@ import logging import os -from typing import Any -from rich.progress import Progress, SpinnerColumn, TextColumn, TimeElapsedColumn -from rich.progress import track as progress_track +from rich.progress import ( + BarColumn, + Progress, + SpinnerColumn, + TaskProgressColumn, + TextColumn, + TimeElapsedColumn, +) logger = logging.getLogger(__package__) handler = logging.StreamHandler() @@ -15,25 +20,32 @@ logger.addHandler(handler) -def progress( +def get_export_spinner_progress( text: str, - spinner_name: str = "aesthetic", - spinner_style: str = "#5BC0DE", ) -> Progress: return Progress( TextColumn(text), - SpinnerColumn(spinner_name, spinner_style), + SpinnerColumn("aesthetic", "#5BC0DE"), TimeElapsedColumn(), transient=True, ) -def track(sequence: Any, total: int, text: str, colour: str) -> Any: - return progress_track( - sequence, - total=total, - description=text, - complete_style=colour, - finished_style=colour, - disable=logger.level != logging.INFO, +def get_import_spinner_progress(text: str) -> Progress: + return Progress( + TextColumn(text), + TextColumn("{task.fields[action]}"), + SpinnerColumn("aesthetic", "#5BC0DE"), + TimeElapsedColumn(), + transient=True, + ) + + +def get_bar_progress(text: str, color: str) -> Progress: + return Progress( + TextColumn(text), + BarColumn(complete_style=color, finished_style=color), + TaskProgressColumn(), + TextColumn("({task.completed}/{task.total})"), + TimeElapsedColumn(), ) diff --git a/examples/ArangoDB_cuGraph_Adapter.ipynb b/examples/ArangoDB_cuGraph_Adapter.ipynb index 4dcab25..b958e7d 100644 --- a/examples/ArangoDB_cuGraph_Adapter.ipynb +++ b/examples/ArangoDB_cuGraph_Adapter.ipynb @@ -554,7 +554,7 @@ "vertex_collections = {\"account\", \"bank\", \"branch\", \"Class\", \"customer\"}\n", "edge_collections = {\"accountHolder\", \"Relationship\", \"transaction\"}\n", "\n", - "# Create NetworkX graph from ArangoDB collections\n", + "# Create cuGraph graph from ArangoDB collections\n", "cug_graph = adbcug.arangodb_collections_to_cugraph(\"fraud-detection\", vertex_collections, edge_collections)\n", "\n", "# You can also provide valid Python-Arango AQL query options to the command above, like such:\n", diff --git a/examples/outputs/ArangoDB_cuGraph_Adapter_output.ipynb b/examples/outputs/ArangoDB_cuGraph_Adapter_output.ipynb index 4ec0559..8cb6dd2 100644 --- a/examples/outputs/ArangoDB_cuGraph_Adapter_output.ipynb +++ b/examples/outputs/ArangoDB_cuGraph_Adapter_output.ipynb @@ -69,16 +69,16 @@ "cell_type": "code", "execution_count": 1, "metadata": { - "id": "z22Ut7DU2BH2", "colab": { "base_uri": "https://localhost:8080/" }, + "id": "z22Ut7DU2BH2", "outputId": "2ff94498-4028-4b62-fa10-47a7549ca900" }, "outputs": [ { - "output_type": "stream", "name": "stdout", + "output_type": "stream", "text": [ "GPU 0: Tesla T4 (UUID: GPU-d29fdd1e-0900-c42d-55b4-6b695acb0a1f)\n" ] @@ -115,16 +115,16 @@ "cell_type": "code", "execution_count": 2, "metadata": { - "id": "fUnFAFAheG89", "colab": { "base_uri": "https://localhost:8080/" }, + "id": "fUnFAFAheG89", "outputId": "6bc59c65-0613-48f0-9d0b-bf52e86f2a35" }, "outputs": [ { - "output_type": "stream", "name": "stdout", + "output_type": "stream", "text": [ "Cloning into 'rapidsai-csp-utils'...\n", "remote: Enumerating objects: 300, done.\u001b[K\n", @@ -151,16 +151,16 @@ "cell_type": "code", "execution_count": null, "metadata": { - "id": "RpqvL4COeG8-", "colab": { "base_uri": "https://localhost:8080/" }, + "id": "RpqvL4COeG8-", "outputId": "b8cbb7c6-7bef-4aab-d1e9-1bab6f1b6762" }, "outputs": [ { - "output_type": "stream", "name": "stdout", + "output_type": "stream", "text": [ "Updating your Colab environment. This will restart your kernel. Don't Panic!\n", "Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]\n", @@ -268,16 +268,16 @@ "cell_type": "code", "execution_count": 1, "metadata": { - "id": "LHhOv_nH2lOL", "colab": { "base_uri": "https://localhost:8080/" }, + "id": "LHhOv_nH2lOL", "outputId": "84870bc3-00df-435a-984b-7321550cf651" }, "outputs": [ { - "output_type": "stream", "name": "stdout", + "output_type": "stream", "text": [ "⏬ Downloading https://github.com/jaimergp/miniforge/releases/latest/download/Mambaforge-colab-Linux-x86_64.sh...\n", "📦 Installing...\n", @@ -302,16 +302,16 @@ "cell_type": "code", "execution_count": 1, "metadata": { - "id": "34Fyyc-42mMm", "colab": { "base_uri": "https://localhost:8080/" }, + "id": "34Fyyc-42mMm", "outputId": "0733ca7d-f9e9-4937-8562-cf9887f076d8" }, "outputs": [ { - "output_type": "stream", "name": "stdout", + "output_type": "stream", "text": [ "✨🍰✨ Everything looks OK!\n" ] @@ -327,17 +327,17 @@ "cell_type": "code", "execution_count": 2, "metadata": { - "id": "U3yZUqK02nTR", "colab": { "base_uri": "https://localhost:8080/", "height": 330 }, + "id": "U3yZUqK02nTR", "outputId": "e3f5bac5-5f26-40cc-be8a-30101433d4be" }, "outputs": [ { - "output_type": "stream", "name": "stdout", + "output_type": "stream", "text": [ "Found existing installation: cffi 1.14.5\n", "Uninstalling cffi-1.14.5:\n", @@ -355,7 +355,6 @@ ] }, { - "output_type": "display_data", "data": { "application/vnd.colab-display-data+json": { "pip_warning": { @@ -365,7 +364,8 @@ } } }, - "metadata": {} + "metadata": {}, + "output_type": "display_data" } ], "source": [ @@ -379,16 +379,16 @@ "cell_type": "code", "execution_count": 3, "metadata": { - "id": "Pi1_CjzE2o7i", "colab": { "base_uri": "https://localhost:8080/" }, + "id": "Pi1_CjzE2o7i", "outputId": "df585d10-cbff-47cb-9e37-21ff658bf722" }, "outputs": [ { - "output_type": "stream", "name": "stdout", + "output_type": "stream", "text": [ "Collecting package metadata (current_repodata.json): - \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\bdone\n", "Solving environment: / \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ \b\b- \b\b\\ \b\b| \b\b/ WARNING conda.core.solve:_add_specs(611): pinned spec cudatoolkit=11.1 conflicts with explicit specs. Overriding pinned spec.\n", @@ -775,16 +775,16 @@ "cell_type": "code", "execution_count": 4, "metadata": { - "id": "8rAMRUSf23Sj", "colab": { "base_uri": "https://localhost:8080/" }, + "id": "8rAMRUSf23Sj", "outputId": "a09849b9-e33c-4c07-eeda-c436fb016356" }, "outputs": [ { - "output_type": "stream", "name": "stdout", + "output_type": "stream", "text": [ "Copying /usr/local/lib/libcudf.so to /usr/lib/libcudf.so\n", "Copying /usr/local/lib/libnccl.so to /usr/lib/libnccl.so\n", @@ -812,16 +812,16 @@ "cell_type": "code", "execution_count": 5, "metadata": { - "id": "32t5ZOx-27bD", "colab": { "base_uri": "https://localhost:8080/" }, + "id": "32t5ZOx-27bD", "outputId": "6ff8eb1f-e1a0-4ec8-a370-49b674e35318" }, "outputs": [ { - "output_type": "stream", "name": "stdout", + "output_type": "stream", "text": [ "Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/\n", "Collecting git+https://github.com/arangoml/cugraph-adapter.git\n", @@ -890,6 +890,11 @@ }, { "cell_type": "code", + "execution_count": 42, + "metadata": { + "id": "J3eGVQLRhulA" + }, + "outputs": [], "source": [ "# All imports\n", "\n", @@ -906,12 +911,7 @@ "import logging\n", "import io, requests\n", "from typing import List" - ], - "metadata": { - "id": "J3eGVQLRhulA" - }, - "execution_count": 42, - "outputs": [] + ] }, { "cell_type": "markdown", @@ -926,50 +926,30 @@ }, { "cell_type": "markdown", + "metadata": { + "id": "42wigq2hYu1w" + }, "source": [ "RAPIDS cuGraph is a library of graph algorithms that seamlessly integrates into the RAPIDS data science ecosystem and allows the data scientist to easily call graph algorithms using data stored in GPU DataFrames, NetworkX Graphs, or even CuPy or SciPy sparse Matrices.\n", "\n", "\n", "Here is an example of creating a simple weighted graph:" - ], - "metadata": { - "id": "42wigq2hYu1w" - } + ] }, { "cell_type": "code", - "source": [ - "cug_graph = cugraph.Graph()\n", - "\n", - "df = cudf.DataFrame(\n", - " [('a', 'b', 5), ('a', 'c', 1), ('a', 'd', 4), ('b', 'c', 3), ('c', 'd', 2)],\n", - " columns=['src', 'dst', 'weight']\n", - ")\n", - "\n", - "cug_graph.from_cudf_edgelist(\n", - " df,\n", - " source='src',\n", - " destination='dst',\n", - " edge_attr='weight'\n", - ")\n", - "\n", - "print('\\n--------------------')\n", - "print(cug_graph.nodes())\n", - "print('\\n--------------------')\n", - "print(cug_graph.edges())" - ], + "execution_count": 8, "metadata": { - "id": "6TlQ9oerV_kj", "colab": { "base_uri": "https://localhost:8080/" }, + "id": "6TlQ9oerV_kj", "outputId": "25b1636a-a175-4a84-ed55-2f04745953f4" }, - "execution_count": 8, "outputs": [ { - "output_type": "stream", "name": "stdout", + "output_type": "stream", "text": [ "\n", "--------------------\n", @@ -988,6 +968,26 @@ "4 b c\n" ] } + ], + "source": [ + "cug_graph = cugraph.Graph()\n", + "\n", + "df = cudf.DataFrame(\n", + " [('a', 'b', 5), ('a', 'c', 1), ('a', 'd', 4), ('b', 'c', 3), ('c', 'd', 2)],\n", + " columns=['src', 'dst', 'weight']\n", + ")\n", + "\n", + "cug_graph.from_cudf_edgelist(\n", + " df,\n", + " source='src',\n", + " destination='dst',\n", + " edge_attr='weight'\n", + ")\n", + "\n", + "print('\\n--------------------')\n", + "print(cug_graph.nodes())\n", + "print('\\n--------------------')\n", + "print(cug_graph.edges())" ] }, { @@ -1005,16 +1005,16 @@ "cell_type": "code", "execution_count": 36, "metadata": { - "id": "zMpjZ7vDTATD", "colab": { "base_uri": "https://localhost:8080/" }, + "id": "zMpjZ7vDTATD", "outputId": "041a3c13-6bed-409c-dc47-40e35d8bbaa8" }, "outputs": [ { - "output_type": "stream", "name": "stdout", + "output_type": "stream", "text": [ "size\n", "6 15.622920\n", @@ -1056,16 +1056,16 @@ "cell_type": "code", "execution_count": 38, "metadata": { - "id": "MQ3TfRSg4m_H", "colab": { "base_uri": "https://localhost:8080/" }, + "id": "MQ3TfRSg4m_H", "outputId": "f5d25c17-0fd6-49b9-fded-ba6baf154664" }, "outputs": [ { - "output_type": "stream", "name": "stdout", + "output_type": "stream", "text": [ " src dst\n", "0 0 0\n", @@ -1159,16 +1159,16 @@ "cell_type": "code", "execution_count": 12, "metadata": { - "id": "2ekGwnJDeG8-", "colab": { "base_uri": "https://localhost:8080/" }, + "id": "2ekGwnJDeG8-", "outputId": "258d7bb4-8391-4c29-827e-f0b680e3a557" }, "outputs": [ { - "output_type": "stream", "name": "stdout", + "output_type": "stream", "text": [ "Log: requesting new credentials...\n", "Succcess: new credentials acquired\n", @@ -1223,16 +1223,16 @@ "cell_type": "code", "execution_count": 13, "metadata": { - "id": "7bgGJ3QkeG8_", "colab": { "base_uri": "https://localhost:8080/" }, + "id": "7bgGJ3QkeG8_", "outputId": "3c85a1c4-5103-47d4-edbd-e9a99a06fecc" }, "outputs": [ { - "output_type": "stream", "name": "stdout", + "output_type": "stream", "text": [ "\u001b[0m2022-05-25T16:40:19Z [12055] INFO [05c30] {restore} Connected to ArangoDB 'http+ssl://tutorials.arangodb.cloud:8529'\n", "\u001b[0m\u001b[0m2022-05-25T16:40:19Z [12055] INFO [abeb4] {restore} Database name in source dump is 'fraud-detection'\n", @@ -1331,16 +1331,16 @@ "cell_type": "code", "execution_count": 14, "metadata": { - "id": "oG496kBeeG9A", "colab": { "base_uri": "https://localhost:8080/" }, + "id": "oG496kBeeG9A", "outputId": "c747878b-f121-405a-ede4-57227c233de2" }, "outputs": [ { - "output_type": "stream", "name": "stderr", + "output_type": "stream", "text": [ "[2022/05/25 16:40:48 +0000] [2488] [INFO] - adbcug_adapter: Instantiated ADBCUG_Adapter with database 'TUT4mnzcrc61phw4we9wpp7tg'\n" ] @@ -1391,16 +1391,16 @@ "cell_type": "code", "execution_count": 63, "metadata": { - "id": "zZ-Hu3lLVHgd", "colab": { "base_uri": "https://localhost:8080/" }, + "id": "zZ-Hu3lLVHgd", "outputId": "e7e72df0-8928-4be5-d002-ed41d5fe168b" }, "outputs": [ { - "output_type": "stream", "name": "stderr", + "output_type": "stream", "text": [ "[2022/05/25 17:05:57 +0000] [2488] [DEBUG] - adbcug_adapter: Starting arangodb_to_cugraph(fraud-detection, ...):\n", "[2022/05/25 17:05:57 +0000] [2488] [DEBUG] - adbcug_adapter: Preparing 'account' vertices\n", @@ -1412,8 +1412,8 @@ ] }, { - "output_type": "stream", "name": "stdout", + "output_type": "stream", "text": [ "\n", "--------------------\n", @@ -1497,23 +1497,23 @@ "cell_type": "code", "execution_count": 17, "metadata": { - "id": "i4XOpdRLUNlJ", "colab": { "base_uri": "https://localhost:8080/" }, + "id": "i4XOpdRLUNlJ", "outputId": "e6a3c4ea-664e-4d1a-dacd-c8d45aa120fd" }, "outputs": [ { - "output_type": "stream", "name": "stderr", + "output_type": "stream", "text": [ "[2022/05/25 16:43:45 +0000] [2488] [INFO] - adbcug_adapter: Created cuGraph 'fraud-detection' Graph\n" ] }, { - "output_type": "stream", "name": "stdout", + "output_type": "stream", "text": [ "\n", "--------------------\n", @@ -1553,7 +1553,7 @@ "vertex_collections = {\"account\", \"bank\", \"branch\", \"Class\", \"customer\"}\n", "edge_collections = {\"accountHolder\", \"Relationship\", \"transaction\"}\n", "\n", - "# Create NetworkX graph from ArangoDB collections\n", + "# Create cuGraph graph from ArangoDB collections\n", "cug_graph = adbcug_adapter.arangodb_collections_to_cugraph(\"fraud-detection\", vertex_collections, edge_collections)\n", "\n", "# You can also provide valid Python-Arango AQL query options to the command above, like such:\n", @@ -1598,16 +1598,16 @@ "cell_type": "code", "execution_count": 19, "metadata": { - "id": "QqGgOe51Vr85", "colab": { "base_uri": "https://localhost:8080/" }, + "id": "QqGgOe51Vr85", "outputId": "d245f424-8608-4197-908a-57728d4e77f0" }, "outputs": [ { - "output_type": "stream", "name": "stderr", + "output_type": "stream", "text": [ "[2022/05/25 16:44:10 +0000] [2488] [INFO] - adbcug_adapter: Instantiated ADBCUG_Adapter with database 'TUT4mnzcrc61phw4we9wpp7tg'\n", "[2022/05/25 16:44:10 +0000] [2488] [DEBUG] - adbcug_adapter: Starting arangodb_to_cugraph(fraud-detection, ...):\n", @@ -1620,8 +1620,8 @@ ] }, { - "output_type": "stream", "name": "stdout", + "output_type": "stream", "text": [ "\n", "--------------------\n", @@ -1705,24 +1705,27 @@ }, { "cell_type": "markdown", - "source": [ - "# cuGraph to ArangoDB" - ], "metadata": { "id": "XnUEQgPU3_eQ" - } + }, + "source": [ + "# cuGraph to ArangoDB" + ] }, { "cell_type": "markdown", - "source": [ - "#### Karate Graph" - ], "metadata": { "id": "OX6ksyYJwFiG" - } + }, + "source": [ + "#### Karate Graph" + ] }, { "cell_type": "markdown", + "metadata": { + "id": "ONgfFnXhwFiO" + }, "source": [ "Data source\n", "* [cuGraph 22.06 Datasets](https://github.com/rapidsai/cugraph/blob/branch-22.06/datasets/karate.csv)\n", @@ -1732,44 +1735,11 @@ "\n", "Important notes\n", "* A custom `ADBCUG Controller` is **not** required here. This is because the karate graph only has 1 vertex collection (`karateka`), and 1 edge collection (`knows`). See the edge definitions below " - ], - "metadata": { - "id": "ONgfFnXhwFiO" - } + ] }, { "cell_type": "code", - "source": [ - "# Fetch Karate Club data\n", - "!wget https://raw.githubusercontent.com/rapidsai/cugraph/branch-22.06/datasets/karate.csv\n", - "dataframe = cudf.read_csv(\"karate-data.csv\", delimiter=' ', names=['src', 'dst'], dtype=['int32', 'int32'] )\n", - "\n", - "# Create the cuGraph graph\n", - "cug_graph = cugraph.Graph()\n", - "cug_graph.from_cudf_edgelist(dataframe, source='src', destination='dst')\n", - "\n", - "# Specify ArangoDB edge definitions\n", - "edge_definitions = [\n", - " {\n", - " \"edge_collection\": \"knows\",\n", - " \"from_vertex_collections\": [\"karateka\"],\n", - " \"to_vertex_collections\": [\"karateka\"],\n", - " }\n", - "]\n", - "\n", - "# Create ArangoDB graph from cuGraph\n", - "name = \"KarateClubGraph\"\n", - "db.delete_graph(name, drop_collections=True, ignore_missing=True)\n", - "adb_graph = adbcug_adapter.cugraph_to_arangodb(name, cug_graph, edge_definitions)\n", - "\n", - "print('\\n--------------------')\n", - "print(\"URL: \" + con[\"url\"])\n", - "print(\"Username: \" + con[\"username\"])\n", - "print(\"Password: \" + con[\"password\"])\n", - "print(\"Database: \" + con[\"dbName\"])\n", - "print('--------------------\\n')\n", - "print(f\"View the created graph here: {con['url']}/_db/{con['dbName']}/_admin/aardvark/index.html#graph/{name}\")" - ], + "execution_count": 60, "metadata": { "colab": { "base_uri": "https://localhost:8080/" @@ -1777,11 +1747,10 @@ "id": "Jn9vgpeFwFiO", "outputId": "3b19b7a4-7822-4f53-9028-c82e32b457bf" }, - "execution_count": 60, "outputs": [ { - "output_type": "stream", "name": "stdout", + "output_type": "stream", "text": [ "--2022-05-25 17:05:34-- https://raw.githubusercontent.com/rapidsai/cugraph/branch-22.06/datasets/karate.csv\n", "Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...\n", @@ -1797,8 +1766,8 @@ ] }, { - "output_type": "stream", "name": "stderr", + "output_type": "stream", "text": [ "[2022/05/25 17:05:34 +0000] [2488] [DEBUG] - adbcug_adapter: Starting cugraph_to_arangodb('KarateClubGraph', ...):\n", "[2022/05/25 17:05:34 +0000] [2488] [DEBUG] - adbcug_adapter: Is graph 'KarateClubGraph' homogeneous? True\n", @@ -1810,8 +1779,8 @@ ] }, { - "output_type": "stream", "name": "stdout", + "output_type": "stream", "text": [ "\n", "--------------------\n", @@ -1824,19 +1793,53 @@ "View the created graph here: https://tutorials.arangodb.cloud:8529/_db/TUT4mnzcrc61phw4we9wpp7tg/_admin/aardvark/index.html#graph/KarateClubGraph\n" ] } + ], + "source": [ + "# Fetch Karate Club data\n", + "!wget https://raw.githubusercontent.com/rapidsai/cugraph/branch-22.06/datasets/karate.csv\n", + "dataframe = cudf.read_csv(\"karate-data.csv\", delimiter=' ', names=['src', 'dst'], dtype=['int32', 'int32'] )\n", + "\n", + "# Create the cuGraph graph\n", + "cug_graph = cugraph.Graph()\n", + "cug_graph.from_cudf_edgelist(dataframe, source='src', destination='dst')\n", + "\n", + "# Specify ArangoDB edge definitions\n", + "edge_definitions = [\n", + " {\n", + " \"edge_collection\": \"knows\",\n", + " \"from_vertex_collections\": [\"karateka\"],\n", + " \"to_vertex_collections\": [\"karateka\"],\n", + " }\n", + "]\n", + "\n", + "# Create ArangoDB graph from cuGraph\n", + "name = \"KarateClubGraph\"\n", + "db.delete_graph(name, drop_collections=True, ignore_missing=True)\n", + "adb_graph = adbcug_adapter.cugraph_to_arangodb(name, cug_graph, edge_definitions)\n", + "\n", + "print('\\n--------------------')\n", + "print(\"URL: \" + con[\"url\"])\n", + "print(\"Username: \" + con[\"username\"])\n", + "print(\"Password: \" + con[\"password\"])\n", + "print(\"Database: \" + con[\"dbName\"])\n", + "print('--------------------\\n')\n", + "print(f\"View the created graph here: {con['url']}/_db/{con['dbName']}/_admin/aardvark/index.html#graph/{name}\")" ] }, { "cell_type": "markdown", - "source": [ - "#### Divisibility Graph" - ], "metadata": { "id": "KLUw5iarn7eh" - } + }, + "source": [ + "#### Divisibility Graph" + ] }, { "cell_type": "markdown", + "metadata": { + "id": "b3I3uHQtoFbt" + }, "source": [ "Data source\n", "* No source\n", @@ -1846,13 +1849,49 @@ "\n", "Important notes\n", "* Even if this graph has more than 1 vertex collection, a custom `ADBCUG Controller` is still **not** required here. This is because the cuGraph Node IDs are already formatted to ArangoDB standard, so the default ADBCUG Controller will take care of node identification (see [`_identify_cugraph_node()`](https://github.com/arangoml/cugraph-adapter/blob/master/adbcug_adapter/controller.py))" - ], - "metadata": { - "id": "b3I3uHQtoFbt" - } + ] }, { "cell_type": "code", + "execution_count": 61, + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "Pdx2r_AtoSWF", + "outputId": "ee0c48ed-b507-48d0-b2cc-d0b0ca2d293c" + }, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "[2022/05/25 17:05:44 +0000] [2488] [DEBUG] - adbcug_adapter: Starting cugraph_to_arangodb('DivisibilityGraph', ...):\n", + "[2022/05/25 17:05:45 +0000] [2488] [DEBUG] - adbcug_adapter: Is graph 'DivisibilityGraph' homogeneous? False\n", + "[2022/05/25 17:05:45 +0000] [2488] [DEBUG] - adbcug_adapter: Preparing 200 cugraph nodes\n", + "[2022/05/25 17:05:45 +0000] [2488] [DEBUG] - adbcug_adapter: Preparing 482 cugraph edges\n", + "[2022/05/25 17:05:45 +0000] [2488] [DEBUG] - adbcug_adapter: Inserting last 100 documents into 'numbers_i'\n", + "[2022/05/25 17:05:45 +0000] [2488] [DEBUG] - adbcug_adapter: Inserting last 100 documents into 'numbers_j'\n", + "[2022/05/25 17:05:45 +0000] [2488] [DEBUG] - adbcug_adapter: Inserting last 482 documents into 'is_divisible_by'\n", + "[2022/05/25 17:05:45 +0000] [2488] [INFO] - adbcug_adapter: Created ArangoDB 'DivisibilityGraph' Graph\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n", + "--------------------\n", + "URL: https://tutorials.arangodb.cloud:8529\n", + "Username: TUTka66dmg6q2eb3vgyb1ywo\n", + "Password: TUTxoj1trkqv5pzhfwjn8h2mh\n", + "Database: TUT4mnzcrc61phw4we9wpp7tg\n", + "--------------------\n", + "\n", + "View the created graph here: https://tutorials.arangodb.cloud:8529/_db/TUT4mnzcrc61phw4we9wpp7tg/_admin/aardvark/index.html#graph/DivisibilityGraph\n" + ] + } + ], "source": [ "# Create the cuGraph graph\n", "cug_graph = cugraph.MultiGraph(directed=True)\n", @@ -1894,33 +1933,67 @@ "print(\"Database: \" + con[\"dbName\"])\n", "print('--------------------\\n')\n", "print(f\"View the created graph here: {con['url']}/_db/{con['dbName']}/_admin/aardvark/index.html#graph/{name}\")" - ], + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "nuVoCZQv6oyi" + }, + "source": [ + "#### School Graph with a custom ADBCUG_Controller" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "zPv3BgWt6wEd" + }, + "source": [ + "Data source\n", + "* No source, the graph data is arbitrary\n", + "\n", + "Package methods used\n", + "* [`adbcug_adapter.adapter.cugraph_to_arangodb()`](https://github.com/arangoml/cugraph-adapter/blob/master/adbcug_adapter/adapter.py)\n", + "\n", + "Important notes\n", + "* Here we demonstrate the functionality of having a custom `ADBCUG_Controller`, that overrides the [default ADBCUG_Controller](https://github.com/arangoml/cugraph-adapter/blob/master/adbcug_adapter/controller.py).\n", + "* Recall that a custom ADBCUG Controller for `cuGraph --> ArangoDB` functionality is almost always needed, at the exception of Homogeneous graphs, and graphs where the node IDs are already formatted to the ArangoDB vertex ID standard (i.e `collection/_key`)" + ] + }, + { + "cell_type": "code", + "execution_count": 62, "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, - "id": "Pdx2r_AtoSWF", - "outputId": "ee0c48ed-b507-48d0-b2cc-d0b0ca2d293c" + "id": "oiToiEJ19ZVx", + "outputId": "229916ac-b888-4d41-ab47-399b9565a68c" }, - "execution_count": 61, "outputs": [ { - "output_type": "stream", "name": "stderr", + "output_type": "stream", "text": [ - "[2022/05/25 17:05:44 +0000] [2488] [DEBUG] - adbcug_adapter: Starting cugraph_to_arangodb('DivisibilityGraph', ...):\n", - "[2022/05/25 17:05:45 +0000] [2488] [DEBUG] - adbcug_adapter: Is graph 'DivisibilityGraph' homogeneous? False\n", - "[2022/05/25 17:05:45 +0000] [2488] [DEBUG] - adbcug_adapter: Preparing 200 cugraph nodes\n", - "[2022/05/25 17:05:45 +0000] [2488] [DEBUG] - adbcug_adapter: Preparing 482 cugraph edges\n", - "[2022/05/25 17:05:45 +0000] [2488] [DEBUG] - adbcug_adapter: Inserting last 100 documents into 'numbers_i'\n", - "[2022/05/25 17:05:45 +0000] [2488] [DEBUG] - adbcug_adapter: Inserting last 100 documents into 'numbers_j'\n", - "[2022/05/25 17:05:45 +0000] [2488] [DEBUG] - adbcug_adapter: Inserting last 482 documents into 'is_divisible_by'\n", - "[2022/05/25 17:05:45 +0000] [2488] [INFO] - adbcug_adapter: Created ArangoDB 'DivisibilityGraph' Graph\n" + "[2022/05/25 17:05:50 +0000] [2488] [INFO] - adbcug_adapter: Instantiated ADBCUG_Adapter with database 'TUT4mnzcrc61phw4we9wpp7tg'\n", + "[2022/05/25 17:05:51 +0000] [2488] [DEBUG] - adbcug_adapter: Starting cugraph_to_arangodb('SchoolGraph', ...):\n", + "[2022/05/25 17:05:51 +0000] [2488] [DEBUG] - adbcug_adapter: Is graph 'SchoolGraph' homogeneous? False\n", + "[2022/05/25 17:05:51 +0000] [2488] [DEBUG] - adbcug_adapter: Preparing 9 cugraph nodes\n", + "[2022/05/25 17:05:51 +0000] [2488] [DEBUG] - adbcug_adapter: Preparing 10 cugraph edges\n", + "[2022/05/25 17:05:51 +0000] [2488] [DEBUG] - adbcug_adapter: Inserting last 3 documents into 'student'\n", + "[2022/05/25 17:05:51 +0000] [2488] [DEBUG] - adbcug_adapter: Inserting last 3 documents into 'teacher'\n", + "[2022/05/25 17:05:51 +0000] [2488] [DEBUG] - adbcug_adapter: Inserting last 3 documents into 'lecture'\n", + "[2022/05/25 17:05:51 +0000] [2488] [DEBUG] - adbcug_adapter: Inserting last 3 documents into 'attends'\n", + "[2022/05/25 17:05:51 +0000] [2488] [DEBUG] - adbcug_adapter: Inserting last 2 documents into 'classmate'\n", + "[2022/05/25 17:05:51 +0000] [2488] [DEBUG] - adbcug_adapter: Inserting last 3 documents into 'teaches'\n", + "[2022/05/25 17:05:51 +0000] [2488] [DEBUG] - adbcug_adapter: Inserting last 2 documents into 'colleague'\n", + "[2022/05/25 17:05:51 +0000] [2488] [INFO] - adbcug_adapter: Created ArangoDB 'SchoolGraph' Graph\n" ] }, { - "output_type": "stream", "name": "stdout", + "output_type": "stream", "text": [ "\n", "--------------------\n", @@ -1930,39 +2003,10 @@ "Database: TUT4mnzcrc61phw4we9wpp7tg\n", "--------------------\n", "\n", - "View the created graph here: https://tutorials.arangodb.cloud:8529/_db/TUT4mnzcrc61phw4we9wpp7tg/_admin/aardvark/index.html#graph/DivisibilityGraph\n" + "View the created graph here: https://tutorials.arangodb.cloud:8529/_db/TUT4mnzcrc61phw4we9wpp7tg/_admin/aardvark/index.html#graph/SchoolGraph\n" ] } - ] - }, - { - "cell_type": "markdown", - "source": [ - "#### School Graph with a custom ADBCUG_Controller" - ], - "metadata": { - "id": "nuVoCZQv6oyi" - } - }, - { - "cell_type": "markdown", - "source": [ - "Data source\n", - "* No source, the graph data is arbitrary\n", - "\n", - "Package methods used\n", - "* [`adbcug_adapter.adapter.cugraph_to_arangodb()`](https://github.com/arangoml/cugraph-adapter/blob/master/adbcug_adapter/adapter.py)\n", - "\n", - "Important notes\n", - "* Here we demonstrate the functionality of having a custom `ADBCUG_Controller`, that overrides the [default ADBCUG_Controller](https://github.com/arangoml/cugraph-adapter/blob/master/adbcug_adapter/controller.py).\n", - "* Recall that a custom ADBCUG Controller for `cuGraph --> ArangoDB` functionality is almost always needed, at the exception of Homogeneous graphs, and graphs where the node IDs are already formatted to the ArangoDB vertex ID standard (i.e `collection/_key`)" ], - "metadata": { - "id": "zPv3BgWt6wEd" - } - }, - { - "cell_type": "code", "source": [ "# Load some arbitary data\n", "df = cudf.DataFrame(\n", @@ -2114,50 +2158,6 @@ "print(\"Database: \" + con[\"dbName\"])\n", "print('--------------------\\n')\n", "print(f\"View the created graph here: {con['url']}/_db/{con['dbName']}/_admin/aardvark/index.html#graph/{name}\")\n" - ], - "metadata": { - "colab": { - "base_uri": "https://localhost:8080/" - }, - "id": "oiToiEJ19ZVx", - "outputId": "229916ac-b888-4d41-ab47-399b9565a68c" - }, - "execution_count": 62, - "outputs": [ - { - "output_type": "stream", - "name": "stderr", - "text": [ - "[2022/05/25 17:05:50 +0000] [2488] [INFO] - adbcug_adapter: Instantiated ADBCUG_Adapter with database 'TUT4mnzcrc61phw4we9wpp7tg'\n", - "[2022/05/25 17:05:51 +0000] [2488] [DEBUG] - adbcug_adapter: Starting cugraph_to_arangodb('SchoolGraph', ...):\n", - "[2022/05/25 17:05:51 +0000] [2488] [DEBUG] - adbcug_adapter: Is graph 'SchoolGraph' homogeneous? False\n", - "[2022/05/25 17:05:51 +0000] [2488] [DEBUG] - adbcug_adapter: Preparing 9 cugraph nodes\n", - "[2022/05/25 17:05:51 +0000] [2488] [DEBUG] - adbcug_adapter: Preparing 10 cugraph edges\n", - "[2022/05/25 17:05:51 +0000] [2488] [DEBUG] - adbcug_adapter: Inserting last 3 documents into 'student'\n", - "[2022/05/25 17:05:51 +0000] [2488] [DEBUG] - adbcug_adapter: Inserting last 3 documents into 'teacher'\n", - "[2022/05/25 17:05:51 +0000] [2488] [DEBUG] - adbcug_adapter: Inserting last 3 documents into 'lecture'\n", - "[2022/05/25 17:05:51 +0000] [2488] [DEBUG] - adbcug_adapter: Inserting last 3 documents into 'attends'\n", - "[2022/05/25 17:05:51 +0000] [2488] [DEBUG] - adbcug_adapter: Inserting last 2 documents into 'classmate'\n", - "[2022/05/25 17:05:51 +0000] [2488] [DEBUG] - adbcug_adapter: Inserting last 3 documents into 'teaches'\n", - "[2022/05/25 17:05:51 +0000] [2488] [DEBUG] - adbcug_adapter: Inserting last 2 documents into 'colleague'\n", - "[2022/05/25 17:05:51 +0000] [2488] [INFO] - adbcug_adapter: Created ArangoDB 'SchoolGraph' Graph\n" - ] - }, - { - "output_type": "stream", - "name": "stdout", - "text": [ - "\n", - "--------------------\n", - "URL: https://tutorials.arangodb.cloud:8529\n", - "Username: TUTka66dmg6q2eb3vgyb1ywo\n", - "Password: TUTxoj1trkqv5pzhfwjn8h2mh\n", - "Database: TUT4mnzcrc61phw4we9wpp7tg\n", - "--------------------\n", - "\n", - "View the created graph here: https://tutorials.arangodb.cloud:8529/_db/TUT4mnzcrc61phw4we9wpp7tg/_admin/aardvark/index.html#graph/SchoolGraph\n" - ] - } ] } ], diff --git a/tests/test_adapter.py b/tests/test_adapter.py index 5fbb336..3ed600c 100644 --- a/tests/test_adapter.py +++ b/tests/test_adapter.py @@ -145,7 +145,7 @@ def test_adb_graph_to_cug( @pytest.mark.parametrize( "adapter, name, cug_g, edge_definitions, orphan_collections, \ - keyify_nodes, keyify_edges, overwrite_graph, edge_attr, import_options", + keyify_nodes, keyify_edges, overwrite_graph, edge_attr, adb_import_kwargs", [ ( adbcug_adapter, @@ -207,7 +207,7 @@ def test_cug_to_adb( keyify_edges: bool, overwrite_graph: bool, edge_attr: str, - import_options: Dict[str, Any], + adb_import_kwargs: Dict[str, Any], ) -> None: adb_g = adapter.cugraph_to_arangodb( name, @@ -218,7 +218,7 @@ def test_cug_to_adb( keyify_edges, overwrite_graph, edge_attr, - **import_options, + **adb_import_kwargs, ) assert_arangodb_data( adapter, @@ -321,7 +321,6 @@ def assert_arangodb_data( def assert_cugraph_data(cug_g: CUGMultiGraph, metagraph: ADBMetagraph) -> None: - adb_edge: Json df = cug_g.to_pandas_edgelist() for col, atribs in metagraph["edgeCollections"].items():