Skip to content

Commit

Permalink
tidy and documentation
Browse files Browse the repository at this point in the history
tidy

tidy

tidy and documentation
  • Loading branch information
renatillas committed Aug 4, 2024
1 parent 8549a7f commit 87970af
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 210 deletions.
5 changes: 5 additions & 0 deletions birdie_snapshots/memory_store.accepted
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
version: 1.1.8
title: memory store
---
BankAccount(True, 4.01)
48 changes: 28 additions & 20 deletions src/eventsourcing.gleam
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import gleam/list
import gleam/result

// TYPES ----

/// Used by the EventStore implementations
pub type Aggregate(entity, command, event, error) {
Aggregate(
Expand All @@ -19,6 +21,25 @@ pub type AggregateContext(entity, command, event, error) {
)
}

/// An EventEnvelop is a wrapper around your domain events
/// used by the Event Stores. You can use this type constructor
/// if the event store provides a `load_events` function.
pub type EventEnvelop(event) {
MemoryStoreEventEnvelop(
aggregate_id: AggregateId,
sequence: Int,
payload: event,
)
SerializedEventEnvelop(
aggregate_id: AggregateId,
sequence: Int,
payload: event,
event_type: String,
event_version: String,
aggregate_type: String,
)
}

type AggregateId =
String

Expand All @@ -31,6 +52,9 @@ type Apply(entity, event) =
type Query(event) =
fn(AggregateId, List(EventEnvelop(event))) -> Nil

/// The main record of the library.
/// It holds everything together and serves as a reference point
/// for other functions such as execute, load_aggregate_entity, and load_events.
pub opaque type EventSourcing(
eventstore,
entity,
Expand Down Expand Up @@ -60,6 +84,8 @@ pub type EventStore(eventstore, entity, command, event, error) {
)
}

// CONSTRUCTORS ----

/// Create a new EventSourcing instance providing
/// an Event Store and a list of queries you want
/// run whenever events are commited.
Expand Down Expand Up @@ -134,6 +160,8 @@ pub fn new(event_store, queries) {
EventSourcing(event_store:, queries:)
}

// PUBLIC FUNCTIONS ----

/// The main function of the package.
/// Run execute with your event_sourcing instance and the command you want to apply.
/// It will return a Result with Ok(Nil) or Error(your domain error) if the command failed.
Expand All @@ -156,7 +184,6 @@ pub fn execute(
)
let aggregate = aggregate_context.aggregate
let entity = aggregate.entity

use events <- result.map(aggregate.handle(entity, command))
events |> list.map(aggregate.apply(entity, _))
let commited_events =
Expand All @@ -169,22 +196,3 @@ pub fn execute(
|> list.map(fn(query) { query(aggregate_id, commited_events) })
Nil
}

/// An EventEnvelop is a wrapper around your domain events
/// used by the Event Stores. You can use this type constructor
/// if the event store provides a `load_events` function.
pub type EventEnvelop(event) {
MemoryStoreEventEnvelop(
aggregate_id: AggregateId,
sequence: Int,
payload: event,
)
SerializedEventEnvelop(
aggregate_id: AggregateId,
sequence: Int,
payload: event,
event_type: String,
event_version: String,
aggregate_type: String,
)
}
73 changes: 39 additions & 34 deletions src/eventsourcing/memory_store.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,27 @@ import gleam/otp/actor
import gleam/pair
import gleam/result

type State(event) =
Dict(eventsourcing.AggregateId, List(eventsourcing.EventEnvelop(event)))

pub opaque type MemoryStore(entity, command, event, error) {
MemoryStore(
subject: process.Subject(Message(event)),
empty_aggregate: eventsourcing.Aggregate(entity, command, event, error),
)
}

type State(event) =
Dict(eventsourcing.AggregateId, List(eventsourcing.EventEnvelop(event)))

type Message(event) {
Set(key: String, value: List(eventsourcing.EventEnvelop(event)))
Get(
key: String,
response: process.Subject(
Result(List(eventsourcing.EventEnvelop(event)), Nil),
),
)
}

/// Create a new memory store record.
pub fn new(
emtpy_entity empty_entity: entity,
handle_command_function handle: eventsourcing.Handle(
Expand All @@ -43,37 +54,8 @@ pub fn new(
)
}

type Message(event) {
Set(key: String, value: List(eventsourcing.EventEnvelop(event)))
Get(
key: String,
response: process.Subject(
Result(List(eventsourcing.EventEnvelop(event)), Nil),
),
)
}

fn handle_message(message: Message(event), state: State(event)) {
case message {
Set(key, value) -> {
state |> dict.insert(key, value) |> actor.continue
}
Get(key, response) -> {
let value = state |> dict.get(key)
actor.send(response, value)
actor.continue(state)
}
}
}

fn load_commited_events(
memory_store: MemoryStore(entity, command, event, error),
aggregate_id: eventsourcing.AggregateId,
) {
actor.call(memory_store.subject, Get(aggregate_id, _), 10_000)
|> result.unwrap([])
}

/// Load the currently commited events from the memory store.
/// They are wrapped in a MemoryStoreEventEnvelop variant from the EventEnvelop type.
pub fn load_events(
memory_store: MemoryStore(entity, command, event, error),
aggregate_id: eventsourcing.AggregateId,
Expand All @@ -91,13 +73,36 @@ pub fn load_events(
}
}

/// Load a aggregate based on a aggregate_id.
/// If the aggregate is not present, it returns an emtpy aggregate.
pub fn load_aggregate_entity(
memory_store: MemoryStore(entity, command, event, error),
aggregate_id: eventsourcing.AggregateId,
) -> entity {
load_aggregate(memory_store, aggregate_id).aggregate.entity
}

fn handle_message(message: Message(event), state: State(event)) {
case message {
Set(key, value) -> {
state |> dict.insert(key, value) |> actor.continue
}
Get(key, response) -> {
let value = state |> dict.get(key)
actor.send(response, value)
actor.continue(state)
}
}
}

fn load_commited_events(
memory_store: MemoryStore(entity, command, event, error),
aggregate_id: eventsourcing.AggregateId,
) {
actor.call(memory_store.subject, Get(aggregate_id, _), 10_000)
|> result.unwrap([])
}

fn load_aggregate(
memory_store: MemoryStore(entity, command, event, error),
aggregate_id: eventsourcing.AggregateId,
Expand Down
120 changes: 76 additions & 44 deletions src/eventsourcing/postgres_store.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,37 @@ import gleam/pair
import gleam/pgo
import gleam/result

// CONSTANTS ----

const insert_event_query = "
INSERT INTO event
(aggregate_type, aggregate_id, sequence, event_type, event_version, payload)
VALUES
($1, $2, $3, $4, $5, $6)
"

const select_events_query = "
SELECT aggregate_type, aggregate_id, sequence, event_type, event_version, payload
FROM event
WHERE aggregate_type = $1 AND aggregate_id = $2
ORDER BY sequence
"

const create_event_table_query = "
CREATE TABLE IF NOT EXISTS event
(
aggregate_type text NOT NULL,
aggregate_id text NOT NULL,
sequence bigint CHECK (sequence >= 0) NOT NULL,
event_type text NOT NULL,
event_version text NOT NULL,
payload text NOT NULL,
PRIMARY KEY (aggregate_type, aggregate_id, sequence)
);
"

// TYPES ----

pub opaque type PostgresStore(entity, command, event, error) {
PostgresStore(
db: pgo.Connection,
Expand All @@ -19,6 +50,8 @@ pub opaque type PostgresStore(entity, command, event, error) {
)
}

// CONSTRUCTORS ----

pub fn new(
pgo_config pgo_config: pgo.Config,
emtpy_entity empty_entity: entity,
Expand All @@ -35,6 +68,12 @@ pub fn new(
event_type event_type: String,
event_version event_version: String,
aggregate_type aggregate_type: String,
) -> eventsourcing.EventStore(
PostgresStore(entity, command, event, error),
entity,
command,
event,
error,
) {
let db = pgo.connect(pgo_config)

Expand All @@ -56,43 +95,30 @@ pub fn new(
)
}

pub fn load_aggregate_entity(
pub fn create_event_table(
postgres_store: PostgresStore(entity, command, event, error),
aggregate_id: eventsourcing.AggregateId,
) -> entity {
load_aggregate(postgres_store, aggregate_id).aggregate.entity
) -> Result(pgo.Returned(dynamic.Dynamic), pgo.QueryError) {
pgo.execute(
query: create_event_table_query,
on: postgres_store.db,
with: [],
expecting: dynamic.dynamic,
)
}

fn load_aggregate(
pub fn load_aggregate_entity(
postgres_store: PostgresStore(entity, command, event, error),
aggregate_id: eventsourcing.AggregateId,
) -> eventsourcing.AggregateContext(entity, command, event, error) {
let assert Ok(commited_events) = load_events(postgres_store, aggregate_id)

let #(aggregate, sequence) =
list.fold(
over: commited_events,
from: #(postgres_store.empty_aggregate, 0),
with: fn(aggregate_and_sequence, event_envelop) {
let #(aggregate, _) = aggregate_and_sequence
#(
eventsourcing.Aggregate(
..aggregate,
entity: aggregate.apply(aggregate.entity, event_envelop.payload),
),
event_envelop.sequence,
)
},
)
eventsourcing.AggregateContext(aggregate_id:, aggregate:, sequence:)
) -> entity {
load_aggregate(postgres_store, aggregate_id).aggregate.entity
}

pub fn load_events(
postgres_store: PostgresStore(entity, command, event, error),
aggregate_id: eventsourcing.AggregateId,
) -> Result(List(eventsourcing.EventEnvelop(event)), pgo.QueryError) {
use resulted <- result.map(pgo.execute(
select_events(),
select_events_query,
on: postgres_store.db,
with: [pgo.text(postgres_store.aggregate_type), pgo.text(aggregate_id)],
expecting: dynamic.decode6(
Expand All @@ -112,6 +138,30 @@ pub fn load_events(
resulted.rows
}

fn load_aggregate(
postgres_store: PostgresStore(entity, command, event, error),
aggregate_id: eventsourcing.AggregateId,
) -> eventsourcing.AggregateContext(entity, command, event, error) {
let assert Ok(commited_events) = load_events(postgres_store, aggregate_id)

let #(aggregate, sequence) =
list.fold(
over: commited_events,
from: #(postgres_store.empty_aggregate, 0),
with: fn(aggregate_and_sequence, event_envelop) {
let #(aggregate, _) = aggregate_and_sequence
#(
eventsourcing.Aggregate(
..aggregate,
entity: aggregate.apply(aggregate.entity, event_envelop.payload),
),
event_envelop.sequence,
)
},
)
eventsourcing.AggregateContext(aggregate_id:, aggregate:, sequence:)
}

fn commit(
postgres_store: PostgresStore(entity, command, event, error),
context: eventsourcing.AggregateContext(entity, command, event, error),
Expand Down Expand Up @@ -174,7 +224,7 @@ fn persist_events(
) = event

pgo.execute(
query: insert_event(),
query: insert_event_query,
on: postgres_store.db,
with: [
pgo.text(aggregate_type),
Expand All @@ -188,21 +238,3 @@ fn persist_events(
)
})
}

fn insert_event() {
"
INSERT INTO event
(aggregate_type, aggregate_id, sequence, event_type, event_version, payload)
VALUES
($1, $2, $3, $4, $5, $6)
"
}

fn select_events() {
"
SELECT aggregate_type, aggregate_id, sequence, event_type, event_version, payload
FROM event
WHERE aggregate_type = $1 AND aggregate_id = $2
ORDER BY sequence
"
}
Loading

0 comments on commit 87970af

Please sign in to comment.