diff --git a/src/aggregate.rs b/src/aggregate.rs index 35516ee..e994097 100644 --- a/src/aggregate.rs +++ b/src/aggregate.rs @@ -48,11 +48,54 @@ where _marker: PhantomData<(C, S, E, Version, Error)>, } -impl - EventSourcedAggregate +impl EventComputation + for EventSourcedAggregate where Repository: EventRepository, Decider: EventComputation, +{ + /// Computes new events based on the current events and the command. + fn compute_new_events(&self, current_events: &[E], command: &C) -> Vec { + self.decider.compute_new_events(current_events, command) + } +} + +#[async_trait] +impl EventRepository + for EventSourcedAggregate +where + Repository: EventRepository + Sync, + Decider: EventComputation + Sync, + C: Sync, + S: Sync, + E: Sync, + Version: Sync, + Error: Sync, +{ + /// Fetches current events, based on the command. + async fn fetch_events(&self, command: &C) -> Result, Error> { + self.repository.fetch_events(command).await + } + /// Saves events. + async fn save( + &self, + events: &[E], + latest_version: &Option, + ) -> Result, Error> { + self.repository.save(events, latest_version).await + } +} + +impl + EventSourcedAggregate +where + Repository: EventRepository + Sync, + Decider: EventComputation + Sync, + C: Sync, + S: Sync, + E: Sync, + Version: Sync, + Error: Sync, { /// Creates a new instance of [EventSourcedAggregate]. pub fn new(repository: Repository, decider: Decider) -> Self { @@ -64,15 +107,15 @@ where } /// Handles the command by fetching the events from the repository, computing new events based on the current events and the command, and saving the new events to the repository. pub async fn handle(&self, command: &C) -> Result, Error> { - let events: Vec<(E, Version)> = self.repository.fetch_events(command).await?; + let events: Vec<(E, Version)> = self.fetch_events(command).await?; let mut version: Option = None; let mut current_events: Vec = vec![]; for (event, ver) in events { version = Some(ver); current_events.push(event); } - let new_events = self.decider.compute_new_events(¤t_events, command); - let saved_events = self.repository.save(&new_events, &version).await?; + let new_events = self.compute_new_events(¤t_events, command); + let saved_events = self.save(&new_events, &version).await?; Ok(saved_events) } } @@ -117,11 +160,50 @@ where _marker: PhantomData<(C, S, E, Version, Error)>, } -impl - StateStoredAggregate +impl StateComputation + for StateStoredAggregate where Repository: StateRepository, Decider: StateComputation, +{ + /// Computes new state based on the current state and the command. + fn compute_new_state(&self, current_state: Option, command: &C) -> S { + self.decider.compute_new_state(current_state, command) + } +} + +#[async_trait] +impl StateRepository + for StateStoredAggregate +where + Repository: StateRepository + Sync, + Decider: StateComputation + Sync, + C: Sync, + S: Sync, + E: Sync, + Version: Sync, + Error: Sync, +{ + /// Fetches current state, based on the command. + async fn fetch_state(&self, command: &C) -> Result, Error> { + self.repository.fetch_state(command).await + } + /// Saves state. + async fn save(&self, state: &S, version: &Option) -> Result<(S, Version), Error> { + self.repository.save(state, version).await + } +} + +impl + StateStoredAggregate +where + Repository: StateRepository + Sync, + Decider: StateComputation + Sync, + C: Sync, + S: Sync, + E: Sync, + Version: Sync, + Error: Sync, { /// Creates a new instance of [StateStoredAggregate]. pub fn new(repository: Repository, decider: Decider) -> Self { @@ -133,16 +215,16 @@ where } /// Handles the command by fetching the state from the repository, computing new state based on the current state and the command, and saving the new state to the repository. pub async fn handle(&self, command: &C) -> Result<(S, Version), Error> { - let state_version = self.repository.fetch_state(command).await?; + let state_version = self.fetch_state(command).await?; match state_version { None => { - let new_state = self.decider.compute_new_state(None, command); - let saved_state = self.repository.save(&new_state, &None).await?; + let new_state = self.compute_new_state(None, command); + let saved_state = self.save(&new_state, &None).await?; Ok(saved_state) } Some((state, version)) => { - let new_state = self.decider.compute_new_state(Some(state), command); - let saved_state = self.repository.save(&new_state, &Some(version)).await?; + let new_state = self.compute_new_state(Some(state), command); + let saved_state = self.save(&new_state, &Some(version)).await?; Ok(saved_state) } } diff --git a/src/materialized_view.rs b/src/materialized_view.rs index 052d677..d92aed0 100644 --- a/src/materialized_view.rs +++ b/src/materialized_view.rs @@ -41,10 +41,46 @@ where _marker: PhantomData<(S, E, Error)>, } -impl MaterializedView +impl ViewStateComputation + for MaterializedView where Repository: ViewStateRepository, View: ViewStateComputation, +{ + /// Computes new state based on the current state and the events. + fn compute_new_state(&self, current_state: Option, events: &[&E]) -> S { + self.view.compute_new_state(current_state, events) + } +} + +#[async_trait] +impl ViewStateRepository + for MaterializedView +where + Repository: ViewStateRepository + Sync, + View: ViewStateComputation + Sync, + E: Sync, + S: Sync, + Error: Sync, +{ + /// Fetches current state, based on the event. + async fn fetch_state(&self, event: &E) -> Result, Error> { + let state = self.repository.fetch_state(event).await?; + Ok(state) + } + /// Saves the new state. + async fn save(&self, state: &S) -> Result { + self.repository.save(state).await + } +} + +impl MaterializedView +where + Repository: ViewStateRepository + Sync, + View: ViewStateComputation + Sync, + E: Sync, + S: Sync, + Error: Sync, { /// Creates a new instance of [MaterializedView]. pub fn new(repository: Repository, view: View) -> Self { @@ -56,9 +92,9 @@ where } /// Handles the event by fetching the state from the repository, computing new state based on the current state and the event, and saving the new state to the repository. pub async fn handle(&self, event: &E) -> Result { - let state = self.repository.fetch_state(event).await?; - let new_state = self.view.compute_new_state(state, &[event]); - let saved_state = self.repository.save(&new_state).await?; + let state = self.fetch_state(event).await?; + let new_state = self.compute_new_state(state, &[event]); + let saved_state = self.save(&new_state).await?; Ok(saved_state) } } diff --git a/src/saga_manager.rs b/src/saga_manager.rs index 8174b4b..6df76b7 100644 --- a/src/saga_manager.rs +++ b/src/saga_manager.rs @@ -36,10 +36,41 @@ where _marker: PhantomData<(A, AR, Error)>, } -impl SagaManager +impl ActionComputation + for SagaManager where Publisher: ActionPublisher, Saga: ActionComputation, +{ + /// Computes new actions based on the action result. + fn compute_new_actions(&self, action_result: &AR) -> Vec { + self.saga.compute_new_actions(action_result) + } +} + +#[async_trait] +impl ActionPublisher + for SagaManager +where + Publisher: ActionPublisher + Sync, + Saga: ActionComputation + Sync, + A: Sync, + AR: Sync, + Error: Sync, +{ + /// Publishes the action/command to some external system, returning either the actions that are successfully published or error. + async fn publish(&self, action: &[A]) -> Result, Error> { + self.action_publisher.publish(action).await + } +} + +impl SagaManager +where + Publisher: ActionPublisher + Sync, + Saga: ActionComputation + Sync, + A: Sync, + AR: Sync, + Error: Sync, { /// Creates a new instance of [SagaManager]. pub fn new(action_publisher: Publisher, saga: Saga) -> Self { @@ -49,10 +80,13 @@ where _marker: PhantomData, } } - /// Handles the action result by publishing it to the external system. + /// Handles the `action result` by computing new `actions` based on `action result`, and publishing new `actions` to the external system. + /// In most cases: + /// - the `action result` is an `event` that you react, + /// - the `actions` are `commands` that you publish downstream. pub async fn handle(&self, action_result: &AR) -> Result, Error> { - let new_actions = self.saga.compute_new_actions(action_result); - let published_actions = self.action_publisher.publish(&new_actions).await?; + let new_actions = self.compute_new_actions(action_result); + let published_actions = self.publish(&new_actions).await?; Ok(published_actions) } }