Skip to content

Commit

Permalink
initial version
Browse files Browse the repository at this point in the history
  • Loading branch information
muir committed Jul 19, 2024
0 parents commit 8a59da0
Show file tree
Hide file tree
Showing 10 changed files with 426 additions and 0 deletions.
16 changes: 16 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# To get started with Dependabot version updates, you'll need to specify which
# package ecosystems to update and where the package manifests are located.
# Please see the documentation for all configuration options:
# https://help.github.com/github/administering-a-repository/configuration-options-for-dependency-updates

version: 2
updates:
- package-ecosystem: "gomod" # See documentation for possible values
directory: "/" # Location of package manifests
schedule:
interval: "daily"

- package-ecosystem: github-actions
directory: /
schedule:
interval: daily
26 changes: 26 additions & 0 deletions .github/workflows/codecov.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
name: Test and coverage

on: [push, pull_request]

permissions: # added using https://github.com/step-security/secure-workflows
contents: read

jobs:
build:
runs-on: ubuntu-latest
steps:
- name: Harden Runner
uses: step-security/harden-runner@17d0e2bd7d51742c71671bd19fa12bdc9d40a3d6
with:
egress-policy: audit # TODO: change to 'egress-policy: block' after couple of runs

- uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332
with:
fetch-depth: 2
- uses: actions/setup-go@0a12ed9d6a96ab950c8f026ed9f722fe0da7ef32
with:
go-version: '1.18'
- name: Run coverage
run: go test ./... -race -coverprofile=coverage.txt -covermode=atomic
- name: Upload coverage to Codecov
run: bash <(curl -s https://codecov.io/bash)
78 changes: 78 additions & 0 deletions .github/workflows/codeql-analysis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# For most projects, this workflow file will not need changing; you simply need
# to commit it to your repository.
#
# You may wish to alter this file to override the set of languages analyzed,
# or to provide custom queries or build logic.
#
# ******** NOTE ********
# We have attempted to detect the languages in your repository. Please check
# the `language` matrix defined below to confirm you have the correct set of
# supported CodeQL languages.
#
name: "CodeQL"

on:
push:
branches: [ main ]
pull_request:
# The branches below must be a subset of the branches above
branches: [ main ]
schedule:
- cron: '20 3 * * 4'

permissions: # added using https://github.com/step-security/secure-workflows
contents: read

jobs:
analyze:
name: Analyze
runs-on: ubuntu-latest
permissions:
actions: read
contents: read
security-events: write

strategy:
fail-fast: false
matrix:
language: [ 'go' ]
# CodeQL supports [ 'cpp', 'csharp', 'go', 'java', 'javascript', 'python', 'ruby' ]
# Learn more about CodeQL language support at https://git.io/codeql-language-support

steps:
- name: Harden Runner
uses: step-security/harden-runner@17d0e2bd7d51742c71671bd19fa12bdc9d40a3d6
with:
egress-policy: audit # TODO: change to 'egress-policy: block' after couple of runs

- name: Checkout repository
uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332

# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
uses: github/codeql-action/init@cdcdbb579706841c47f7063dda365e292e5cad7a
with:
languages: ${{ matrix.language }}
# If you wish to specify custom queries, you can do so here or in a config file.
# By default, queries listed here will override any specified in a config file.
# Prefix the list here with "+" to use these queries and those in the config file.
# queries: ./path/to/local/query, your-org/your-repo/queries@main

# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
# If this step fails, then you should remove it and run the build manually (see below)
- name: Autobuild
uses: github/codeql-action/autobuild@cdcdbb579706841c47f7063dda365e292e5cad7a

# ℹ️ Command-line programs to run using the OS shell.
# 📚 https://git.io/JvXDl

# ✏️ If the Autobuild fails above, remove it and uncomment the following three lines
# and modify them (or add more) to build your code if your project
# uses a compiled language

#- run: |
# make bootstrap
# make release

- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@cdcdbb579706841c47f7063dda365e292e5cad7a
26 changes: 26 additions & 0 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
on: [push, pull_request]
name: Unit Tests
permissions: # added using https://github.com/step-security/secure-workflows
contents: read

jobs:
test:
strategy:
matrix:
go-version: [1.18.x, 1.19.x, 1.20.x, 1.21.x, 1.22.x]
os: [ubuntu-latest, macos-latest, windows-latest]
runs-on: ${{ matrix.os }}
steps:
- name: Harden Runner
uses: step-security/harden-runner@17d0e2bd7d51742c71671bd19fa12bdc9d40a3d6
with:
egress-policy: audit # TODO: change to 'egress-policy: block' after couple of runs

- name: Install Go
uses: actions/setup-go@0a12ed9d6a96ab950c8f026ed9f722fe0da7ef32
with:
go-version: ${{ matrix.go-version }}
- name: Checkout code
uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332
- name: Test
run: go test ./...
44 changes: 44 additions & 0 deletions .github/workflows/golangci-lint.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
name: golangci-lint
on: [push ]
permissions: # added using https://github.com/step-security/secure-workflows
contents: read

jobs:
golangci:
permissions:
contents: read # for actions/checkout to fetch code
pull-requests: read # for golangci/golangci-lint-action to fetch pull requests
name: lint
runs-on: ubuntu-latest
steps:
- name: Harden Runner
uses: step-security/harden-runner@17d0e2bd7d51742c71671bd19fa12bdc9d40a3d6
with:
egress-policy: audit # TODO: change to 'egress-policy: block' after couple of runs

- uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332
- name: golangci-lint
uses: golangci/golangci-lint-action@a4f60bb28d35aeee14e6880718e0c85ff1882e64
with:
# Optional: version of golangci-lint to use in form of v1.2 or v1.2.3 or `latest` to use the latest version
version: latest

# Optional: working directory, useful for monorepos
# working-directory: somedir
output: checkstyle

# Optional: golangci-lint command line arguments.
# args: --issues-exit-code=0
# args: --out-format checkstyle

# Optional: show only new issues if it's a pull request. The default value is `false`.
# only-new-issues: true

# Optional: if set to true then the action will use pre-installed Go.
# skip-go-installation: true

# Optional: if set to true then the action don't cache or restore ~/go/pkg.
# skip-pkg-cache: true

# Optional: if set to true then the action don't cache or restore ~/.cache/go-build.
# skip-build-cache: true
22 changes: 22 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
The MIT License (MIT)

Copyright (c) 2024 SingleStore, Inc

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

30 changes: 30 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# keyeddistributor - distribute events to listeners by key

[![GoDoc](https://godoc.org/github.com/memsql/keyeddistributor?status.svg)](https://pkg.go.dev/github.com/memsql/keyeddistributor)
![unit tests](https://github.com/memsql/keyeddistributor/actions/workflows/go.yml/badge.svg)
[![report card](https://goreportcard.com/badge/github.com/memsql/keyeddistributor)](https://goreportcard.com/report/github.com/memsql/keyeddistributor)
[![codecov](https://codecov.io/gh/memsql/keyeddistributor/branch/main/graph/badge.svg)](https://codecov.io/gh/memsql/keyeddistributor)

Install:

go get github.com/memsql/keyeddistributor

---

Keyeddistributor is an add-on to [eventdistributor](https://github.com/sharnoff/eventdistributor) that
allows for ad-hoc subscription to events that match a key value.

## An example

```go
keyed := keyeddistributor.New(func(t Thing) ThingKey) {
return t.Key // or whatever is needed to extract a ThingKey from a Thing
})

reader := keyed.Subscribe("some key value")
defer reader.Unsubscribe()

<-reader.WaitChan()
thing := reader.Consume()
// do stuff with thing
```
16 changes: 16 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
module github.com/memsql/keyeddistributor

go 1.18

require (
github.com/muir/nject v1.7.1
github.com/stretchr/testify v1.8.4
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/muir/reflectutils v0.7.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
87 changes: 87 additions & 0 deletions keyed.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package keyed

import (
"github.com/sharnoff/eventdistributor"

Check failure on line 4 in keyed.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, macos-latest)

github.com/davecgh/go-spew@v1.1.1: missing go.sum entry for go.mod file; to add it:

Check failure on line 4 in keyed.go

View workflow job for this annotation

GitHub Actions / build

github.com/davecgh/go-spew@v1.1.1: missing go.sum entry; to add it:

Check failure on line 4 in keyed.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, ubuntu-latest)

github.com/davecgh/go-spew@v1.1.1: missing go.sum entry for go.mod file; to add it:

Check failure on line 4 in keyed.go

View workflow job for this annotation

GitHub Actions / test (1.18.x, ubuntu-latest)

github.com/davecgh/go-spew@v1.1.1: missing go.sum entry; to add it:

Check failure on line 4 in keyed.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, macos-latest)

github.com/davecgh/go-spew@v1.1.1: missing go.sum entry for go.mod file; to add it:

Check failure on line 4 in keyed.go

View workflow job for this annotation

GitHub Actions / test (1.22.x, macos-latest)

github.com/davecgh/go-spew@v1.1.1: missing go.sum entry for go.mod file; to add it:

Check failure on line 4 in keyed.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, ubuntu-latest)

github.com/davecgh/go-spew@v1.1.1: missing go.sum entry for go.mod file; to add it:

Check failure on line 4 in keyed.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, ubuntu-latest)

github.com/davecgh/go-spew@v1.1.1: missing go.sum entry for go.mod file; to add it:

Check failure on line 4 in keyed.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, macos-latest)

github.com/davecgh/go-spew@v1.1.1: missing go.sum entry for go.mod file; to add it:

Check failure on line 4 in keyed.go

View workflow job for this annotation

GitHub Actions / test (1.22.x, ubuntu-latest)

github.com/davecgh/go-spew@v1.1.1: missing go.sum entry for go.mod file; to add it:

Check failure on line 4 in keyed.go

View workflow job for this annotation

GitHub Actions / build

github.com/davecgh/go-spew@v1.1.1: missing go.sum entry; to add it:

"singlestore.com/helios/util/refcountmap"

Check failure on line 6 in keyed.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, macos-latest)

github.com/davecgh/go-spew@v1.1.1: missing go.sum entry for go.mod file; to add it:

Check failure on line 6 in keyed.go

View workflow job for this annotation

GitHub Actions / build

github.com/davecgh/go-spew@v1.1.1: missing go.sum entry; to add it:

Check failure on line 6 in keyed.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, ubuntu-latest)

github.com/davecgh/go-spew@v1.1.1: missing go.sum entry for go.mod file; to add it:

Check failure on line 6 in keyed.go

View workflow job for this annotation

GitHub Actions / test (1.18.x, ubuntu-latest)

github.com/davecgh/go-spew@v1.1.1: missing go.sum entry; to add it:

Check failure on line 6 in keyed.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, macos-latest)

github.com/davecgh/go-spew@v1.1.1: missing go.sum entry for go.mod file; to add it:

Check failure on line 6 in keyed.go

View workflow job for this annotation

GitHub Actions / test (1.22.x, macos-latest)

github.com/davecgh/go-spew@v1.1.1: missing go.sum entry for go.mod file; to add it:

Check failure on line 6 in keyed.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, ubuntu-latest)

github.com/davecgh/go-spew@v1.1.1: missing go.sum entry for go.mod file; to add it:

Check failure on line 6 in keyed.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, ubuntu-latest)

