From d877767aee3073791708888a20fedab0b98a34b4 Mon Sep 17 00:00:00 2001 From: Maksim Shcherbo Date: Mon, 3 Apr 2023 00:54:32 +0100 Subject: [PATCH] refactor aggregate package (#9) * rename advanced aggregate to EventSourced * change aggregate factory to get rid of extra dependencies * update tests with better examples and test case names * provide excessive documentation and examples --- .golangci.yml | 9 +- Makefile | 10 +- aggregate/advanced.go | 64 ------ aggregate/advanced_test.go | 160 --------------- aggregate/aggregate.go | 35 ++++ aggregate/aggtest/aggregate.go | 29 ++- aggregate/aggtest/domain.go | 13 ++ aggregate/aggtest/factory.go | 5 - aggregate/aggtest/testdsl/testdsl.go | 29 ++- aggregate/command_handler.go | 59 ++++-- aggregate/event_applier.go | 13 +- aggregate/event_sourced.go | 79 ++++++++ aggregate/event_sourced_test.go | 193 +++++++++++++++++++ aggregate/example_test.go | 98 ++++++++++ aggregate/factory.go | 48 +++-- aggregate/factory_test.go | 60 +++--- cqrs.go | 82 +++----- examples/bank/bank_test.go | 6 +- examples/bank/domain/account/account_test.go | 53 ++--- matcher.go | 1 + matcher_test.go | 28 ++- x/aggstore/aggregate_store.go | 9 +- x/aggstore/aggregate_store_test.go | 35 ++-- x/aggstore/aggstoretest/aggregate_store.go | 8 +- x/cqrs.go | 29 +++ x/dispatcher/dispatcher.go | 5 +- x/dispatcher/dispatcher_test.go | 18 +- x/eventbus/eventbus.go | 11 +- x/eventbus/eventbus_test.go | 3 +- x/eventhandler/event_handler.go | 7 +- x/eventhandler/event_handler_test.go | 3 +- x/eventstore/in_memory_event_store.go | 5 +- x/eventstore/in_memory_event_store_test.go | 3 +- 33 files changed, 736 insertions(+), 474 deletions(-) delete mode 100644 aggregate/advanced.go delete mode 100644 aggregate/advanced_test.go create mode 100644 aggregate/aggregate.go create mode 100644 aggregate/aggtest/domain.go delete mode 100644 aggregate/aggtest/factory.go create mode 100644 aggregate/event_sourced.go create mode 100644 aggregate/event_sourced_test.go create mode 100644 aggregate/example_test.go create mode 100644 x/cqrs.go diff --git a/.golangci.yml b/.golangci.yml index 884f84c..9cfc836 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -31,10 +31,9 @@ linters: - durationcheck - errcheck - errchkjson - #- errname + - errname - errorlint - execinquery - #- errorsas - exhaustive - exportloopref #- forbidigo @@ -68,7 +67,7 @@ linters: #- ireturn - ineffassign - lll - #- logrlint + - logrlint - maintidx - makezero - misspell @@ -80,7 +79,7 @@ linters: - nolintlint - nonamedreturns - nosprintfhostport - #- paralleltest + - paralleltest - prealloc - predeclared - promlinter @@ -94,7 +93,7 @@ linters: - tenv - testpackage - thelper - #- tparallel + - tparallel - typecheck - unconvert - unparam diff --git a/Makefile b/Makefile index c5785c4..a5af1e5 100644 --- a/Makefile +++ b/Makefile @@ -24,13 +24,17 @@ tools: ## install dev tools, linters, code generators, etc.. @echo -e "$(OK_COLOR)--> Installing tools from tools/tools.go$(NO_COLOR)" @export GOBIN=$$PWD/tools/bin; export PATH=$$GOBIN:$$PATH; cat tools/tools.go | grep _ | awk -F'"' '{print $$2}' | xargs -tI % go install % -lint: ## run linters +lint: ## run linters for the current changes @echo -e "$(OK_COLOR)--> Running linters$(NO_COLOR)" @tools/bin/golangci-lint run +lint-all: ## run linters + @echo -e "$(OK_COLOR)==> Linting$(NO_COLOR)" + golangci-lint run ./... --new-from-rev="" + test: ## run tests @echo -e "$(OK_COLOR)--> Running unit tests$(NO_COLOR)" - go test -v --race --count=1 -coverprofile=coverage.tmp ./... + go test -v --race --count=1 -covermode atomic -coverprofile=coverage.tmp ./... @set -euo pipefail && cat coverage.tmp | grep -v $(IGNORE_COVERAGE_FOR) > coverage.out && rm coverage.tmp coverage: test ## show test coverage report @@ -55,4 +59,4 @@ help: ## show this help screen # To avoid unintended conflicts with file names, always add to .PHONY # unless there is a reason not to. # https://www.gnu.org/software/make/manual/html_node/Phony-Targets.html -.PHONY: all deps tools lint test coverage fmt clean help \ No newline at end of file +.PHONY: all deps tools lint lint-all test coverage fmt clean help \ No newline at end of file diff --git a/aggregate/advanced.go b/aggregate/advanced.go deleted file mode 100644 index 8d9dfb4..0000000 --- a/aggregate/advanced.go +++ /dev/null @@ -1,64 +0,0 @@ -package aggregate - -import ( - "github.com/screwyprof/cqrs" -) - -// Advanced implements an advanced aggregate root. -type Advanced struct { - cqrs.Aggregate - version int - - commandHandler cqrs.CommandHandler - eventApplier cqrs.EventApplier -} - -// NewAdvanced creates a new instance of Advanced. -func NewAdvanced(pureAgg cqrs.Aggregate, commandHandler cqrs.CommandHandler, eventApplier cqrs.EventApplier) *Advanced { - if pureAgg == nil { - panic("pureAgg is required") - } - - if commandHandler == nil { - panic("commandHandler is required") - } - - if eventApplier == nil { - panic("eventApplier is required") - } - - return &Advanced{ - Aggregate: pureAgg, - commandHandler: commandHandler, - eventApplier: eventApplier, - } -} - -// Version implements cqrs.Versionable interface. -func (b *Advanced) Version() int { - return b.version -} - -// Handle implements cqrs.CommandHandler. -func (b *Advanced) Handle(c cqrs.Command) ([]cqrs.DomainEvent, error) { - events, err := b.commandHandler.Handle(c) - if err != nil { - return nil, err - } - - if applierErr := b.eventApplier.Apply(events...); applierErr != nil { - return nil, applierErr - } - - return events, nil -} - -// Apply implements cqrs.EventApplier interface. -func (b *Advanced) Apply(e ...cqrs.DomainEvent) error { - if err := b.eventApplier.Apply(e...); err != nil { - return err - } - b.version += len(e) - - return nil -} diff --git a/aggregate/advanced_test.go b/aggregate/advanced_test.go deleted file mode 100644 index 88fd12c..0000000 --- a/aggregate/advanced_test.go +++ /dev/null @@ -1,160 +0,0 @@ -package aggregate_test - -import ( - "testing" - - "github.com/go-faker/faker/v4" - "github.com/stretchr/testify/assert" - - "github.com/screwyprof/cqrs" - "github.com/screwyprof/cqrs/aggregate" - . "github.com/screwyprof/cqrs/aggregate/aggtest" - . "github.com/screwyprof/cqrs/aggregate/aggtest/testdsl" -) - -// ensure that Advanced implements cqrs.AdvancedAggregate interface. -var _ cqrs.AdvancedAggregate = (*aggregate.Advanced)(nil) - -func TestNewBase(t *testing.T) { - t.Run("ItPanicsIfThePureAggregateIsNotGiven", func(t *testing.T) { - factory := func() { - aggregate.NewAdvanced(nil, nil, nil) - } - assert.Panics(t, factory) - }) - - t.Run("ItPanicsIfCommandHandlerIsNotGiven", func(t *testing.T) { - factory := func() { - aggregate.NewAdvanced(NewTestAggregate(StringIdentifier(faker.UUIDHyphenated())), nil, nil) - } - assert.Panics(t, factory) - }) - - t.Run("ItPanicsIfEventApplierIsNotGiven", func(t *testing.T) { - factory := func() { - aggregate.NewAdvanced( - NewTestAggregate(StringIdentifier(faker.UUIDHyphenated())), - aggregate.NewCommandHandler(), - nil, - ) - } - assert.Panics(t, factory) - }) -} - -func TestBaseHandle(t *testing.T) { - t.Run("ItUsesCustomCommandHandlerAndEventApplierWhenProvided", func(t *testing.T) { - Test(t)( - Given(createTestAggregateWithCustomCommandHandlerAndEventApplier()), - When(MakeSomethingHappen{}), - Then(SomethingHappened{}), - ) - }) - - t.Run("ItReturnsAnErrorIfTheHandlerIsNotFound", func(t *testing.T) { - Test(t)( - Given(createTestAggWithEmptyCommandHandler()), - When(MakeSomethingHappen{}), - ThenFailWith(ErrMakeSomethingHandlerNotFound), - ) - }) - - t.Run("ItReturnsAnErrorIfTheEventAppliersNotFound", func(t *testing.T) { - Test(t)( - Given(createTestAggWithEmptyEventApplier()), - When(MakeSomethingHappen{}), - ThenFailWith(ErrOnSomethingHappenedApplierNotFound), - ) - }) -} - -func TestBaseVersion(t *testing.T) { - t.Run("ItReturnsVersion", func(t *testing.T) { - agg := createTestAggWithDefaultCommandHandlerAndEventApplier() - - assert.Equal(t, 0, agg.Version()) - }) -} - -func TestBaseApply(t *testing.T) { - t.Run("ItAppliesEventsAndReturnsSomeBusinessError", func(t *testing.T) { - Test(t)( - Given(createTestAggWithDefaultCommandHandlerAndEventApplier(), SomethingHappened{}), - When(MakeSomethingHappen{}), - ThenFailWith(ErrItCanHappenOnceOnly), - ) - }) - - t.Run("ItReturnsAnErrorIfTheEventAppliersNotFound", func(t *testing.T) { - Test(t)( - Given(createTestAggWithEmptyEventApplier(), SomethingHappened{}), - When(MakeSomethingHappen{}), - ThenFailWith(ErrOnSomethingHappenedApplierNotFound), - ) - }) - - t.Run("ItIncrementsVersion", func(t *testing.T) { - agg := createTestAggWithEmptyCommandHandler() - - err := agg.Apply(SomethingHappened{}) - - assert.NoError(t, err) - assert.Equal(t, 1, agg.Version()) - }) -} - -func createTestAggWithDefaultCommandHandlerAndEventApplier() *aggregate.Advanced { - ID := StringIdentifier(faker.UUIDHyphenated()) - pureAgg := NewTestAggregate(ID) - - handler := aggregate.NewCommandHandler() - handler.RegisterHandlers(pureAgg) - - applier := aggregate.NewEventApplier() - applier.RegisterAppliers(pureAgg) - - return aggregate.NewAdvanced(pureAgg, handler, applier) -} - -func createTestAggregateWithCustomCommandHandlerAndEventApplier() *aggregate.Advanced { - ID := StringIdentifier(faker.UUIDHyphenated()) - a := NewTestAggregate(ID) - - return aggregate.NewAdvanced(a, createCommandHandler(a), createEventApplier(a)) -} - -func createTestAggWithEmptyCommandHandler() *aggregate.Advanced { - ID := StringIdentifier(faker.UUIDHyphenated()) - pureAgg := NewTestAggregate(ID) - - applier := aggregate.NewEventApplier() - applier.RegisterAppliers(pureAgg) - - return aggregate.NewAdvanced(pureAgg, aggregate.NewCommandHandler(), applier) -} - -func createTestAggWithEmptyEventApplier() *aggregate.Advanced { - ID := StringIdentifier(faker.UUIDHyphenated()) - pureAgg := NewTestAggregate(ID) - - handler := aggregate.NewCommandHandler() - handler.RegisterHandlers(pureAgg) - - return aggregate.NewAdvanced(pureAgg, handler, aggregate.NewEventApplier()) -} - -func createEventApplier(pureAgg *TestAggregate) *aggregate.EventApplier { - eventApplier := aggregate.NewEventApplier() - eventApplier.RegisterApplier("OnSomethingHappened", func(e cqrs.DomainEvent) { - pureAgg.OnSomethingHappened(e.(SomethingHappened)) - }) - return eventApplier -} - -func createCommandHandler(pureAgg *TestAggregate) *aggregate.CommandHandler { - commandHandler := aggregate.NewCommandHandler() - commandHandler.RegisterHandler("MakeSomethingHappen", func(c cqrs.Command) ([]cqrs.DomainEvent, error) { - return pureAgg.MakeSomethingHappen(c.(MakeSomethingHappen)) - }) - return commandHandler -} diff --git a/aggregate/aggregate.go b/aggregate/aggregate.go new file mode 100644 index 0000000..82fd059 --- /dev/null +++ b/aggregate/aggregate.go @@ -0,0 +1,35 @@ +// Package aggregate provides a base implementation for event sourced aggregates. +// +// To create an event sourced aggregate, a user must define their own domain +// aggregate that implements the cqrs.Aggregate interface. The user's domain +// aggregate should define its own identifier type and event type. +// +// Additionally, command handlers and event appliers should be defined within +// the user's domain aggregate. The command handlers handle commands and produce +// events, while the event appliers apply events to update the aggregate's state. +// +// Once the user's domain aggregate is defined, it can be transformed into +// a cqrs.ESAggregate using the aggregate.FromAggregate function. This function +// automatically registers the command handlers and event appliers defined in +// the user's domain aggregate. +// +// For a detailed example of how to use the aggregate package, please refer to +// the Example function in the example_test.go file. +// +// More examples can be found in the examples directory. +package aggregate + +import ( + "errors" +) + +var ( + // ErrCommandHandlerNotFound is returned when a command handler is not found. + ErrCommandHandlerNotFound = errors.New("command handler not found") + + // ErrEventApplierNotFound is returned when an event applier is not found. + ErrEventApplierNotFound = errors.New("event applier not found") + + // ErrAggregateNotRegistered is returned when an aggregate is not registered in the factory. + ErrAggregateNotRegistered = errors.New("aggregate is not registered") +) diff --git a/aggregate/aggtest/aggregate.go b/aggregate/aggtest/aggregate.go index 6fcb6ea..8df9abc 100644 --- a/aggregate/aggtest/aggregate.go +++ b/aggregate/aggtest/aggregate.go @@ -7,11 +7,9 @@ import ( ) var ( - ErrItCanHappenOnceOnly = errors.New("some business rule error occurred") - ErrMakeSomethingHandlerNotFound = errors.New("handler for MakeSomethingHappen command is not found") - ErrOnSomethingHappenedApplierNotFound = errors.New("event applier for OnSomethingHappened event is not found") + ErrItCanHappenOnceOnly = errors.New("some business rule error occurred") - TestAggregateType = "mock.TestAggregate" + TestAggregateType = "mock.TestAggregate" //nolint:gochecknoglobals ) type StringIdentifier string @@ -20,40 +18,39 @@ func (i StringIdentifier) String() string { return string(i) } -// TestAggregate a pure aggregate (has no external dependencies or dark magic method) used for testing. +// TestAggregate is a user-defined aggregate (has no external dependencies or dark magic methods) used for testing. type TestAggregate struct { - id cqrs.Identifier - version int - aggType string + id Identifier alreadyHappened bool } // NewTestAggregate creates a new instance of TestAggregate. -func NewTestAggregate(ID cqrs.Identifier) *TestAggregate { - return &TestAggregate{id: ID} +func NewTestAggregate(id cqrs.Identifier) *TestAggregate { + return &TestAggregate{id: id} } // AggregateID implements cqrs.Aggregate interface. -func (a *TestAggregate) AggregateID() cqrs.Identifier { +func (a *TestAggregate) AggregateID() Identifier { return a.id } // AggregateType implements cqrs.Aggregate interface. func (a *TestAggregate) AggregateType() string { - return "mock.TestAggregate" + return TestAggregateType } -func (a *TestAggregate) MakeSomethingHappen(c MakeSomethingHappen) ([]cqrs.DomainEvent, error) { +func (a *TestAggregate) MakeSomethingHappen(_ MakeSomethingHappen) ([]Event, error) { if a.alreadyHappened { return nil, ErrItCanHappenOnceOnly } - return []cqrs.DomainEvent{SomethingHappened{}}, nil + + return []Event{SomethingHappened{}}, nil } -func (a *TestAggregate) OnSomethingHappened(e SomethingHappened) { +func (a *TestAggregate) OnSomethingHappened(_ SomethingHappened) { a.alreadyHappened = true } -func (a *TestAggregate) OnSomethingElseHappened(e SomethingElseHappened) { +func (a *TestAggregate) OnSomethingElseHappened(_ SomethingElseHappened) { } diff --git a/aggregate/aggtest/domain.go b/aggregate/aggtest/domain.go new file mode 100644 index 0000000..990a821 --- /dev/null +++ b/aggregate/aggtest/domain.go @@ -0,0 +1,13 @@ +package aggtest + +import "fmt" + +// Identifier an object identifier. +type Identifier = fmt.Stringer + +// Event represents something that took place in the domain. +// +// Events are always named with a past-participle verb, such as OrderConfirmed. +type Event interface { + EventType() string +} diff --git a/aggregate/aggtest/factory.go b/aggregate/aggtest/factory.go deleted file mode 100644 index c03f59a..0000000 --- a/aggregate/aggtest/factory.go +++ /dev/null @@ -1,5 +0,0 @@ -package aggtest - -import "errors" - -var ErrAggIsNotRegistered = errors.New("mock.TestAggregate is not registered") diff --git a/aggregate/aggtest/testdsl/testdsl.go b/aggregate/aggtest/testdsl/testdsl.go index b5c2dca..9607e3e 100644 --- a/aggregate/aggtest/testdsl/testdsl.go +++ b/aggregate/aggtest/testdsl/testdsl.go @@ -9,10 +9,10 @@ import ( ) // GivenFn is a test init function. -type GivenFn func() (cqrs.AdvancedAggregate, []cqrs.DomainEvent) +type GivenFn func() (cqrs.ESAggregate, []cqrs.DomainEvent) // WhenFn is a command handler function. -type WhenFn func(agg cqrs.AdvancedAggregate, err error) ([]cqrs.DomainEvent, error) +type WhenFn func(agg cqrs.ESAggregate, err error) ([]cqrs.DomainEvent, error) // ThenFn prepares the Checker. type ThenFn func(t *testing.T) Checker @@ -32,26 +32,30 @@ type AggregateTester func(given GivenFn, when WhenFn, then ThenFn) // When(testdata.TestCommand{Param: "param"}), // Then(testdata.TestEvent{Data: "param"}), // ) -func Test(t *testing.T) AggregateTester { +func Test(t *testing.T) AggregateTester { //nolint:tparallel,paralleltest + t.Helper() + return func(given GivenFn, when WhenFn, then ThenFn) { t.Helper() + then(t)(when(applyEvents(given))) } } // Given prepares the given aggregate for testing. -func Given(agg cqrs.AdvancedAggregate, events ...cqrs.DomainEvent) GivenFn { - return func() (cqrs.AdvancedAggregate, []cqrs.DomainEvent) { +func Given(agg cqrs.ESAggregate, events ...cqrs.DomainEvent) GivenFn { + return func() (cqrs.ESAggregate, []cqrs.DomainEvent) { return agg, events } } // When prepares the command handler for the given command. func When(c cqrs.Command) WhenFn { - return func(agg cqrs.AdvancedAggregate, err error) ([]cqrs.DomainEvent, error) { + return func(agg cqrs.ESAggregate, err error) ([]cqrs.DomainEvent, error) { if err != nil { return nil, err } + return agg.Handle(c) } } @@ -59,8 +63,11 @@ func When(c cqrs.Command) WhenFn { // Then asserts that the expected events are applied. func Then(want ...cqrs.DomainEvent) ThenFn { return func(t *testing.T) Checker { + t.Helper() + return func(got []cqrs.DomainEvent, err error) { t.Helper() + assert.NoError(t, err) assert.Equal(t, want, got) } @@ -70,17 +77,19 @@ func Then(want ...cqrs.DomainEvent) ThenFn { // ThenFailWith asserts that the expected error occurred. func ThenFailWith(want error) ThenFn { return func(t *testing.T) Checker { + t.Helper() + return func(got []cqrs.DomainEvent, err error) { t.Helper() - assert.Equal(t, want, err) + + assert.ErrorIs(t, err, want) } } } -func applyEvents(given GivenFn) (cqrs.AdvancedAggregate, error) { +func applyEvents(given GivenFn) (cqrs.ESAggregate, error) { agg, events := given() - err := agg.Apply(events...) - if err != nil { + if err := agg.Apply(events...); err != nil { return nil, err } diff --git a/aggregate/command_handler.go b/aggregate/command_handler.go index 9be26d4..9da2bf9 100644 --- a/aggregate/command_handler.go +++ b/aggregate/command_handler.go @@ -3,15 +3,13 @@ package aggregate import ( "fmt" "reflect" - "sync" "github.com/screwyprof/cqrs" ) // CommandHandler registers and handles commands. type CommandHandler struct { - handlers map[string]cqrs.CommandHandlerFunc - handlersMu sync.RWMutex + handlers map[string]cqrs.CommandHandlerFunc } // NewCommandHandler creates a new instance of CommandHandler. @@ -23,12 +21,9 @@ func NewCommandHandler() *CommandHandler { // Handle implements cqrs.CommandHandler interface. func (h *CommandHandler) Handle(c cqrs.Command) ([]cqrs.DomainEvent, error) { - h.handlersMu.RLock() - defer h.handlersMu.RUnlock() - handler, ok := h.handlers[c.CommandType()] if !ok { - return nil, fmt.Errorf("handler for %s command is not found", c.CommandType()) + return nil, fmt.Errorf("%w: %s", ErrCommandHandlerNotFound, c.CommandType()) } return handler(c) @@ -36,8 +31,6 @@ func (h *CommandHandler) Handle(c cqrs.Command) ([]cqrs.DomainEvent, error) { // RegisterHandler registers a command handler for the given method. func (h *CommandHandler) RegisterHandler(method string, handler cqrs.CommandHandlerFunc) { - h.handlersMu.Lock() - defer h.handlersMu.Unlock() h.handlers[method] = handler } @@ -46,7 +39,8 @@ func (h *CommandHandler) RegisterHandlers(aggregate cqrs.Aggregate) { aggregateType := reflect.TypeOf(aggregate) for i := 0; i < aggregateType.NumMethod(); i++ { method := aggregateType.Method(i) - if !h.methodHasValidSignature(method) { + + if !h.isCommandHandler(method) { continue } @@ -56,19 +50,51 @@ func (h *CommandHandler) RegisterHandlers(aggregate cqrs.Aggregate) { } } -func (h *CommandHandler) methodHasValidSignature(method reflect.Method) bool { +func (h *CommandHandler) isCommandHandler(method reflect.Method) bool { + return h.commandHandlerHasExpectedInputs(method) && h.commandHandlerHasExpectedOutputs(method) +} + +func (h *CommandHandler) commandHandlerHasExpectedInputs(method reflect.Method) bool { if method.Type.NumIn() != 2 { return false } - // ensure that the method has a cqrs.Command as a parameter. cmdIntfType := reflect.TypeOf((*cqrs.Command)(nil)).Elem() - cmdType := method.Type.In(1) return cmdType.Implements(cmdIntfType) } +func (h *CommandHandler) commandHandlerHasExpectedOutputs(method reflect.Method) bool { + return method.Type.NumOut() == 2 && + h.commandHandlerReturnsDomainEvents(method) && + h.commandHandlerReturnsAnError(method) +} + +func (h *CommandHandler) commandHandlerReturnsDomainEvents(method reflect.Method) bool { + eventSliceType := method.Type.Out(0) + + return eventSliceType.Kind() == reflect.Slice && h.isDomainEvent(eventSliceType.Elem()) +} + +func (h *CommandHandler) isDomainEvent(eventType reflect.Type) bool { + method, ok := eventType.MethodByName("EventType") + + return ok && h.eventTypeMethodHasNoInputs(method) && h.eventTypeMethodReturnsString(method) +} + +func (h *CommandHandler) commandHandlerReturnsAnError(method reflect.Method) bool { + return method.Type.Out(1) == reflect.TypeOf((*error)(nil)).Elem() +} + +func (h *CommandHandler) eventTypeMethodHasNoInputs(method reflect.Method) bool { + return method.Type.NumIn() == 0 +} + +func (h *CommandHandler) eventTypeMethodReturnsString(method reflect.Method) bool { + return method.Type.NumOut() == 1 && method.Type.Out(0) == reflect.TypeOf("") +} + func (h *CommandHandler) invokeCommandHandler( method reflect.Method, aggregate cqrs.Aggregate, c cqrs.Command, ) ([]cqrs.DomainEvent, error) { @@ -76,11 +102,12 @@ func (h *CommandHandler) invokeCommandHandler( resErr := result[1].Interface() if resErr != nil { - return nil, resErr.(error) + return nil, resErr.(error) //nolint:forcetypeassert } - eventsIntf := result[0].Interface() + eventsIntf := result[0].Interface() events := h.convertDomainEvents(eventsIntf) + return events, nil } @@ -89,7 +116,7 @@ func (h *CommandHandler) convertDomainEvents(eventsIntf interface{}) []cqrs.Doma events := make([]cqrs.DomainEvent, 0, len(eventsIntfs)) for _, eventIntf := range eventsIntfs { - events = append(events, eventIntf.(cqrs.DomainEvent)) + events = append(events, eventIntf.(cqrs.DomainEvent)) //nolint:forcetypeassert } return events diff --git a/aggregate/event_applier.go b/aggregate/event_applier.go index f29d671..38d90d9 100644 --- a/aggregate/event_applier.go +++ b/aggregate/event_applier.go @@ -4,15 +4,13 @@ import ( "fmt" "reflect" "strings" - "sync" "github.com/screwyprof/cqrs" ) // EventApplier applies events for the registered appliers. type EventApplier struct { - appliers map[string]cqrs.EventApplierFunc - appliersMu sync.RWMutex + appliers map[string]cqrs.EventApplierFunc } // NewEventApplier creates a new instance of EventApplier. @@ -39,8 +37,6 @@ func (a *EventApplier) RegisterAppliers(aggregate cqrs.Aggregate) { // RegisterApplier registers an event applier for the given method. func (a *EventApplier) RegisterApplier(method string, applier cqrs.EventApplierFunc) { - a.appliersMu.Lock() - defer a.appliersMu.Unlock() a.appliers[method] = applier } @@ -51,17 +47,16 @@ func (a *EventApplier) Apply(events ...cqrs.DomainEvent) error { return err } } + return nil } func (a *EventApplier) apply(event cqrs.DomainEvent) error { - a.appliersMu.RLock() - defer a.appliersMu.RUnlock() - applierID := "On" + event.EventType() + applier, ok := a.appliers[applierID] if !ok { - return fmt.Errorf("event applier for %s event is not found", applierID) + return fmt.Errorf("%w: %s", ErrEventApplierNotFound, applierID) } applier(event) diff --git a/aggregate/event_sourced.go b/aggregate/event_sourced.go new file mode 100644 index 0000000..90273aa --- /dev/null +++ b/aggregate/event_sourced.go @@ -0,0 +1,79 @@ +package aggregate + +import ( + "github.com/screwyprof/cqrs" +) + +// EventSourced is an aggregate that implements CQRS and Event Sourcing. +// +// It composes cqrs.Aggregate, cqrs.CommandHandler, and cqrs.EventApplier interfaces. +type EventSourced struct { + cqrs.Aggregate + version int + + commandHandler cqrs.CommandHandler + eventApplier cqrs.EventApplier +} + +// New creates a new instance of EventSourced. +// +// It requires a basic cqrs.Aggregate, a cqrs.CommandHandler, and a cqrs.EventApplier as parameters. +// It returns a pointer to an EventSourced instance. +func New(aggregate cqrs.Aggregate, commandHandler cqrs.CommandHandler, eventApplier cqrs.EventApplier) *EventSourced { + if aggregate == nil { + panic("aggregate is required") + } + + if commandHandler == nil { + panic("commandHandler is required") + } + + if eventApplier == nil { + panic("eventApplier is required") + } + + return &EventSourced{ + Aggregate: aggregate, + commandHandler: commandHandler, + eventApplier: eventApplier, + } +} + +// Version returns the current version of the aggregate. +// +// It implements the cqrs.Versionable interface. +func (b *EventSourced) Version() int { + return b.version +} + +// Handle processes the given command and produces relevant domain events. +// +// It handles this given command and applies the produced events. +// +// It implements the cqrs.CommandHandler interface. +func (b *EventSourced) Handle(c cqrs.Command) ([]cqrs.DomainEvent, error) { + events, err := b.commandHandler.Handle(c) + if err != nil { + return nil, err + } + + if applierErr := b.eventApplier.Apply(events...); applierErr != nil { + return nil, applierErr + } + + return events, nil +} + +// Apply applies the given domain events to the aggregate. +// +// It applies the events and updates the aggregate version. +// It implements the cqrs.EventApplier interface. +func (b *EventSourced) Apply(e ...cqrs.DomainEvent) error { + if err := b.eventApplier.Apply(e...); err != nil { + return err + } + + b.version += len(e) + + return nil +} diff --git a/aggregate/event_sourced_test.go b/aggregate/event_sourced_test.go new file mode 100644 index 0000000..cf95ae3 --- /dev/null +++ b/aggregate/event_sourced_test.go @@ -0,0 +1,193 @@ +package aggregate_test + +import ( + "testing" + + "github.com/go-faker/faker/v4" + "github.com/stretchr/testify/assert" + + "github.com/screwyprof/cqrs" + "github.com/screwyprof/cqrs/aggregate" + domain "github.com/screwyprof/cqrs/aggregate/aggtest" + . "github.com/screwyprof/cqrs/aggregate/aggtest/testdsl" +) + +// ensure that EventSourced implements cqrs.ESAggregate interface. +var _ cqrs.ESAggregate = (*aggregate.EventSourced)(nil) + +func TestEventSourced(t *testing.T) { + t.Parallel() + + t.Run("creating an event sourced aggregate", func(t *testing.T) { + t.Parallel() + + t.Run("it panics if the aggregate is not provided", func(t *testing.T) { + t.Parallel() + + factory := func() { + aggregate.New(nil, nil, nil) + } + + assert.Panics(t, factory) + }) + + t.Run("it panics if the command handler is not provided", func(t *testing.T) { + t.Parallel() + + factory := func() { + aggregate.New(domain.NewTestAggregate(domain.StringIdentifier(faker.UUIDHyphenated())), nil, nil) + } + + assert.Panics(t, factory) + }) + + t.Run("it panics if the event applier is not provided", func(t *testing.T) { + t.Parallel() + + factory := func() { + aggregate.New( + domain.NewTestAggregate(domain.StringIdentifier(faker.UUIDHyphenated())), + aggregate.NewCommandHandler(), + nil, + ) + } + + assert.Panics(t, factory) + }) + }) + + t.Run("handling commands", func(t *testing.T) { + t.Parallel() + + t.Run("it uses custom command handler and event applier when provided", func(t *testing.T) { + t.Parallel() + + Test(t)( + Given(createTestAggregateWithCustomCommandHandlerAndEventApplier()), + When(domain.MakeSomethingHappen{}), + Then(domain.SomethingHappened{}), + ) + }) + + t.Run("it returns an error if it cannot apply events", func(t *testing.T) { + t.Parallel() + + Test(t)( + Given(createTestAggWithEmptyEventApplier()), + When(domain.MakeSomethingHappen{}), + ThenFailWith(aggregate.ErrEventApplierNotFound), + ) + }) + + t.Run("it returns an error if the handler is not found", func(t *testing.T) { + t.Parallel() + + Test(t)( + Given(createTestAggWithEmptyCommandHandler()), + When(domain.MakeSomethingHappen{}), + ThenFailWith(aggregate.ErrCommandHandlerNotFound), + ) + }) + + t.Run("it returns an error if the command fails", func(t *testing.T) { + t.Parallel() + + Test(t)( + Given(createTestAggWithDefaultCommandHandlerAndEventApplier(), domain.SomethingHappened{}), + When(domain.MakeSomethingHappen{}), + ThenFailWith(domain.ErrItCanHappenOnceOnly), + ) + }) + }) + + t.Run("aggregate version", func(t *testing.T) { + t.Parallel() + + t.Run("it returns the aggregate version", func(t *testing.T) { + t.Parallel() + + agg := createTestAggWithDefaultCommandHandlerAndEventApplier() + + assert.Equal(t, 0, agg.Version()) + }) + }) + + t.Run("applying events", func(t *testing.T) { + t.Parallel() + + t.Run("it returns an error if the event appliers not found", func(t *testing.T) { + agg := createTestAggWithEmptyEventApplier() + + err := agg.Apply(domain.SomethingHappened{}) + + assert.ErrorIs(t, err, aggregate.ErrEventApplierNotFound) + }) + + t.Run("it increments the aggregate version", func(t *testing.T) { + t.Parallel() + + agg := createTestAggWithEmptyCommandHandler() + + err := agg.Apply(domain.SomethingHappened{}) + + assert.NoError(t, err) + assert.Equal(t, 1, agg.Version()) + }) + }) +} + +func createTestAggWithDefaultCommandHandlerAndEventApplier() *aggregate.EventSourced { + ID := domain.StringIdentifier(faker.UUIDHyphenated()) + agg := domain.NewTestAggregate(ID) + + handler := aggregate.NewCommandHandler() + handler.RegisterHandlers(agg) + + applier := aggregate.NewEventApplier() + applier.RegisterAppliers(agg) + + return aggregate.New(agg, handler, applier) +} + +func createTestAggregateWithCustomCommandHandlerAndEventApplier() *aggregate.EventSourced { + ID := domain.StringIdentifier(faker.UUIDHyphenated()) + agg := domain.NewTestAggregate(ID) + + return aggregate.New(agg, createCommandHandler(agg), createEventApplier(agg)) +} + +func createTestAggWithEmptyCommandHandler() *aggregate.EventSourced { + ID := domain.StringIdentifier(faker.UUIDHyphenated()) + agg := domain.NewTestAggregate(ID) + + applier := aggregate.NewEventApplier() + applier.RegisterAppliers(agg) + + return aggregate.New(agg, aggregate.NewCommandHandler(), applier) +} + +func createTestAggWithEmptyEventApplier() *aggregate.EventSourced { + ID := domain.StringIdentifier(faker.UUIDHyphenated()) + agg := domain.NewTestAggregate(ID) + + handler := aggregate.NewCommandHandler() + handler.RegisterHandlers(agg) + + return aggregate.New(agg, handler, aggregate.NewEventApplier()) +} + +func createEventApplier(agg *domain.TestAggregate) *aggregate.EventApplier { + eventApplier := aggregate.NewEventApplier() + eventApplier.RegisterApplier("OnSomethingHappened", func(e cqrs.DomainEvent) { + agg.OnSomethingHappened(e.(domain.SomethingHappened)) //nolint:forcetypeassert + }) + + return eventApplier +} + +func createCommandHandler(agg *domain.TestAggregate) *aggregate.CommandHandler { + commandHandler := aggregate.NewCommandHandler() + commandHandler.RegisterHandlers(agg) + + return commandHandler +} diff --git a/aggregate/example_test.go b/aggregate/example_test.go new file mode 100644 index 0000000..28c8af3 --- /dev/null +++ b/aggregate/example_test.go @@ -0,0 +1,98 @@ +package aggregate_test + +import ( + "fmt" + + "github.com/screwyprof/cqrs" + "github.com/screwyprof/cqrs/aggregate" +) + +const AggregateType = "MyAggregate" + +// Identifier is a user-defined type that implements the fmt.Stringer interface. +type Identifier = fmt.Stringer + +// Event is a user-defined interface that represents events in the domain. +type Event interface { + EventType() string +} + +// MyIdentifier is a user-defined identifier that implements the Identifier interface. +type MyIdentifier string + +func (id MyIdentifier) String() string { + return string(id) +} + +// SomethingHappened is a user-defined event that implements the Event interface. +type SomethingHappened struct { + ID MyIdentifier + Changed bool +} + +func (e SomethingHappened) EventType() string { + return "SomethingHappened" +} + +// MyAggregate is a user-defined aggregate that will handle commands and apply events. +type MyAggregate struct { + id MyIdentifier + isModified bool +} + +func (a *MyAggregate) AggregateID() Identifier { + return a.id +} + +func (a *MyAggregate) AggregateType() string { + return AggregateType +} + +// DoSomething is an example of a command. +type DoSomething struct { + ID MyIdentifier +} + +func (c DoSomething) AggregateID() cqrs.Identifier { + return c.ID +} + +func (c DoSomething) AggregateType() string { + return AggregateType +} + +func (c DoSomething) CommandType() string { + return "DoSomething" +} + +// DoSomething is a command handler method for the MyAggregate. +func (a *MyAggregate) DoSomething(c DoSomething) ([]Event, error) { + e := SomethingHappened{ + ID: c.ID, + Changed: true, + } + + return []Event{e}, nil +} + +// OnSomethingHappened is an event applier method for the MyAggregate. +func (a *MyAggregate) OnSomethingHappened(e SomethingHappened) { + a.isModified = e.Changed +} + +// Example demonstrates the use of the aggregate package to handle a command and produce events. +func Example() { + id := MyIdentifier("123") + agg := &MyAggregate{id: id} + esAgg := aggregate.FromAggregate(agg) + + events, err := esAgg.Handle(DoSomething{ID: id}) + if err != nil { + fmt.Printf("an error occurred: %v\n", err) + + return + } + + fmt.Printf("Produced events: %v\n", events) + // Output: Produced events: [{123 true}] +} diff --git a/aggregate/factory.go b/aggregate/factory.go index d718752..c5d613f 100644 --- a/aggregate/factory.go +++ b/aggregate/factory.go @@ -1,22 +1,19 @@ package aggregate import ( - "errors" - "sync" - - "github.com/go-faker/faker/v4" + "fmt" "github.com/screwyprof/cqrs" - "github.com/screwyprof/cqrs/aggregate/aggtest" ) -// Factory handles aggregate creation. +// Factory is responsible for creating aggregates by their type. +// It maintains a registry of factory functions for different aggregate types. type Factory struct { - factories map[string]cqrs.FactoryFn - factoriesMu sync.RWMutex + factories map[string]cqrs.FactoryFn } -// NewFactory creates a new instance of Factory. +// NewFactory creates a new instance of Factory and initializes its internal factory registry. +// It returns a pointer to the created Factory instance. func NewFactory() *Factory { return &Factory{ factories: make(map[string]cqrs.FactoryFn), @@ -24,23 +21,34 @@ func NewFactory() *Factory { } // RegisterAggregate registers an aggregate factory method. -func (f *Factory) RegisterAggregate(factory cqrs.FactoryFn) { - f.factoriesMu.Lock() - defer f.factoriesMu.Unlock() - - agg := factory(aggtest.StringIdentifier(faker.UUIDHyphenated())) - f.factories[agg.AggregateType()] = factory +// +// The factory function is used to create aggregates of a specific type. +func (f *Factory) RegisterAggregate(aggregateType string, factory cqrs.FactoryFn) { + f.factories[aggregateType] = factory } // CreateAggregate creates an aggregate of a given type. -func (f *Factory) CreateAggregate(aggregateType string, id cqrs.Identifier) (cqrs.AdvancedAggregate, error) { - f.factoriesMu.Lock() - defer f.factoriesMu.Unlock() - +// +// It uses the registered factory function to create an instance of the aggregate. +// It returns an error if the requested aggregate type is not registered in the factory. +func (f *Factory) CreateAggregate(aggregateType string, id cqrs.Identifier) (cqrs.ESAggregate, error) { factory, ok := f.factories[aggregateType] if !ok { - return nil, errors.New(aggregateType + " is not registered") + return nil, fmt.Errorf("%w: %s", ErrAggregateNotRegistered, aggregateType) } return factory(id), nil } + +// FromAggregate takes a cqrs.Aggregate and returns a cqrs.ESAggregate. +// +// It automatically registers all the command handlers and event appliers found in the aggregate. +func FromAggregate(agg cqrs.Aggregate) *EventSourced { + handler := NewCommandHandler() + handler.RegisterHandlers(agg) + + eventApplier := NewEventApplier() + eventApplier.RegisterAppliers(agg) + + return New(agg, handler, eventApplier) +} diff --git a/aggregate/factory_test.go b/aggregate/factory_test.go index d4006f7..441c5ca 100644 --- a/aggregate/factory_test.go +++ b/aggregate/factory_test.go @@ -14,51 +14,51 @@ import ( // ensure that factory implements cqrs.AggregateFactory interface. var _ cqrs.AggregateFactory = (*aggregate.Factory)(nil) -func TestNewFactory(t *testing.T) { - t.Run("ItReturnsNewFactoryInstance", func(t *testing.T) { - f := aggregate.NewFactory() - assert.True(t, f != nil) - }) -} +func TestFactory(t *testing.T) { + t.Parallel() + + t.Run("it panics if an aggregate is not registered", func(t *testing.T) { + t.Parallel() -func TestFactoryCreateAggregate(t *testing.T) { - t.Run("ItPanicsIfTheAggregateIsNotRegistered", func(t *testing.T) { f := aggregate.NewFactory() _, err := f.CreateAggregate(aggtest.TestAggregateType, aggtest.StringIdentifier(faker.UUIDHyphenated())) - assert.Equal(t, aggtest.ErrAggIsNotRegistered, err) + assert.ErrorIs(t, err, aggregate.ErrAggregateNotRegistered) }) -} -func TestFactoryRegisterAggregate(t *testing.T) { - t.Run("ItRegistersAnAggregateFactory", func(t *testing.T) { + t.Run("it creates an aggregate", func(t *testing.T) { + t.Parallel() + // arrange - ID := aggtest.StringIdentifier(faker.UUIDHyphenated()) - agg := aggtest.NewTestAggregate(ID) + id := aggtest.StringIdentifier(faker.UUIDHyphenated()) + f := aggregate.NewFactory() + + // act + f.RegisterAggregate(aggtest.TestAggregateType, func(ID cqrs.Identifier) cqrs.ESAggregate { + agg := aggtest.NewTestAggregate(id) - commandHandler := aggregate.NewCommandHandler() - commandHandler.RegisterHandlers(agg) + return aggregate.FromAggregate(agg) + }) - eventApplier := aggregate.NewEventApplier() - eventApplier.RegisterAppliers(agg) + agg, err := f.CreateAggregate(aggtest.TestAggregateType, id) - expected := aggregate.NewAdvanced( - agg, - commandHandler, - eventApplier, - ) + // assert + assert.NoError(t, err) + assert.Implements(t, (*cqrs.ESAggregate)(nil), agg) + }) - f := aggregate.NewFactory() + t.Run("it converts a cqrs.Aggregate to a cqrs.ESAggregate", func(t *testing.T) { + t.Parallel() + + // arrange + id := aggtest.StringIdentifier(faker.UUIDHyphenated()) + agg := aggtest.NewTestAggregate(id) // act - f.RegisterAggregate(func(ID cqrs.Identifier) cqrs.AdvancedAggregate { - return expected - }) - newAgg, err := f.CreateAggregate(aggtest.TestAggregateType, ID) + esAgg := aggregate.FromAggregate(agg) // assert - assert.NoError(t, err) - assert.Equal(t, expected, newAgg) + assert.Implements(t, (*cqrs.ESAggregate)(nil), esAgg) }) } diff --git a/cqrs.go b/cqrs.go index 178981c..bb69371 100644 --- a/cqrs.go +++ b/cqrs.go @@ -2,106 +2,76 @@ package cqrs import "fmt" -// Identifier an object identifier. +// Identifier represents an aggregate identifier. type Identifier = fmt.Stringer -// Command is an object that is sent to the cqrs to change state. +// Command is sent to the domain to change the state of an aggregate. // -// People request changes to the cqrs by sending commands. -// Command are named with a verb in the imperative mood, for example ConfirmOrder. +// Commands are named with a verb in the imperative mood, e.g., ConfirmOrder. type Command interface { AggregateID() Identifier AggregateType() string CommandType() string } -// CommandHandler executes commands. +// CommandHandler is responsible for executing commands. // -// A command handler receives a command and brokers a result from the appropriate aggregate. -// "A result" is either a successful application of the command, or an error. +// It processes a command, produces relevant domain events. // -// Could be implemented like a method on the aggregate. +// It returns a list of domain events on success +// It returns an error if the command cannot be executed. type CommandHandler interface { Handle(c Command) ([]DomainEvent, error) } -// CommandHandlerFunc is a function that can be used as a command handler. +// CommandHandlerFunc is a function type that can be used as a command handler. type CommandHandlerFunc func(Command) ([]DomainEvent, error) -// DomainEvent represents something that took place in the cqrs. +// DomainEvent represents an event that has occurred in the domain. // -// Events are always named with a past-participle verb, such as OrderConfirmed. +// Events are named with a past-participle verb, e.g., OrderConfirmed. type DomainEvent interface { EventType() string } -// EventApplier applies the given events to an aggregate. +// EventApplier is responsible for applying domain events to an aggregate. type EventApplier interface { Apply(e ...DomainEvent) error } -// EventApplierFunc is a function that can be used as an event applier. +// EventApplierFunc is a function type that can be used as an event applier. type EventApplierFunc func(DomainEvent) -// Aggregate is a cluster of cqrs objects that can be treated as a single unit. +// Aggregate represents a cluster of related objects that can be treated as a single unit. // -// Every transaction is scoped to a single aggregate. -// The lifetimes of the components of an aggregate are bounded by -// the lifetime of the entire aggregate. -// -// Concretely, an aggregate will handle commands, apply events, -// and have a state model encapsulated within it that allows it to implement the required command validation, -// thus upholding the invariants (business rules) of the aggregate. +// This basic interface is intended for simple aggregates that may not follow CQRS or event sourcing patterns. type Aggregate interface { AggregateID() Identifier AggregateType() string } -// Versionable used to say that an object can support different versions. +// Versionable indicates that an object can support different versions. type Versionable interface { Version() int } -// AdvancedAggregate is an aggregate which handles commands -// and applies events after it automatically. -type AdvancedAggregate interface { +// ESAggregate represents an aggregate that is designed with CQRS and event sourcing in mind. +// +// It extends the basic Aggregate interface and includes additional responsibilities such as +// command handling, event application, and versioning. +type ESAggregate interface { Aggregate Versionable CommandHandler EventApplier } -// EventStore stores and loads events. -type EventStore interface { - LoadEventsFor(aggregateID Identifier) ([]DomainEvent, error) - StoreEventsFor(aggregateID Identifier, version int, events []DomainEvent) error -} - -// FactoryFn aggregate factory function. -type FactoryFn func(Identifier) AdvancedAggregate +// FactoryFn is a function type for an aggregate factory function. +type FactoryFn func(Identifier) ESAggregate -// AggregateFactory creates aggregates. +// AggregateFactory is responsible for creating aggregates. +// It registers aggregate factory functions and creates aggregates based on a given aggregate type and identifier. type AggregateFactory interface { - RegisterAggregate(factory FactoryFn) - CreateAggregate(aggregateType string, ID Identifier) (AdvancedAggregate, error) -} - -// EventPublisher publishes events. -type EventPublisher interface { - Publish(e ...DomainEvent) error -} - -// EventHandler handles events that were published though EventPublisher. -type EventHandler interface { - SubscribedTo() EventMatcher - Handle(DomainEvent) error -} - -// EventHandlerFunc is a function that can be used as an event handler. -type EventHandlerFunc func(DomainEvent) error - -// AggregateStore loads and stores the aggregate. -type AggregateStore interface { - Load(aggregateID Identifier, aggregateType string) (AdvancedAggregate, error) - Store(aggregate AdvancedAggregate, events ...DomainEvent) error + RegisterAggregate(aggregateType string, factory FactoryFn) + CreateAggregate(aggregateType string, ID Identifier) (ESAggregate, error) } diff --git a/examples/bank/bank_test.go b/examples/bank/bank_test.go index c319136..d3c179d 100644 --- a/examples/bank/bank_test.go +++ b/examples/bank/bank_test.go @@ -47,7 +47,7 @@ func Example() { func createDispatcher(accountReporter eh.AccountReporting) *dispatcher.Dispatcher { aggregateFactory := aggregate.NewFactory() - aggregateFactory.RegisterAggregate(createAggregate) + aggregateFactory.RegisterAggregate("account.Aggregate", createAggregate) accountDetailsProjector := eventhandler.New() accountDetailsProjector.RegisterHandlers(eh.NewAccountDetailsProjector(accountReporter)) @@ -63,7 +63,7 @@ func createDispatcher(accountReporter eh.AccountReporting) *dispatcher.Dispatche return dispatcher.NewDispatcher(aggregateStore) } -func createAggregate(ID cqrs.Identifier) cqrs.AdvancedAggregate { +func createAggregate(ID cqrs.Identifier) cqrs.ESAggregate { acc := account.NewAggregate(ID) commandHandler := aggregate.NewCommandHandler() @@ -72,7 +72,7 @@ func createAggregate(ID cqrs.Identifier) cqrs.AdvancedAggregate { eventApplier := aggregate.NewEventApplier() eventApplier.RegisterAppliers(acc) - return aggregate.NewAdvanced(acc, commandHandler, eventApplier) + return aggregate.New(acc, commandHandler, eventApplier) } func failCommandOnError(_ []cqrs.DomainEvent, err error) { diff --git a/examples/bank/domain/account/account_test.go b/examples/bank/domain/account/account_test.go index e17a65a..0895294 100644 --- a/examples/bank/domain/account/account_test.go +++ b/examples/bank/domain/account/account_test.go @@ -16,38 +16,41 @@ import ( "github.com/screwyprof/cqrs/examples/bank/domain/event" ) -// ensure that game aggregate implements cqrs.Aggregate interface. +// ensure that the account aggregate implements cqrs.Aggregate interface. var _ cqrs.Aggregate = (*account.Aggregate)(nil) -func TestNewAggregate(t *testing.T) { - t.Run("ItPanicsIfIDIsNotGiven", func(t *testing.T) { +func TestAggregate(t *testing.T) { + t.Run("panics if ID is not given", func(t *testing.T) { + t.Parallel() + factory := func() { account.NewAggregate(nil) } + assert.Panics(t, factory) }) -} -func TestAggregateAggregateID(t *testing.T) { - t.Run("ItReturnsAggregateID", func(t *testing.T) { + t.Run("returns aggregate ID", func(t *testing.T) { + t.Parallel() + ID := aggtest.StringIdentifier(faker.UUIDHyphenated()) agg := account.NewAggregate(ID) assert.Equal(t, ID, agg.AggregateID()) }) -} -func TestAggregateAggregateType(t *testing.T) { - t.Run("ItReturnsAggregateType", func(t *testing.T) { + t.Run("returns aggregate type", func(t *testing.T) { + t.Parallel() + ID := aggtest.StringIdentifier(faker.UUIDHyphenated()) agg := account.NewAggregate(ID) assert.Equal(t, "account.Aggregate", agg.AggregateType()) }) -} -func TestAggregate(t *testing.T) { - t.Run("ItOpensAnAccount", func(t *testing.T) { + t.Run("opens an account", func(t *testing.T) { + t.Parallel() + ID := aggtest.StringIdentifier(faker.UUIDHyphenated()) number := faker.Word() @@ -58,7 +61,9 @@ func TestAggregate(t *testing.T) { ) }) - t.Run("ItDepositsAnEmptyAccount", func(t *testing.T) { + t.Run("deposits to an empty account", func(t *testing.T) { + t.Parallel() + ID := aggtest.StringIdentifier(faker.UUIDHyphenated()) number := faker.Word() amount := faker.UnixTime() @@ -70,7 +75,9 @@ func TestAggregate(t *testing.T) { ) }) - t.Run("ItDepositsAnAccountWithInitialFunds", func(t *testing.T) { + t.Run("deposits to an account with initial funds", func(t *testing.T) { + t.Parallel() + ID := aggtest.StringIdentifier(faker.UUIDHyphenated()) number := faker.Word() @@ -88,7 +95,9 @@ func TestAggregate(t *testing.T) { ) }) - t.Run("ItWithdrawsSomeFunds", func(t *testing.T) { + t.Run("withdraws some funds", func(t *testing.T) { + t.Parallel() + ID := aggtest.StringIdentifier(faker.UUIDHyphenated()) number := faker.Word() @@ -105,7 +114,9 @@ func TestAggregate(t *testing.T) { ) }) - t.Run("ItCannotWithdrawMoneyIfBalanceIsNotHighEnough", func(t *testing.T) { + t.Run("cannot withdraw money if balance is not high enough", func(t *testing.T) { + t.Parallel() + ID := aggtest.StringIdentifier(faker.UUIDHyphenated()) number := faker.Word() amount := faker.UnixTime() @@ -118,14 +129,8 @@ func TestAggregate(t *testing.T) { }) } -func createTestAggregate(ID domain.Identifier) *aggregate.Advanced { +func createTestAggregate(ID domain.Identifier) *aggregate.EventSourced { accAgg := account.NewAggregate(ID) - commandHandler := aggregate.NewCommandHandler() - commandHandler.RegisterHandlers(accAgg) - - eventApplier := aggregate.NewEventApplier() - eventApplier.RegisterAppliers(accAgg) - - return aggregate.NewAdvanced(accAgg, commandHandler, eventApplier) + return aggregate.FromAggregate(accAgg) } diff --git a/matcher.go b/matcher.go index 7f163af..675868c 100644 --- a/matcher.go +++ b/matcher.go @@ -30,5 +30,6 @@ func matchAnyEvent(e DomainEvent, types ...string) bool { return true } } + return false } diff --git a/matcher_test.go b/matcher_test.go index 112627f..f21d17a 100644 --- a/matcher_test.go +++ b/matcher_test.go @@ -1,10 +1,12 @@ -package cqrs +package cqrs_test import ( "testing" "github.com/go-faker/faker/v4" "github.com/stretchr/testify/assert" + + "github.com/screwyprof/cqrs" ) type testEvent struct{} @@ -13,10 +15,26 @@ func (e testEvent) EventType() string { return "testEvent" } -func TestMatchAnyEventOf(t *testing.T) { - t.Run("ItShouldReturnFalseIfNoEventsAreMatched", func(t *testing.T) { - m := MatchAnyEventOf(faker.Word(), faker.Word()) +func TestMatcher(t *testing.T) { + t.Parallel() + + t.Run("it matches any event", func(t *testing.T) { + t.Parallel() + + t.Run("no matches found", func(t *testing.T) { + t.Parallel() + + m := cqrs.MatchAnyEventOf(faker.Word(), faker.Word()) + + assert.False(t, m(testEvent{})) + }) + + t.Run("matches found", func(t *testing.T) { + t.Parallel() + + m := cqrs.MatchAnyEventOf("testEvent") - assert.True(t, m(testEvent{}) == false) + assert.True(t, m(testEvent{})) + }) }) } diff --git a/x/aggstore/aggregate_store.go b/x/aggstore/aggregate_store.go index 26f2d9d..0f0a999 100644 --- a/x/aggstore/aggregate_store.go +++ b/x/aggstore/aggregate_store.go @@ -2,16 +2,17 @@ package aggstore import ( "github.com/screwyprof/cqrs" + "github.com/screwyprof/cqrs/x" ) // AggregateStore loads and stores aggregates. type AggregateStore struct { aggregateFactory cqrs.AggregateFactory - eventStore cqrs.EventStore + eventStore x.EventStore } // NewStore creates a new instance of AggregateStore. -func NewStore(eventStore cqrs.EventStore, aggregateFactory cqrs.AggregateFactory) *AggregateStore { +func NewStore(eventStore x.EventStore, aggregateFactory cqrs.AggregateFactory) *AggregateStore { if eventStore == nil { panic("eventStore is required") } @@ -27,7 +28,7 @@ func NewStore(eventStore cqrs.EventStore, aggregateFactory cqrs.AggregateFactory } // Load implements cqrs.AggregateStore interface. -func (s *AggregateStore) Load(aggregateID cqrs.Identifier, aggregateType string) (cqrs.AdvancedAggregate, error) { +func (s *AggregateStore) Load(aggregateID cqrs.Identifier, aggregateType string) (cqrs.ESAggregate, error) { loadedEvents, err := s.eventStore.LoadEventsFor(aggregateID) if err != nil { return nil, err @@ -47,6 +48,6 @@ func (s *AggregateStore) Load(aggregateID cqrs.Identifier, aggregateType string) } // Store implements cqrs.AggregateStore interface. -func (s *AggregateStore) Store(agg cqrs.AdvancedAggregate, events ...cqrs.DomainEvent) error { +func (s *AggregateStore) Store(agg cqrs.ESAggregate, events ...cqrs.DomainEvent) error { return s.eventStore.StoreEventsFor(agg.AggregateID(), agg.Version(), events) } diff --git a/x/aggstore/aggregate_store_test.go b/x/aggstore/aggregate_store_test.go index 1169402..789308f 100644 --- a/x/aggstore/aggregate_store_test.go +++ b/x/aggstore/aggregate_store_test.go @@ -9,12 +9,13 @@ import ( "github.com/screwyprof/cqrs" "github.com/screwyprof/cqrs/aggregate" "github.com/screwyprof/cqrs/aggregate/aggtest" + "github.com/screwyprof/cqrs/x" "github.com/screwyprof/cqrs/x/aggstore" "github.com/screwyprof/cqrs/x/eventstore/evnstoretest" ) // ensure that AggregateStore implements cqrs.AggregateStore interface. -var _ cqrs.AggregateStore = (*aggstore.AggregateStore)(nil) +var _ x.AggregateStore = (*aggstore.AggregateStore)(nil) func TestNewStore(t *testing.T) { t.Run("ItPanicsIfEventStoreIsNotGiven", func(t *testing.T) { @@ -57,7 +58,7 @@ func TestAggregateStoreLoad(t *testing.T) { _, err := s.Load(ID, aggtest.TestAggregateType) // assert - assert.Equal(t, aggtest.ErrAggIsNotRegistered, err) + assert.ErrorIs(t, err, aggregate.ErrAggregateNotRegistered) }) t.Run("ItFailsIfItCannotApplyEvents", func(t *testing.T) { @@ -73,7 +74,7 @@ func TestAggregateStoreLoad(t *testing.T) { _, err := s.Load(ID, aggtest.TestAggregateType) // assert - assert.Equal(t, aggtest.ErrOnSomethingHappenedApplierNotFound, err) + assert.ErrorIs(t, err, aggregate.ErrEventApplierNotFound) }) t.Run("ItReturnsAggregate", func(t *testing.T) { @@ -105,16 +106,16 @@ func TestAggregateStoreStore(t *testing.T) { }) } -func createAgg(id cqrs.Identifier) *aggregate.Advanced { - pureAgg := aggtest.NewTestAggregate(id) +func createAgg(id cqrs.Identifier) *aggregate.EventSourced { + agg := aggtest.NewTestAggregate(id) commandHandler := aggregate.NewCommandHandler() - commandHandler.RegisterHandlers(pureAgg) + commandHandler.RegisterHandlers(agg) eventApplier := aggregate.NewEventApplier() - eventApplier.RegisterAppliers(pureAgg) + eventApplier.RegisterAppliers(agg) - return aggregate.NewAdvanced(pureAgg, commandHandler, eventApplier) + return aggregate.New(agg, commandHandler, eventApplier) } type aggregateStoreOptions struct { @@ -164,32 +165,34 @@ func createAggregateStore(id cqrs.Identifier, opts ...option) *aggstore.Aggregat opt(config) } - pureAgg := aggtest.NewTestAggregate(id) + agg := aggtest.NewTestAggregate(id) applier := aggregate.NewEventApplier() if !config.staticEventApplier { - applier.RegisterAppliers(pureAgg) + applier.RegisterAppliers(agg) } commandHandler := aggregate.NewCommandHandler() - commandHandler.RegisterHandlers(pureAgg) + commandHandler.RegisterHandlers(agg) - agg := aggregate.NewAdvanced(pureAgg, commandHandler, applier) + esAgg := aggregate.New(agg, commandHandler, applier) if config.loadedEvents != nil { - _ = agg.Apply(config.loadedEvents...) + _ = esAgg.Apply(config.loadedEvents...) } - aggFactory := createAggFactory(agg, config.emptyFactory) + + aggFactory := createAggFactory(esAgg, config.emptyFactory) eventStore := createEventStoreMock(config.loadedEvents, config.loadErr, config.storeErr) return aggstore.NewStore(eventStore, aggFactory) } -func createAggFactory(agg *aggregate.Advanced, empty bool) *aggregate.Factory { +func createAggFactory(agg *aggregate.EventSourced, empty bool) *aggregate.Factory { f := aggregate.NewFactory() if empty { return f } - f.RegisterAggregate(func(ID cqrs.Identifier) cqrs.AdvancedAggregate { + + f.RegisterAggregate(agg.AggregateType(), func(ID cqrs.Identifier) cqrs.ESAggregate { return agg }) diff --git a/x/aggstore/aggstoretest/aggregate_store.go b/x/aggstore/aggstoretest/aggregate_store.go index a276461..0f7b5fa 100644 --- a/x/aggstore/aggstoretest/aggregate_store.go +++ b/x/aggstore/aggstoretest/aggregate_store.go @@ -15,18 +15,18 @@ var ( // AggregateStoreMock mocks event store. type AggregateStoreMock struct { - Loader func(aggregateID cqrs.Identifier, aggregateType string) (cqrs.AdvancedAggregate, error) - Saver func(aggregate cqrs.AdvancedAggregate, events ...cqrs.DomainEvent) error + Loader func(aggregateID cqrs.Identifier, aggregateType string) (cqrs.ESAggregate, error) + Saver func(aggregate cqrs.ESAggregate, events ...cqrs.DomainEvent) error } // Load implements cqrs.AggregateStore interface. func (m *AggregateStoreMock) Load( aggregateID cqrs.Identifier, aggregateType string, -) (cqrs.AdvancedAggregate, error) { +) (cqrs.ESAggregate, error) { return m.Loader(aggregateID, aggregateType) } // Store implements cqrs.AggregateStore interface. -func (m *AggregateStoreMock) Store(aggregate cqrs.AdvancedAggregate, events ...cqrs.DomainEvent) error { +func (m *AggregateStoreMock) Store(aggregate cqrs.ESAggregate, events ...cqrs.DomainEvent) error { return m.Saver(aggregate, events...) } diff --git a/x/cqrs.go b/x/cqrs.go new file mode 100644 index 0000000..2357d3c --- /dev/null +++ b/x/cqrs.go @@ -0,0 +1,29 @@ +package x + +import "github.com/screwyprof/cqrs" + +// EventStore stores and loads events. +type EventStore interface { + LoadEventsFor(aggregateID cqrs.Identifier) ([]cqrs.DomainEvent, error) + StoreEventsFor(aggregateID cqrs.Identifier, version int, events []cqrs.DomainEvent) error +} + +// EventPublisher publishes events. +type EventPublisher interface { + Publish(e ...cqrs.DomainEvent) error +} + +// EventHandler handles events that were published though EventPublisher. +type EventHandler interface { + SubscribedTo() cqrs.EventMatcher + Handle(cqrs.DomainEvent) error +} + +// EventHandlerFunc is a function that can be used as an event handler. +type EventHandlerFunc func(cqrs.DomainEvent) error + +// AggregateStore loads and stores the aggregate. +type AggregateStore interface { + Load(aggregateID cqrs.Identifier, aggregateType string) (cqrs.ESAggregate, error) + Store(aggregate cqrs.ESAggregate, events ...cqrs.DomainEvent) error +} diff --git a/x/dispatcher/dispatcher.go b/x/dispatcher/dispatcher.go index 9d7f5e8..2416867 100644 --- a/x/dispatcher/dispatcher.go +++ b/x/dispatcher/dispatcher.go @@ -2,6 +2,7 @@ package dispatcher import ( "github.com/screwyprof/cqrs" + "github.com/screwyprof/cqrs/x" ) // Dispatcher is a basic message dispatcher. @@ -11,11 +12,11 @@ import ( // at startup and keep it in memory. // Depends on some kind of event storage mechanism. type Dispatcher struct { - store cqrs.AggregateStore + store x.AggregateStore } // NewDispatcher creates a new instance of Dispatcher. -func NewDispatcher(aggregateStore cqrs.AggregateStore) *Dispatcher { +func NewDispatcher(aggregateStore x.AggregateStore) *Dispatcher { if aggregateStore == nil { panic("aggregateStore is required") } diff --git a/x/dispatcher/dispatcher_test.go b/x/dispatcher/dispatcher_test.go index 425b2d4..4dcd001 100644 --- a/x/dispatcher/dispatcher_test.go +++ b/x/dispatcher/dispatcher_test.go @@ -106,32 +106,32 @@ func createDispatcher(id cqrs.Identifier, opts ...option) *dispatcher.Dispatcher opt(config) } - pureAgg := aggtest.NewTestAggregate(id) + agg := aggtest.NewTestAggregate(id) commandHandler := aggregate.NewCommandHandler() - commandHandler.RegisterHandlers(pureAgg) + commandHandler.RegisterHandlers(agg) eventApplier := aggregate.NewEventApplier() - eventApplier.RegisterAppliers(pureAgg) + eventApplier.RegisterAppliers(agg) - agg := aggregate.NewAdvanced(pureAgg, commandHandler, eventApplier) + esAgg := aggregate.New(agg, commandHandler, eventApplier) if config.loadedEvents != nil { - _ = agg.Apply(config.loadedEvents...) + _ = esAgg.Apply(config.loadedEvents...) } return dispatcher.NewDispatcher( - createAggregateStoreMock(agg, config.loadErr, config.storeErr), + createAggregateStoreMock(esAgg, config.loadErr, config.storeErr), ) } func createAggregateStoreMock( - want cqrs.AdvancedAggregate, loadErr error, storeErr error, + want cqrs.ESAggregate, loadErr error, storeErr error, ) *aggstoretest.AggregateStoreMock { eventStore := &aggstoretest.AggregateStoreMock{ - Loader: func(aggregateID cqrs.Identifier, aggregateType string) (cqrs.AdvancedAggregate, error) { + Loader: func(aggregateID cqrs.Identifier, aggregateType string) (cqrs.ESAggregate, error) { return want, loadErr }, - Saver: func(aggregate cqrs.AdvancedAggregate, events ...cqrs.DomainEvent) error { + Saver: func(aggregate cqrs.ESAggregate, events ...cqrs.DomainEvent) error { return storeErr }, } diff --git a/x/eventbus/eventbus.go b/x/eventbus/eventbus.go index 30a058b..9cf96bd 100644 --- a/x/eventbus/eventbus.go +++ b/x/eventbus/eventbus.go @@ -4,23 +4,24 @@ import ( "sync" "github.com/screwyprof/cqrs" + "github.com/screwyprof/cqrs/x" ) // InMemoryEventBus publishes events. type InMemoryEventBus struct { - eventHandlers map[cqrs.EventHandler]struct{} + eventHandlers map[x.EventHandler]struct{} eventHandlersMu sync.RWMutex } // NewInMemoryEventBus creates a new instance of InMemoryEventBus. func NewInMemoryEventBus() *InMemoryEventBus { return &InMemoryEventBus{ - eventHandlers: make(map[cqrs.EventHandler]struct{}), + eventHandlers: make(map[x.EventHandler]struct{}), } } // Register registers event handler. -func (b *InMemoryEventBus) Register(h cqrs.EventHandler) { +func (b *InMemoryEventBus) Register(h x.EventHandler) { b.eventHandlersMu.Lock() defer b.eventHandlersMu.Unlock() @@ -41,7 +42,7 @@ func (b *InMemoryEventBus) Publish(events ...cqrs.DomainEvent) error { return nil } -func (b *InMemoryEventBus) handleEvents(h cqrs.EventHandler, events ...cqrs.DomainEvent) error { +func (b *InMemoryEventBus) handleEvents(h x.EventHandler, events ...cqrs.DomainEvent) error { for _, e := range events { err := b.handleEventIfMatches(h.SubscribedTo(), h, e) if err != nil { @@ -51,7 +52,7 @@ func (b *InMemoryEventBus) handleEvents(h cqrs.EventHandler, events ...cqrs.Doma return nil } -func (b *InMemoryEventBus) handleEventIfMatches(m cqrs.EventMatcher, h cqrs.EventHandler, e cqrs.DomainEvent) error { +func (b *InMemoryEventBus) handleEventIfMatches(m cqrs.EventMatcher, h x.EventHandler, e cqrs.DomainEvent) error { if !m(e) { return nil } diff --git a/x/eventbus/eventbus_test.go b/x/eventbus/eventbus_test.go index e4d9223..cf1c6b9 100644 --- a/x/eventbus/eventbus_test.go +++ b/x/eventbus/eventbus_test.go @@ -7,12 +7,13 @@ import ( "github.com/screwyprof/cqrs" event "github.com/screwyprof/cqrs/aggregate/aggtest" + "github.com/screwyprof/cqrs/x" "github.com/screwyprof/cqrs/x/eventbus" "github.com/screwyprof/cqrs/x/eventhandler/evnhndtest" ) // ensure that EventBus implements cqrs.EventPublisher interface. -var _ cqrs.EventPublisher = (*eventbus.InMemoryEventBus)(nil) +var _ x.EventPublisher = (*eventbus.InMemoryEventBus)(nil) func TestNewInMemoryEventBus(t *testing.T) { t.Run("ItCreatesNewInstance", func(t *testing.T) { diff --git a/x/eventhandler/event_handler.go b/x/eventhandler/event_handler.go index f23d3f8..017387e 100644 --- a/x/eventhandler/event_handler.go +++ b/x/eventhandler/event_handler.go @@ -7,23 +7,24 @@ import ( "sync" "github.com/screwyprof/cqrs" + "github.com/screwyprof/cqrs/x" ) // EventHandler handles events. type EventHandler struct { - handlers map[string]cqrs.EventHandlerFunc + handlers map[string]x.EventHandlerFunc handlersMu sync.RWMutex } // New creates new instance of New. func New() *EventHandler { return &EventHandler{ - handlers: make(map[string]cqrs.EventHandlerFunc), + handlers: make(map[string]x.EventHandlerFunc), } } // RegisterHandler registers an event handler for the given method. -func (h *EventHandler) RegisterHandler(method string, handler cqrs.EventHandlerFunc) { +func (h *EventHandler) RegisterHandler(method string, handler x.EventHandlerFunc) { h.handlersMu.Lock() defer h.handlersMu.Unlock() h.handlers[method] = handler diff --git a/x/eventhandler/event_handler_test.go b/x/eventhandler/event_handler_test.go index 3d934cd..0f5182c 100644 --- a/x/eventhandler/event_handler_test.go +++ b/x/eventhandler/event_handler_test.go @@ -8,12 +8,13 @@ import ( "github.com/screwyprof/cqrs" event "github.com/screwyprof/cqrs/aggregate/aggtest" + "github.com/screwyprof/cqrs/x" "github.com/screwyprof/cqrs/x/eventhandler" "github.com/screwyprof/cqrs/x/eventhandler/evnhndtest" ) // ensure that event handler implements cqrs.EventHandler interface. -var _ cqrs.EventHandler = (*eventhandler.EventHandler)(nil) +var _ x.EventHandler = (*eventhandler.EventHandler)(nil) func TestNew(t *testing.T) { t.Run("ItCreatesNewInstance", func(t *testing.T) { diff --git a/x/eventstore/in_memory_event_store.go b/x/eventstore/in_memory_event_store.go index 85df09b..f56a351 100644 --- a/x/eventstore/in_memory_event_store.go +++ b/x/eventstore/in_memory_event_store.go @@ -5,6 +5,7 @@ import ( "sync" "github.com/screwyprof/cqrs" + "github.com/screwyprof/cqrs/x" ) // ErrConcurrencyViolation happens if aggregate has been modified concurrently. @@ -15,11 +16,11 @@ type InMemoryEventStore struct { eventStreams map[cqrs.Identifier][]cqrs.DomainEvent eventStreamsMu sync.RWMutex - eventPublisher cqrs.EventPublisher + eventPublisher x.EventPublisher } // NewInInMemoryEventStore creates a new instance of InMemoryEventStore. -func NewInInMemoryEventStore(eventPublisher cqrs.EventPublisher) *InMemoryEventStore { +func NewInInMemoryEventStore(eventPublisher x.EventPublisher) *InMemoryEventStore { if eventPublisher == nil { panic("eventPublisher is required") } diff --git a/x/eventstore/in_memory_event_store_test.go b/x/eventstore/in_memory_event_store_test.go index 3edb43d..6be8d21 100644 --- a/x/eventstore/in_memory_event_store_test.go +++ b/x/eventstore/in_memory_event_store_test.go @@ -8,12 +8,13 @@ import ( "github.com/screwyprof/cqrs" "github.com/screwyprof/cqrs/aggregate/aggtest" + "github.com/screwyprof/cqrs/x" "github.com/screwyprof/cqrs/x/eventbus/evnbustest" "github.com/screwyprof/cqrs/x/eventstore" ) // ensure that event aggstore implements cqrs.EventStore interface. -var _ cqrs.EventStore = (*eventstore.InMemoryEventStore)(nil) +var _ x.EventStore = (*eventstore.InMemoryEventStore)(nil) func TestNewInInMemoryEventStore(t *testing.T) { t.Run("ItCreatesEventStore", func(t *testing.T) {