Skip to content

Commit

Permalink
add broker to proto
Browse files Browse the repository at this point in the history
  • Loading branch information
PotatoCloud committed Aug 2, 2023
1 parent d49ce8e commit 31468ea
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 13 deletions.
76 changes: 63 additions & 13 deletions proto/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,19 @@ package proto

import (
"bytes"
"context"
qWebsocket "github.com/RealFax/q-websocket"
"github.com/gobwas/ws"
"github.com/pkg/errors"
"log"
"os"
"runtime/debug"
"sync/atomic"
)

type (
HandlerFunc[T any] func(request *Request[T])
BrokerFunc[T any] func(request *Request[T]) error
RecoveryFunc[T any, K comparable] func(protoKey K, request *Request[T], err any)
NewProtoFunc[T any, K comparable] func() Proto[T, K]
DestroyProtoFunc[T any, K comparable] func(params *qWebsocket.HandlerParams, proto Proto[T, K])
Expand All @@ -20,6 +25,8 @@ type Engine[T any, K comparable] struct {

codec Codec[T, K]

brokers []BrokerFunc[T]

handlers map[K]HandlerFunc[T]

// recovery should be called when the handler panic
Expand All @@ -32,9 +39,37 @@ type Engine[T any, K comparable] struct {
destroyProto DestroyProtoFunc[T, K]
}

func (e *Engine[T, K]) handlerError(params *qWebsocket.HandlerParams, err error) {
log.Printf("proto handler error: %s\n", err.Error())

countAddr, ok := params.WsConn.Context().Value("ERR_COUNT").(*uint32)
if !ok {
// init ERR_COUNT
addr := uint32(1)
params.WsConn.SetContext(context.WithValue(
params.WsConn.Context(),
"ERR_COUNT",
&addr,
))
return
}

count := atomic.AddUint32(countAddr, 1)
if count == 3 {
defer params.WsConn.Close()
ws.WriteFrame(
params.Writer,
ws.NewCloseFrame(ws.NewCloseFrameBody(ws.StatusGoingAway, "too many error")),
)
return
}
}

// handler
//
// impl the handler of q-websocket
//
// Codec.Unmarshal proto -> find handler -> call brokers -> call handler
func (e *Engine[T, K]) handler(params *qWebsocket.HandlerParams) {
proto := e.newProto()

Expand All @@ -43,13 +78,15 @@ func (e *Engine[T, K]) handler(params *qWebsocket.HandlerParams) {
defer e.destroyProto(params, proto)
}

if err := e.codec.Unmarshal(params.WsConn.ID, bytes.NewReader(params.Request), proto); err != nil {
log.Println("Proto engine codec error:", err)
var err error
if err = e.codec.Unmarshal(params.WsConn.ID, bytes.NewReader(params.Request), proto); err != nil {
e.handlerError(params, errors.Wrap(err, "codec error"))
return
}

handler, ok := e.handlers[proto.Key()]
if !ok {
e.handlerError(params, errors.Errorf("no %v handler", proto.Key()))
return
}

Expand All @@ -61,27 +98,37 @@ func (e *Engine[T, K]) handler(params *qWebsocket.HandlerParams) {
RawRequest: params.Request,
}

// call brokers
for _, broker := range e.brokers {
if err = broker(req); err != nil {
e.handlerError(params, errors.Wrap(err, "called broker error"))
return
}
}

// panic handler
defer func() {
var err any
if err = recover(); err == nil {
var pErr any
if pErr = recover(); pErr == nil {
return
}

if e.recovery != nil {
e.recovery(proto.Key(), req, err)
e.recovery(proto.Key(), req, pErr)
return
}

// default recovery handler
switch tErr := err.(type) {
switch val := pErr.(type) {
case error:
os.Stderr.WriteString(tErr.Error())
os.Stderr.WriteString(val.Error())
case string:
os.Stderr.WriteString(tErr)
os.Stderr.WriteString(val)
case interface{ String() string }:
os.Stderr.WriteString(tErr.String())
os.Stderr.WriteString(val.String())
}
os.Stderr.WriteString("\n")
debug.PrintStack()
os.Exit(0)
}()

Expand All @@ -96,34 +143,37 @@ func (e *Engine[T, K]) UseHandler() qWebsocket.HandlerFunc {
return e.handler
}

func (e *Engine[T, K]) UseBrokers(brokers ...BrokerFunc[T]) {
if e.state.Load() == 1 {
panic("proto: UseBrokers should be called before UseHandler")
}
e.brokers = append(e.brokers, brokers...)
}

func (e *Engine[T, K]) Register(key K, handler HandlerFunc[T]) {
if e.state.Load() == 1 {
panic("proto: Register should be called before UseHandler")
}

e.handlers[key] = handler
}

func (e *Engine[T, K]) RegisterRecovery(recovery RecoveryFunc[T, K]) {
if e.state.Load() == 1 {
panic("proto: RegisterRecovery should be called before UseHandler")
}

e.recovery = recovery
}

func (e *Engine[T, K]) RegisterCodec(codec Codec[T, K]) {
if e.state.Load() == 1 {
panic("proto: RegisterCodec should be called before UseHandler")
}

e.codec = codec
}

func (e *Engine[T, K]) RegisterDestroyProto(handler DestroyProtoFunc[T, K]) {
if e.state.Load() == 1 {
panic("proto: RegisterDestroyProto should be called before UseHandler")
}

e.destroyProto = handler
}
1 change: 1 addition & 0 deletions proto/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type Resetter interface {
func New[T any, K comparable](newProto NewProtoFunc[T, K]) *Engine[T, K] {
return &Engine[T, K]{
codec: &CodecJSON[T, K]{},
brokers: make([]BrokerFunc[T], 0),
handlers: make(map[K]HandlerFunc[T]),
newProto: newProto,
}
Expand Down

0 comments on commit 31468ea

Please sign in to comment.