Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for parallel runtime #842

Merged
merged 4 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions doc/source/transactions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ Explicit Transactions
Neomodel also supports `explicit transactions <https://neo4j.com/docs/
api/python-driver/current/transactions.html>`_ that are pre-designated as either *read* or *write*.

This is vital when using neomodel over a `Neo4J causal cluster <https://neo4j.com/docs/
This is vital when using neomodel over a `Neo4j causal cluster <https://neo4j.com/docs/
operations-manual/current/clustering/>`_ because internally, queries will be rerouted to different
servers depending on their designation.

Expand Down Expand Up @@ -168,7 +168,7 @@ Impersonation

*Neo4j Enterprise feature*

Impersonation (`see Neo4j driver documentation <https://neo4j.com/docs/api/python-driver/current/api.html#impersonated-user-ref>``)
Impersonation (`see Neo4j driver documentation <https://neo4j.com/docs/api/python-driver/current/api.html#impersonated-user-ref>`_)
can be enabled via a context manager::

from neomodel import db
Expand Down Expand Up @@ -197,4 +197,22 @@ This can be mixed with other context manager like transactions::

@db.transaction()
def func2():
...
...


Parallel runtime
----------------

As of version 5.13, Neo4j *Enterprise Edition* supports parallel runtime for read transactions.

To use it, you can simply use the `parallel_read_transaction` context manager::

from neomodel import db

with db.parallel_read_transaction:
# It works for both neomodel-generated and custom Cypher queries
parallel_count_1 = len(Coffee.nodes)
parallel_count_2 = db.cypher_query("MATCH (n:Coffee) RETURN count(n)")

It is worth noting that the parallel runtime is only available for read transactions and that it is not enabled by default, because it is not always the fastest option. It is recommended to test it in your specific use case to see if it improves performance, and read the general considerations in the `Neo4j official documentation <https://neo4j.com/docs/cypher-manual/current/planning-and-tuning/runtimes/concepts/#runtimes-parallel-runtime-considerations>`_.

34 changes: 32 additions & 2 deletions neomodel/async_/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ def __init__(self):
self._database_version = None
self._database_edition = None
self.impersonated_user = None
self._parallel_runtime = False

async def set_connection(self, url: str = None, driver: AsyncDriver = None):
"""
Expand Down Expand Up @@ -239,6 +240,10 @@ def write_transaction(self):
def read_transaction(self):
return AsyncTransactionProxy(self, access_mode="READ")

@property
def parallel_read_transaction(self):
return AsyncTransactionProxy(self, access_mode="READ", parallel_runtime=True)

async def impersonate(self, user: str) -> "ImpersonationHandler":
"""All queries executed within this context manager will be executed as impersonated user

