Skip to content

Commit

Permalink
Merge pull request #16 from fraktalio/feature/delegation
Browse files Browse the repository at this point in the history
implementing delegation pattern on the application layer components.
  • Loading branch information
idugalic authored Dec 14, 2023
2 parents 1eb073e + 7085ea2 commit 1231bcd
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 20 deletions.
106 changes: 94 additions & 12 deletions src/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,54 @@ where
_marker: PhantomData<(C, S, E, Version, Error)>,
}

impl<C, S, E, Repository, Decider, Version, Error>
EventSourcedAggregate<C, S, E, Repository, Decider, Version, Error>
impl<C, S, E, Repository, Decider, Version, Error> EventComputation<C, S, E>
for EventSourcedAggregate<C, S, E, Repository, Decider, Version, Error>
where
Repository: EventRepository<C, E, Version, Error>,
Decider: EventComputation<C, S, E>,
{
/// Computes new events based on the current events and the command.
fn compute_new_events(&self, current_events: &[E], command: &C) -> Vec<E> {
self.decider.compute_new_events(current_events, command)
}
}

#[async_trait]
impl<C, S, E, Repository, Decider, Version, Error> EventRepository<C, E, Version, Error>
for EventSourcedAggregate<C, S, E, Repository, Decider, Version, Error>
where
Repository: EventRepository<C, E, Version, Error> + Sync,
Decider: EventComputation<C, S, E> + Sync,
C: Sync,
S: Sync,
E: Sync,
Version: Sync,
Error: Sync,
{
/// Fetches current events, based on the command.
async fn fetch_events(&self, command: &C) -> Result<Vec<(E, Version)>, Error> {
self.repository.fetch_events(command).await
}
/// Saves events.
async fn save(
&self,
events: &[E],
latest_version: &Option<Version>,
) -> Result<Vec<(E, Version)>, Error> {
self.repository.save(events, latest_version).await
}
}

impl<C, S, E, Repository, Decider, Version, Error>
EventSourcedAggregate<C, S, E, Repository, Decider, Version, Error>
where
Repository: EventRepository<C, E, Version, Error> + Sync,
Decider: EventComputation<C, S, E> + Sync,
C: Sync,
S: Sync,
E: Sync,
Version: Sync,
Error: Sync,
{
/// Creates a new instance of [EventSourcedAggregate].
pub fn new(repository: Repository, decider: Decider) -> Self {
Expand All @@ -64,15 +107,15 @@ where
}
/// Handles the command by fetching the events from the repository, computing new events based on the current events and the command, and saving the new events to the repository.
pub async fn handle(&self, command: &C) -> Result<Vec<(E, Version)>, Error> {
let events: Vec<(E, Version)> = self.repository.fetch_events(command).await?;
let events: Vec<(E, Version)> = self.fetch_events(command).await?;
let mut version: Option<Version> = None;
let mut current_events: Vec<E> = vec![];
for (event, ver) in events {
version = Some(ver);
current_events.push(event);
}
let new_events = self.decider.compute_new_events(&current_events, command);
let saved_events = self.repository.save(&new_events, &version).await?;
let new_events = self.compute_new_events(&current_events, command);
let saved_events = self.save(&new_events, &version).await?;
Ok(saved_events)
}
}
Expand Down Expand Up @@ -117,11 +160,50 @@ where
_marker: PhantomData<(C, S, E, Version, Error)>,
}

impl<C, S, E, Repository, Decider, Version, Error>
StateStoredAggregate<C, S, E, Repository, Decider, Version, Error>
impl<C, S, E, Repository, Decider, Version, Error> StateComputation<C, S, E>
for StateStoredAggregate<C, S, E, Repository, Decider, Version, Error>
where
Repository: StateRepository<C, S, Version, Error>,
Decider: StateComputation<C, S, E>,
{
/// Computes new state based on the current state and the command.
fn compute_new_state(&self, current_state: Option<S>, command: &C) -> S {
self.decider.compute_new_state(current_state, command)
}
}

#[async_trait]
impl<C, S, E, Repository, Decider, Version, Error> StateRepository<C, S, Version, Error>
for StateStoredAggregate<C, S, E, Repository, Decider, Version, Error>
where
Repository: StateRepository<C, S, Version, Error> + Sync,
Decider: StateComputation<C, S, E> + Sync,
C: Sync,
S: Sync,
E: Sync,
Version: Sync,
Error: Sync,
{
/// Fetches current state, based on the command.
async fn fetch_state(&self, command: &C) -> Result<Option<(S, Version)>, Error> {
self.repository.fetch_state(command).await
}
/// Saves state.
async fn save(&self, state: &S, version: &Option<Version>) -> Result<(S, Version), Error> {
self.repository.save(state, version).await
}
}

