Skip to content

Commit

Permalink
remove group
Browse files Browse the repository at this point in the history
  • Loading branch information
tonybase committed Sep 8, 2021
1 parent ed7048a commit 9c51406
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 203 deletions.
100 changes: 0 additions & 100 deletions circuitbreaker/circuitbreaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,114 +2,14 @@ package circuitbreaker

import (
"errors"
"sync"
"sync/atomic"
)

// ErrNotAllowed error not allowed.
var ErrNotAllowed = errors.New("circuitbreaker: not allowed for circuit open")

// State .
type State int

const (
// StateOpen when circuit breaker open, request not allowed, after sleep
// some duration, allow one single request for testing the health, if ok
// then state reset to closed, if not continue the step.
StateOpen State = iota
// StateClosed when circuit breaker closed, request allowed, the breaker
// calc the succeed ratio, if request num greater request setting and
// ratio lower than the setting ratio, then reset state to open.
StateClosed
// StateHalfopen when circuit breaker open, after slepp some duration, allow
// one request, but not state closed.
StateHalfopen
)

// CircuitBreaker is a circuit breaker.
type CircuitBreaker interface {
Allow() error
MarkSuccess()
MarkFailed()
}

// Group .
type Group struct {
mutex sync.Mutex
val atomic.Value

New func() CircuitBreaker
}

// Get .
func (g *Group) Get(name string) CircuitBreaker {
m, ok := g.val.Load().(map[string]CircuitBreaker)
if ok {
breaker, ok := m[name]
if ok {
return breaker
}
}
// slowpath for group don`t have specified name breaker.
g.mutex.Lock()
nm := make(map[string]CircuitBreaker, len(m)+1)
for k, v := range m {
nm[k] = v
}
breaker := g.New()
nm[name] = breaker
g.val.Store(nm)
g.mutex.Unlock()
return breaker
}

// Do runs your function in a synchronous manner, blocking until either your
// function succeeds or an error is returned, including circuit errors.
func (g *Group) Do(name string, fn func() error, fbs ...func(error) error) error {
breaker := g.Get(name)
err := breaker.Allow()
if err == nil {
if err = fn(); err == nil {
breaker.MarkSuccess()
return nil
}
switch v := err.(type) {
case ignore:
breaker.MarkSuccess()
err = v.error
case drop:
breaker.MarkFailed()
err = v.error
default:
breaker.MarkFailed()
}
}
// fallback the request
if err != nil {
oe := err // save origin error
for _, fb := range fbs {
if err = fb(oe); err == nil {
return nil
}
}
}
return err
}

type ignore struct {
error
}

// Ignore ignore the error.
func Ignore(err error) error {
return ignore{err}
}

type drop struct {
error
}

// Drop drop the error.
func Drop(err error) error {
return drop{err}
}
48 changes: 9 additions & 39 deletions circuitbreaker/example_test.go
Original file line number Diff line number Diff line change
@@ -1,52 +1,22 @@
package circuitbreaker_test

import (
"errors"
"fmt"

"github.com/go-kratos/aegis/circuitbreaker"
"github.com/go-kratos/aegis/circuitbreaker/sre"
)

// This is a example of using a circuit breaker Do() when return nil.
func Example() {
g := &circuitbreaker.Group{New: func() circuitbreaker.CircuitBreaker {
return sre.NewBreaker()
}}
err := g.Do("do", func() error {
// dosomething
return nil
})

b := sre.NewBreaker()
for i := 0; i < 1000; i++ {
b.MarkSuccess()
}
for i := 0; i < 100; i++ {
b.MarkFailed()
}

err := b.Allow()
fmt.Printf("err=%v", err)
// Output: err=<nil>
}

// This is a example of using a circuit breaker fn failed then call fallback.
func Example_fallback() {
g := &circuitbreaker.Group{New: func() circuitbreaker.CircuitBreaker {
return sre.NewBreaker()
}}
err := g.Do("do", func() error {
// dosomething
return errors.New("fallback")
})

fmt.Printf("err=%v", err)
// Output: err=fallback
}

