diff --git a/circuitbreaker/circuitbreaker.go b/circuitbreaker/circuitbreaker.go index 5199fbb..47e692e 100644 --- a/circuitbreaker/circuitbreaker.go +++ b/circuitbreaker/circuitbreaker.go @@ -2,114 +2,14 @@ package circuitbreaker import ( "errors" - "sync" - "sync/atomic" ) // ErrNotAllowed error not allowed. var ErrNotAllowed = errors.New("circuitbreaker: not allowed for circuit open") -// State . -type State int - -const ( - // StateOpen when circuit breaker open, request not allowed, after sleep - // some duration, allow one single request for testing the health, if ok - // then state reset to closed, if not continue the step. - StateOpen State = iota - // StateClosed when circuit breaker closed, request allowed, the breaker - // calc the succeed ratio, if request num greater request setting and - // ratio lower than the setting ratio, then reset state to open. - StateClosed - // StateHalfopen when circuit breaker open, after slepp some duration, allow - // one request, but not state closed. - StateHalfopen -) - // CircuitBreaker is a circuit breaker. type CircuitBreaker interface { Allow() error MarkSuccess() MarkFailed() } - -// Group . -type Group struct { - mutex sync.Mutex - val atomic.Value - - New func() CircuitBreaker -} - -// Get . -func (g *Group) Get(name string) CircuitBreaker { - m, ok := g.val.Load().(map[string]CircuitBreaker) - if ok { - breaker, ok := m[name] - if ok { - return breaker - } - } - // slowpath for group don`t have specified name breaker. - g.mutex.Lock() - nm := make(map[string]CircuitBreaker, len(m)+1) - for k, v := range m { - nm[k] = v - } - breaker := g.New() - nm[name] = breaker - g.val.Store(nm) - g.mutex.Unlock() - return breaker -} - -// Do runs your function in a synchronous manner, blocking until either your -// function succeeds or an error is returned, including circuit errors. -func (g *Group) Do(name string, fn func() error, fbs ...func(error) error) error { - breaker := g.Get(name) - err := breaker.Allow() - if err == nil { - if err = fn(); err == nil { - breaker.MarkSuccess() - return nil - } - switch v := err.(type) { - case ignore: - breaker.MarkSuccess() - err = v.error - case drop: - breaker.MarkFailed() - err = v.error - default: - breaker.MarkFailed() - } - } - // fallback the request - if err != nil { - oe := err // save origin error - for _, fb := range fbs { - if err = fb(oe); err == nil { - return nil - } - } - } - return err -} - -type ignore struct { - error -} - -// Ignore ignore the error. -func Ignore(err error) error { - return ignore{err} -} - -type drop struct { - error -} - -// Drop drop the error. -func Drop(err error) error { - return drop{err} -} diff --git a/circuitbreaker/example_test.go b/circuitbreaker/example_test.go index e984dd8..fc97ce3 100644 --- a/circuitbreaker/example_test.go +++ b/circuitbreaker/example_test.go @@ -1,52 +1,22 @@ package circuitbreaker_test import ( - "errors" "fmt" - "github.com/go-kratos/aegis/circuitbreaker" "github.com/go-kratos/aegis/circuitbreaker/sre" ) // This is a example of using a circuit breaker Do() when return nil. func Example() { - g := &circuitbreaker.Group{New: func() circuitbreaker.CircuitBreaker { - return sre.NewBreaker() - }} - err := g.Do("do", func() error { - // dosomething - return nil - }) - + b := sre.NewBreaker() + for i := 0; i < 1000; i++ { + b.MarkSuccess() + } + for i := 0; i < 100; i++ { + b.MarkFailed() + } + + err := b.Allow() fmt.Printf("err=%v", err) // Output: err= } - -// This is a example of using a circuit breaker fn failed then call fallback. -func Example_fallback() { - g := &circuitbreaker.Group{New: func() circuitbreaker.CircuitBreaker { - return sre.NewBreaker() - }} - err := g.Do("do", func() error { - // dosomething - return errors.New("fallback") - }) - - fmt.Printf("err=%v", err) - // Output: err=fallback -} - -// This is a example of using a circuit breaker fn failed but ignore error mark -// as success. -func Example_ignore() { - g := &circuitbreaker.Group{New: func() circuitbreaker.CircuitBreaker { - return sre.NewBreaker() - }} - err := g.Do("do", func() error { - // dosomething - return circuitbreaker.Ignore(errors.New("ignore")) - }) - - fmt.Printf("err=%v", err) - // Output: err=ignore -} diff --git a/circuitbreaker/sre/sre.go b/circuitbreaker/sre/sre.go index a76a638..238c69a 100644 --- a/circuitbreaker/sre/sre.go +++ b/circuitbreaker/sre/sre.go @@ -85,7 +85,7 @@ type Breaker struct { // NewBreaker return a sreBresker with options func NewBreaker(opts ...Option) circuitbreaker.CircuitBreaker { opt := options{ - success: 0.5, + success: 0.6, request: 100, bucket: 10, window: 3 * time.Second, @@ -123,11 +123,12 @@ func (b *Breaker) summary() (success int64, total int64) { // Allow request if error returns nil. func (b *Breaker) Allow() error { - success, total := b.summary() - k := b.k * float64(success) - - // check overflow requests = K * success - if total < b.request || float64(total) < k { + // The number of requests accepted by the backend + accepts, total := b.summary() + // The number of requests attempted by the application layer(at the client, on top of the adaptive throttling system) + requests := b.k * float64(accepts) + // check overflow requests = K * accepts + if total < b.request || float64(total) < requests { if atomic.LoadInt32(&b.state) == StateOpen { atomic.CompareAndSwapInt32(&b.state, StateOpen, StateClosed) } @@ -136,9 +137,8 @@ func (b *Breaker) Allow() error { if atomic.LoadInt32(&b.state) == StateClosed { atomic.CompareAndSwapInt32(&b.state, StateClosed, StateOpen) } - dr := math.Max(0, (float64(total)-k)/float64(total+1)) + dr := math.Max(0, (float64(total)-requests)/float64(total+1)) drop := b.trueOnProba(dr) - if drop { return circuitbreaker.ErrNotAllowed } diff --git a/ratelimit/bbr/bbr.go b/ratelimit/bbr/bbr.go index c22fb9c..aadebdf 100644 --- a/ratelimit/bbr/bbr.go +++ b/ratelimit/bbr/bbr.go @@ -267,7 +267,7 @@ func (l *BBR) Stat() Stat { // Allow checks all inbound traffic. // Once overload is detected, it raises limit.ErrLimitExceed error. -func (l *BBR) Allow() (ratelimit.Done, error) { +func (l *BBR) Allow() (ratelimit.DoneFunc, error) { if l.shouldDrop() { return nil, ratelimit.ErrLimitExceed } diff --git a/ratelimit/ratelimit.go b/ratelimit/ratelimit.go index cc99580..847ec0f 100644 --- a/ratelimit/ratelimit.go +++ b/ratelimit/ratelimit.go @@ -2,8 +2,6 @@ package ratelimit import ( "errors" - "sync" - "sync/atomic" ) var ( @@ -12,8 +10,8 @@ var ( ErrLimitExceed = errors.New("rate limit exceeded") ) -// Done is done function. -type Done func(DoneInfo) +// DoneFunc is done function. +type DoneFunc func(DoneInfo) // DoneInfo is done info. type DoneInfo struct { @@ -22,55 +20,5 @@ type DoneInfo struct { // Limiter is a rate limiter. type Limiter interface { - Allow() (Done, error) -} - -// Group . -type Group struct { - mutex sync.Mutex - val atomic.Value - - New func() Limiter -} - -// Get . -func (g *Group) Get(name string) Limiter { - m, ok := g.val.Load().(map[string]Limiter) - if ok { - limiter, ok := m[name] - if ok { - return limiter - } - } - // slowpath for group don`t have specified name breaker. - g.mutex.Lock() - nm := make(map[string]Limiter, len(m)+1) - for k, v := range m { - nm[k] = v - } - limiter := g.New() - nm[name] = limiter - g.val.Store(nm) - g.mutex.Unlock() - return limiter -} - -// Do runs your function in a synchronous manner, blocking until either your -// function succeeds or an error is returned, including circuit errors. -func (g *Group) Do(name string, fn func() error, fbs ...func(error) error) error { - limit := g.Get(name) - done, err := limit.Allow() - if err == nil { - done(DoneInfo{Err: fn()}) - } - // fallback the request - if err != nil { - oe := err // save origin error - for _, fb := range fbs { - if err = fb(oe); err == nil { - return nil - } - } - } - return err + Allow() (DoneFunc, error) }