impl<C, S, E, Repository, Decider, Version, Error>
StateStoredAggregate<C, S, E, Repository, Decider, Version, Error>
where
Repository: StateRepository<C, S, Version, Error> + Sync,
Decider: StateComputation<C, S, E> + Sync,
C: Sync,
S: Sync,
E: Sync,
Version: Sync,
Error: Sync,
{
/// Creates a new instance of [StateStoredAggregate].
pub fn new(repository: Repository, decider: Decider) -> Self {
Expand All @@ -133,16 +215,16 @@ where
}
/// Handles the command by fetching the state from the repository, computing new state based on the current state and the command, and saving the new state to the repository.
pub async fn handle(&self, command: &C) -> Result<(S, Version), Error> {
let state_version = self.repository.fetch_state(command).await?;
let state_version = self.fetch_state(command).await?;
match state_version {
None => {
let new_state = self.decider.compute_new_state(None, command);
let saved_state = self.repository.save(&new_state, &None).await?;
let new_state = self.compute_new_state(None, command);
let saved_state = self.save(&new_state, &None).await?;
Ok(saved_state)
}
Some((state, version)) => {
let new_state = self.decider.compute_new_state(Some(state), command);
let saved_state = self.repository.save(&new_state, &Some(version)).await?;
let new_state = self.compute_new_state(Some(state), command);
let saved_state = self.save(&new_state, &Some(version)).await?;
Ok(saved_state)
}
}
Expand Down
44 changes: 40 additions & 4 deletions src/materialized_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,46 @@ where
_marker: PhantomData<(S, E, Error)>,
}

impl<S, E, Repository, View, Error> MaterializedView<S, E, Repository, View, Error>
impl<S, E, Repository, View, Error> ViewStateComputation<E, S>
for MaterializedView<S, E, Repository, View, Error>
where
Repository: ViewStateRepository<E, S, Error>,
View: ViewStateComputation<E, S>,
{
/// Computes new state based on the current state and the events.
fn compute_new_state(&self, current_state: Option<S>, events: &[&E]) -> S {
self.view.compute_new_state(current_state, events)
}
}

#[async_trait]
impl<S, E, Repository, View, Error> ViewStateRepository<E, S, Error>
for MaterializedView<S, E, Repository, View, Error>
where
Repository: ViewStateRepository<E, S, Error> + Sync,
View: ViewStateComputation<E, S> + Sync,
E: Sync,
S: Sync,
Error: Sync,
{
/// Fetches current state, based on the event.
async fn fetch_state(&self, event: &E) -> Result<Option<S>, Error> {
let state = self.repository.fetch_state(event).await?;
Ok(state)
}
/// Saves the new state.
async fn save(&self, state: &S) -> Result<S, Error> {
self.repository.save(state).await
}
}

impl<S, E, Repository, View, Error> MaterializedView<S, E, Repository, View, Error>
where
Repository: ViewStateRepository<E, S, Error> + Sync,
View: ViewStateComputation<E, S> + Sync,
E: Sync,
S: Sync,
Error: Sync,
{
/// Creates a new instance of [MaterializedView].
pub fn new(repository: Repository, view: View) -> Self {
Expand All @@ -56,9 +92,9 @@ where
}
/// Handles the event by fetching the state from the repository, computing new state based on the current state and the event, and saving the new state to the repository.
pub async fn handle(&self, event: &E) -> Result<S, Error> {
let state = self.repository.fetch_state(event).await?;
let new_state = self.view.compute_new_state(state, &[event]);
let saved_state = self.repository.save(&new_state).await?;
let state = self.fetch_state(event).await?;
let new_state = self.compute_new_state(state, &[event]);
let saved_state = self.save(&new_state).await?;
Ok(saved_state)
}
}
42 changes: 38 additions & 4 deletions src/saga_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,41 @@ where
_marker: PhantomData<(A, AR, Error)>,
}

impl<A, AR, Publisher, Saga, Error> SagaManager<A, AR, Publisher, Saga, Error>
impl<A, AR, Publisher, Saga, Error> ActionComputation<AR, A>
for SagaManager<A, AR, Publisher, Saga, Error>
where
Publisher: ActionPublisher<A, Error>,
Saga: ActionComputation<AR, A>,
{
/// Computes new actions based on the action result.
fn compute_new_actions(&self, action_result: &AR) -> Vec<A> {
self.saga.compute_new_actions(action_result)
}
}

#[async_trait]
impl<A, AR, Publisher, Saga, Error> ActionPublisher<A, Error>
for SagaManager<A, AR, Publisher, Saga, Error>
where
Publisher: ActionPublisher<A, Error> + Sync,
Saga: ActionComputation<AR, A> + Sync,
A: Sync,
AR: Sync,
Error: Sync,
{
/// Publishes the action/command to some external system, returning either the actions that are successfully published or error.
async fn publish(&self, action: &[A]) -> Result<Vec<A>, Error> {
self.action_publisher.publish(action).await
}
}

impl<A, AR, Publisher, Saga, Error> SagaManager<A, AR, Publisher, Saga, Error>
where
Publisher: ActionPublisher<A, Error> + Sync,
Saga: ActionComputation<AR, A> + Sync,
A: Sync,
AR: Sync,
Error: Sync,
{
/// Creates a new instance of [SagaManager].
pub fn new(action_publisher: Publisher, saga: Saga) -> Self {
Expand All @@ -49,10 +80,13 @@ where
_marker: PhantomData,
}
}
/// Handles the action result by publishing it to the external system.
/// Handles the `action result` by computing new `actions` based on `action result`, and publishing new `actions` to the external system.
/// In most cases:
/// - the `action result` is an `event` that you react,
/// - the `actions` are `commands` that you publish downstream.
pub async fn handle(&self, action_result: &AR) -> Result<Vec<A>, Error> {
let new_actions = self.saga.compute_new_actions(action_result);
let published_actions = self.action_publisher.publish(&new_actions).await?;
let new_actions = self.compute_new_actions(action_result);
let published_actions = self.publish(&new_actions).await?;
Ok(published_actions)
}
}

0 comments on commit 1231bcd

Please sign in to comment.