Expand Down Expand Up @@ -454,7 +459,6 @@ async def cypher_query(

:return: A tuple containing a list of results and a tuple of headers.
"""

if self._active_transaction:
# Use current session is a transaction is currently active
results, meta = await self._run_cypher_query(
Expand Down Expand Up @@ -493,6 +497,8 @@ async def _run_cypher_query(
try:
# Retrieve the data
start = time.time()
if self._parallel_runtime:
query = "CYPHER runtime=parallel " + query
response: AsyncResult = await session.run(query, params)
results, meta = [list(r.values()) async for r in response], response.keys()
end = time.time()
Expand Down Expand Up @@ -598,6 +604,18 @@ async def edition_is_enterprise(self) -> bool:
edition = await self.database_edition
return edition == "enterprise"

@ensure_connection
async def parallel_runtime_available(self) -> bool:
"""Returns true if the database supports parallel runtime

Returns:
bool: True if the database supports parallel runtime
"""
return (
await self.version_is_higher_than("5.13")
and await self.edition_is_enterprise()
)

async def change_neo4j_password(self, user, new_password):
await self.cypher_query(f"ALTER USER {user} SET PASSWORD '{new_password}'")

Expand Down Expand Up @@ -1168,17 +1186,29 @@ async def install_all_labels(stdout=None):
class AsyncTransactionProxy:
bookmarks: Optional[Bookmarks] = None

def __init__(self, db: AsyncDatabase, access_mode=None):
def __init__(
self, db: AsyncDatabase, access_mode: str = None, parallel_runtime: bool = False
):
self.db = db
self.access_mode = access_mode
self.parallel_runtime = parallel_runtime

@ensure_connection
async def __aenter__(self):
if self.parallel_runtime and not await self.db.parallel_runtime_available():
warnings.warn(
"Parallel runtime is only available in Neo4j Enterprise Edition 5.13 and above. "
"Reverting to default runtime.",
UserWarning,
)
self.parallel_runtime = False
self.db._parallel_runtime = self.parallel_runtime
await self.db.begin(access_mode=self.access_mode, bookmarks=self.bookmarks)
self.bookmarks = None
return self

async def __aexit__(self, exc_type, exc_value, traceback):
self.db._parallel_runtime = False
if exc_value:
await self.db.rollback()

Expand Down
13 changes: 9 additions & 4 deletions neomodel/async_/match.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import inspect
import re
import string
import warnings
from dataclasses import dataclass
from typing import Any, Dict, List
from typing import Optional as TOptional
Expand Down Expand Up @@ -589,9 +590,11 @@ def build_traversal_from_path(
}
else:
existing_rhs_name = subgraph[part][
"rel_variable_name"
if relation.get("relation_filtering")
else "variable_name"
(
"rel_variable_name"
if relation.get("relation_filtering")
else "variable_name"
)
]
if relation["include_in_return"] and not already_present:
self._additional_return(rel_ident)
Expand Down Expand Up @@ -973,7 +976,9 @@ async def _execute(self, lazy: bool = False, dict_output: bool = False):
]
query = self.build_query()
results, prop_names = await adb.cypher_query(
query, self._query_params, resolve_objects=True
query,
self._query_params,
resolve_objects=True,
)
if dict_output:
for item in results:
Expand Down
31 changes: 29 additions & 2 deletions neomodel/sync_/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ def __init__(self):
self._database_version = None
self._database_edition = None
self.impersonated_user = None
self._parallel_runtime = False

def set_connection(self, url: str = None, driver: Driver = None):
"""
Expand Down Expand Up @@ -239,6 +240,10 @@ def write_transaction(self):
def read_transaction(self):
return TransactionProxy(self, access_mode="READ")

@property
def parallel_read_transaction(self):
return TransactionProxy(self, access_mode="READ", parallel_runtime=True)

def impersonate(self, user: str) -> "ImpersonationHandler":
"""All queries executed within this context manager will be executed as impersonated user

Expand Down Expand Up @@ -452,7 +457,6 @@ def cypher_query(

:return: A tuple containing a list of results and a tuple of headers.
"""

if self._active_transaction:
# Use current session is a transaction is currently active
results, meta = self._run_cypher_query(
Expand Down Expand Up @@ -491,6 +495,8 @@ def _run_cypher_query(
try:
# Retrieve the data
start = time.time()
if self._parallel_runtime:
query = "CYPHER runtime=parallel " + query
response: Result = session.run(query, params)
results, meta = [list(r.values()) for r in response], response.keys()
end = time.time()
Expand Down Expand Up @@ -596,6 +602,15 @@ def edition_is_enterprise(self) -> bool:
edition = self.database_edition
return edition == "enterprise"

@ensure_connection
def parallel_runtime_available(self) -> bool:
"""Returns true if the database supports parallel runtime

Returns:
bool: True if the database supports parallel runtime
"""
return self.version_is_higher_than("5.13") and self.edition_is_enterprise()

def change_neo4j_password(self, user, new_password):
self.cypher_query(f"ALTER USER {user} SET PASSWORD '{new_password}'")

Expand Down Expand Up @@ -1162,17 +1177,29 @@ def install_all_labels(stdout=None):
class TransactionProxy:
bookmarks: Optional[Bookmarks] = None

def __init__(self, db: Database, access_mode=None):
def __init__(
self, db: Database, access_mode: str = None, parallel_runtime: bool = False
):
self.db = db
self.access_mode = access_mode
self.parallel_runtime = parallel_runtime

@ensure_connection
def __enter__(self):
if self.parallel_runtime and not self.db.parallel_runtime_available():
warnings.warn(
"Parallel runtime is only available in Neo4j Enterprise Edition 5.13 and above. "
"Reverting to default runtime.",
UserWarning,
)
self.parallel_runtime = False
self.db._parallel_runtime = self.parallel_runtime
self.db.begin(access_mode=self.access_mode, bookmarks=self.bookmarks)
self.bookmarks = None
return self

def __exit__(self, exc_type, exc_value, traceback):
self.db._parallel_runtime = False
if exc_value:
self.db.rollback()

Expand Down
5 changes: 4 additions & 1 deletion neomodel/sync_/match.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import inspect
import re
import string
import warnings
from dataclasses import dataclass
from typing import Any, Dict, List
from typing import Optional as TOptional
Expand Down Expand Up @@ -973,7 +974,9 @@ def _execute(self, lazy: bool = False, dict_output: bool = False):
]
query = self.build_query()
results, prop_names = db.cypher_query(
query, self._query_params, resolve_objects=True
query,
self._query_params,
resolve_objects=True,
)
if dict_output:
for item in results:
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ dev = [
"pytest>=7.1",
"pytest-asyncio",
"pytest-cov>=4.0",
"pytest-mock",
"pre-commit",
"black",
"isort",
Expand Down
1 change: 1 addition & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ unasync>=0.5.0
pytest>=7.1
pytest-asyncio>=0.19.0
pytest-cov>=4.0
pytest-mock
pre-commit
black
isort
Expand Down
63 changes: 61 additions & 2 deletions test/async_/test_match_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from datetime import datetime
from test._async_compat import mark_async_test

from pytest import raises
from pytest import raises, skip, warns

from neomodel import (
INCOMING,
Expand Down Expand Up @@ -31,7 +31,11 @@
RawCypher,
RelationNameResolver,
)
from neomodel.exceptions import MultipleNodesReturned, RelationshipClassNotDefined
from neomodel.exceptions import (
FeatureNotSupported,
MultipleNodesReturned,
RelationshipClassNotDefined,
)


class SupplierRel(AsyncStructuredRel):
Expand Down Expand Up @@ -1113,3 +1117,58 @@ async def test_async_iterator():

# assert that generator runs loop above
assert counter == n


def assert_last_query_startswith(mock_func, query) -> bool:
return mock_func.call_args_list[-1].args[0].startswith(query)


@mark_async_test
async def test_parallel_runtime(mocker):
if (
not await adb.version_is_higher_than("5.13")
or not await adb.edition_is_enterprise()
):
skip("Only supported for Enterprise 5.13 and above.")

assert await adb.parallel_runtime_available()

# Parallel should be applied to custom Cypher query
async with adb.parallel_read_transaction:
# Mock transaction.run to access executed query
# Assert query starts with CYPHER runtime=parallel
assert adb._parallel_runtime == True
mock_transaction_run = mocker.patch("neo4j.AsyncTransaction.run")
await adb.cypher_query("MATCH (n:Coffee) RETURN n")
assert assert_last_query_startswith(
mock_transaction_run, "CYPHER runtime=parallel"
)
# Test exiting the context sets the parallel_runtime to False
assert adb._parallel_runtime == False

# Parallel should be applied to neomodel queries
async with adb.parallel_read_transaction:
mock_transaction_run_2 = mocker.patch("neo4j.AsyncTransaction.run")
await Coffee.nodes.all()
assert assert_last_query_startswith(
mock_transaction_run_2, "CYPHER runtime=parallel"
)


@mark_async_test
async def test_parallel_runtime_conflict(mocker):
if await adb.version_is_higher_than("5.13") and await adb.edition_is_enterprise():
skip("Test for unavailable parallel runtime.")

assert not await adb.parallel_runtime_available()
mock_transaction_run = mocker.patch("neo4j.AsyncTransaction.run")
with warns(
UserWarning,
match="Parallel runtime is only available in Neo4j Enterprise Edition 5.13",
):
async with adb.parallel_read_transaction:
await Coffee.nodes.all()
assert not adb._parallel_runtime
assert not assert_last_query_startswith(
mock_transaction_run, "CYPHER runtime=parallel"
)
Loading
Loading