github.com/davecgh/go-spew@v1.1.1: missing go.sum entry for go.mod file; to add it:

Check failure on line 6 in keyed.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, macos-latest)

github.com/davecgh/go-spew@v1.1.1: missing go.sum entry for go.mod file; to add it:

Check failure on line 6 in keyed.go

View workflow job for this annotation

GitHub Actions / test (1.22.x, ubuntu-latest)

github.com/davecgh/go-spew@v1.1.1: missing go.sum entry for go.mod file; to add it:

Check failure on line 6 in keyed.go

View workflow job for this annotation

GitHub Actions / build

github.com/davecgh/go-spew@v1.1.1: missing go.sum entry; to add it:
)

/*
Package keyeddistributor provides a way to wrap [eventdistributor]
so that it is efficient to receive notification only for the
specific events you're interested in: those that have a specific
value as derived from the underlying event.
[eventdistributor]: https://pkg.go.dev/github.com/sharnoff/eventdistributor
*/

var closedChannel = func() <-chan struct{} {
c := make(chan struct{})
close(c)
return c
}()

// Distributor supports subscribing to get a Reader
type Distributor[E any, K comparable] struct {
m *refcountmap.Map[K, *eventdistributor.Distributor[E]]
key func(E) K
}

// Reader is used by a single subscriber to get events. Select on
// reader.WaitChan() to know when there is an event ready to consume
// and then use reader.Consume() to get the event. Use reader.Unsubscribe()
// when the reader is no longer needed. Reader embeds
// eventdistributor's Reader.
type Reader[E any] struct {
eventdistributor.Reader[E]
release func()
}

