Skip to content

Commit

Permalink
Added example "content management system".
Browse files Browse the repository at this point in the history
  • Loading branch information
johnbywater committed Oct 16, 2023
1 parent befbc46 commit 7d61ae2
Show file tree
Hide file tree
Showing 21 changed files with 716 additions and 224 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ create_postgres_db:
.PHONY: updatetools
updatetools:
pip install -U pip
pip install -U black mypy flake8 flake8-bugbear isort orjson python-coveralls coverage
pip install -U black mypy flake8 flake8-bugbear isort python-coveralls coverage orjson pydantic

.PHONY: docs
docs:
Expand Down
8 changes: 8 additions & 0 deletions docs/topics/examples.rst
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,11 @@ Example applications
examples/content-management
examples/searchable-timestamps
examples/searchable-content

Example systems
===============

.. toctree::
:maxdepth: 2

examples/content-management-system
89 changes: 89 additions & 0 deletions docs/topics/examples/content-management-system.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
System 1 - Content management system
====================================

In this example, event notifications from the ``ContentManagementApplication`` from
:doc:`/topics/examples/content-management` are processed and projected into an
eventually-consistent full text search index, a searchable "materialized view" of
the pages' body text just like :doc:`/topics/examples/searchable-content`.

This is an example of CQRS. By separating the search engine "read model" from the content management
"write model", the commands that update pages will perform faster. But, more importantly, the search
engine can be redesigned and rebuilt by reprocessing those events. The projected searchable content
can be deleted and rebuilt, perhaps also to include page titles, or timestamps, or other information
contained in the domain events such as the authors, because it is updated by processing events.
This is the main advantage of "CQRS" over the "inline" technique used in :doc:`/topics/examples/searchable-content`
where the search index is simply updated whenever new events are recorded. Please note, it is possible
to migrate from the "inline" technique to CQRS, by adding the downstream processing and then removing
the inline updating, since the domain model is already event sourced. Similarly, other projections
can be added to work alongside and concurrently with the updating of the search engine.

Application
-----------

The ``SearchIndexApplication`` defined below is a :class:`~eventsourcing.system.ProcessApplication`.
Its ``policy()`` function is coded to process the ``Page.Created`` and ``Page.BodyUpdated`` domain
events of the ``ContentManagementApplication``. It also has a ``search()`` method that returns
a list of page IDs.

It that works in a similar way to the ``SearchableContentApplication`` class in
:doc:`/topics/examples/searchable-content`, by setting variable keyword arguments
``insert_pages`` and ``update_pages``. However, rather than populating variable
keyword arguments in the ``save()`` method, it populates ``insert_pages`` and ``update_pages``
within its ``policy()`` function. The ``insert_pages`` and ``update_pages`` arguments are set
on the :class:`~eventsourcing.application.ProcessingEvent` object passed into the ``policy()``
function, which carries an event notification ID that indicates the position
in the application sequence of the domain event that is being processed.

The application will be configured to run with a custom :class:`~eventsourcing.persistence.ProcessRecorder`
so that search index records will be updated atomically with the inserting of a tracking record which
indicates which upstream event notification has been processed.

Because the ``Page.BodyUpdated`` event carries only the ``diff`` of the page body, the
``policy()`` function must first select the current page body from its own records
and then apply the diff as a patch. The "exactly once" semantics provided by the library's
system module guarantees that the diffs will always be applied in the correct order. Without
this guarantee, the projection could become inconsistent, with the consequence that the diffs
will fail to be applied.

.. literalinclude:: ../../../eventsourcing/examples/contentmanagementsystem/application.py

System
------

A :class:`~eventsourcing.system.System` of applications is defined, in which the
``SearchIndexApplication`` follows the ``ContentManagementApplication``. This system
can then be used in any :class:`~eventsourcing.system.Runner`.

.. literalinclude:: ../../../eventsourcing/examples/contentmanagementsystem/system.py

PostgreSQL
----------

