diff --git a/dev/lifecycle/debian b/dev/lifecycle/debian index 3cbf207e..1bde2872 100755 --- a/dev/lifecycle/debian +++ b/dev/lifecycle/debian @@ -100,6 +100,10 @@ generate_changelog() { rm -f ${target}.gz fi + if [ -f ${target}.Debian.gz ] ; then + rm -f ${target}.Debian.gz + fi + touch ${target} VER="${VERSION}" @@ -149,7 +153,15 @@ generate_changelog() { fi done - cat ${target} | gzip -n -9 > ${target}.gz + case "${VERSION}" in + *+*) + cat ${target} | gzip -n -9 > ${target}.Debian.gz + ;; + *) + cat ${target} | gzip -n -9 > ${target}.gz + ;; + esac + rm ${target} } diff --git a/packaging/debian_amd64/DEBIAN/control b/packaging/debian_amd64/DEBIAN/control index c4675064..fed1ae37 100755 --- a/packaging/debian_amd64/DEBIAN/control +++ b/packaging/debian_amd64/DEBIAN/control @@ -1,5 +1,5 @@ Package: vault -Version: 1.2.6+bbtest-message-expectation-flake +Version: 1.2.6+single-app-struct Section: misc Priority: extra Architecture: amd64 diff --git a/packaging/debian_amd64/usr/share/doc/vault/changelog.gz b/packaging/debian_amd64/usr/share/doc/vault/changelog.gz deleted file mode 100644 index 7c84f7e1..00000000 Binary files a/packaging/debian_amd64/usr/share/doc/vault/changelog.gz and /dev/null differ diff --git a/services/vault-rest/actor/account.go b/services/vault-rest/actor/account.go index e6299004..ea38b529 100644 --- a/services/vault-rest/actor/account.go +++ b/services/vault-rest/actor/account.go @@ -17,7 +17,6 @@ package actor import ( "time" - "github.com/jancajthaml-openbank/vault-rest/daemon" "github.com/jancajthaml-openbank/vault-rest/model" "github.com/rs/xid" @@ -27,8 +26,8 @@ import ( ) // CreateAccount creates new account for target tenant vault -func CreateAccount(s *daemon.ActorSystem, tenant string, account model.Account) (result interface{}) { - s.Metrics.TimeCreateAccount(func() { +func CreateAccount(sys *ActorSystem, tenant string, account model.Account) (result interface{}) { + sys.Metrics.TimeCreateAccount(func() { // FIXME properly determine fail states // input validation -> input error // system in invalid state (and panics) -> fatal error @@ -46,13 +45,13 @@ func CreateAccount(s *daemon.ActorSystem, tenant string, account model.Account) defer close(ch) envelope := system.NewEnvelope("relay/"+xid.New().String(), nil) - defer s.UnregisterActor(envelope.Name) + defer sys.UnregisterActor(envelope.Name) - s.RegisterActor(envelope, func(state interface{}, context system.Context) { + sys.RegisterActor(envelope, func(state interface{}, context system.Context) { ch <- context.Data }) - s.SendRemote(CreateAccountMessage(tenant, envelope.Name, account.Name, account.Currency, account.IsBalanceCheck)) + sys.SendRemote(CreateAccountMessage(tenant, envelope.Name, account.Name, account.Currency, account.IsBalanceCheck)) select { @@ -68,8 +67,8 @@ func CreateAccount(s *daemon.ActorSystem, tenant string, account model.Account) } // GetAccount retrives account state from target tenant vault -func GetAccount(s *daemon.ActorSystem, tenant string, name string) (result interface{}) { - s.Metrics.TimeGetAccount(func() { +func GetAccount(sys *ActorSystem, tenant string, name string) (result interface{}) { + sys.Metrics.TimeGetAccount(func() { // FIXME properly determine fail states // input validation -> input error // system in invalid state (and panics) -> fatal error @@ -87,13 +86,13 @@ func GetAccount(s *daemon.ActorSystem, tenant string, name string) (result inter defer close(ch) envelope := system.NewEnvelope("relay/"+xid.New().String(), nil) - defer s.UnregisterActor(envelope.Name) + defer sys.UnregisterActor(envelope.Name) - s.RegisterActor(envelope, func(state interface{}, context system.Context) { + sys.RegisterActor(envelope, func(state interface{}, context system.Context) { ch <- context.Data }) - s.SendRemote(GetAccountMessage(tenant, envelope.Name, name)) + sys.SendRemote(GetAccountMessage(tenant, envelope.Name, name)) select { diff --git a/services/vault-rest/actor/common.go b/services/vault-rest/actor/common.go index b21da346..025a75f6 100644 --- a/services/vault-rest/actor/common.go +++ b/services/vault-rest/actor/common.go @@ -18,20 +18,17 @@ import ( "fmt" "strings" - "github.com/jancajthaml-openbank/vault-rest/daemon" "github.com/jancajthaml-openbank/vault-rest/model" system "github.com/jancajthaml-openbank/actor-system" log "github.com/sirupsen/logrus" ) -var nilCoordinates = system.Coordinates{} - -func asEnvelopes(s *daemon.ActorSystem, msg string) (system.Coordinates, system.Coordinates, []string, error) { +func asEnvelopes(s *ActorSystem, msg string) (system.Coordinates, system.Coordinates, []string, error) { parts := strings.Split(msg, " ") if len(parts) < 5 { - return nilCoordinates, nilCoordinates, nil, fmt.Errorf("invalid message received %+v", parts) + return system.Coordinates{}, system.Coordinates{}, nil, fmt.Errorf("invalid message received %+v", parts) } recieverRegion, senderRegion, receiverName, senderName := parts[0], parts[1], parts[2], parts[3] @@ -50,7 +47,7 @@ func asEnvelopes(s *daemon.ActorSystem, msg string) (system.Coordinates, system. } // ProcessRemoteMessage processing of remote message to this wall -func ProcessRemoteMessage(s *daemon.ActorSystem) system.ProcessRemoteMessage { +func ProcessRemoteMessage(s *ActorSystem) system.ProcessRemoteMessage { return func(msg string) { from, to, parts, err := asEnvelopes(s, msg) if err != nil { diff --git a/services/vault-rest/daemon/actor_system.go b/services/vault-rest/actor/system.go similarity index 78% rename from services/vault-rest/daemon/actor_system.go rename to services/vault-rest/actor/system.go index 69379614..746ca6c9 100644 --- a/services/vault-rest/daemon/actor_system.go +++ b/services/vault-rest/actor/system.go @@ -12,14 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -package daemon +package actor import ( "context" "fmt" "time" - "github.com/jancajthaml-openbank/vault-rest/config" + "github.com/jancajthaml-openbank/vault-rest/metrics" system "github.com/jancajthaml-openbank/actor-system" ) @@ -27,15 +27,19 @@ import ( // ActorSystem represents actor system subroutine type ActorSystem struct { system.Support - Metrics *Metrics + Metrics *metrics.Metrics } // NewActorSystem returns actor system fascade -func NewActorSystem(ctx context.Context, cfg config.Configuration, metrics *Metrics) ActorSystem { - return ActorSystem{ - Support: system.NewSupport(ctx, "VaultRest", cfg.LakeHostname), +func NewActorSystem(ctx context.Context, lakeEndpoint string, metrics *metrics.Metrics) ActorSystem { + result := ActorSystem{ + Support: system.NewSupport(ctx, "VaultRest", lakeEndpoint), Metrics: metrics, } + + result.Support.RegisterOnRemoteMessage(ProcessRemoteMessage(&result)) + + return result } // GreenLight daemon noop @@ -60,7 +64,7 @@ func (system ActorSystem) WaitReady(deadline time.Duration) (err error) { ticker := time.NewTicker(deadline) select { - case <-system.IsReady: + case <-system.Support.IsReady: ticker.Stop() err = nil return diff --git a/services/vault-rest/api/account.go b/services/vault-rest/api/account.go index b1f234f4..7d6ffc4c 100644 --- a/services/vault-rest/api/account.go +++ b/services/vault-rest/api/account.go @@ -19,17 +19,15 @@ import ( "net/http" "github.com/jancajthaml-openbank/vault-rest/actor" - "github.com/jancajthaml-openbank/vault-rest/daemon" "github.com/jancajthaml-openbank/vault-rest/model" "github.com/jancajthaml-openbank/vault-rest/persistence" "github.com/jancajthaml-openbank/vault-rest/utils" "github.com/gorilla/mux" - localfs "github.com/jancajthaml-openbank/local-fs" ) // AccountPartial returns http handler for single account -func AccountPartial(system *daemon.ActorSystem) func(w http.ResponseWriter, r *http.Request) { +func AccountPartial(server *Server) func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) @@ -46,7 +44,7 @@ func AccountPartial(system *daemon.ActorSystem) func(w http.ResponseWriter, r *h switch r.Method { case "GET": - GetAccount(system, tenant, id, w, r) + GetAccount(server, tenant, id, w, r) return default: @@ -60,7 +58,7 @@ func AccountPartial(system *daemon.ActorSystem) func(w http.ResponseWriter, r *h } // AccountsPartial returns http handler for accounts -func AccountsPartial(system *daemon.ActorSystem, storage *localfs.Storage) func(w http.ResponseWriter, r *http.Request) { +func AccountsPartial(server *Server) func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) @@ -76,11 +74,11 @@ func AccountsPartial(system *daemon.ActorSystem, storage *localfs.Storage) func( switch r.Method { case "GET": - GetAccounts(storage, tenant, w, r) + GetAccounts(server, tenant, w, r) return case "POST": - CreateAccount(system, tenant, w, r) + CreateAccount(server, tenant, w, r) return default: @@ -95,7 +93,7 @@ func AccountsPartial(system *daemon.ActorSystem, storage *localfs.Storage) func( } // CreateAccount creates new account -func CreateAccount(system *daemon.ActorSystem, tenant string, w http.ResponseWriter, r *http.Request) { +func CreateAccount(server *Server, tenant string, w http.ResponseWriter, r *http.Request) { b, err := ioutil.ReadAll(r.Body) defer r.Body.Close() if err != nil { @@ -114,7 +112,7 @@ func CreateAccount(system *daemon.ActorSystem, tenant string, w http.ResponseWri return } - switch actor.CreateAccount(system, tenant, *req).(type) { + switch actor.CreateAccount(server.ActorSystem, tenant, *req).(type) { case *model.AccountCreated: resp, err := utils.JSON.Marshal(req) @@ -146,8 +144,8 @@ func CreateAccount(system *daemon.ActorSystem, tenant string, w http.ResponseWri } // GetAccounts returns list of existing accounts -func GetAccounts(storage *localfs.Storage, tenant string, w http.ResponseWriter, r *http.Request) { - accounts, err := persistence.LoadAccounts(storage, tenant) +func GetAccounts(server *Server, tenant string, w http.ResponseWriter, r *http.Request) { + accounts, err := persistence.LoadAccounts(server.Storage, tenant) if err != nil { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusInternalServerError) @@ -168,8 +166,8 @@ func GetAccounts(storage *localfs.Storage, tenant string, w http.ResponseWriter, } // GetAccount returns snapshot existing account -func GetAccount(system *daemon.ActorSystem, tenant string, id string, w http.ResponseWriter, r *http.Request) { - switch result := actor.GetAccount(system, tenant, id).(type) { +func GetAccount(server *Server, tenant string, id string, w http.ResponseWriter, r *http.Request) { + switch result := actor.GetAccount(server.ActorSystem, tenant, id).(type) { case *model.AccountMissing: w.Header().Set("Content-Type", "application/json") diff --git a/services/vault-rest/api/health.go b/services/vault-rest/api/health.go index 6fc9e4a5..02752277 100644 --- a/services/vault-rest/api/health.go +++ b/services/vault-rest/api/health.go @@ -17,8 +17,10 @@ package api import "net/http" // HealtCheck returns 200 OK -func HealtCheck(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - w.Write(emptyJSONObject) +func HealtCheck(server *Server) func(w http.ResponseWriter, r *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write(emptyJSONObject) + } } diff --git a/services/vault-rest/daemon/server.go b/services/vault-rest/api/server.go similarity index 71% rename from services/vault-rest/daemon/server.go rename to services/vault-rest/api/server.go index 34d8c081..761ebc4b 100644 --- a/services/vault-rest/daemon/server.go +++ b/services/vault-rest/api/server.go @@ -12,34 +12,41 @@ // See the License for the specific language governing permissions and // limitations under the License. -package daemon +package api import ( "context" "crypto/tls" "fmt" + "io/ioutil" "net" "net/http" "time" - "github.com/jancajthaml-openbank/vault-rest/config" + "github.com/jancajthaml-openbank/vault-rest/actor" + "github.com/jancajthaml-openbank/vault-rest/systemd" "github.com/jancajthaml-openbank/vault-rest/utils" "github.com/gorilla/mux" - + localfs "github.com/jancajthaml-openbank/local-fs" log "github.com/sirupsen/logrus" ) // Server is a fascade for http-server following handler api of Gin and lifecycle // api of http type Server struct { - Support - underlying *http.Server - router *mux.Router - key []byte - cert []byte + utils.DaemonSupport + Storage *localfs.Storage + SystemControl *systemd.SystemControl + ActorSystem *actor.ActorSystem + underlying *http.Server + router *mux.Router + key []byte + cert []byte } +type Endpoint func(*Server) func(http.ResponseWriter, *http.Request) + type tcpKeepAliveListener struct { *net.TCPListener } @@ -72,14 +79,27 @@ func cloneTLSConfig(cfg *tls.Config) *tls.Config { } // NewServer returns new secure server instance -func NewServer(ctx context.Context, cfg config.Configuration) Server { +func NewServer(ctx context.Context, port int, secretsPath string, actorSystem *actor.ActorSystem, systemControl *systemd.SystemControl, storage *localfs.Storage) Server { router := mux.NewRouter() - return Server{ - Support: NewDaemonSupport(ctx), - router: router, + cert, err := ioutil.ReadFile(secretsPath + "/domain.local.crt") + if err != nil { + log.Fatalf("unable to load certificate %s/domain.local.crt", secretsPath) + } + + key, err := ioutil.ReadFile(secretsPath + "/domain.local.key") + if err != nil { + log.Fatalf("unable to load certificate %s/domain.local.key", secretsPath) + } + + result := Server{ + DaemonSupport: utils.NewDaemonSupport(ctx), + Storage: storage, + ActorSystem: actorSystem, + router: router, + SystemControl: systemControl, underlying: &http.Server{ - Addr: fmt.Sprintf(":%d", cfg.ServerPort), + Addr: fmt.Sprintf(":%d", port), ReadTimeout: 5 * time.Second, WriteTimeout: 5 * time.Second, Handler: router, @@ -96,15 +116,23 @@ func NewServer(ctx context.Context, cfg config.Configuration) Server { }, TLSNextProto: make(map[string]func(*http.Server, *tls.Conn, http.Handler), 0), }, - key: cfg.SecretKey, - cert: cfg.SecretCert, + key: key, + cert: cert, } + + result.HandleFunc("/health", HealtCheck, "GET", "HEAD") + result.HandleFunc("/tenant/{tenant}", TenantPartial, "POST", "DELETE") + result.HandleFunc("/tenant", TenantsPartial, "GET") + result.HandleFunc("/account/{tenant}/{id}", AccountPartial, "GET") + result.HandleFunc("/account/{tenant}", AccountsPartial, "POST", "GET") + + return result } // HandleFunc registers route -func (server Server) HandleFunc(path string, handle func(w http.ResponseWriter, r *http.Request), methods ...string) *mux.Route { +func (server Server) HandleFunc(path string, handle Endpoint, methods ...string) *mux.Route { log.Debugf("HTTP route %+v %+v registered", methods, path) - return server.router.HandleFunc(path, handle).Methods(methods...) + return server.router.HandleFunc(path, handle(&server)).Methods(methods...) } // WaitReady wait for server to be ready @@ -168,7 +196,7 @@ func (server Server) Start() { server.MarkReady() select { - case <-server.canStart: + case <-server.CanStart: break case <-server.Done(): return @@ -176,7 +204,7 @@ func (server Server) Start() { log.Infof("Start http-server daemon, listening on :%d", ln.Addr().(*net.TCPAddr).Port) - <-server.exitSignal + <-server.ExitSignal } // Stop tries to shut down http-server daemon gracefully within 5 seconds @@ -185,6 +213,6 @@ func (server Server) Stop() { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() server.underlying.Shutdown(ctx) - server.cancel() + server.Cancel() return } diff --git a/services/vault-rest/api/tenant.go b/services/vault-rest/api/tenant.go index 39c01552..c0c19bf5 100644 --- a/services/vault-rest/api/tenant.go +++ b/services/vault-rest/api/tenant.go @@ -17,14 +17,14 @@ package api import ( "net/http" - "github.com/gorilla/mux" - "github.com/jancajthaml-openbank/vault-rest/daemon" "github.com/jancajthaml-openbank/vault-rest/utils" + + "github.com/gorilla/mux" "github.com/labstack/gommon/log" ) // TenantPartial returns http handler for single tenant -func TenantPartial(system *daemon.SystemControl) func(w http.ResponseWriter, r *http.Request) { +func TenantPartial(server *Server) func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) @@ -40,11 +40,11 @@ func TenantPartial(system *daemon.SystemControl) func(w http.ResponseWriter, r * switch r.Method { case "POST": - EnableUnit(system, tenant, w, r) + EnableUnit(server, tenant, w, r) return case "DELETE": - DisableUnit(system, tenant, w, r) + DisableUnit(server, tenant, w, r) return default: @@ -58,10 +58,10 @@ func TenantPartial(system *daemon.SystemControl) func(w http.ResponseWriter, r * } // TenantsPartial returns http handler for tenants -func TenantsPartial(system *daemon.SystemControl) func(w http.ResponseWriter, r *http.Request) { +func TenantsPartial(server *Server) func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) { - units, err := system.ListUnits("vault-unit@") + units, err := server.SystemControl.ListUnits("vault-unit@") if err != nil { log.Errorf("Error when listing units, %+v", err) w.Header().Set("Content-Type", "application/json") @@ -86,8 +86,8 @@ func TenantsPartial(system *daemon.SystemControl) func(w http.ResponseWriter, r } // EnableUnit enables tenant unit -func EnableUnit(system *daemon.SystemControl, tenant string, w http.ResponseWriter, r *http.Request) { - err := system.EnableUnit("vault-unit@" + tenant + ".service") +func EnableUnit(server *Server, tenant string, w http.ResponseWriter, r *http.Request) { + err := server.SystemControl.EnableUnit("vault-unit@" + tenant + ".service") if err != nil { log.Errorf("Error when enabling unit, %+v", err) w.Header().Set("Content-Type", "application/json") @@ -103,8 +103,8 @@ func EnableUnit(system *daemon.SystemControl, tenant string, w http.ResponseWrit } // DisableUnit disables tenant unit -func DisableUnit(system *daemon.SystemControl, tenant string, w http.ResponseWriter, r *http.Request) { - err := system.DisableUnit("vault-unit@" + tenant + ".service") +func DisableUnit(server *Server, tenant string, w http.ResponseWriter, r *http.Request) { + err := server.SystemControl.DisableUnit("vault-unit@" + tenant + ".service") if err != nil { log.Errorf("Error when disabling unit, %+v", err) w.Header().Set("Content-Type", "application/json") diff --git a/services/vault-rest/boot/init.go b/services/vault-rest/boot/init.go index 0a9e545a..97fc778f 100644 --- a/services/vault-rest/boot/init.go +++ b/services/vault-rest/boot/init.go @@ -21,54 +21,47 @@ import ( "github.com/jancajthaml-openbank/vault-rest/actor" "github.com/jancajthaml-openbank/vault-rest/api" "github.com/jancajthaml-openbank/vault-rest/config" - "github.com/jancajthaml-openbank/vault-rest/daemon" + "github.com/jancajthaml-openbank/vault-rest/metrics" + "github.com/jancajthaml-openbank/vault-rest/systemd" "github.com/jancajthaml-openbank/vault-rest/utils" localfs "github.com/jancajthaml-openbank/local-fs" ) -// Application encapsulate initialized application -type Application struct { +// Program encapsulate initialized application +type Program struct { cfg config.Configuration interrupt chan os.Signal - actorSystem daemon.ActorSystem - metrics daemon.Metrics - rest daemon.Server - systemControl daemon.SystemControl + actorSystem actor.ActorSystem + metrics metrics.Metrics + rest api.Server + systemControl systemd.SystemControl cancel context.CancelFunc } // Initialize application -func Initialize() Application { +func Initialize() Program { ctx, cancel := context.WithCancel(context.Background()) cfg := config.GetConfig() utils.SetupLogger(cfg.LogLevel) - systemControl := daemon.NewSystemControl(ctx, cfg) + systemControlDaemon := systemd.NewSystemControl(ctx) storage := localfs.NewStorage(cfg.RootStorage) + metricsDaemon := metrics.NewMetrics(ctx, cfg.MetricsOutput, cfg.MetricsRefreshRate) - metrics := daemon.NewMetrics(ctx, cfg) + actorSystemDaemon := actor.NewActorSystem(ctx, cfg.LakeHostname, &metricsDaemon) + restDaemon := api.NewServer(ctx, cfg.ServerPort, cfg.SecretsPath, &actorSystemDaemon, &systemControlDaemon, &storage) - actorSystem := daemon.NewActorSystem(ctx, cfg, &metrics) - actorSystem.Support.RegisterOnRemoteMessage(actor.ProcessRemoteMessage(&actorSystem)) - - rest := daemon.NewServer(ctx, cfg) - rest.HandleFunc("/health", api.HealtCheck, "GET", "HEAD") - rest.HandleFunc("/tenant/{tenant}", api.TenantPartial(&systemControl), "POST", "DELETE") - rest.HandleFunc("/tenant", api.TenantsPartial(&systemControl), "GET") - rest.HandleFunc("/account/{tenant}/{id}", api.AccountPartial(&actorSystem), "GET") - rest.HandleFunc("/account/{tenant}", api.AccountsPartial(&actorSystem, &storage), "POST", "GET") - - return Application{ + return Program{ cfg: cfg, interrupt: make(chan os.Signal, 1), - metrics: metrics, - actorSystem: actorSystem, - rest: rest, - systemControl: systemControl, + metrics: metricsDaemon, + actorSystem: actorSystemDaemon, + rest: restDaemon, + systemControl: systemControlDaemon, cancel: cancel, } } diff --git a/services/vault-rest/boot/run.go b/services/vault-rest/boot/run.go index 31114200..8f0b0fd3 100644 --- a/services/vault-rest/boot/run.go +++ b/services/vault-rest/boot/run.go @@ -21,24 +21,23 @@ import ( "syscall" "time" - "github.com/jancajthaml-openbank/vault-rest/daemon" "github.com/jancajthaml-openbank/vault-rest/utils" log "github.com/sirupsen/logrus" ) // Stop stops the application -func (app Application) Stop() { - close(app.interrupt) +func (prog Program) Stop() { + close(prog.interrupt) } // WaitReady wait for daemons to be ready -func (app Application) WaitReady(deadline time.Duration) error { +func (prog Program) WaitReady(deadline time.Duration) error { errors := make([]error, 0) mux := new(sync.Mutex) var wg sync.WaitGroup - waitWithDeadline := func(support daemon.Daemon) { + waitWithDeadline := func(support utils.Daemon) { go func() { err := support.WaitReady(deadline) if err != nil { @@ -51,10 +50,10 @@ func (app Application) WaitReady(deadline time.Duration) error { } wg.Add(4) - waitWithDeadline(app.actorSystem) - waitWithDeadline(app.rest) - waitWithDeadline(app.systemControl) - waitWithDeadline(app.metrics) + waitWithDeadline(prog.actorSystem) + waitWithDeadline(prog.rest) + waitWithDeadline(prog.systemControl) + waitWithDeadline(prog.metrics) wg.Wait() if len(errors) > 0 { @@ -65,45 +64,45 @@ func (app Application) WaitReady(deadline time.Duration) error { } // GreenLight daemons -func (app Application) GreenLight() { - app.metrics.GreenLight() - app.actorSystem.GreenLight() - app.systemControl.GreenLight() - app.rest.GreenLight() +func (prog Program) GreenLight() { + prog.metrics.GreenLight() + prog.actorSystem.GreenLight() + prog.systemControl.GreenLight() + prog.rest.GreenLight() } // WaitInterrupt wait for signal -func (app Application) WaitInterrupt() { - <-app.interrupt +func (prog Program) WaitInterrupt() { + <-prog.interrupt } // Run runs the application -func (app Application) Run() { +func (prog Program) Run() { log.Info(">>> Start <<<") - go app.metrics.Start() - go app.actorSystem.Start() - go app.systemControl.Start() - go app.rest.Start() + go prog.metrics.Start() + go prog.actorSystem.Start() + go prog.systemControl.Start() + go prog.rest.Start() - if err := app.WaitReady(5 * time.Second); err != nil { + if err := prog.WaitReady(5 * time.Second); err != nil { log.Errorf("Error when starting daemons: %+v", err) } else { log.Info(">>> Started <<<") utils.NotifyServiceReady() - app.GreenLight() - signal.Notify(app.interrupt, syscall.SIGINT, syscall.SIGTERM) - app.WaitInterrupt() + prog.GreenLight() + signal.Notify(prog.interrupt, syscall.SIGINT, syscall.SIGTERM) + prog.WaitInterrupt() } log.Info(">>> Stopping <<<") utils.NotifyServiceStopping() - app.rest.Stop() - app.actorSystem.Stop() - app.systemControl.Stop() - app.metrics.Stop() - app.cancel() + prog.rest.Stop() + prog.actorSystem.Stop() + prog.systemControl.Stop() + prog.metrics.Stop() + prog.cancel() log.Info(">>> Stop <<<") } diff --git a/services/vault-rest/config/config.go b/services/vault-rest/config/config.go index 00b686e7..89d1231e 100644 --- a/services/vault-rest/config/config.go +++ b/services/vault-rest/config/config.go @@ -22,10 +22,8 @@ type Configuration struct { RootStorage string // ServerPort is port which server is bound to ServerPort int - // Secrets represents cerificate .key - SecretKey []byte - // Secrets represents cerificate .crt - SecretCert []byte + // SecretsPath directory where .key and .crt is stored + SecretsPath string // LakeHostname represent hostname of openbank lake service LakeHostname string // LogLevel ignorecase log level diff --git a/services/vault-rest/config/environment.go b/services/vault-rest/config/environment.go index 2f30256a..82f46d9c 100644 --- a/services/vault-rest/config/environment.go +++ b/services/vault-rest/config/environment.go @@ -15,7 +15,6 @@ package config import ( - "io/ioutil" "os" "path/filepath" "strconv" @@ -42,21 +41,10 @@ func loadConfFromEnv() Configuration { log.Fatal("unable to assert metrics output") } - cert, err := ioutil.ReadFile(secrets + "/domain.local.crt") - if err != nil { - log.Fatalf("unable to load certificate %s/domain.local.crt", secrets) - } - - key, err := ioutil.ReadFile(secrets + "/domain.local.key") - if err != nil { - log.Fatalf("unable to load certificate %s/domain.local.key", secrets) - } - return Configuration{ RootStorage: rootStorage, ServerPort: port, - SecretKey: key, - SecretCert: cert, + SecretsPath: secrets, LakeHostname: lakeHostname, LogLevel: logLevel, MetricsRefreshRate: metricsRefreshRate, diff --git a/services/vault-rest/main.go b/services/vault-rest/main.go index b8611019..27e32d52 100644 --- a/services/vault-rest/main.go +++ b/services/vault-rest/main.go @@ -19,7 +19,7 @@ import ( ) func main() { - application := boot.Initialize() - defer application.Stop() - application.Run() + program := boot.Initialize() + defer program.Stop() + program.Run() } diff --git a/services/vault-rest/daemon/metrics.go b/services/vault-rest/metrics/metrics.go similarity index 90% rename from services/vault-rest/daemon/metrics.go rename to services/vault-rest/metrics/metrics.go index 7d403308..f0e2223b 100644 --- a/services/vault-rest/daemon/metrics.go +++ b/services/vault-rest/metrics/metrics.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package daemon +package metrics import ( "context" @@ -21,7 +21,6 @@ import ( "path/filepath" "time" - "github.com/jancajthaml-openbank/vault-rest/config" "github.com/jancajthaml-openbank/vault-rest/utils" metrics "github.com/rcrowley/go-metrics" @@ -30,7 +29,7 @@ import ( // Metrics represents metrics subroutine type Metrics struct { - Support + utils.DaemonSupport output string refreshRate time.Duration getAccountLatency metrics.Timer @@ -39,11 +38,11 @@ type Metrics struct { } // NewMetrics returns metrics fascade -func NewMetrics(ctx context.Context, cfg config.Configuration) Metrics { +func NewMetrics(ctx context.Context, output string, refreshRate time.Duration) Metrics { return Metrics{ - Support: NewDaemonSupport(ctx), - output: cfg.MetricsOutput, - refreshRate: cfg.MetricsRefreshRate, + DaemonSupport: utils.NewDaemonSupport(ctx), + output: output, + refreshRate: refreshRate, getAccountLatency: metrics.NewTimer(), createAccountLatency: metrics.NewTimer(), } @@ -147,30 +146,31 @@ func (metrics Metrics) Start() { return } - output := getFilename(metrics.output) + metricsOutput := getFilename(metrics.output) + ticker := time.NewTicker(metrics.refreshRate) defer ticker.Stop() metrics.MarkReady() select { - case <-metrics.canStart: + case <-metrics.CanStart: break case <-metrics.Done(): return } - log.Infof("Start metrics daemon, update each %v into %v", metrics.refreshRate, output) + log.Infof("Start metrics daemon, update each %v into %v", metrics.refreshRate, metricsOutput) for { select { case <-metrics.Done(): log.Info("Stopping metrics daemon") - metrics.persist(output) + metrics.persist(metricsOutput) log.Info("Stop metrics daemon") return case <-ticker.C: - metrics.persist(output) + metrics.persist(metricsOutput) } } } diff --git a/services/vault-rest/daemon/metrics_test.go b/services/vault-rest/metrics/metrics_test.go similarity index 89% rename from services/vault-rest/daemon/metrics_test.go rename to services/vault-rest/metrics/metrics_test.go index 077ad2f2..4b81df41 100644 --- a/services/vault-rest/daemon/metrics_test.go +++ b/services/vault-rest/metrics/metrics_test.go @@ -1,12 +1,10 @@ -package daemon +package metrics import ( "context" "testing" "time" - "github.com/jancajthaml-openbank/vault-rest/config" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -16,12 +14,10 @@ func TestGetFilename(t *testing.T) { } func TestMetricsPersist(t *testing.T) { - cfg := config.Configuration{} - ctx, cancel := context.WithCancel(context.Background()) defer cancel() - entity := NewMetrics(ctx, cfg) + entity := NewMetrics(ctx, "", time.Hour) delay := 1e8 delta := 1e8 diff --git a/services/vault-rest/daemon/system_control.go b/services/vault-rest/systemd/system.go similarity index 93% rename from services/vault-rest/daemon/system_control.go rename to services/vault-rest/systemd/system.go index 2d80dbf1..9fc7b440 100644 --- a/services/vault-rest/daemon/system_control.go +++ b/services/vault-rest/systemd/system.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package daemon +package systemd import ( "context" @@ -21,7 +21,7 @@ import ( "strings" "time" - "github.com/jancajthaml-openbank/vault-rest/config" + "github.com/jancajthaml-openbank/vault-rest/utils" "github.com/coreos/go-systemd/dbus" @@ -30,20 +30,20 @@ import ( // SystemControl represents systemctl subroutine type SystemControl struct { - Support + utils.DaemonSupport underlying *dbus.Conn } // NewSystemControl returns new systemctl fascade -func NewSystemControl(ctx context.Context, cfg config.Configuration) SystemControl { +func NewSystemControl(ctx context.Context) SystemControl { conn, err := dbus.New() if err != nil { panic(fmt.Sprintf("Unable to obtain dbus connection because %+v", err)) } return SystemControl{ - Support: NewDaemonSupport(ctx), - underlying: conn, + DaemonSupport: utils.NewDaemonSupport(ctx), + underlying: conn, } } @@ -168,7 +168,7 @@ func (sys SystemControl) Start() { sys.MarkReady() select { - case <-sys.canStart: + case <-sys.CanStart: break case <-sys.Done(): return @@ -176,12 +176,12 @@ func (sys SystemControl) Start() { log.Info("Start system-control daemon") - <-sys.exitSignal + <-sys.ExitSignal } // Stop shutdowns systemctl fascade func (sys *SystemControl) Stop() { log.Info("Stopping system-control daemon") - sys.cancel() + sys.Cancel() return } diff --git a/services/vault-unit/daemon/daemon.go b/services/vault-rest/utils/daemon.go similarity index 63% rename from services/vault-unit/daemon/daemon.go rename to services/vault-rest/utils/daemon.go index 56fd7636..9218c3c4 100644 --- a/services/vault-unit/daemon/daemon.go +++ b/services/vault-rest/utils/daemon.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package daemon +package utils import ( "context" @@ -24,55 +24,55 @@ type Daemon interface { WaitReady(deadline time.Duration) error } +// Support provides support for graceful shutdown +type DaemonSupport struct { + ctx context.Context + Cancel context.CancelFunc + ExitSignal chan struct{} + IsReady chan interface{} + CanStart chan interface{} +} + // NewDaemonSupport constructor -func NewDaemonSupport(parentCtx context.Context) Support { +func NewDaemonSupport(parentCtx context.Context) DaemonSupport { ctx, cancel := context.WithCancel(parentCtx) - return Support{ + return DaemonSupport{ ctx: ctx, - cancel: cancel, - exitSignal: make(chan struct{}), + Cancel: cancel, + ExitSignal: make(chan struct{}), IsReady: make(chan interface{}), - canStart: make(chan interface{}), + CanStart: make(chan interface{}), } } -// Support provides support for graceful shutdown -type Support struct { - ctx context.Context - cancel context.CancelFunc - exitSignal chan struct{} - IsReady chan interface{} - canStart chan interface{} -} - // GreenLight signals daemon to start work -func (s Support) GreenLight() { - s.canStart <- nil +func (daemon DaemonSupport) GreenLight() { + daemon.CanStart <- nil } // MarkDone signals daemon is finished -func (s Support) MarkDone() { - close(s.exitSignal) +func (daemon DaemonSupport) MarkDone() { + close(daemon.ExitSignal) } // MarkReady signals daemon is ready -func (s Support) MarkReady() { - s.IsReady <- nil +func (daemon DaemonSupport) MarkReady() { + daemon.IsReady <- nil } // Done cancel channel -func (s Support) Done() <-chan struct{} { - return s.ctx.Done() +func (daemon DaemonSupport) Done() <-chan struct{} { + return daemon.ctx.Done() } // Stop daemon and wait for graceful shutdown -func (s Support) Stop() { - s.cancel() - <-s.exitSignal +func (daemon DaemonSupport) Stop() { + daemon.Cancel() + <-daemon.ExitSignal } // Start daemon and wait for it to be ready -func (s Support) Start() { - s.MarkReady() - <-s.IsReady +func (daemon DaemonSupport) Start() { + daemon.MarkReady() + <-daemon.IsReady } diff --git a/services/vault-unit/actor/account.go b/services/vault-unit/actor/account.go index 9be72e7d..1d602087 100644 --- a/services/vault-unit/actor/account.go +++ b/services/vault-unit/actor/account.go @@ -17,7 +17,6 @@ package actor import ( "strings" - "github.com/jancajthaml-openbank/vault-unit/daemon" "github.com/jancajthaml-openbank/vault-unit/model" "github.com/jancajthaml-openbank/vault-unit/persistence" @@ -27,7 +26,7 @@ import ( ) // NilAccount represents account that is neither existing neither non existing -func NilAccount(s *daemon.ActorSystem) func(interface{}, system.Context) { +func NilAccount(s *ActorSystem) func(interface{}, system.Context) { return func(t_state interface{}, context system.Context) { state := t_state.(model.Account) @@ -46,7 +45,7 @@ func NilAccount(s *daemon.ActorSystem) func(interface{}, system.Context) { } // NonExistAccount represents account that does not exist -func NonExistAccount(s *daemon.ActorSystem) func(interface{}, system.Context) { +func NonExistAccount(s *ActorSystem) func(interface{}, system.Context) { return func(t_state interface{}, context system.Context) { state := t_state.(model.Account) @@ -90,7 +89,7 @@ func NonExistAccount(s *daemon.ActorSystem) func(interface{}, system.Context) { } // ExistAccount represents account that does exist -func ExistAccount(s *daemon.ActorSystem) func(interface{}, system.Context) { +func ExistAccount(s *ActorSystem) func(interface{}, system.Context) { return func(t_state interface{}, context system.Context) { state := t_state.(model.Account) diff --git a/services/vault-unit/actor/common.go b/services/vault-unit/actor/common.go index 85a08137..f8b82f5e 100644 --- a/services/vault-unit/actor/common.go +++ b/services/vault-unit/actor/common.go @@ -18,7 +18,6 @@ import ( "fmt" "strings" - "github.com/jancajthaml-openbank/vault-unit/daemon" "github.com/jancajthaml-openbank/vault-unit/model" system "github.com/jancajthaml-openbank/actor-system" @@ -29,7 +28,7 @@ import ( var nilCoordinates = system.Coordinates{} // ProcessLocalMessage processing of local message to this vault -func ProcessLocalMessage(s *daemon.ActorSystem) system.ProcessLocalMessage { +func ProcessLocalMessage(s *ActorSystem) system.ProcessLocalMessage { return func(message interface{}, to system.Coordinates, from system.Coordinates) { if to.Region != "" && to.Region != s.Name { log.Warnf("Invalid region received [local %s -> local %s]", from, to) @@ -49,7 +48,7 @@ func ProcessLocalMessage(s *daemon.ActorSystem) system.ProcessLocalMessage { } } -func asEnvelopes(s *daemon.ActorSystem, msg string) (system.Coordinates, system.Coordinates, []string, error) { +func asEnvelopes(s *ActorSystem, msg string) (system.Coordinates, system.Coordinates, []string, error) { parts := strings.Split(msg, " ") if len(parts) < 5 { @@ -71,7 +70,7 @@ func asEnvelopes(s *daemon.ActorSystem, msg string) (system.Coordinates, system. return from, to, parts, nil } -func spawnAccountActor(s *daemon.ActorSystem, name string) (*system.Envelope, error) { +func spawnAccountActor(s *ActorSystem, name string) (*system.Envelope, error) { envelope := system.NewEnvelope(name, model.NewAccount(name)) err := s.RegisterActor(envelope, NilAccount(s)) @@ -85,7 +84,7 @@ func spawnAccountActor(s *daemon.ActorSystem, name string) (*system.Envelope, er } // ProcessRemoteMessage processing of remote message to this vault -func ProcessRemoteMessage(s *daemon.ActorSystem) system.ProcessRemoteMessage { +func ProcessRemoteMessage(s *ActorSystem) system.ProcessRemoteMessage { return func(msg string) { from, to, parts, err := asEnvelopes(s, msg) if err != nil { diff --git a/services/vault-unit/daemon/actor_system.go b/services/vault-unit/actor/system.go similarity index 76% rename from services/vault-unit/daemon/actor_system.go rename to services/vault-unit/actor/system.go index 8f7c6973..57d53dc4 100644 --- a/services/vault-unit/daemon/actor_system.go +++ b/services/vault-unit/actor/system.go @@ -12,14 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -package daemon +package actor import ( "context" "fmt" "time" - "github.com/jancajthaml-openbank/vault-unit/config" + "github.com/jancajthaml-openbank/vault-unit/metrics" system "github.com/jancajthaml-openbank/actor-system" localfs "github.com/jancajthaml-openbank/local-fs" @@ -29,16 +29,21 @@ import ( type ActorSystem struct { system.Support Storage *localfs.Storage - Metrics *Metrics + Metrics *metrics.Metrics } // NewActorSystem returns actor system fascade -func NewActorSystem(ctx context.Context, cfg config.Configuration, metrics *Metrics, storage *localfs.Storage) ActorSystem { - return ActorSystem{ - Support: system.NewSupport(ctx, "VaultUnit/"+cfg.Tenant, cfg.LakeHostname), +func NewActorSystem(ctx context.Context, tenant string, lakeEndpoint string, metrics *metrics.Metrics, storage *localfs.Storage) ActorSystem { + result := ActorSystem{ + Support: system.NewSupport(ctx, "VaultUnit/"+tenant, lakeEndpoint), Storage: storage, Metrics: metrics, } + + result.Support.RegisterOnLocalMessage(ProcessLocalMessage(&result)) + result.Support.RegisterOnRemoteMessage(ProcessRemoteMessage(&result)) + + return result } // GreenLight daemon noop diff --git a/services/vault-unit/boot/init.go b/services/vault-unit/boot/init.go index 324dad61..2471675e 100644 --- a/services/vault-unit/boot/init.go +++ b/services/vault-unit/boot/init.go @@ -22,7 +22,8 @@ import ( "github.com/jancajthaml-openbank/vault-unit/actor" "github.com/jancajthaml-openbank/vault-unit/config" - "github.com/jancajthaml-openbank/vault-unit/daemon" + "github.com/jancajthaml-openbank/vault-unit/metrics" + "github.com/jancajthaml-openbank/vault-unit/persistence" "github.com/jancajthaml-openbank/vault-unit/utils" ) @@ -30,9 +31,9 @@ import ( type Application struct { cfg config.Configuration interrupt chan os.Signal - metrics daemon.Metrics - actorSystem daemon.ActorSystem - snapshotUpdater daemon.SnapshotUpdater + metrics metrics.Metrics + actorSystem actor.ActorSystem + snapshotUpdater persistence.SnapshotUpdater cancel context.CancelFunc } @@ -45,21 +46,17 @@ func Initialize() Application { utils.SetupLogger(cfg.LogLevel) storage := localfs.NewStorage(cfg.RootStorage) + metricsDaemon := metrics.NewMetrics(ctx, cfg.Tenant, cfg.MetricsOutput, cfg.MetricsRefreshRate) - metrics := daemon.NewMetrics(ctx, cfg) - - actorSystem := daemon.NewActorSystem(ctx, cfg, &metrics, &storage) - actorSystem.Support.RegisterOnLocalMessage(actor.ProcessLocalMessage(&actorSystem)) - actorSystem.Support.RegisterOnRemoteMessage(actor.ProcessRemoteMessage(&actorSystem)) - - snapshotUpdater := daemon.NewSnapshotUpdater(ctx, cfg, &metrics, &storage, actor.ProcessLocalMessage(&actorSystem)) + actorSystemDaemon := actor.NewActorSystem(ctx, cfg.Tenant, cfg.LakeHostname, &metricsDaemon, &storage) + snapshotUpdaterDaemon := persistence.NewSnapshotUpdater(ctx, cfg.JournalSaturation, cfg.SnapshotScanInterval, &metricsDaemon, &storage, actor.ProcessLocalMessage(&actorSystemDaemon)) return Application{ cfg: cfg, interrupt: make(chan os.Signal, 1), - metrics: metrics, - actorSystem: actorSystem, - snapshotUpdater: snapshotUpdater, + metrics: metricsDaemon, + actorSystem: actorSystemDaemon, + snapshotUpdater: snapshotUpdaterDaemon, cancel: cancel, } } diff --git a/services/vault-unit/boot/run.go b/services/vault-unit/boot/run.go index 0ca6913c..12436b10 100644 --- a/services/vault-unit/boot/run.go +++ b/services/vault-unit/boot/run.go @@ -21,7 +21,6 @@ import ( "syscall" "time" - "github.com/jancajthaml-openbank/vault-unit/daemon" "github.com/jancajthaml-openbank/vault-unit/utils" log "github.com/sirupsen/logrus" @@ -38,7 +37,7 @@ func (app Application) WaitReady(deadline time.Duration) error { mux := new(sync.Mutex) var wg sync.WaitGroup - waitWithDeadline := func(support daemon.Daemon) { + waitWithDeadline := func(support utils.Daemon) { go func() { err := support.WaitReady(deadline) if err != nil { diff --git a/services/vault-unit/daemon/metrics.go b/services/vault-unit/metrics/metrics.go similarity index 94% rename from services/vault-unit/daemon/metrics.go rename to services/vault-unit/metrics/metrics.go index 48e28984..6104262f 100644 --- a/services/vault-unit/daemon/metrics.go +++ b/services/vault-unit/metrics/metrics.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package daemon +package metrics import ( "context" @@ -21,7 +21,6 @@ import ( "path/filepath" "time" - "github.com/jancajthaml-openbank/vault-unit/config" "github.com/jancajthaml-openbank/vault-unit/utils" metrics "github.com/rcrowley/go-metrics" @@ -40,7 +39,7 @@ type Snapshot struct { // Metrics represents metrics subroutine type Metrics struct { - Support + utils.DaemonSupport output string tenant string refreshRate time.Duration @@ -53,12 +52,12 @@ type Metrics struct { } // NewMetrics returns metrics fascade -func NewMetrics(ctx context.Context, cfg config.Configuration) Metrics { +func NewMetrics(ctx context.Context, tenant string, output string, refreshRate time.Duration) Metrics { return Metrics{ - Support: NewDaemonSupport(ctx), - output: cfg.MetricsOutput, - tenant: cfg.Tenant, - refreshRate: cfg.MetricsRefreshRate, + DaemonSupport: utils.NewDaemonSupport(ctx), + output: output, + tenant: tenant, + refreshRate: refreshRate, promisesAccepted: metrics.NewCounter(), commitsAccepted: metrics.NewCounter(), rollbacksAccepted: metrics.NewCounter(), @@ -195,7 +194,7 @@ func (metrics Metrics) Start() { metrics.MarkReady() select { - case <-metrics.canStart: + case <-metrics.CanStart: break case <-metrics.Done(): return diff --git a/services/vault-unit/daemon/metrics_test.go b/services/vault-unit/metrics/metrics_test.go similarity index 93% rename from services/vault-unit/daemon/metrics_test.go rename to services/vault-unit/metrics/metrics_test.go index 741faacf..e0cfda4b 100644 --- a/services/vault-unit/daemon/metrics_test.go +++ b/services/vault-unit/metrics/metrics_test.go @@ -1,12 +1,10 @@ -package daemon +package metrics import ( "context" "testing" "time" - "github.com/jancajthaml-openbank/vault-unit/config" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -17,12 +15,10 @@ func TestGetFilename(t *testing.T) { } func TestMetricsPersist(t *testing.T) { - cfg := config.Configuration{} - ctx, cancel := context.WithCancel(context.Background()) defer cancel() - entity := NewMetrics(ctx, cfg) + entity := NewMetrics(ctx, "tenant", "output", time.Hour) delay := 1e8 delta := 1e8 diff --git a/services/vault-unit/daemon/snapshot_updater.go b/services/vault-unit/persistence/snapshot_updater.go similarity index 88% rename from services/vault-unit/daemon/snapshot_updater.go rename to services/vault-unit/persistence/snapshot_updater.go index 07caed80..78101a09 100644 --- a/services/vault-unit/daemon/snapshot_updater.go +++ b/services/vault-unit/persistence/snapshot_updater.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package daemon +package persistence import ( "context" @@ -20,7 +20,7 @@ import ( "strconv" "time" - "github.com/jancajthaml-openbank/vault-unit/config" + "github.com/jancajthaml-openbank/vault-unit/metrics" "github.com/jancajthaml-openbank/vault-unit/model" "github.com/jancajthaml-openbank/vault-unit/utils" @@ -31,23 +31,23 @@ import ( // SnapshotUpdater represents journal saturation update subroutine type SnapshotUpdater struct { - Support + utils.DaemonSupport callback func(msg interface{}, to system.Coordinates, from system.Coordinates) - metrics *Metrics + metrics *metrics.Metrics storage *localfs.Storage scanInterval time.Duration saturationThreshold int } // NewSnapshotUpdater returns snapshot updater fascade -func NewSnapshotUpdater(ctx context.Context, cfg config.Configuration, metrics *Metrics, storage *localfs.Storage, callback func(msg interface{}, to system.Coordinates, from system.Coordinates)) SnapshotUpdater { +func NewSnapshotUpdater(ctx context.Context, saturation int, scanInterval time.Duration, metrics *metrics.Metrics, storage *localfs.Storage, callback func(msg interface{}, to system.Coordinates, from system.Coordinates)) SnapshotUpdater { return SnapshotUpdater{ - Support: NewDaemonSupport(ctx), + DaemonSupport: utils.NewDaemonSupport(ctx), callback: callback, metrics: metrics, storage: storage, - scanInterval: cfg.SnapshotScanInterval, - saturationThreshold: cfg.JournalSaturation, + scanInterval: scanInterval, + saturationThreshold: saturation, } } @@ -144,7 +144,7 @@ func (updater SnapshotUpdater) Start() { updater.MarkReady() select { - case <-updater.canStart: + case <-updater.CanStart: break case <-updater.Done(): return diff --git a/services/vault-unit/daemon/snapshot_updater_test.go b/services/vault-unit/persistence/snapshot_updater_test.go similarity index 67% rename from services/vault-unit/daemon/snapshot_updater_test.go rename to services/vault-unit/persistence/snapshot_updater_test.go index bef5177d..2eeadc23 100644 --- a/services/vault-unit/daemon/snapshot_updater_test.go +++ b/services/vault-unit/persistence/snapshot_updater_test.go @@ -1,14 +1,14 @@ -package daemon +package persistence import ( "context" "io/ioutil" "os" "testing" + "time" - "github.com/jancajthaml-openbank/vault-unit/config" + "github.com/jancajthaml-openbank/vault-unit/metrics" "github.com/jancajthaml-openbank/vault-unit/model" - "github.com/jancajthaml-openbank/vault-unit/persistence" system "github.com/jancajthaml-openbank/actor-system" localfs "github.com/jancajthaml-openbank/local-fs" @@ -28,13 +28,7 @@ func TestSnapshotUpdater(t *testing.T) { require.Nil(t, err) defer os.RemoveAll(tmpdir) - cfg := config.Configuration{ - Tenant: "tenant", - RootStorage: tmpdir, - JournalSaturation: 1, - } - - storage := localfs.NewStorage(cfg.RootStorage) + storage := localfs.NewStorage(tmpdir) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -50,18 +44,18 @@ func TestSnapshotUpdater(t *testing.T) { callbackCalled++ } - metrics := NewMetrics(ctx, cfg) - su := NewSnapshotUpdater(ctx, cfg, &metrics, &storage, callback) + metrics := metrics.NewMetrics(ctx, "tenant", "", time.Hour) + su := NewSnapshotUpdater(ctx, 1, time.Hour, &metrics, &storage, callback) - s := persistence.CreateAccount(&storage, "account_1", "EUR", true) + s := CreateAccount(&storage, "account_1", "EUR", true) require.NotNil(t, s) - require.True(t, persistence.PersistPromise(&storage, "account_1", 0, new(money.Dec), "transaction_1")) - s = persistence.UpdateAccount(&storage, "account_1", s) - require.True(t, persistence.PersistPromise(&storage, "account_1", 1, new(money.Dec), "transaction_2")) - require.True(t, persistence.PersistCommit(&storage, "account_1", 1, new(money.Dec), "transaction_2")) + require.True(t, PersistPromise(&storage, "account_1", 0, new(money.Dec), "transaction_1")) + s = UpdateAccount(&storage, "account_1", s) + require.True(t, PersistPromise(&storage, "account_1", 1, new(money.Dec), "transaction_2")) + require.True(t, PersistCommit(&storage, "account_1", 1, new(money.Dec), "transaction_2")) require.NotNil(t, s) - require.NotNil(t, persistence.CreateAccount(&storage, "account_2", "EUR", true)) + require.NotNil(t, CreateAccount(&storage, "account_2", "EUR", true)) t.Log("return valid accounts") { diff --git a/services/vault-rest/daemon/daemon.go b/services/vault-unit/utils/daemon.go similarity index 63% rename from services/vault-rest/daemon/daemon.go rename to services/vault-unit/utils/daemon.go index 56fd7636..9218c3c4 100644 --- a/services/vault-rest/daemon/daemon.go +++ b/services/vault-unit/utils/daemon.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package daemon +package utils import ( "context" @@ -24,55 +24,55 @@ type Daemon interface { WaitReady(deadline time.Duration) error } +// Support provides support for graceful shutdown +type DaemonSupport struct { + ctx context.Context + Cancel context.CancelFunc + ExitSignal chan struct{} + IsReady chan interface{} + CanStart chan interface{} +} + // NewDaemonSupport constructor -func NewDaemonSupport(parentCtx context.Context) Support { +func NewDaemonSupport(parentCtx context.Context) DaemonSupport { ctx, cancel := context.WithCancel(parentCtx) - return Support{ + return DaemonSupport{ ctx: ctx, - cancel: cancel, - exitSignal: make(chan struct{}), + Cancel: cancel, + ExitSignal: make(chan struct{}), IsReady: make(chan interface{}), - canStart: make(chan interface{}), + CanStart: make(chan interface{}), } } -// Support provides support for graceful shutdown -type Support struct { - ctx context.Context - cancel context.CancelFunc - exitSignal chan struct{} - IsReady chan interface{} - canStart chan interface{} -} - // GreenLight signals daemon to start work -func (s Support) GreenLight() { - s.canStart <- nil +func (daemon DaemonSupport) GreenLight() { + daemon.CanStart <- nil } // MarkDone signals daemon is finished -func (s Support) MarkDone() { - close(s.exitSignal) +func (daemon DaemonSupport) MarkDone() { + close(daemon.ExitSignal) } // MarkReady signals daemon is ready -func (s Support) MarkReady() { - s.IsReady <- nil +func (daemon DaemonSupport) MarkReady() { + daemon.IsReady <- nil } // Done cancel channel -func (s Support) Done() <-chan struct{} { - return s.ctx.Done() +func (daemon DaemonSupport) Done() <-chan struct{} { + return daemon.ctx.Done() } // Stop daemon and wait for graceful shutdown -func (s Support) Stop() { - s.cancel() - <-s.exitSignal +func (daemon DaemonSupport) Stop() { + daemon.Cancel() + <-daemon.ExitSignal } // Start daemon and wait for it to be ready -func (s Support) Start() { - s.MarkReady() - <-s.IsReady +func (daemon DaemonSupport) Start() { + daemon.MarkReady() + <-daemon.IsReady }