// This is a example of using a circuit breaker fn failed but ignore error mark
// as success.
func Example_ignore() {
g := &circuitbreaker.Group{New: func() circuitbreaker.CircuitBreaker {
return sre.NewBreaker()
}}
err := g.Do("do", func() error {
// dosomething
return circuitbreaker.Ignore(errors.New("ignore"))
})

fmt.Printf("err=%v", err)
// Output: err=ignore
}
16 changes: 8 additions & 8 deletions circuitbreaker/sre/sre.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ type Breaker struct {
// NewBreaker return a sreBresker with options
func NewBreaker(opts ...Option) circuitbreaker.CircuitBreaker {
opt := options{
success: 0.5,
success: 0.6,
request: 100,
bucket: 10,
window: 3 * time.Second,
Expand Down Expand Up @@ -123,11 +123,12 @@ func (b *Breaker) summary() (success int64, total int64) {

// Allow request if error returns nil.
func (b *Breaker) Allow() error {
success, total := b.summary()
k := b.k * float64(success)

// check overflow requests = K * success
if total < b.request || float64(total) < k {
// The number of requests accepted by the backend
accepts, total := b.summary()
// The number of requests attempted by the application layer(at the client, on top of the adaptive throttling system)
requests := b.k * float64(accepts)
// check overflow requests = K * accepts
if total < b.request || float64(total) < requests {
if atomic.LoadInt32(&b.state) == StateOpen {
atomic.CompareAndSwapInt32(&b.state, StateOpen, StateClosed)
}
Expand All @@ -136,9 +137,8 @@ func (b *Breaker) Allow() error {
if atomic.LoadInt32(&b.state) == StateClosed {
atomic.CompareAndSwapInt32(&b.state, StateClosed, StateOpen)
}
dr := math.Max(0, (float64(total)-k)/float64(total+1))
dr := math.Max(0, (float64(total)-requests)/float64(total+1))
drop := b.trueOnProba(dr)

if drop {
return circuitbreaker.ErrNotAllowed
}
Expand Down
2 changes: 1 addition & 1 deletion ratelimit/bbr/bbr.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func (l *BBR) Stat() Stat {

// Allow checks all inbound traffic.
// Once overload is detected, it raises limit.ErrLimitExceed error.
func (l *BBR) Allow() (ratelimit.Done, error) {
func (l *BBR) Allow() (ratelimit.DoneFunc, error) {
if l.shouldDrop() {
return nil, ratelimit.ErrLimitExceed
}
Expand Down
58 changes: 3 additions & 55 deletions ratelimit/ratelimit.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package ratelimit

import (
"errors"
"sync"
"sync/atomic"
)

var (
Expand All @@ -12,8 +10,8 @@ var (
ErrLimitExceed = errors.New("rate limit exceeded")
)

// Done is done function.
type Done func(DoneInfo)
// DoneFunc is done function.
type DoneFunc func(DoneInfo)

// DoneInfo is done info.
type DoneInfo struct {
Expand All @@ -22,55 +20,5 @@ type DoneInfo struct {

// Limiter is a rate limiter.
type Limiter interface {
Allow() (Done, error)
}

// Group .
type Group struct {
mutex sync.Mutex
val atomic.Value

New func() Limiter
}

// Get .
func (g *Group) Get(name string) Limiter {
m, ok := g.val.Load().(map[string]Limiter)
if ok {
limiter, ok := m[name]
if ok {
return limiter
}
}
// slowpath for group don`t have specified name breaker.
g.mutex.Lock()
nm := make(map[string]Limiter, len(m)+1)
for k, v := range m {
nm[k] = v
}
limiter := g.New()
nm[name] = limiter
g.val.Store(nm)
g.mutex.Unlock()
return limiter
}

// Do runs your function in a synchronous manner, blocking until either your
// function succeeds or an error is returned, including circuit errors.
func (g *Group) Do(name string, fn func() error, fbs ...func(error) error) error {
limit := g.Get(name)
done, err := limit.Allow()
if err == nil {
done(DoneInfo{Err: fn()})
}
// fallback the request
if err != nil {
oe := err // save origin error
for _, fb := range fbs {
if err = fb(oe); err == nil {
return nil
}
}
}
return err
Allow() (DoneFunc, error)
}

0 comments on commit 9c51406

Please sign in to comment.