The ``PostgresSearchableContentRecorder`` from :doc:`/topics/examples/searchable-content`
is used to define a custom :class:`~eventsourcing.persistence.ProcessRecorder` for PostgreSQL.
The PostgreSQL :class:`~eventsourcing.postgres.Factory` class is extended to involve this custom recorder
in a custom persistence module so that it can be used by the ``SearchIndexApplication``.


.. literalinclude:: ../../../eventsourcing/examples/contentmanagementsystem/postgres.py

SQLite
------

The ``SqliteSearchableContentRecorder`` from :doc:`/topics/examples/searchable-content`
is used to define a custom :class:`~eventsourcing.persistence.ProcessRecorder` for SQLite.
The SQLite :class:`~eventsourcing.sqlite.Factory` class is extended to involve this custom recorder
in a custom persistence module so that it can be used by the ``SearchIndexApplication``.

.. literalinclude:: ../../../eventsourcing/examples/contentmanagementsystem/sqlite.py


Test case
---------

The test case ``ContentManagementSystemTestCase`` creates three pages, for 'animals', 'plants'
and 'minerals'. Content is added to the pages. The content is searched with various queries and
the search results are checked. The test is executed twice, once with the application configured
for both PostgreSQL, and once for SQLite.

.. literalinclude:: ../../../eventsourcing/examples/contentmanagementsystem/test_system.py
47 changes: 22 additions & 25 deletions docs/topics/examples/searchable-content.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,60 +11,57 @@ to support full text search queries in an event-sourced application with both
Application
-----------

The application class ``SearchableContentApplication`` extends the ``WikiApplication``
class presented in the :doc:`content management example </topics/examples/content-management>`.
It extends the :func:`~eventsourcing.application.Application.save` method by using the variable keyword parameters (``**kwargs``)
of the application :func:`~eventsourcing.application.Application.save` method to pass down to the recorder extra
information that will be used to update a searchable index of the event-sourced
content. It also introduces a ``search()`` method that expects a ``query``
argument and returns a list of pages.
The application class ``SearchableContentApplication`` extends the ``ContentManagementApplication``
class presented in :doc:`/topics/examples/content-management`.
Its :func:`~eventsourcing.application.Application.save` method sets the variable keyword
parameters ``insert_pages`` and ``update_pages``. It also introduces a ``search()`` method that
expects a ``query`` argument and returns a list of pages. The application's recorders are expected
to be receptive to these variable keyword parameters and to support the ``search_pages()`` function.

.. literalinclude:: ../../../eventsourcing/examples/searchablecontent/application.py


Persistence
-----------

The recorder classes ``SearchableContentApplicationRecorder`` extend the PostgreSQL
and SQLite ``ApplicationRecorder`` classes by creating a table that contains the current
page body text. They define SQL statements that insert, update, and search the rows
of the table using search query syntax similar to the one used by web search engines.
They define a ``search_page_bodies()`` method which returns the page slugs for page
bodies that match the given search query.
The recorder class ``SearchableContentRecorder`` extends the ``AggregateRecorder`` by
defining abstract methods to search and select pages. These methods will be implemented
for both PostgreSQL and SQLite, which will also create custom tables for page content with
a full text search indexes.

.. literalinclude:: ../../../eventsourcing/examples/searchablecontent/persistence.py

The application recorder classes extend the ``_insert_events()`` method by inserting
and updating rows, according to the information passed down from the application
through the :func:`~eventsourcing.application.Application.save` method's variable keyword parameters.

The infrastructure factory classes ``SearchableContentInfrastructureFactory`` extend the
PostgreSQL and SQLite ``Factory`` class by overriding the ``application_recorder()`` method
so that a ``SearchableContentApplicationRecorder`` is constructed as the application recorder.
The ``_insert_events()`` methods of the PostgreSQL and SQLite recorders are extended, so that
rows are inserted and updated, according to the information passed down from the application
in the variable keyword arguments ``insert_pages`` and ``update_pages``.


PostgreSQL
----------

The PostgreSQL recorder uses a GIN index and the ``websearch_to_tsquery()`` function.
The PostgreSQL :class:`~eventsourcing.postgres.Factory` class is extended to involve this custom recorder
in a custom PostgreSQL persistence module so that it can be used by the ``ContentManagementApplication``.