func New[E any, K comparable](key func(E) K) *Distributor[E, K] {
return &Distributor[E, K]{
m: refcountmap.New[K](func() *eventdistributor.Distributor[E] {
return eventdistributor.New[E]()
}),
key: key,
}
}

// Submit pushes an event into the Distributor. The returned channel is closed
// when the event has been fully consumed. If there are no subscribers, the
// event will be considered consumed immediately.
//
// Submit is thread-safe
func (k *Distributor[E, K]) Submit(e E) <-chan struct{} {
key := k.key(e)
dist, ok := k.m.Load(key)
if !ok {
return closedChannel
}
return dist.Submit(e)
}

// Subscribe creates a Reader that listens for events where the key matches
// a specific value. It is recommended that immediately after a Subscribe,
// that you defer the Unsubscribe:
//
// reader := distributor.Subscribe(someValue)
// defer reader.Unsubscribe()
//
// Subscribe is thread-safe
func (k *Distributor[E, K]) Subscribe(value K) *Reader[E] {
d, release, _ := k.m.Get(value)
reader := d.Subscribe()
return &Reader[E]{
Reader: reader,
release: release,
}
}

// Unsubscribe releases a Reader. After an Unsubscribe, the WaitChan() and
// Consume() methods should not be used. It is important to Unsubscribe()
// because otherwise the Distributor will keep buffering events that are
// meant for the reader.
func (r *Reader[E]) Unsubscribe() {
r.release()
r.Reader.Unsubscribe()
}
Loading

0 comments on commit 8a59da0

Please sign in to comment.