Skip to content

Commit

Permalink
Use query parameters wherever possible in Neo4jStore (#330)
Browse files Browse the repository at this point in the history
* Add validation for GufeKey format and characters to prevent Cypher injection

Introduce a validator for GufeKeys to ensure they follow the <prefix>-<token> format.
The validator restricts characters to ASCII letters (A-Za-z), digits (0-9), underscores (_), and hyphens (-).

* Add tests for GufeKey validation

Add tests to verify that GufeKeys are restricted to allowed characters.

* Refactor _query method to use Cypher parameters

Update `_query()` method in Neo4jStore to use Cypher parameters instead of f-strings, reducing the risk of injection attacks.
Also add a test demonstrating how previous versions were vulnerable.

---------

Co-authored-by: Ian Kenney <ianmichaelkenney@gmail.com>
  • Loading branch information
LilDojd and ianmkenney authored Nov 19, 2024
1 parent 4926573 commit 49182dc
Show file tree
Hide file tree
Showing 4 changed files with 332 additions and 213 deletions.
27 changes: 25 additions & 2 deletions alchemiscale/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from pydantic import BaseModel, Field, validator, root_validator
from gufe.tokenization import GufeKey
from re import fullmatch
import unicodedata
import string


class Scope(BaseModel):
Expand Down Expand Up @@ -114,6 +116,9 @@ def specific(self) -> bool:
return all(self.to_tuple())


class InvalidGufeKeyError(ValueError): ...


class ScopedKey(BaseModel):
"""Unique identifier for GufeTokenizables in state store.
Expand All @@ -131,8 +136,26 @@ class Config:
frozen = True

@validator("gufe_key")
def cast_gufe_key(cls, v):
return GufeKey(v)
def gufe_key_validator(cls, v):
v = str(v)

# GufeKey is of form <prefix>-<hex>
try:
_prefix, _token = v.split("-")
except ValueError:
raise InvalidGufeKeyError("gufe_key must be of the form '<prefix>-<hex>'")

# Normalize the input to NFC form
v_normalized = unicodedata.normalize("NFC", v)

# Allowed characters: letters, numbers, underscores, hyphens
allowed_chars = set(string.ascii_letters + string.digits + "_-")

if not set(v_normalized).issubset(allowed_chars):
raise InvalidGufeKeyError("gufe_key contains invalid characters")

# Cast to GufeKey
return GufeKey(v_normalized)

def __repr__(self): # pragma: no cover
return f"<ScopedKey('{str(self)}')>"
Expand Down
Loading

0 comments on commit 49182dc

Please sign in to comment.