.. literalinclude:: ../../../eventsourcing/examples/searchablecontent/postgres.py


SQLite
------

The SQLite recorder uses a virtual table and the ``MATCH`` operator.
The SQLite :class:`~eventsourcing.sqlite.Factory` class is extended to involve this custom recorder
in a custom SQLite persistence module so that it can be used by the ``ContentManagementApplication``.

.. literalinclude:: ../../../eventsourcing/examples/searchablecontent/sqlite.py


Test case
---------

The test case ``SearchableContentTestCase`` uses the application to create three
pages, for 'animals', 'plants' and 'minerals'. Content is added to the pages. The
The test case ``SearchableContentApplicationTestCase`` uses the ``SearchableContentApplication`` to
create three pages, for 'animals', 'plants' and 'minerals'. Content is added to the pages. The
content is searched with various queries and the search results are checked. The
test is executed twice, with the application configured for both PostgreSQL and SQLite.
test case is executed twice, once with the PostgreSQL persistence module, and once with the
SQLite persistence module.

.. literalinclude:: ../../../eventsourcing/examples/searchablecontent/test_searchablecontent.py
.. literalinclude:: ../../../eventsourcing/examples/searchablecontent/test_application.py
6 changes: 5 additions & 1 deletion eventsourcing/examples/contentmanagement/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@ def create_page(self, title: str, slug: str) -> None:
index_entry = Index(slug, ref=page.id)
self.save(page, page_logged, index_entry)

def get_page_details(self, slug: str) -> PageDetailsType:
def get_page_by_slug(self, slug: str) -> PageDetailsType:
page = self._get_page_by_slug(slug)
return self._details_from_page(page)

def get_page_by_id(self, page_id: UUID) -> PageDetailsType:
page = self._get_page_by_id(page_id)
return self._details_from_page(page)

def _details_from_page(self, page: Page) -> PageDetailsType:
return {
"title": page.title,
Expand Down
22 changes: 15 additions & 7 deletions eventsourcing/examples/contentmanagement/domainmodel.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,24 @@
user_id_cvar: ContextVar[Optional[UUID]] = ContextVar("user_id", default=None)


@dataclass
class Page(Aggregate):
title: str
slug: str
body: str = ""
modified_by: Optional[UUID] = field(default=None, init=False)

class Event(Aggregate.Event):
user_id: Optional[UUID] = field(default_factory=user_id_cvar.get, init=False)

def apply(self, aggregate: Aggregate) -> None:
cast(Page, aggregate).modified_by = self.user_id

class Created(Event, Aggregate.Created):
title: str
slug: str
body: str

def __init__(self, title: str, slug: str, body: str = ""):
self.title = title
self.slug = slug
self.body = body
self.modified_by: Optional[UUID] = None

@event("SlugUpdated")
def update_slug(self, slug: str) -> None:
self.slug = slug
Expand All @@ -35,7 +40,10 @@ def update_title(self, title: str) -> None:
def update_body(self, body: str) -> None:
self._update_body(create_diff(old=self.body, new=body))

@event("BodyUpdated")
class BodyUpdated(Event):
diff: str

@event(BodyUpdated)
def _update_body(self, diff: str) -> None:
self.body = apply_patch(old=self.body, diff=diff)

Expand Down
20 changes: 10 additions & 10 deletions eventsourcing/examples/contentmanagement/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def test(self) -> None:

# Check the page doesn't exist.
with self.assertRaises(PageNotFound):
app.get_page_details(slug="welcome")
app.get_page_by_slug(slug="welcome")

# Check the list of pages is empty.
pages = list(app.get_pages())
Expand All @@ -36,7 +36,7 @@ def test(self) -> None:
app.create_page(title="Welcome", slug="welcome")

# Present page identified by the given slug.
page = app.get_page_details(slug="welcome")
page = app.get_page_by_slug(slug="welcome")

# Check we got a dict that has the given title and slug.
self.assertEqual(page["title"], "Welcome")
Expand All @@ -48,7 +48,7 @@ def test(self) -> None:
app.update_title(slug="welcome", title="Welcome Visitors")

# Check the title was updated.
page = app.get_page_details(slug="welcome")
page = app.get_page_by_slug(slug="welcome")
self.assertEqual(page["title"], "Welcome Visitors")
self.assertEqual(page["modified_by"], user_id)

Expand All @@ -57,25 +57,25 @@ def test(self) -> None:

# Check the index was updated.
with self.assertRaises(PageNotFound):
app.get_page_details(slug="welcome")
app.get_page_by_slug(slug="welcome")

# Check we can get the page by the new slug.
page = app.get_page_details(slug="welcome-visitors")
page = app.get_page_by_slug(slug="welcome-visitors")
self.assertEqual(page["title"], "Welcome Visitors")
self.assertEqual(page["slug"], "welcome-visitors")

# Update the body.
app.update_body(slug="welcome-visitors", body="Welcome to my wiki")

# Check the body was updated.
page = app.get_page_details(slug="welcome-visitors")
page = app.get_page_by_slug(slug="welcome-visitors")
self.assertEqual(page["body"], "Welcome to my wiki")

# Update the body.
app.update_body(slug="welcome-visitors", body="Welcome to this wiki")

# Check the body was updated.
page = app.get_page_details(slug="welcome-visitors")
page = app.get_page_by_slug(slug="welcome-visitors")
self.assertEqual(page["body"], "Welcome to this wiki")

# Update the body.
Expand All @@ -89,7 +89,7 @@ def test(self) -> None:
)

