From dd2d949e0f7f6be0613fe096c8beb9b5ad7a2e13 Mon Sep 17 00:00:00 2001 From: "Bogdan Dinu (Badu)" Date: Thu, 23 Mar 2023 09:56:20 +0200 Subject: [PATCH] updated Readme, expose Range --- README.md | 148 +++++++++++++++--- main.go | 37 +++-- main_test.go | 44 ++++++ .../fire-and-forget/audit/service.go | 4 +- 4 files changed, 195 insertions(+), 38 deletions(-) diff --git a/README.md b/README.md index 13db0f3..0a0ca37 100644 --- a/README.md +++ b/README.md @@ -1,33 +1,140 @@ # Bus -In it's most simple form, this event bus can be used as following. +- **Independent**: has no external dependencies +- **Probably Fast**: no reflection +- **Type Safe**: built on generics +- **Small and Simple**: can be used as following: -The listener declares it's interest for an event, by registering a handler: +Having the following event: -`bus.Sub(OnMyEventOccurred)` +```go + package events -and the handler is having the following signature: + type InterestingEvent struct { + } +``` -`func OnMyEventOccurred(event InterestingEvent)` +a listener can registering a handler by calling `Sub` method: -The event producer will simply do: +```go + package subscriber -`bus.Pub(InterestingEvent{})` + import "github.com/badu/bus" + + // ... somewhere in a setup function / constructor + bus.Sub(OnMyEventOccurred) +``` -Optional, to allow the bus to spin a goroutine for dispatching events, implement the following interface: +where the handler is having the following signature: -`func (e InterestingEvent) Async() bool{ return true }` +```go + func OnMyEventOccurred(event InterestingEvent){ + // do something with the event here + } +``` -or +The event producer / dispatcher will simply: -`func (e *InterestingEvent) Async() bool{ return true }` +```go + package dispatcher -By default, the bus is using sync events : waits for listeners to complete their jobs before calling the next listener. + import "github.com/badu/bus" + + // ...somewhere in a dispatching function + + bus.Pub(InterestingEvent{}) +``` -Or, even easier, just call `PubAsync` methods. +If the event needs to go async, in the sense that the bus package will spin up a goroutine for the caller, just +implement the following interface: + +```go + package events + + func (e InterestingEvent) Async() bool{ return true } +``` + +if the handler has the signature declared above, or + +```go + package events + + func (e *InterestingEvent) Async() bool{ return true } +``` + +if the handler has the signature as following: + +```go + func OnMyEventOccurred(event *InterestingEvent){ + // do something with the event here + } +``` + +Another way to publish an event async, is to use `PubAsync` method that package exposes. + +By default, the bus is using sync events, which means that it waits for listeners to complete their jobs before calling +the next listener. Usage : `go get github.com/badu/bus` +## F.A.Q. + +1. I want to cancel subscription at some point. How do I do that? + +Subscribing returns access to the `Cancel` method + +```go +package subscriber + +// ... somewhere in a setup function / constructor +subscription := bus.Sub(OnMyEventOccurred) +// when needed, calling cancel of subscription, so function OnMyEventOccurred won't be called anymore +subscription.Cancel() +``` + +2. Can I subscribe once? + +Yes! The event handler has to return true. + +```go +package subscriber +// ... somewhere in a setup function / constructor + +bus.SubCancel( func(event InterestingEvent) bool { + // do something with the event here + return true // returning true will cancel the subscription +}) +``` + +3. I want to inspect registered events. How do I do that? + +The events mapper is a `sync.Map`, so iterate using `Range` + +```go +bus.Range(func(k, v any)bool{ + fmt.Printf("%#v %#v\n", k, v) +}) +``` + +4. I want to use my own event names. Is that possible? + +Yes! You have to implement the following interface: + +```go +package events + +func (e InterestingEvent) EventID() string{ + return "YourInterestingEventName" +} +``` + +The event name is the key of the mapper, which means that implementing your own event names might cause panics +if you have name collisions. + +5. Will I have race conditions? + +No. The package is concurrent safe. + ## What Problem Does It Solve? Decoupling of components: publishers and subscribers can operate independently of each other, with no direct knowledge @@ -60,9 +167,10 @@ Inside the `test_scenarios` folder, you can find the following scenarios: event is [triggered](https://github.com/badu/bus/blob/master/test_scenarios/fire-and-forget/users/service.go#L23) by the user service, which performs the creation of the user account. We're using the `fire and forget` technique here, because the operation of registration should not depend on the fact that we've been able to - send an welcoming email or a sms, or the audit system malfunctions. + send a welcoming email or a sms, or the audit system malfunctions. - Simulating audit service malfunction is easy. Instead of using `Sub`, we're using `SubUnsub` to register the listener + Simulating audit service malfunctions easy. + Instead of using `Sub`, we're using `SubUnsub` to register the listener and return [`true`](https://github.com/badu/bus/blob/master/test_scenarios/fire-and-forget/audit/service.go#L36) to unsubscribe on events of that kind. @@ -76,7 +184,7 @@ Inside the `test_scenarios` folder, you can find the following scenarios: The `cart` service requires two replies from two other microservices `inventory` and `prices`. In the past, I've been using a closure function to provide the service with both real GRPC clients or with mocks and stubs. The service - signature gets complicated and large as we one service would depend on a lot of GRPC clients to aggregate data. + signature gets complicated and large as one service would depend on a lot of GRPC clients to aggregate data. As you can see the [test here](https://github.com/badu/bus/blob/master/test_scenarios/factory-request-reply/main_test.go) it's much @@ -92,7 +200,7 @@ Inside the `test_scenarios` folder, you can find the following scenarios: In this example, we wanted to achieve two things. First is that the `service` and the `repository` are decoupled by events. More than that, we wanted that the events are generic on their own. - The orders service will dispatch a generic request event, one for placing an order, which will carry an `Order` ( + The `orders` service will dispatch a generic request event, one for placing an order, which will carry an `Order` ( model) struct with that request and another `OrderStatus` (model) struct using the same generic event. We are using a channel inside the generic `RequestEvent` to signal the `reply` to the publisher, which in this case @@ -106,7 +214,7 @@ Inside the `test_scenarios` folder, you can find the following scenarios: The `repository` is simulating a long database call, longer than the context's cancellation, so the service gets the deadline exceeded error. - Note that this final example is not using pointer to the event's struct, but it contains two properties which have + Note that this final example is not using a pointer to the event struct, but it contains two properties which have pointers, so the `service` can access the altered `reply`. ## Recommendations @@ -114,8 +222,8 @@ Inside the `test_scenarios` folder, you can find the following scenarios: 1. always place your events inside a separate `events` package, avoiding circular dependencies. 2. in general, in `request-reply` scenarios, the events should be passed as pointers (even if it's somewhat slower), because changing properties that represents the `reply` would not be reflected. Also, when using `sync.WaitGroup` - inside your event struct, always use method receivers and pass the event as pointer, otherwise you will be passing a - lock by value (which is `sync.Locker`). + inside your event struct, always use method receivers and pass the event as a pointer — otherwise you will be passing + a lock by value (which is `sync.Locker`). 3. be careful if you don't want to use pointers for events, but you still need to pass values from the listener to the dispatcher. You should still have at least one property of that event that is a pointer (see events in `request reply with cancellation` for example). Same technique can be applied when you need `sync.Waitgroup` to be diff --git a/main.go b/main.go index f281e4e..3a6db21 100644 --- a/main.go +++ b/main.go @@ -55,8 +55,8 @@ func (b *Topic[T]) Sub(callback func(v T)) *Listener[T] { return result } -// unsub is private to the topic, but can be accessed via Listener -func (b *Topic[T]) unsub(who *Listener[T]) { +// cancel is private to the topic, but can be accessed via Listener +func (b *Topic[T]) cancel(who *Listener[T]) { b.rwMu.Lock() for i := range b.subs { if b.subs[i] != who { @@ -82,9 +82,9 @@ func (b *Topic[T]) NumSubs() int { return result } -// Unsub forgets the indicated callback -func (s *Listener[T]) Unsub() { - s.parent.unsub(s) +// Cancel forgets the indicated callback +func (s *Listener[T]) Cancel() { + s.parent.cancel(s) } // Topic gives access to the underlying topic @@ -124,21 +124,21 @@ func (b *Topic[T]) PubAsync(event T) { b.rwMu.RUnlock() } -// Bus is being returned when you subscribe, so you can manually Unsub +// Bus is being returned when you subscribe, so you can manually Cancel type Bus[T any] struct { listener *Listener[T] stop atomic.Uint32 // flag for unsubscribing after receiving one event } -// Unsub allows caller to manually unsubscribe, in case they don't want to use SubUnsub -func (o *Bus[T]) Unsub() { +// Cancel allows callers to manually unsubscribe, in case they don't want to use SubCancel +func (o *Bus[T]) Cancel() { if o.stop.CompareAndSwap(0, 1) { - go o.listener.Unsub() + go o.listener.Cancel() } } -// SubUnsub can be used if you need to unsubscribe immediately after receiving an event, by making your function return true -func SubUnsub[T any](callback func(event T) bool) *Bus[T] { +// SubCancel can be used if you need to unsubscribe immediately after receiving an event, by making your function return true +func SubCancel[T any](callback func(event T) bool) *Bus[T] { var ( event T key string @@ -163,9 +163,9 @@ func SubUnsub[T any](callback func(event T) bool) *Bus[T] { return } - unsub := callback(v) - if unsub { - result.Unsub() + shouldCancel := callback(v) + if shouldCancel { + result.Cancel() } }) @@ -216,7 +216,7 @@ func Pub[T any](event T) { } topic, ok := mapper.Load(key) - if !ok || topic == nil { // create new topic, even if there are no listeners (otherwise we will have to panic) + if !ok || topic == nil { // create a new topic, even if there are no listeners (otherwise we will have to panic) topic, _ = mapper.LoadOrStore(key, NewTopic[T]()) } @@ -235,8 +235,13 @@ func PubAsync[T any](event T) { } topic, ok := mapper.Load(key) - if !ok || topic == nil { // create new topic, even if there are no listeners (otherwise we will have to panic) + if !ok || topic == nil { // create a new topic, even if there are no listeners (otherwise we will have to panic) topic, _ = mapper.LoadOrStore(key, NewTopic[T]()) } topic.(*Topic[T]).PubAsync(event) } + +// Range gives access to mapper Range +func Range(f func(k, v any) bool) { + mapper.Range(f) +} diff --git a/main_test.go b/main_test.go index 9af9060..61b6a88 100644 --- a/main_test.go +++ b/main_test.go @@ -122,3 +122,47 @@ func TestAsyncBus(t *testing.T) { t.Logf("%d", c) } + +func TestRange(t *testing.T) { + + type Event1 struct{} + type Event2 struct{} + type Event3 struct{} + type Event4 struct{} + type Event5 struct{} + + bus.Sub(func(e Event1) {}) + bus.Sub(func(e Event2) {}) + bus.Sub(func(e Event2) {}) + bus.Sub(func(e Event3) {}) + bus.Sub(func(e Event3) {}) + bus.Sub(func(e Event3) {}) + bus.Sub(func(e Event4) {}) + bus.Sub(func(e Event4) {}) + bus.Sub(func(e Event4) {}) + bus.Sub(func(e Event4) {}) + bus.Sub(func(e Event5) {}) + bus.Sub(func(e Event5) {}) + bus.Sub(func(e Event5) {}) + bus.Sub(func(e Event5) {}) + bus.Sub(func(e Event5) {}) + + seen := map[string]struct{}{ + "bus_test.Event2": {}, + "bus_test.Event3": {}, + "bus_test.Event1": {}, + "bus_test.Event5": {}, + "bus_test.Event4": {}, + } + + bus.Range(func(k, _ any) bool { + if _, has := seen[k.(string)]; has { + delete(seen, k.(string)) + } + return true + }) + + if len(seen) > 0 { + t.Fatalf("error : not all events were seen") + } +} diff --git a/test_scenarios/fire-and-forget/audit/service.go b/test_scenarios/fire-and-forget/audit/service.go index 90f435b..a1a51e0 100644 --- a/test_scenarios/fire-and-forget/audit/service.go +++ b/test_scenarios/fire-and-forget/audit/service.go @@ -15,8 +15,8 @@ type ServiceImpl struct { func NewAuditService(sb *strings.Builder) ServiceImpl { result := ServiceImpl{sb: sb} bus.Sub(result.OnUserRegisteredEvent) - bus.SubUnsub(result.OnSMSRequestEvent) - bus.SubUnsub(result.OnSMSSentEvent) + bus.SubCancel(result.OnSMSRequestEvent) + bus.SubCancel(result.OnSMSSentEvent) return result }