Skip to content

Commit

Permalink
Game server rewrites; somewhat ingame now
Browse files Browse the repository at this point in the history
  • Loading branch information
jchv committed Jun 19, 2023
1 parent 01f1456 commit e628a8a
Show file tree
Hide file tree
Showing 38 changed files with 3,481 additions and 1,158 deletions.
4 changes: 2 additions & 2 deletions cmd/gameserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/pangbox/server/common/topology"
"github.com/pangbox/server/database"
"github.com/pangbox/server/database/accounts"
"github.com/pangbox/server/game"
gameserver "github.com/pangbox/server/game/server"
log "github.com/sirupsen/logrus"
"github.com/xo/dburl"
)
Expand Down Expand Up @@ -66,7 +66,7 @@ func main() {
}

log.Println("Listening for game server on", listenAddr)
gameServer := game.New(game.Options{
gameServer := gameserver.New(gameserver.Options{
TopologyClient: topologyClient,
AccountsService: accounts.NewService(accounts.Options{
Database: db,
Expand Down
6 changes: 3 additions & 3 deletions cmd/packetparse/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"github.com/davecgh/go-spew/spew"
"github.com/go-restruct/restruct"
"github.com/pangbox/server/common"
"github.com/pangbox/server/game"
gamepacket "github.com/pangbox/server/game/packet"
"github.com/pangbox/server/login"
"github.com/pangbox/server/message"
)
Expand All @@ -49,9 +49,9 @@ func GetMessageTable(server string, origin string) (common.AnyMessageTable, erro
case "game":
switch origin {
case "server":
return game.ServerMessageTable.Any(), nil
return gamepacket.ServerMessageTable.Any(), nil
case "client":
return game.ClientMessageTable.Any(), nil
return gamepacket.ClientMessageTable.Any(), nil
default:
return nil, fmt.Errorf("unexpected origin %q; valid values are server, client", origin)
}
Expand Down
223 changes: 223 additions & 0 deletions common/actor/actor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
// Copyright (C) 2018-2023, John Chadwick <john@jchw.io>
//
// Permission to use, copy, modify, and/or distribute this software for any purpose
// with or without fee is hereby granted, provided that the above copyright notice
// and this permission notice appear in all copies.
//
// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
// FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT,
// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
// OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER
// TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF
// THIS SOFTWARE.
//
// SPDX-FileCopyrightText: Copyright (c) 2018-2023 John Chadwick
// SPDX-License-Identifier: ISC

package actor

import (
"context"
"errors"
"sync"
)

var ErrActorDead = errors.New("actor dead")

type Message[T any] struct {
Context context.Context
Value T
Promise *Promise[any]
}

type Base[T any] struct {
mutex sync.RWMutex
task *Task[T]
err error
}

type Task[T any] struct {
wg sync.WaitGroup
msgCh chan Message[T]
doneCh chan struct{}
cancel context.CancelFunc
ctx context.Context
}

// TryStart returns true if started, or false if there's already a running task.
func (b *Base[T]) TryStart(ctx context.Context, callback func(context.Context, *Task[T]) error) bool {
// Fast/low contention path
b.mutex.RLock()
task := b.task
b.mutex.RUnlock()

if task != nil {
return false
}

b.mutex.Lock()
defer b.mutex.Unlock()

// Need to re-check now.
if b.task != nil {
return false
}

ctx, cancel := context.WithCancel(ctx)
task = &Task[T]{
msgCh: make(chan Message[T]),
doneCh: make(chan struct{}),
ctx: ctx,
cancel: cancel,
}

// These signal the status to elsewhere.
b.task = task
b.err = nil

go func() {
defer func() {
b.mutex.Lock()
b.task = nil
b.mutex.Unlock()

task.wg.Wait()

// At this point, nothing can see this task anymore.
// It should be safe to close the message channel.
close(task.msgCh)
close(task.doneCh)
}()
defer cancel()
if err := callback(ctx, task); err != nil {
b.mutex.Lock()
b.err = err
b.mutex.Unlock()
}
}()

return true
}

// Shutdown shuts down the task, if it's running. Shutdown will wait for the
// current instance of the task to fully shut down.
func (b *Base[T]) Shutdown(ctx context.Context) error {
b.mutex.RLock()
task := b.task
b.mutex.RUnlock()

if task != nil {
task.cancel()
select {
case <-task.doneCh:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}

// Active returns true if the task is currently running. Note that by the time
// this function returns, the value it returns may already be stale.
func (b *Base[T]) Active() bool {
b.mutex.RLock()
task := b.task
b.mutex.RUnlock()

return task != nil
}

// Err returns the last error, if a task returns one.
func (b *Base[T]) Err() error {
b.mutex.RLock()
err := b.err
b.mutex.RUnlock()

return err
}

func (b *Base[T]) acquireTask() *Task[T] {
b.mutex.RLock()
defer b.mutex.RUnlock()

task := b.task
if task != nil {
task.wg.Add(1)
}

return task
}

func (t *Task[T]) release() {
if t != nil {
t.wg.Done()
}
}

// TrySend tries to send if it wouldn't block. The boolean result is set to true
// when the message is successfully sent, false otherwise.
func (b *Base[T]) TrySend(ctx context.Context, value T) (*Promise[any], bool) {
msg := Message[T]{
Context: ctx,
Value: value,
Promise: NewPromise[any](),
}

task := b.acquireTask()
defer task.release()

if task == nil {
return nil, false
}

defer task.wg.Done()

select {
case task.msgCh <- msg:
return msg.Promise, true
default:
msg.Promise.Close()
return nil, false
}
}

// Send will block until the message is sent or the context is cancelled.
func (b *Base[T]) Send(ctx context.Context, value T) (*Promise[any], error) {
msg := Message[T]{
Context: ctx,
Value: value,
Promise: NewPromise[any](),
}

task := b.acquireTask()
defer task.release()

if task == nil {
return nil, ErrActorDead
}

select {
case task.msgCh <- msg:
return msg.Promise, nil
case <-ctx.Done():
msg.Promise.Close()
return nil, ctx.Err()
}
}

// Receive receives the next message in the mailbox. Note that the promise needs
// to be resolved or rejected for every message.
func (t *Task[T]) Receive() (Message[T], error) {
var msg Message[T]

msgCh := t.msgCh

select {
case msg = <-msgCh:
return msg, nil
case <-t.ctx.Done():
return msg, t.ctx.Err()
}
}
71 changes: 71 additions & 0 deletions common/actor/promise.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright (C) 2018-2023, John Chadwick <john@jchw.io>
//
// Permission to use, copy, modify, and/or distribute this software for any purpose
// with or without fee is hereby granted, provided that the above copyright notice
// and this permission notice appear in all copies.
//
// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
// FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT,
// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
// OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER
// TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF
// THIS SOFTWARE.
//
// SPDX-FileCopyrightText: Copyright (c) 2018-2023 John Chadwick
// SPDX-License-Identifier: ISC

package actor

import (
"context"
"errors"
"sync"
)

var ErrClosed = errors.New("promise closed")

// Promise is a simple promise-like object.
type Promise[T any] struct {
once sync.Once
doneCh chan struct{}
value T
err error
}

func NewPromise[T any]() *Promise[T] {
return &Promise[T]{
doneCh: make(chan struct{}),
}
}

func (p *Promise[T]) Resolve(value T) {
p.once.Do(func() {
p.value = value
close(p.doneCh)
})
}

func (p *Promise[T]) Reject(err error) {
p.once.Do(func() {
p.err = err
close(p.doneCh)
})
}

func (p *Promise[T]) Close() {
p.Reject(ErrClosed)
}

func (p *Promise[T]) Wait(ctx context.Context) (T, error) {
var t T
select {
case <-p.doneCh:
if p.err != nil {
return t, p.err
}
return p.value, nil
case <-ctx.Done():
return t, ctx.Err()
}
}
26 changes: 20 additions & 6 deletions common/bufconn/bufconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ func (e netErrorTimeout) Timeout() bool { return true }
func (e netErrorTimeout) Temporary() bool { return false }

var errClosed = fmt.Errorf("closed")
var errTimeout net.Error = netErrorTimeout{error: fmt.Errorf("i/o timeout")}

var errTimeout net.Error = netErrorTimeout{error: fmt.Errorf("i/o timeout")} // +checklocksignore

// Listen returns a Listener that can only be contacted by its own Dialers and
// creates buffered connections between the two.
Expand Down Expand Up @@ -117,20 +118,33 @@ type pipe struct {
//
// w and r are always in the range [0, cap(buf)) and [0, len(buf)].
buf []byte
w, r int
w, r int // +checklocksignore

// +checklocks:mu
wwait sync.Cond

// +checklocks:mu
rwait sync.Cond

// Indicate that a write/read timeout has occurred

// +checklocks:mu
wtimedout bool

// +checklocks:mu
rtimedout bool

// +checklocks:mu
wtimer *time.Timer

// +checklocks:mu
rtimer *time.Timer

closed bool
writeClosed bool
// +checklocks:mu
closed bool

// +checklocks:mu
writeClosed bool // +checklocksignore
}

func newPipe(sz int) *pipe {
Expand All @@ -144,11 +158,11 @@ func newPipe(sz int) *pipe {
}

func (p *pipe) empty() bool {
return p.r == len(p.buf)
return p.r == len(p.buf) // +checklocksignore
}

func (p *pipe) full() bool {
return p.r < len(p.buf) && p.r == p.w
return p.r < len(p.buf) && p.r == p.w // +checklocksignore
}

func (p *pipe) Read(b []byte) (n int, err error) {
Expand Down
Loading

0 comments on commit e628a8a

Please sign in to comment.