# Check the body was updated.
page = app.get_page_details(slug="welcome-visitors")
page = app.get_page_by_slug(slug="welcome-visitors")
self.assertEqual(
page["body"],
"""
Expand Down Expand Up @@ -120,7 +120,7 @@ def test(self) -> None:
)

# Check 'modified_by' changed.
page = app.get_page_details(slug="welcome-visitors")
page = app.get_page_by_slug(slug="welcome-visitors")
self.assertEqual(page["title"], "Welcome Visitors")
self.assertEqual(page["modified_by"], user_id)

Expand Down Expand Up @@ -173,6 +173,6 @@ def test(self) -> None:
# that was previously being used.
app.update_slug("welcome-visitors", "welcome")

page = app.get_page_details(slug="welcome")
page = app.get_page_by_slug(slug="welcome")
self.assertEqual(page["title"], "Welcome Visitors")
self.assertEqual(page["modified_by"], user_id)
Empty file.
49 changes: 49 additions & 0 deletions eventsourcing/examples/contentmanagementsystem/application.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from typing import List, cast
from uuid import UUID

from eventsourcing.application import ProcessingEvent
from eventsourcing.domain import DomainEventProtocol
from eventsourcing.examples.contentmanagement.domainmodel import Page
from eventsourcing.examples.contentmanagement.utils import apply_patch
from eventsourcing.examples.searchablecontent.persistence import (
SearchableContentRecorder,
)
from eventsourcing.system import ProcessApplication


class SearchIndexApplication(ProcessApplication):
env = {
"COMPRESSOR_TOPIC": "gzip",
}

def policy(
self,
domain_event: DomainEventProtocol,
processing_event: ProcessingEvent,
) -> None:
if isinstance(domain_event, Page.Created):
processing_event.saved_kwargs["insert_pages"] = [
(
domain_event.originator_id,
domain_event.slug,
domain_event.title,
domain_event.body,
)
]
elif isinstance(domain_event, Page.BodyUpdated):
recorder = cast(SearchableContentRecorder, self.recorder)
page_id = domain_event.originator_id
page_slug, page_title, page_body = recorder.select_page(page_id)
page_body = apply_patch(page_body, domain_event.diff)
processing_event.saved_kwargs["update_pages"] = [
(
page_id,
page_slug,
page_title,
page_body,
)
]

def search(self, query: str) -> List[UUID]:
recorder = cast(SearchableContentRecorder, self.recorder)
return recorder.search_pages(query)
Loading

0 comments on commit 7d61ae2

Please sign in to comment.