diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 0000000..a07e9ab --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,16 @@ +# To get started with Dependabot version updates, you'll need to specify which +# package ecosystems to update and where the package manifests are located. +# Please see the documentation for all configuration options: +# https://help.github.com/github/administering-a-repository/configuration-options-for-dependency-updates + +version: 2 +updates: + - package-ecosystem: "gomod" # See documentation for possible values + directory: "/" # Location of package manifests + schedule: + interval: "daily" + + - package-ecosystem: github-actions + directory: / + schedule: + interval: daily diff --git a/.github/workflows/codecov.yml b/.github/workflows/codecov.yml new file mode 100644 index 0000000..f9a7c5d --- /dev/null +++ b/.github/workflows/codecov.yml @@ -0,0 +1,26 @@ +name: Test and coverage + +on: [push, pull_request] + +permissions: # added using https://github.com/step-security/secure-workflows + contents: read + +jobs: + build: + runs-on: ubuntu-latest + steps: + - name: Harden Runner + uses: step-security/harden-runner@17d0e2bd7d51742c71671bd19fa12bdc9d40a3d6 + with: + egress-policy: audit # TODO: change to 'egress-policy: block' after couple of runs + + - uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 + with: + fetch-depth: 2 + - uses: actions/setup-go@0a12ed9d6a96ab950c8f026ed9f722fe0da7ef32 + with: + go-version: '1.18' + - name: Run coverage + run: go test ./... -race -coverprofile=coverage.txt -covermode=atomic + - name: Upload coverage to Codecov + run: bash <(curl -s https://codecov.io/bash) diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml new file mode 100644 index 0000000..52b0fa8 --- /dev/null +++ b/.github/workflows/codeql-analysis.yml @@ -0,0 +1,78 @@ +# For most projects, this workflow file will not need changing; you simply need +# to commit it to your repository. +# +# You may wish to alter this file to override the set of languages analyzed, +# or to provide custom queries or build logic. +# +# ******** NOTE ******** +# We have attempted to detect the languages in your repository. Please check +# the `language` matrix defined below to confirm you have the correct set of +# supported CodeQL languages. +# +name: "CodeQL" + +on: + push: + branches: [ main ] + pull_request: + # The branches below must be a subset of the branches above + branches: [ main ] + schedule: + - cron: '20 3 * * 4' + +permissions: # added using https://github.com/step-security/secure-workflows + contents: read + +jobs: + analyze: + name: Analyze + runs-on: ubuntu-latest + permissions: + actions: read + contents: read + security-events: write + + strategy: + fail-fast: false + matrix: + language: [ 'go' ] + # CodeQL supports [ 'cpp', 'csharp', 'go', 'java', 'javascript', 'python', 'ruby' ] + # Learn more about CodeQL language support at https://git.io/codeql-language-support + + steps: + - name: Harden Runner + uses: step-security/harden-runner@17d0e2bd7d51742c71671bd19fa12bdc9d40a3d6 + with: + egress-policy: audit # TODO: change to 'egress-policy: block' after couple of runs + + - name: Checkout repository + uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 + + # Initializes the CodeQL tools for scanning. + - name: Initialize CodeQL + uses: github/codeql-action/init@cdcdbb579706841c47f7063dda365e292e5cad7a + with: + languages: ${{ matrix.language }} + # If you wish to specify custom queries, you can do so here or in a config file. + # By default, queries listed here will override any specified in a config file. + # Prefix the list here with "+" to use these queries and those in the config file. + # queries: ./path/to/local/query, your-org/your-repo/queries@main + + # Autobuild attempts to build any compiled languages (C/C++, C#, or Java). + # If this step fails, then you should remove it and run the build manually (see below) + - name: Autobuild + uses: github/codeql-action/autobuild@cdcdbb579706841c47f7063dda365e292e5cad7a + + # ℹī¸ Command-line programs to run using the OS shell. + # 📚 https://git.io/JvXDl + + # ✏ī¸ If the Autobuild fails above, remove it and uncomment the following three lines + # and modify them (or add more) to build your code if your project + # uses a compiled language + + #- run: | + # make bootstrap + # make release + + - name: Perform CodeQL Analysis + uses: github/codeql-action/analyze@cdcdbb579706841c47f7063dda365e292e5cad7a diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml new file mode 100644 index 0000000..2445ebf --- /dev/null +++ b/.github/workflows/go.yml @@ -0,0 +1,26 @@ +on: [push, pull_request] +name: Unit Tests +permissions: # added using https://github.com/step-security/secure-workflows + contents: read + +jobs: + test: + strategy: + matrix: + go-version: [1.18.x, 1.19.x, 1.20.x, 1.21.x, 1.22.x] + os: [ubuntu-latest, macos-latest, windows-latest] + runs-on: ${{ matrix.os }} + steps: + - name: Harden Runner + uses: step-security/harden-runner@17d0e2bd7d51742c71671bd19fa12bdc9d40a3d6 + with: + egress-policy: audit # TODO: change to 'egress-policy: block' after couple of runs + + - name: Install Go + uses: actions/setup-go@0a12ed9d6a96ab950c8f026ed9f722fe0da7ef32 + with: + go-version: ${{ matrix.go-version }} + - name: Checkout code + uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 + - name: Test + run: go test ./... diff --git a/.github/workflows/golangci-lint.yml b/.github/workflows/golangci-lint.yml new file mode 100644 index 0000000..2461b71 --- /dev/null +++ b/.github/workflows/golangci-lint.yml @@ -0,0 +1,44 @@ +name: golangci-lint +on: [push ] +permissions: # added using https://github.com/step-security/secure-workflows + contents: read + +jobs: + golangci: + permissions: + contents: read # for actions/checkout to fetch code + pull-requests: read # for golangci/golangci-lint-action to fetch pull requests + name: lint + runs-on: ubuntu-latest + steps: + - name: Harden Runner + uses: step-security/harden-runner@17d0e2bd7d51742c71671bd19fa12bdc9d40a3d6 + with: + egress-policy: audit # TODO: change to 'egress-policy: block' after couple of runs + + - uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 + - name: golangci-lint + uses: golangci/golangci-lint-action@a4f60bb28d35aeee14e6880718e0c85ff1882e64 + with: + # Optional: version of golangci-lint to use in form of v1.2 or v1.2.3 or `latest` to use the latest version + version: latest + + # Optional: working directory, useful for monorepos + # working-directory: somedir + output: checkstyle + + # Optional: golangci-lint command line arguments. + # args: --issues-exit-code=0 + # args: --out-format checkstyle + + # Optional: show only new issues if it's a pull request. The default value is `false`. + # only-new-issues: true + + # Optional: if set to true then the action will use pre-installed Go. + # skip-go-installation: true + + # Optional: if set to true then the action don't cache or restore ~/go/pkg. + # skip-pkg-cache: true + + # Optional: if set to true then the action don't cache or restore ~/.cache/go-build. + # skip-build-cache: true diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..91a7e2c --- /dev/null +++ b/LICENSE @@ -0,0 +1,22 @@ +The MIT License (MIT) + +Copyright (c) 2024 SingleStore, Inc + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + diff --git a/README.md b/README.md new file mode 100644 index 0000000..fcd271a --- /dev/null +++ b/README.md @@ -0,0 +1,30 @@ +# keyeddistributor - distribute events to listeners by key + +[![GoDoc](https://godoc.org/github.com/memsql/keyeddistributor?status.svg)](https://pkg.go.dev/github.com/memsql/keyeddistributor) +![unit tests](https://github.com/memsql/keyeddistributor/actions/workflows/go.yml/badge.svg) +[![report card](https://goreportcard.com/badge/github.com/memsql/keyeddistributor)](https://goreportcard.com/report/github.com/memsql/keyeddistributor) +[![codecov](https://codecov.io/gh/memsql/keyeddistributor/branch/main/graph/badge.svg)](https://codecov.io/gh/memsql/keyeddistributor) + +Install: + + go get github.com/memsql/keyeddistributor + +--- + +Keyeddistributor is an add-on to [eventdistributor](https://github.com/sharnoff/eventdistributor) that +allows for ad-hoc subscription to events that match a key value. + +## An example + +```go +keyed := keyeddistributor.New(func(t Thing) ThingKey) { + return t.Key // or whatever is needed to extract a ThingKey from a Thing +}) + +reader := keyed.Subscribe("some key value") +defer reader.Unsubscribe() + +<-reader.WaitChan() +thing := reader.Consume() +// do stuff with thing +``` diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..a383a52 --- /dev/null +++ b/go.mod @@ -0,0 +1,16 @@ +module github.com/memsql/keyeddistributor + +go 1.18 + +require ( + github.com/muir/nject v1.7.1 + github.com/stretchr/testify v1.8.4 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/muir/reflectutils v0.7.0 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/keyed.go b/keyed.go new file mode 100644 index 0000000..156b6ca --- /dev/null +++ b/keyed.go @@ -0,0 +1,87 @@ +package keyed + +import ( + "github.com/sharnoff/eventdistributor" + + "singlestore.com/helios/util/refcountmap" +) + +/* +Package keyeddistributor provides a way to wrap [eventdistributor] +so that it is efficient to receive notification only for the +specific events you're interested in: those that have a specific +value as derived from the underlying event. + +[eventdistributor]: https://pkg.go.dev/github.com/sharnoff/eventdistributor +*/ + +var closedChannel = func() <-chan struct{} { + c := make(chan struct{}) + close(c) + return c +}() + +// Distributor supports subscribing to get a Reader +type Distributor[E any, K comparable] struct { + m *refcountmap.Map[K, *eventdistributor.Distributor[E]] + key func(E) K +} + +// Reader is used by a single subscriber to get events. Select on +// reader.WaitChan() to know when there is an event ready to consume +// and then use reader.Consume() to get the event. Use reader.Unsubscribe() +// when the reader is no longer needed. Reader embeds +// eventdistributor's Reader. +type Reader[E any] struct { + eventdistributor.Reader[E] + release func() +} + +func New[E any, K comparable](key func(E) K) *Distributor[E, K] { + return &Distributor[E, K]{ + m: refcountmap.New[K](func() *eventdistributor.Distributor[E] { + return eventdistributor.New[E]() + }), + key: key, + } +} + +// Submit pushes an event into the Distributor. The returned channel is closed +// when the event has been fully consumed. If there are no subscribers, the +// event will be considered consumed immediately. +// +// Submit is thread-safe +func (k *Distributor[E, K]) Submit(e E) <-chan struct{} { + key := k.key(e) + dist, ok := k.m.Load(key) + if !ok { + return closedChannel + } + return dist.Submit(e) +} + +// Subscribe creates a Reader that listens for events where the key matches +// a specific value. It is recommended that immediately after a Subscribe, +// that you defer the Unsubscribe: +// +// reader := distributor.Subscribe(someValue) +// defer reader.Unsubscribe() +// +// Subscribe is thread-safe +func (k *Distributor[E, K]) Subscribe(value K) *Reader[E] { + d, release, _ := k.m.Get(value) + reader := d.Subscribe() + return &Reader[E]{ + Reader: reader, + release: release, + } +} + +// Unsubscribe releases a Reader. After an Unsubscribe, the WaitChan() and +// Consume() methods should not be used. It is important to Unsubscribe() +// because otherwise the Distributor will keep buffering events that are +// meant for the reader. +func (r *Reader[E]) Unsubscribe() { + r.release() + r.Reader.Unsubscribe() +} diff --git a/keyed_test.go b/keyed_test.go new file mode 100644 index 0000000..fe0f70f --- /dev/null +++ b/keyed_test.go @@ -0,0 +1,81 @@ +package keyed_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "singlestore.com/helios/events/keyed" +) + +type MyEvent struct { + id int +} + +func TestKeyed(t *testing.T) { + distributor := keyed.New(func(e MyEvent) int { + return e.id % 4 + }) + + t.Log("subscribe to various keys") + r0 := distributor.Subscribe(0) + r1 := distributor.Subscribe(1) + r5 := distributor.Subscribe(5) // should get no events + + t.Log("none are ready") + require.False(t, ready(r0.WaitChan())) + require.False(t, ready(r1.WaitChan())) + require.False(t, ready(r5.WaitChan())) + + t.Log("submit an event that doesn't match any keys") + s0 := distributor.Submit(MyEvent{id: 2}) // nobody + require.False(t, ready(r0.WaitChan())) + require.False(t, ready(r1.WaitChan())) + require.False(t, ready(r5.WaitChan())) + require.True(t, ready(s0)) + + t.Log("submit an event that matches one key (0)") + s1 := distributor.Submit(MyEvent{id: 0}) + require.True(t, ready(r0.WaitChan())) + require.False(t, ready(r1.WaitChan())) + require.False(t, ready(r5.WaitChan())) + require.False(t, ready(s1)) + assert.Equal(t, MyEvent{id: 0}, r0.Consume()) + require.False(t, ready(r0.WaitChan())) + require.False(t, ready(r1.WaitChan())) + require.False(t, ready(r5.WaitChan())) + require.True(t, ready(s1)) + + t.Log("submit an event that matches one key (1)") + s2 := distributor.Submit(MyEvent{id: 1}) + require.False(t, ready(r0.WaitChan())) + require.True(t, ready(r1.WaitChan())) + require.False(t, ready(r5.WaitChan())) + require.False(t, ready(s2)) + assert.Equal(t, MyEvent{id: 1}, r1.Consume()) + require.False(t, ready(r0.WaitChan())) + require.False(t, ready(r1.WaitChan())) + require.False(t, ready(r5.WaitChan())) + require.True(t, ready(s2)) + + t.Log("note that unsubscribe releases the submission") + s3 := distributor.Submit(MyEvent{id: 1}) + require.False(t, ready(r0.WaitChan())) + require.True(t, ready(r1.WaitChan())) + require.False(t, ready(r5.WaitChan())) + require.False(t, ready(s3)) + r1.Unsubscribe() + require.False(t, ready(r0.WaitChan())) + require.False(t, ready(r5.WaitChan())) + require.True(t, ready(s3)) +} + +func ready(c <-chan struct{}) bool { + select { + case <-c: + return true + default: + return false + } +}