From 8909b3ebca9db7c5503b6849aa80847914d1a54e Mon Sep 17 00:00:00 2001 From: Jan Cajthaml Date: Fri, 30 Nov 2018 17:55:53 +0100 Subject: [PATCH] using dedicated actor system library to dedup code from services (#59) * using dedicated actor system library to dedup code from services * making lint work again and fixing lint issues --- Gopkg.lock | 26 ++- Gopkg.toml | 2 +- actor/actor.go | 102 --------- actor/model.go | 28 --- daemon/actor_system.go | 290 +++++++----------------- daemon/snapshot_updater.go | 8 +- daemon/snapshot_updater_test.go | 4 +- dev/lifecycle/lint | 56 ++--- model/model.go | 20 +- packaging/debian_amd64/DEBIAN/changelog | 4 +- packaging/debian_amd64/DEBIAN/control | 2 +- packaging/debian_armhf/DEBIAN/changelog | 4 +- packaging/debian_armhf/DEBIAN/control | 2 +- 13 files changed, 140 insertions(+), 408 deletions(-) delete mode 100755 actor/actor.go delete mode 100644 actor/model.go diff --git a/Gopkg.lock b/Gopkg.lock index 41db2c3a..b58a728b 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -7,11 +7,17 @@ revision = "8991bc29aa16c548c550c7ff78260e27b9ab7c73" version = "v1.1.1" +[[projects]] + branch = "master" + name = "github.com/jancajthaml-openbank/actor-system" + packages = ["."] + revision = "ff3df8fbc5599a0c1a85636e44a09bb3890e8e1c" + [[projects]] branch = "master" name = "github.com/jancajthaml-openbank/lake-client" packages = ["go"] - revision = "2898a6572215a5a912dc975b26588d31bb0aa695" + revision = "e2d4da331f215ef86f454e362846f90dc98922c3" [[projects]] name = "github.com/json-iterator/go" @@ -20,10 +26,10 @@ version = "v1.1.5" [[projects]] - branch = "master" name = "github.com/konsorten/go-windows-terminal-sequences" packages = ["."] - revision = "b729f2633dfe35f4d1d8a32385f6685610ce1cb5" + revision = "5c8c8bd35d3832f5d134ae1e1e375b69a4d25242" + version = "v1.0.1" [[projects]] name = "github.com/modern-go/concurrent" @@ -41,7 +47,7 @@ branch = "master" name = "github.com/pebbe/zmq4" packages = ["."] - revision = "3515f4e6f439e167f92f2b99a9497cf5ea8e3cea" + revision = "e5666f371e863ac73d8fa7c15bef9c9ba0359b82" [[projects]] name = "github.com/pmezard/go-difflib" @@ -53,13 +59,13 @@ branch = "master" name = "github.com/rcrowley/go-metrics" packages = ["."] - revision = "e2704e165165ec55d062f5919b4b29494e9fa790" + revision = "3113b8401b8a98917cde58f8bbd42a1b1c03b1fd" [[projects]] name = "github.com/sirupsen/logrus" packages = ["."] - revision = "a67f783a3814b8729bd2dac5780b5f78f8dbd64d" - version = "v1.1.0" + revision = "bcd833dfe83d3cebad139e4a29ed79cb2318bf95" + version = "v1.2.0" [[projects]] name = "github.com/stretchr/testify" @@ -74,7 +80,7 @@ branch = "master" name = "golang.org/x/crypto" packages = ["ssh/terminal"] - revision = "e3636079e1a4c1f337f212cc5cd2aca108f6c900" + revision = "eb0de9b17e854e9b1ccd9963efafc79862359959" [[projects]] branch = "master" @@ -83,7 +89,7 @@ "unix", "windows" ] - revision = "e4b3c5e9061176387e7cea65e4dc5853801f3fb7" + revision = "4ed8d59d0b35e1e29334a206d1b3f38b1e5dfb31" [[projects]] name = "gopkg.in/inf.v0" @@ -94,6 +100,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "1e2e14dd961d235e2ebd2f8cd161349bc2ec3b5dec07eaabe9938e8759b189ed" + inputs-digest = "fb631edb0737a08601c6f55c7881478532d41f385b0d26698acf16854bbc2f0b" solver-name = "gps-cdcl" solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml index f3945db4..7eec5f5b 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -16,4 +16,4 @@ required = [ [[constraint]] branch = "master" - name = "github.com/jancajthaml-openbank/lake-client" + name = "github.com/jancajthaml-openbank/actor-system" diff --git a/actor/actor.go b/actor/actor.go deleted file mode 100755 index 4d6537f3..00000000 --- a/actor/actor.go +++ /dev/null @@ -1,102 +0,0 @@ -// Copyright (c) 2016-2018, Jan Cajthaml -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package actor - -import ( - "fmt" - - "github.com/jancajthaml-openbank/vault/model" -) - -// Envelope represents single actor -type Envelope struct { - Name string - receive func(model.Account, Context) - Backlog chan Context - Exit chan interface{} - State model.Account -} - -// NewEnvelope returns new actor instance -func NewEnvelope(name string) *Envelope { - return &Envelope{ - Name: name, - Backlog: make(chan Context, 64), - Exit: make(chan interface{}), - State: model.NewAccount(name), - } -} - -// Tell queues message to actor -func (ref *Envelope) Tell(data interface{}, sender Coordinates) (err error) { - defer func() { - if e := recover(); e != nil { - switch x := e.(type) { - case string: - err = fmt.Errorf(x) - case error: - err = x - default: - err = fmt.Errorf("Unknown panic") - } - } - }() - - if ref == nil { - err = fmt.Errorf("actor reference %v not found", ref) - return - } - - ref.Backlog <- Context{ - Data: data, - Receiver: ref, - Sender: sender, - } - return -} - -// Become transforms actor behaviour for next message -func (ref *Envelope) Become(state model.Account, f func(model.Account, Context)) { - if ref == nil { - return - } - ref.State = state - ref.React(f) - return -} - -func (ref *Envelope) String() string { - if ref == nil { - return "Deadletter" - } - return ref.Name -} - -// React change become function -func (ref *Envelope) React(f func(model.Account, Context)) { - if ref == nil { - return - } - ref.receive = f - return -} - -// Receive dequeues message to actor -func (ref *Envelope) Receive(msg Context) { - if ref.receive == nil { - return - } - ref.receive(ref.State, msg) -} diff --git a/actor/model.go b/actor/model.go deleted file mode 100644 index c115ec9f..00000000 --- a/actor/model.go +++ /dev/null @@ -1,28 +0,0 @@ -// Copyright (c) 2016-2018, Jan Cajthaml -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package actor - -// Coordinates represents actor namespace -type Coordinates struct { - Name string - Region string -} - -// Context represents actor message envelope -type Context struct { - Data interface{} - Receiver *Envelope - Sender Coordinates -} diff --git a/daemon/actor_system.go b/daemon/actor_system.go index 44f3ae65..da48bb84 100644 --- a/daemon/actor_system.go +++ b/daemon/actor_system.go @@ -16,105 +16,76 @@ package daemon import ( "context" - "fmt" "strings" - "sync" - "github.com/jancajthaml-openbank/vault/actor" "github.com/jancajthaml-openbank/vault/config" "github.com/jancajthaml-openbank/vault/model" "github.com/jancajthaml-openbank/vault/persistence" - lake "github.com/jancajthaml-openbank/lake-client/go" + system "github.com/jancajthaml-openbank/actor-system" log "github.com/sirupsen/logrus" money "gopkg.in/inf.v0" ) -type actorsMap struct { - sync.RWMutex - underlying map[string]*actor.Envelope -} - -func (rm *actorsMap) Load(key string) (value *actor.Envelope, ok bool) { - rm.RLock() - defer rm.RUnlock() - result, ok := rm.underlying[key] - return result, ok -} - -func (rm *actorsMap) Delete(key string) { - rm.Lock() - defer rm.Unlock() - delete(rm.underlying, key) -} - -func (rm *actorsMap) Store(key string, value *actor.Envelope) { - rm.Lock() - defer rm.Unlock() - rm.underlying[key] = value -} - // ActorSystem represents actor system subroutine type ActorSystem struct { - Support + system.Support tenant string storage string metrics *Metrics - Actors *actorsMap - Client *lake.Client - Name string } // NewActorSystem returns actor system fascade func NewActorSystem(ctx context.Context, cfg config.Configuration, metrics *Metrics) ActorSystem { - lakeClient, _ := lake.NewClient(ctx, "Vault/"+cfg.Tenant, cfg.LakeHostname) - - return ActorSystem{ - Support: NewDaemonSupport(ctx), + actorSystem := ActorSystem{ + Support: system.NewSupport(ctx, "Vault/"+cfg.Tenant, cfg.LakeHostname), storage: cfg.RootStorage, tenant: cfg.Tenant, metrics: metrics, - Actors: &actorsMap{ - underlying: make(map[string]*actor.Envelope), - }, - Client: lakeClient, - Name: "Vault/" + cfg.Tenant, } + actorSystem.Support.RegisterOnLocalMessage(actorSystem.ProcessLocalMessage) + actorSystem.Support.RegisterOnRemoteMessage(actorSystem.ProcessRemoteMessage) + return actorSystem } -// ProcessLocalMessage send local message to actor by name -func (system ActorSystem) ProcessLocalMessage(msg interface{}, receiver string, sender actor.Coordinates) { - ref, err := system.ActorOf(receiver) +// ProcessLocalMessage processing of local message to this vault +func (s *ActorSystem) ProcessLocalMessage(msg interface{}, receiver string, sender system.Coordinates) { + ref, err := s.ActorOf(receiver) if err != nil { - ref, err = system.ActorOf(system.SpawnAccountActor(receiver)) + ref, err = s.ActorOf(s.SpawnAccountActor(receiver)) } if err != nil { log.Warnf("Actor not found [%s local]", receiver) return } - ref.Tell(msg, sender) } -func (system ActorSystem) processRemoteMessage(parts []string) { +// ProcessRemoteMessage processing of remote message to this vault +func (s *ActorSystem) ProcessRemoteMessage(parts []string) { + if len(parts) < 4 { + log.Warnf("invalid message received %+v", parts) + return + } + region, receiver, sender, payload := parts[0], parts[1], parts[2], parts[3] defer func() { if r := recover(); r != nil { log.Errorf("procesRemoteMessage recovered in [%s %s/%s] : %v", r, receiver, region, sender) - system.SendRemote(region, model.FatalErrorMessage(receiver, sender)) + s.SendRemote(region, model.FatalErrorMessage(receiver, sender)) } }() - ref, err := system.ActorOf(receiver) + ref, err := s.ActorOf(receiver) if err != nil { - ref, err = system.ActorOf(system.SpawnAccountActor(receiver)) + ref, err = s.ActorOf(s.SpawnAccountActor(receiver)) } if err != nil { log.Warnf("Actor not found [%s %s/%s]", receiver, region, sender) - system.SendRemote(region, model.FatalErrorMessage(receiver, sender)) + s.SendRemote(region, model.FatalErrorMessage(receiver, sender)) return } @@ -163,27 +134,29 @@ func (system ActorSystem) processRemoteMessage(parts []string) { if message == nil { log.Warnf("Deserialization of unsuported message [%s %s/%s] : %v", ref.Name, region, sender, parts) - system.SendRemote(region, model.FatalErrorMessage(receiver, sender)) + s.SendRemote(region, model.FatalErrorMessage(receiver, sender)) return } - ref.Tell(message, actor.Coordinates{ + ref.Tell(message, system.Coordinates{ Name: sender, Region: region, }) - return } // NilAccount represents account that is neither existing neither non existing -func NilAccount(system ActorSystem) func(model.Account, actor.Context) { - return func(state model.Account, context actor.Context) { - snapshotHydration := persistence.LoadAccount(system.storage, state.AccountName) +func NilAccount(s *ActorSystem) func(interface{}, system.Context) { + return func(t_state interface{}, context system.Context) { + log.Info("Nil account reacting") + state := t_state.(model.Account) + + snapshotHydration := persistence.LoadAccount(s.storage, state.AccountName) if snapshotHydration == nil { - context.Receiver.Become(state, NonExistAccount(system)) + context.Receiver.Become(state, NonExistAccount(s)) log.Debugf("%s ~ Nil -> NonExist", state.AccountName) } else { - context.Receiver.Become(*snapshotHydration, ExistAccount(system)) + context.Receiver.Become(*snapshotHydration, ExistAccount(s)) log.Debugf("%s ~ Nil -> Exist", state.AccountName) } @@ -192,34 +165,36 @@ func NilAccount(system ActorSystem) func(model.Account, actor.Context) { } // NonExistAccount represents account that does not exist -func NonExistAccount(system ActorSystem) func(model.Account, actor.Context) { - return func(state model.Account, context actor.Context) { +func NonExistAccount(s *ActorSystem) func(interface{}, system.Context) { + return func(t_state interface{}, context system.Context) { + state := t_state.(model.Account) + switch msg := context.Data.(type) { case model.CreateAccount: currency := strings.ToUpper(msg.Currency) isBalanceCheck := msg.IsBalanceCheck - snaphostResult := persistence.CreateAccount(system.storage, state.AccountName, currency, isBalanceCheck) + snaphostResult := persistence.CreateAccount(s.storage, state.AccountName, currency, isBalanceCheck) if snaphostResult == nil { - system.SendRemote(context.Sender.Region, model.FatalErrorMessage(context.Receiver.Name, context.Sender.Name)) + s.SendRemote(context.Sender.Region, model.FatalErrorMessage(context.Receiver.Name, context.Sender.Name)) log.Debugf("%s ~ (NonExist CreateAccount) Error", state.AccountName) return } - context.Receiver.Become(*snaphostResult, ExistAccount(system)) - system.SendRemote(context.Sender.Region, model.AccountCreatedMessage(context.Receiver.Name, context.Sender.Name)) + context.Receiver.Become(*snaphostResult, ExistAccount(s)) + s.SendRemote(context.Sender.Region, model.AccountCreatedMessage(context.Receiver.Name, context.Sender.Name)) log.Infof("New Account %s Created", state.AccountName) log.Debugf("%s ~ (NonExist CreateAccount) OK", state.AccountName) - system.metrics.AccountCreated() + s.metrics.AccountCreated() case model.Rollback: - system.SendRemote(context.Sender.Region, model.RollbackAcceptedMessage(context.Receiver.Name, context.Sender.Name)) + s.SendRemote(context.Sender.Region, model.RollbackAcceptedMessage(context.Receiver.Name, context.Sender.Name)) log.Debugf("%s ~ (NonExist Rollback) OK", state.AccountName) default: - system.SendRemote(context.Sender.Region, model.FatalErrorMessage(context.Receiver.Name, context.Sender.Name)) + s.SendRemote(context.Sender.Region, model.FatalErrorMessage(context.Receiver.Name, context.Sender.Name)) log.Debugf("%s ~ (NonExist Unknown Message) Error", state.AccountName) } @@ -228,27 +203,29 @@ func NonExistAccount(system ActorSystem) func(model.Account, actor.Context) { } // ExistAccount represents account that does exist -func ExistAccount(system ActorSystem) func(model.Account, actor.Context) { - return func(state model.Account, context actor.Context) { +func ExistAccount(s *ActorSystem) func(interface{}, system.Context) { + return func(t_state interface{}, context system.Context) { + state := t_state.(model.Account) + switch msg := context.Data.(type) { case model.GetAccountState: - system.SendRemote(context.Sender.Region, model.AccountStateMessage(context.Receiver.Name, context.Sender.Name, state.Currency, state.Balance.String(), state.Promised.String(), state.IsBalanceCheck)) + s.SendRemote(context.Sender.Region, model.AccountStateMessage(context.Receiver.Name, context.Sender.Name, state.Currency, state.Balance.String(), state.Promised.String(), state.IsBalanceCheck)) log.Debugf("%s ~ (Exist GetAccountState) OK", state.AccountName) case model.CreateAccount: - system.SendRemote(context.Sender.Region, model.FatalErrorMessage(context.Receiver.Name, context.Sender.Name)) + s.SendRemote(context.Sender.Region, model.FatalErrorMessage(context.Receiver.Name, context.Sender.Name)) log.Debugf("%s ~ (Exist CreateAccount) Error", state.AccountName) case model.Promise: if state.PromiseBuffer.Contains(msg.Transaction) { - system.SendRemote(context.Sender.Region, model.PromiseAcceptedMessage(context.Receiver.Name, context.Sender.Name)) + s.SendRemote(context.Sender.Region, model.PromiseAcceptedMessage(context.Receiver.Name, context.Sender.Name)) log.Debugf("%s ~ (Exist Promise) OK Already Accepted", state.AccountName) return } if state.Currency != msg.Currency { - system.SendRemote(context.Sender.Region, model.FatalErrorMessage(context.Receiver.Name, context.Sender.Name)) + s.SendRemote(context.Sender.Region, model.FatalErrorMessage(context.Receiver.Name, context.Sender.Name)) log.Warnf("%s ~ (Exist Promise) Error Currency Mismatch", state.AccountName) return } @@ -256,8 +233,8 @@ func ExistAccount(system ActorSystem) func(model.Account, actor.Context) { nextPromised := new(money.Dec).Add(state.Promised, msg.Amount) if !state.IsBalanceCheck || new(money.Dec).Add(state.Balance, nextPromised).Sign() >= 0 { - if !persistence.PersistPromise(system.storage, state.AccountName, state.Version, msg.Amount, msg.Transaction) { - system.SendRemote(context.Sender.Region, model.FatalErrorMessage(context.Receiver.Name, context.Sender.Name)) + if !persistence.PersistPromise(s.storage, state.AccountName, state.Version, msg.Amount, msg.Transaction) { + s.SendRemote(context.Sender.Region, model.FatalErrorMessage(context.Receiver.Name, context.Sender.Name)) log.Warnf("%s ~ (Exist Promise) Error Could not Persist", state.AccountName) return } @@ -266,34 +243,34 @@ func ExistAccount(system ActorSystem) func(model.Account, actor.Context) { next.Promised = nextPromised next.PromiseBuffer.Add(msg.Transaction) - context.Receiver.Become(*next, ExistAccount(system)) + context.Receiver.Become(next, ExistAccount(s)) - system.SendRemote(context.Sender.Region, model.PromiseAcceptedMessage(context.Receiver.Name, context.Sender.Name)) + s.SendRemote(context.Sender.Region, model.PromiseAcceptedMessage(context.Receiver.Name, context.Sender.Name)) log.Infof("Account %s Promised %s %s", state.AccountName, msg.Amount.String(), state.Currency) log.Debugf("%s ~ (Exist Promise) OK", state.AccountName) - system.metrics.PromiseAccepted() + s.metrics.PromiseAccepted() return } if new(money.Dec).Sub(state.Balance, msg.Amount).Sign() < 0 { - system.SendRemote(context.Sender.Region, model.FatalErrorMessage(context.Receiver.Name, context.Sender.Name)) + s.SendRemote(context.Sender.Region, model.FatalErrorMessage(context.Receiver.Name, context.Sender.Name)) log.Debugf("%s ~ (Exist Promise) Error Insufficient Funds", state.AccountName) return } // FIXME boucing not handled - system.SendRemote(context.Sender.Region, model.FatalErrorMessage(context.Receiver.Name, context.Sender.Name)) + s.SendRemote(context.Sender.Region, model.FatalErrorMessage(context.Receiver.Name, context.Sender.Name)) log.Warnf("%s ~ (Exist Promise) Error ... (Bounce?)", state.AccountName) case model.Commit: if !state.PromiseBuffer.Contains(msg.Transaction) { - system.SendRemote(context.Sender.Region, model.CommitAcceptedMessage(context.Receiver.Name, context.Sender.Name)) + s.SendRemote(context.Sender.Region, model.CommitAcceptedMessage(context.Receiver.Name, context.Sender.Name)) log.Debugf("%s ~ (Exist Commit) OK Already Accepted", state.AccountName) return } - if !persistence.PersistCommit(system.storage, state.AccountName, state.Version, msg.Amount, msg.Transaction) { - system.SendRemote(context.Sender.Region, model.FatalErrorMessage(context.Receiver.Name, context.Sender.Name)) + if !persistence.PersistCommit(s.storage, state.AccountName, state.Version, msg.Amount, msg.Transaction) { + s.SendRemote(context.Sender.Region, model.FatalErrorMessage(context.Receiver.Name, context.Sender.Name)) log.Warnf("%s ~ (Exist Commit) Error Could not Persist", state.AccountName) return } @@ -303,21 +280,21 @@ func ExistAccount(system ActorSystem) func(model.Account, actor.Context) { next.Promised = new(money.Dec).Sub(state.Promised, msg.Amount) next.PromiseBuffer.Remove(msg.Transaction) - context.Receiver.Become(*next, ExistAccount(system)) + context.Receiver.Become(next, ExistAccount(s)) - system.SendRemote(context.Sender.Region, model.CommitAcceptedMessage(context.Receiver.Name, context.Sender.Name)) + s.SendRemote(context.Sender.Region, model.CommitAcceptedMessage(context.Receiver.Name, context.Sender.Name)) log.Debugf("%s ~ (Exist Commit) OK", state.AccountName) - system.metrics.CommitAccepted() + s.metrics.CommitAccepted() case model.Rollback: if !state.PromiseBuffer.Contains(msg.Transaction) { - system.SendRemote(context.Sender.Region, model.RollbackAcceptedMessage(context.Receiver.Name, context.Sender.Name)) + s.SendRemote(context.Sender.Region, model.RollbackAcceptedMessage(context.Receiver.Name, context.Sender.Name)) log.Debugf("%s ~ (Exist Rollback) OK Already Accepted", state.AccountName) return } - if !persistence.PersistRollback(system.storage, state.AccountName, state.Version, msg.Amount, msg.Transaction) { - system.SendRemote(context.Sender.Region, model.FatalErrorMessage(context.Receiver.Name, context.Sender.Name)) + if !persistence.PersistRollback(s.storage, state.AccountName, state.Version, msg.Amount, msg.Transaction) { + s.SendRemote(context.Sender.Region, model.FatalErrorMessage(context.Receiver.Name, context.Sender.Name)) log.Warnf("%s ~ (Exist Rollback) Error Could not Persist", state.AccountName) return } @@ -326,12 +303,12 @@ func ExistAccount(system ActorSystem) func(model.Account, actor.Context) { next.Promised = new(money.Dec).Sub(state.Promised, msg.Amount) next.PromiseBuffer.Remove(msg.Transaction) - context.Receiver.Become(*next, ExistAccount(system)) + context.Receiver.Become(next, ExistAccount(s)) - system.SendRemote(context.Sender.Region, model.RollbackAcceptedMessage(context.Receiver.Name, context.Sender.Name)) + s.SendRemote(context.Sender.Region, model.RollbackAcceptedMessage(context.Receiver.Name, context.Sender.Name)) log.Infof("Account %s Rejected %s %s", state.AccountName, msg.Amount.String(), state.Currency) log.Debugf("%s ~ (Exist Rollback) OK", state.AccountName) - system.metrics.RollbackAccepted() + s.metrics.RollbackAccepted() case model.Update: if msg.Version != state.Version { @@ -339,24 +316,24 @@ func ExistAccount(system ActorSystem) func(model.Account, actor.Context) { return } - result := persistence.LoadAccount(system.storage, state.AccountName) + result := persistence.LoadAccount(s.storage, state.AccountName) if result == nil { log.Warnf("%s ~ (Exist Update) Error no existing snapshot", state.AccountName) return } - next := persistence.UpdateAccount(system.storage, state.AccountName, result) + next := persistence.UpdateAccount(s.storage, state.AccountName, result) if next == nil { log.Warnf("%s ~ (Exist Update) Error unable to update", state.AccountName) return } - context.Receiver.Become(*next, ExistAccount(system)) + context.Receiver.Become(*next, ExistAccount(s)) log.Infof("Account %s Updated Snapshot to %d", state.AccountName, next.Version) log.Debugf("%s ~ (Exist Update) OK", state.AccountName) default: - system.SendRemote(context.Sender.Region, model.FatalErrorMessage(context.Receiver.Name, context.Sender.Name)) + s.SendRemote(context.Sender.Region, model.FatalErrorMessage(context.Receiver.Name, context.Sender.Name)) log.Warnf("%s ~ (Exist Unknown Message) Error", state.AccountName) } @@ -365,126 +342,19 @@ func ExistAccount(system ActorSystem) func(model.Account, actor.Context) { } } -// RegisterActor register new actor into actor system -func (system ActorSystem) RegisterActor(ref *actor.Envelope, initialState func(model.Account, actor.Context)) (err error) { - _, exists := system.Actors.Load(ref.Name) - if exists { - return - } - - ref.React(initialState) - system.Actors.Store(ref.Name, ref) - - go func() { - defer func() { - if e := recover(); e != nil { - switch x := e.(type) { - case string: - err = fmt.Errorf(x) - case error: - err = x - default: - err = fmt.Errorf("Unknown panic") - } - } - }() - - for { - select { - case <-system.Done(): - return - case p := <-ref.Backlog: - ref.Receive(p) - case <-ref.Exit: - // FIXME check if not already closed - close(ref.Backlog) - close(ref.Exit) - return - } - } - }() - - return -} - -// UnregisterActor stops actor and removes it from actor system -func (system ActorSystem) UnregisterActor(name string) { - ref, err := system.ActorOf(name) - if err != nil { - log.Warnf("Unable to unregister actor %v", name) - return - } - - if ref.Exit != nil { - ref.Exit <- nil - } - - system.Actors.Delete(name) -} - -// SendRemote send message to remote region -func (system ActorSystem) SendRemote(destinationSystem, data string) { - system.Client.Publish <- []string{destinationSystem, data} -} - // SpawnAccountActor returns new account actor instance registered into actor // system -func (system ActorSystem) SpawnAccountActor(path string) string { +func (s *ActorSystem) SpawnAccountActor(name string) string { // FIXME split to multiple functions - envelope := actor.NewEnvelope(path) - err := system.RegisterActor(envelope, NilAccount(system)) + envelope := system.NewEnvelope(name, model.NewAccount(name)) + + err := s.RegisterActor(envelope, NilAccount(s)) if err != nil { - log.Warnf("%s ~ Spawning Actor Error unable to register", path) + log.Warnf("%s ~ Spawning Actor Error unable to register", name) return "" } - log.Debugf("%s ~ Actor Spawned", path) + log.Debugf("%s ~ Actor Spawned", name) return envelope.Name } - -// ActorOf return actor reference by name -func (system ActorSystem) ActorOf(name string) (*actor.Envelope, error) { - ref, exists := system.Actors.Load(name) - if !exists { - return nil, fmt.Errorf("actor %v not registered", name) - } - - return ref, nil -} - -// Stop actor system and flush all actors -func (system ActorSystem) Stop() { - for actorName := range system.Actors.underlying { - system.UnregisterActor(actorName) - } - system.cancel() - <-system.exitSignal -} - -// Start handles everything needed to start metrics daemon -func (system ActorSystem) Start() { - defer system.MarkDone() - - log.Info("Start actor system daemon") - - system.Client.Start() - - system.MarkReady() - - for { - select { - case message := <-system.Client.Receive: - if len(message) < 4 { - log.Warn("invalid message received") - continue - } - system.processRemoteMessage(message) - case <-system.Done(): - log.Info("Stopping actor system daemon") - system.Client.Stop() - log.Info("Stop actor system daemon") - return - } - } -} diff --git a/daemon/snapshot_updater.go b/daemon/snapshot_updater.go index 7427a7a7..e7da3bd4 100644 --- a/daemon/snapshot_updater.go +++ b/daemon/snapshot_updater.go @@ -19,18 +19,18 @@ import ( "strconv" "time" - "github.com/jancajthaml-openbank/vault/actor" "github.com/jancajthaml-openbank/vault/config" "github.com/jancajthaml-openbank/vault/model" "github.com/jancajthaml-openbank/vault/utils" + system "github.com/jancajthaml-openbank/actor-system" log "github.com/sirupsen/logrus" ) // SnapshotUpdater represents journal saturation update subroutine type SnapshotUpdater struct { Support - callback func(msg interface{}, receiver string, sender actor.Coordinates) + callback func(msg interface{}, receiver string, sender system.Coordinates) metrics *Metrics storage string scanInterval time.Duration @@ -38,7 +38,7 @@ type SnapshotUpdater struct { } // NewSnapshotUpdater returns snapshot updater fascade -func NewSnapshotUpdater(ctx context.Context, cfg config.Configuration, metrics *Metrics, callback func(msg interface{}, receiver string, sender actor.Coordinates)) SnapshotUpdater { +func NewSnapshotUpdater(ctx context.Context, cfg config.Configuration, metrics *Metrics, callback func(msg interface{}, receiver string, sender system.Coordinates)) SnapshotUpdater { return SnapshotUpdater{ Support: NewDaemonSupport(ctx), callback: callback, @@ -63,7 +63,7 @@ func (updater SnapshotUpdater) updateSaturated() { if updater.getEvents(name, version) >= updater.saturationThreshold { log.Debugf("Request %v to update snapshot version from %d to %d", name, version, version+1) msg := model.Update{Version: version} - coordinates := actor.Coordinates{Name: "snapshot_saturation_cron"} + coordinates := system.Coordinates{Name: "snapshot_saturation_cron"} updater.callback(msg, name, coordinates) numberOfSnapshotsUpdated++ diff --git a/daemon/snapshot_updater_test.go b/daemon/snapshot_updater_test.go index eaf67b7b..ab21714d 100644 --- a/daemon/snapshot_updater_test.go +++ b/daemon/snapshot_updater_test.go @@ -6,11 +6,11 @@ import ( "path/filepath" "testing" - "github.com/jancajthaml-openbank/vault/actor" "github.com/jancajthaml-openbank/vault/config" "github.com/jancajthaml-openbank/vault/model" "github.com/jancajthaml-openbank/vault/persistence" + system "github.com/jancajthaml-openbank/actor-system" money "gopkg.in/inf.v0" "github.com/stretchr/testify/assert" @@ -56,7 +56,7 @@ func TestSnapshotUpdater(t *testing.T) { callbackCalled := 0 callbackBacklog := make([]CallbackMessage, 0) - callback := func(msg interface{}, account string, sender actor.Coordinates) { + callback := func(msg interface{}, account string, sender system.Coordinates) { callbackBacklog = append(callbackBacklog, CallbackMessage{ msg: msg, account: account, diff --git a/dev/lifecycle/lint b/dev/lifecycle/lint index 046e949b..9fd31dda 100755 --- a/dev/lifecycle/lint +++ b/dev/lifecycle/lint @@ -3,47 +3,27 @@ set -Eeuo pipefail trap exit INT TERM -sem_open() { - mkfifo pipe-$$ - exec 3<>pipe-$$ - rm pipe-$$ - local parallelism=${1} - while [ $parallelism -gt 0 ] ; do - parallelism=$((parallelism-1)) - printf %s 000 >&3 - done -} - -sem_put() { - local x - read -u 3 -n 3 x && [ $x -eq 0 ] || exit $x - ( - "$@" - printf '%.3d' $? >&3 - )& -} - scan() { - if [ -d ${1} ] ; then - sem_put gofmt -s -w ${1} - sem_put golint ${1}/... - sem_put misspell ${1} - sem_put go vet ${1}/... - sem_put prealloc ${1}/... - sem_put gocyclo -over 15 ${1} - sem_put prealloc ${1}/... - sem_put maligned ${1}/... - sem_put goconst ${1}/... + if [ -f ${1} ] ; then + echo ">> gofmt ${1}" + gofmt -s -w ${1} + echo ">> golint ${1}" + /go/bin/golint ${1} + echo ">> misspell ${1}" + /go/bin/misspell ${1} + echo ">> prealloc ${1}" + /go/bin/prealloc ${1} + echo ">> gocyclo -over 15 ${1}" + /go/bin/gocyclo -over 15 ${1} + echo ">> prealloc ${1}" + /go/bin/prealloc ${1} + echo ">> goconst ${1}" + /go/bin/goconst ${1} fi } -sem_open $(getconf _NPROCESSORS_ONLN) - -find . -mindepth 2 -name "*.go" -not -path "./vendor/*" -exec dirname {} \; \ +find . -name "*.go" -not -path "*vendor/*" \ | sort -u \ -| while read package ; do - echo "processing ${package}" - scan "${package}" +| while read f ; do + scan ${f} done - -wait diff --git a/model/model.go b/model/model.go index f363ff17..d3c638e6 100644 --- a/model/model.go +++ b/model/model.go @@ -32,6 +32,19 @@ type Account struct { Version int } +// Copy returns copy of Account +func (s Account) Copy() Account { + return Account{ + AccountName: s.AccountName, + Currency: s.Currency, + IsBalanceCheck: s.IsBalanceCheck, + Balance: new(money.Dec).Set(s.Balance), + Promised: new(money.Dec).Set(s.Promised), + PromiseBuffer: s.PromiseBuffer, //.Copy(), // FIXME implement + Version: s.Version, + } +} + // CreateAccount is inbound request for creation of new account type CreateAccount struct { AccountName string @@ -96,13 +109,6 @@ func NewAccount(name string) Account { } } -// Copy returns value copy of Snapshot -func (entity *Account) Copy() *Account { - clone := new(Account) - *clone = *entity - return clone -} - // Persist serializes Account entity to persistable data func (entity *Account) Persist() []byte { var buffer bytes.Buffer diff --git a/packaging/debian_amd64/DEBIAN/changelog b/packaging/debian_amd64/DEBIAN/changelog index 3df8c38c..d9c7f170 100644 --- a/packaging/debian_amd64/DEBIAN/changelog +++ b/packaging/debian_amd64/DEBIAN/changelog @@ -1,8 +1,8 @@ -vault (v1.1.0+use-standard-map) unstable; urgency=low +vault (v1.1.0+use-provided-actor-system) unstable; urgency=low * includes non-released commits - -- Jan Cajthaml Tue, 27 Nov 2018 07:34:49 +0100 + -- Jan Cajthaml Fri, 30 Nov 2018 16:38:02 +0100 vault (1.1.0) unstable; urgency=low diff --git a/packaging/debian_amd64/DEBIAN/control b/packaging/debian_amd64/DEBIAN/control index ddf5963f..0b70e7cb 100755 --- a/packaging/debian_amd64/DEBIAN/control +++ b/packaging/debian_amd64/DEBIAN/control @@ -1,5 +1,5 @@ Package: vault -Version: 1.1.0+use-standard-map +Version: 1.1.0+use-provided-actor-system Architecture: amd64 Maintainer: Jan Cajthaml Depends: init-system-helpers (>= 1.18~), libzmq5 (= 4.2.1-4) diff --git a/packaging/debian_armhf/DEBIAN/changelog b/packaging/debian_armhf/DEBIAN/changelog index 3df8c38c..d9c7f170 100644 --- a/packaging/debian_armhf/DEBIAN/changelog +++ b/packaging/debian_armhf/DEBIAN/changelog @@ -1,8 +1,8 @@ -vault (v1.1.0+use-standard-map) unstable; urgency=low +vault (v1.1.0+use-provided-actor-system) unstable; urgency=low * includes non-released commits - -- Jan Cajthaml Tue, 27 Nov 2018 07:34:49 +0100 + -- Jan Cajthaml Fri, 30 Nov 2018 16:38:02 +0100 vault (1.1.0) unstable; urgency=low diff --git a/packaging/debian_armhf/DEBIAN/control b/packaging/debian_armhf/DEBIAN/control index de8440d1..b1cb642d 100755 --- a/packaging/debian_armhf/DEBIAN/control +++ b/packaging/debian_armhf/DEBIAN/control @@ -1,5 +1,5 @@ Package: vault -Version: 1.1.0+use-standard-map +Version: 1.1.0+use-provided-actor-system Architecture: armhf Maintainer: Jan Cajthaml Depends: init-system-helpers (>= 1.18~), libzmq5 (= 4.2.1-4)