Skip to content

Commit

Permalink
Merge pull request #38 from piercefreeman/feature/for-update-tests
Browse files Browse the repository at this point in the history
Add db-level tests for updates
  • Loading branch information
piercefreeman authored Dec 20, 2024
2 parents c28eb5a + 6f0d898 commit f26f001
Showing 1 changed file with 144 additions and 0 deletions.
144 changes: 144 additions & 0 deletions iceaxe/__tests__/test_session.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from enum import StrEnum

import asyncpg
import pytest

from iceaxe.__tests__.conf_models import ArtifactDemo, ComplexDemo, UserDemo
Expand Down Expand Up @@ -763,3 +764,146 @@ async def test_upsert_multiple_conflict_fields(db_connection: DBConnection):
assert result is not None
assert len(result) == 2
assert {r[1] for r in result} == {"john@example.com", "jane@example.com"}


@pytest.mark.asyncio
async def test_for_update_prevents_concurrent_modification(db_connection: DBConnection):
"""
Test that FOR UPDATE actually locks the row for concurrent modifications.
"""
# Create initial user
user = UserDemo(name="John Doe", email="john@example.com")
await db_connection.insert([user])

async with db_connection.transaction():
# Lock the row with FOR UPDATE
[locked_user] = await db_connection.exec(
QueryBuilder().select(UserDemo).where(UserDemo.id == user.id).for_update()
)
assert locked_user.name == "John Doe"

# Try to update from another connection - this should block
# until our transaction is done
other_conn = DBConnection(
await asyncpg.connect(
host="localhost",
port=5438,
user="iceaxe",
password="mysecretpassword",
database="iceaxe_test_db",
)
)
try:
with pytest.raises(asyncpg.exceptions.LockNotAvailableError):
# This should raise an error since we're using NOWAIT
await other_conn.exec(
QueryBuilder()
.select(UserDemo)
.where(UserDemo.id == user.id)
.for_update(nowait=True)
)
finally:
await other_conn.conn.close()


@pytest.mark.asyncio
async def test_for_update_skip_locked(db_connection: DBConnection):
"""
Test that SKIP LOCKED works as expected.
"""
# Create test users
users = [
UserDemo(name="User 1", email="user1@example.com"),
UserDemo(name="User 2", email="user2@example.com"),
]
await db_connection.insert(users)

async with db_connection.transaction():
# Lock the first user
[locked_user] = await db_connection.exec(
QueryBuilder()
.select(UserDemo)
.where(UserDemo.id == users[0].id)
.for_update()
)
assert locked_user.name == "User 1"

# From another connection, try to select both users with SKIP LOCKED
other_conn = DBConnection(
await asyncpg.connect(
host="localhost",
port=5438,
user="iceaxe",
password="mysecretpassword",
database="iceaxe_test_db",
)
)
try:
# This should only return User 2 since User 1 is locked
result = await other_conn.exec(
QueryBuilder()
.select(UserDemo)
.order_by(UserDemo.id, "ASC")
.for_update(skip_locked=True)
)
assert len(result) == 1
assert result[0].name == "User 2"
finally:
await other_conn.conn.close()


@pytest.mark.asyncio
async def test_for_update_of_with_join(db_connection: DBConnection):
"""
Test FOR UPDATE OF with JOINed tables.
"""
# Create test data
user = UserDemo(name="John Doe", email="john@example.com")
await db_connection.insert([user])

artifact = ArtifactDemo(title="Test Artifact", user_id=user.id)
await db_connection.insert([artifact])

async with db_connection.transaction():
# Lock only the artifacts table in a join query
[(selected_artifact, selected_user)] = await db_connection.exec(
QueryBuilder()
.select((ArtifactDemo, UserDemo))
.join(UserDemo, UserDemo.id == ArtifactDemo.user_id)
.for_update(of=(ArtifactDemo,))
)
assert selected_artifact.title == "Test Artifact"
assert selected_user.name == "John Doe"

# In another connection, we should be able to lock the user
# but not the artifact
other_conn = DBConnection(
await asyncpg.connect(
host="localhost",
port=5438,
user="iceaxe",
password="mysecretpassword",
database="iceaxe_test_db",
)
)
try:
# Should succeed since user table isn't locked
[other_user] = await other_conn.exec(
QueryBuilder()
.select(UserDemo)
.where(UserDemo.id == user.id)
.for_update(nowait=True)
)
assert other_user.name == "John Doe"

# Should fail since artifact table is locked
with pytest.raises(asyncpg.exceptions.LockNotAvailableError):
await other_conn.exec(
QueryBuilder()
.select(ArtifactDemo)
.where(ArtifactDemo.id == artifact.id)
.for_update(nowait=True)
)
pytest.fail("Should have raised an error")
finally:
await other_conn.conn.close()

0 comments on commit f26f001

Please sign in to comment.