From 3979f23bc0c2ece65db0f757d079aad3452cdbf6 Mon Sep 17 00:00:00 2001 From: Tim Holm Date: Mon, 21 Feb 2022 14:47:07 +1100 Subject: [PATCH 1/8] fix: Query worker pool for config as code apis. --- go.mod | 2 +- go.sum | 4 ++-- pkg/cmd/run/root.go | 24 +++++++++++++++++++----- pkg/run/run.go | 12 ++++++------ pkg/stack/membraneversion.txt | 2 +- 5 files changed, 29 insertions(+), 15 deletions(-) diff --git a/go.mod b/go.mod index df13b244d..c339b9e88 100644 --- a/go.mod +++ b/go.mod @@ -25,7 +25,7 @@ require ( github.com/moby/buildkit v0.9.3 // indirect github.com/moby/moby v20.10.12+incompatible github.com/nitrictech/boxygen v0.0.1-rc.7.0.20211212231606-62c668408f91 - github.com/nitrictech/nitric v0.13.0-rc.11 + github.com/nitrictech/nitric v0.13.0-rc.14 github.com/pkg/errors v0.9.1 github.com/pterm/pterm v0.12.34 github.com/pulumi/pulumi-aws/sdk/v4 v4.33.0 diff --git a/go.sum b/go.sum index b8af6de67..ef5ef1dbe 100644 --- a/go.sum +++ b/go.sum @@ -1287,8 +1287,8 @@ github.com/nishanths/predeclared v0.2.1 h1:1TXtjmy4f3YCFjTxRd8zcFHOmoUir+gp0ESzj github.com/nishanths/predeclared v0.2.1/go.mod h1:HvkGJcA3naj4lOwnFXFDkFxVtSqQMB9sbB1usJ+xjQE= github.com/nitrictech/boxygen v0.0.1-rc.7.0.20211212231606-62c668408f91 h1:gtZZJc7l5pML1eRsqyXe0U7NdQxSa7u/cbyEvnGLBpc= github.com/nitrictech/boxygen v0.0.1-rc.7.0.20211212231606-62c668408f91/go.mod h1:2XXi1xEwqitH4/gus1bHyG/IQe8WOniK+pybGTz2y/Y= -github.com/nitrictech/nitric v0.13.0-rc.11 h1:nn4j71Wk32KV0j3zN5y/S8eCpVYjeR/E3PYOkFgt1lY= -github.com/nitrictech/nitric v0.13.0-rc.11/go.mod h1:XC6DG1/NrMc59Jzq/1h6SLn6L4foSS67pCqyTpauT3o= +github.com/nitrictech/nitric v0.13.0-rc.14 h1:jxQaKQRV4cejMVKZ2OiqS5xRwRoN4z+96bETqRmvTmc= +github.com/nitrictech/nitric v0.13.0-rc.14/go.mod h1:XC6DG1/NrMc59Jzq/1h6SLn6L4foSS67pCqyTpauT3o= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= diff --git a/pkg/cmd/run/root.go b/pkg/cmd/run/root.go index 54a9b1cb1..311264d89 100644 --- a/pkg/cmd/run/root.go +++ b/pkg/cmd/run/root.go @@ -33,6 +33,7 @@ import ( "github.com/nitrictech/cli/pkg/run" "github.com/nitrictech/cli/pkg/stack" "github.com/nitrictech/cli/pkg/tasklet" + "github.com/nitrictech/nitric/pkg/worker" ) var runCmd = &cobra.Command{ @@ -85,11 +86,16 @@ nitric run -s ../projectX/ "functions/*.ts"`, ls := run.NewLocalServices(s.Name, s.Dir) memerr := make(chan error) + pool := worker.NewProcessPool(&worker.ProcessPoolOptions{ + MinWorkers: 0, + MaxWorkers: 100, + }) + startLocalServices := tasklet.Runner{ StartMsg: "Starting Local Services", Runner: func(progress output.Progress) error { go func(errch chan error) { - errch <- ls.Start() + errch <- ls.Start(pool) }(memerr) for { @@ -143,16 +149,24 @@ nitric run -s ../projectX/ "functions/*.ts"`, Api string `yaml:"api"` Endpoint string `yaml:"endpoint"` } - apis := []apiendpoint{} + apis := make(map[string]string) + + <-time.NewTimer(time.Second * 2).C - for a := range s.ApiDocs { - apis = append(apis, apiendpoint{Api: a, Endpoint: fmt.Sprintf("http://127.0.0.1:9001/apis/%s", a)}) + workers := pool.GetWorkers(&worker.GetWorkerOptions{}) + + for _, w := range workers { + if apiW, ok := w.(*worker.RouteWorker); ok { + if _, ok := apis[apiW.Api()]; !ok { + apis[apiW.Api()] = fmt.Sprintf("http://127.0.0.1:9001/apis/%s", apiW.Api()) + } + } } if len(apis) == 0 { // if we have a nitric.yaml then ApiDocs will be empty for a := range s.Apis { - apis = append(apis, apiendpoint{Api: a, Endpoint: fmt.Sprintf("http://127.0.0.1:9001/apis/%s", a)}) + apis[a] = fmt.Sprintf("http://127.0.0.1:9001/apis/%s", a) } } output.Print(apis) diff --git a/pkg/run/run.go b/pkg/run/run.go index ec1fb7d64..b72077b88 100644 --- a/pkg/run/run.go +++ b/pkg/run/run.go @@ -34,7 +34,7 @@ import ( ) type LocalServices interface { - Start() error + Start(pool worker.WorkerPool) error Stop() error Running() bool Status() *LocalServicesStatus @@ -85,7 +85,7 @@ func (l *localServices) Status() *LocalServicesStatus { return l.status } -func (l *localServices) Start() error { +func (l *localServices) Start(pool worker.WorkerPool) error { var err error l.mio, err = NewMinio(l.status.RunDir, "minio") if err != nil { @@ -132,10 +132,10 @@ func (l *localServices) Start() error { // Create a new Worker Pool // TODO: We may want to override GetWorker on the default ProcessPool // For now we'll use the default and expand from there - pool := worker.NewProcessPool(&worker.ProcessPoolOptions{ - MinWorkers: 0, - MaxWorkers: 100, - }) + // pool := worker.NewProcessPool(&worker.ProcessPoolOptions{ + // MinWorkers: 0, + // MaxWorkers: 100, + // }) ev, err := NewEvents(pool) if err != nil { diff --git a/pkg/stack/membraneversion.txt b/pkg/stack/membraneversion.txt index 63d0d9f1e..5a339538a 100644 --- a/pkg/stack/membraneversion.txt +++ b/pkg/stack/membraneversion.txt @@ -1 +1 @@ -v0.13.0-rc.11 \ No newline at end of file +v0.13.0-rc.14 \ No newline at end of file From a51665d4515290e7cea227388da6f2246ec079ca Mon Sep 17 00:00:00 2001 From: Tim Holm Date: Mon, 21 Feb 2022 15:09:18 +1100 Subject: [PATCH 2/8] feat: Add reactive worker pool. --- pkg/run/pool.go | 66 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 pkg/run/pool.go diff --git a/pkg/run/pool.go b/pkg/run/pool.go new file mode 100644 index 000000000..547760b80 --- /dev/null +++ b/pkg/run/pool.go @@ -0,0 +1,66 @@ +package run + +import "github.com/nitrictech/nitric/pkg/worker" + +type WorkerEventType string + +const ( + WorkerEventType_Add WorkerEventType = "add" + WorkerEventType_Remove WorkerEventType = "remove" +) + +type WorkerEvent struct { + Type WorkerEventType + Worker worker.Worker +} + +type WorkerListener = func(WorkerEvent) + +type RunProcessPool struct { + worker.WorkerPool + listeners []WorkerListener +} + +func (r *RunProcessPool) notifyListeners(evt WorkerEvent) { + for _, l := range r.listeners { + l(evt) + } +} + +func (r *RunProcessPool) AddWorker(w worker.Worker) error { + if err := r.WorkerPool.AddWorker(w); err != nil { + return err + } + // notify listener of successfully added worker + r.notifyListeners(WorkerEvent{ + Type: WorkerEventType_Add, + Worker: w, + }) + return nil +} + +func (r *RunProcessPool) RemoveWorker(w worker.Worker) error { + if err := r.WorkerPool.RemoveWorker(w); err != nil { + return err + } + // notify listener of successfully removed worker + r.notifyListeners(WorkerEvent{ + Type: WorkerEventType_Remove, + Worker: w, + }) + return nil +} + +func (r *RunProcessPool) Listen(l WorkerListener) { + r.listeners = append(r.listeners, l) +} + +func NewRunProcessPool() *RunProcessPool { + return &RunProcessPool{ + listeners: make([]WorkerListener, 0), + WorkerPool: worker.NewProcessPool(&worker.ProcessPoolOptions{ + MinWorkers: 0, + MaxWorkers: 100, + }), + } +} From d454fd3026854bb4e7d53da2da7d4503dc6b7a5a Mon Sep 17 00:00:00 2001 From: Tim Holm Date: Mon, 21 Feb 2022 16:44:37 +1100 Subject: [PATCH 3/8] feat: Add reactive api table updates. --- go.mod | 2 +- go.sum | 4 +-- pkg/cmd/run/root.go | 48 ++++++++++++---------------- pkg/run/stack.go | 59 +++++++++++++++++++++++++++++++++++ pkg/stack/membraneversion.txt | 2 +- 5 files changed, 83 insertions(+), 32 deletions(-) create mode 100644 pkg/run/stack.go diff --git a/go.mod b/go.mod index c339b9e88..59b4bc527 100644 --- a/go.mod +++ b/go.mod @@ -25,7 +25,7 @@ require ( github.com/moby/buildkit v0.9.3 // indirect github.com/moby/moby v20.10.12+incompatible github.com/nitrictech/boxygen v0.0.1-rc.7.0.20211212231606-62c668408f91 - github.com/nitrictech/nitric v0.13.0-rc.14 + github.com/nitrictech/nitric v0.13.0-rc.16 github.com/pkg/errors v0.9.1 github.com/pterm/pterm v0.12.34 github.com/pulumi/pulumi-aws/sdk/v4 v4.33.0 diff --git a/go.sum b/go.sum index ef5ef1dbe..b5e6b6c88 100644 --- a/go.sum +++ b/go.sum @@ -1287,8 +1287,8 @@ github.com/nishanths/predeclared v0.2.1 h1:1TXtjmy4f3YCFjTxRd8zcFHOmoUir+gp0ESzj github.com/nishanths/predeclared v0.2.1/go.mod h1:HvkGJcA3naj4lOwnFXFDkFxVtSqQMB9sbB1usJ+xjQE= github.com/nitrictech/boxygen v0.0.1-rc.7.0.20211212231606-62c668408f91 h1:gtZZJc7l5pML1eRsqyXe0U7NdQxSa7u/cbyEvnGLBpc= github.com/nitrictech/boxygen v0.0.1-rc.7.0.20211212231606-62c668408f91/go.mod h1:2XXi1xEwqitH4/gus1bHyG/IQe8WOniK+pybGTz2y/Y= -github.com/nitrictech/nitric v0.13.0-rc.14 h1:jxQaKQRV4cejMVKZ2OiqS5xRwRoN4z+96bETqRmvTmc= -github.com/nitrictech/nitric v0.13.0-rc.14/go.mod h1:XC6DG1/NrMc59Jzq/1h6SLn6L4foSS67pCqyTpauT3o= +github.com/nitrictech/nitric v0.13.0-rc.16 h1:YZ/DZThhRohjcmuHh+R7YFFd5DvfN9ufbs60kuNfTyE= +github.com/nitrictech/nitric v0.13.0-rc.16/go.mod h1:XC6DG1/NrMc59Jzq/1h6SLn6L4foSS67pCqyTpauT3o= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= diff --git a/pkg/cmd/run/root.go b/pkg/cmd/run/root.go index 311264d89..382ddbda3 100644 --- a/pkg/cmd/run/root.go +++ b/pkg/cmd/run/root.go @@ -24,6 +24,7 @@ import ( "time" "github.com/pkg/errors" + "github.com/pterm/pterm" "github.com/spf13/cobra" "github.com/nitrictech/cli/pkg/build" @@ -33,7 +34,6 @@ import ( "github.com/nitrictech/cli/pkg/run" "github.com/nitrictech/cli/pkg/stack" "github.com/nitrictech/cli/pkg/tasklet" - "github.com/nitrictech/nitric/pkg/worker" ) var runCmd = &cobra.Command{ @@ -86,10 +86,7 @@ nitric run -s ../projectX/ "functions/*.ts"`, ls := run.NewLocalServices(s.Name, s.Dir) memerr := make(chan error) - pool := worker.NewProcessPool(&worker.ProcessPoolOptions{ - MinWorkers: 0, - MaxWorkers: 100, - }) + pool := run.NewRunProcessPool() startLocalServices := tasklet.Runner{ StartMsg: "Starting Local Services", @@ -143,33 +140,27 @@ nitric run -s ../projectX/ "functions/*.ts"`, } tasklet.MustRun(startFunctions, tasklet.Opts{Signal: term}) - fmt.Println("Local running, use ctrl-C to stop") - - type apiendpoint struct { - Api string `yaml:"api"` - Endpoint string `yaml:"endpoint"` - } - apis := make(map[string]string) + pterm.DefaultBasicText.Println("Local running, use ctrl-C to stop") - <-time.NewTimer(time.Second * 2).C + stackState := run.NewStackState() - workers := pool.GetWorkers(&worker.GetWorkerOptions{}) + area, _ := pterm.DefaultArea.Start() + // React to worker pool state and update services table + pool.Listen(func(we run.WorkerEvent) { + stackState.UpdateFromWorkerEvent(we) + apiTable := stackState.ApiTable(9001) + area.Update(apiTable) - for _, w := range workers { - if apiW, ok := w.(*worker.RouteWorker); ok { - if _, ok := apis[apiW.Api()]; !ok { - apis[apiW.Api()] = fmt.Sprintf("http://127.0.0.1:9001/apis/%s", apiW.Api()) - } - } - } + }) - if len(apis) == 0 { - // if we have a nitric.yaml then ApiDocs will be empty - for a := range s.Apis { - apis[a] = fmt.Sprintf("http://127.0.0.1:9001/apis/%s", a) - } - } - output.Print(apis) + // TODO: revisit nitric.yaml support for this output + // if len(apis) == 0 { + // // if we have a nitric.yaml then ApiDocs will be empty + // for a := range s.Apis { + // apis[a] = fmt.Sprintf("http://127.0.0.1:9001/apis/%s", a) + // } + // } + // output.Print(apis) select { case membraneError := <-memerr: @@ -184,6 +175,7 @@ nitric run -s ../projectX/ "functions/*.ts"`, } } + area.Stop() _ = logger.Stop() // Stop the membrane cobra.CheckErr(ls.Stop()) diff --git a/pkg/run/stack.go b/pkg/run/stack.go new file mode 100644 index 000000000..54de56d07 --- /dev/null +++ b/pkg/run/stack.go @@ -0,0 +1,59 @@ +package run + +import ( + "fmt" + + "github.com/nitrictech/nitric/pkg/worker" + "github.com/pterm/pterm" +) + +type RunStackState struct { + apis map[string]int +} + +func (r *RunStackState) UpdateFromWorkerEvent(evt WorkerEvent) { + if evt.Type == WorkerEventType_Add { + switch evt.Worker.(type) { + case *worker.RouteWorker: + w := evt.Worker.(*worker.RouteWorker) + + if _, ok := r.apis[w.Api()]; !ok { + r.apis[w.Api()] = 1 + } else { + r.apis[w.Api()] = r.apis[w.Api()] + 1 + } + } + } else if evt.Type == WorkerEventType_Remove { + switch evt.Worker.(type) { + case *worker.RouteWorker: + w := evt.Worker.(*worker.RouteWorker) + + r.apis[w.Api()] = r.apis[w.Api()] - 1 + + if r.apis[w.Api()] <= 0 { + // Remove the key if the reference count is 0 or less + delete(r.apis, w.Api()) + } + } + } +} + +func (r *RunStackState) ApiTable(port int) string { + tableData := pterm.TableData{{"Api", "Endpoint"}} + + for k := range r.apis { + tableData = append(tableData, []string{ + k, fmt.Sprintf("http://localhost:%d/apis/%s", port, k), + }) + } + + str, _ := pterm.DefaultTable.WithHasHeader().WithData(tableData).Srender() + + return str +} + +func NewStackState() *RunStackState { + return &RunStackState{ + apis: map[string]int{}, + } +} diff --git a/pkg/stack/membraneversion.txt b/pkg/stack/membraneversion.txt index 5a339538a..ef30c8083 100644 --- a/pkg/stack/membraneversion.txt +++ b/pkg/stack/membraneversion.txt @@ -1 +1 @@ -v0.13.0-rc.14 \ No newline at end of file +v0.13.0-rc.16 \ No newline at end of file From 9ddeed67aa7b4b11635793a21a91134890d0e978 Mon Sep 17 00:00:00 2001 From: Tim Holm Date: Tue, 22 Feb 2022 09:21:13 +1100 Subject: [PATCH 4/8] chore: Remove obsolete commented out code. --- pkg/run/run.go | 8 -------- 1 file changed, 8 deletions(-) diff --git a/pkg/run/run.go b/pkg/run/run.go index b72077b88..48c07ef53 100644 --- a/pkg/run/run.go +++ b/pkg/run/run.go @@ -129,14 +129,6 @@ func (l *localServices) Start(pool worker.WorkerPool) error { return err } - // Create a new Worker Pool - // TODO: We may want to override GetWorker on the default ProcessPool - // For now we'll use the default and expand from there - // pool := worker.NewProcessPool(&worker.ProcessPoolOptions{ - // MinWorkers: 0, - // MaxWorkers: 100, - // }) - ev, err := NewEvents(pool) if err != nil { return err From 10f83c480d9c346cdb0c9d74bd53167064842b44 Mon Sep 17 00:00:00 2001 From: Tim Holm Date: Tue, 22 Feb 2022 15:26:27 +1100 Subject: [PATCH 5/8] chore: Update function_helpers tests. --- pkg/stack/function_helpers_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/stack/function_helpers_test.go b/pkg/stack/function_helpers_test.go index 37cf4c907..8d4d042ea 100644 --- a/pkg/stack/function_helpers_test.go +++ b/pkg/stack/function_helpers_test.go @@ -29,7 +29,7 @@ func TestFunctionVersionString(t *testing.T) { }{ { name: "from embed", - want: "v0.13.0-rc.11", + want: "v0.13.0-rc.17", }, { name: "from function", From d2a375bc0ee6518a5f2e16becec85e8373c8cd98 Mon Sep 17 00:00:00 2001 From: Tim Holm Date: Tue, 22 Feb 2022 12:11:10 +1100 Subject: [PATCH 6/8] chore: address lint errors --- pkg/cmd/run/root.go | 3 +-- pkg/run/pool.go | 16 ++++++++++++++++ pkg/run/stack.go | 19 ++++++++++++++++++- 3 files changed, 35 insertions(+), 3 deletions(-) diff --git a/pkg/cmd/run/root.go b/pkg/cmd/run/root.go index 382ddbda3..f216a4ecf 100644 --- a/pkg/cmd/run/root.go +++ b/pkg/cmd/run/root.go @@ -150,7 +150,6 @@ nitric run -s ../projectX/ "functions/*.ts"`, stackState.UpdateFromWorkerEvent(we) apiTable := stackState.ApiTable(9001) area.Update(apiTable) - }) // TODO: revisit nitric.yaml support for this output @@ -175,7 +174,7 @@ nitric run -s ../projectX/ "functions/*.ts"`, } } - area.Stop() + _ = area.Stop() _ = logger.Stop() // Stop the membrane cobra.CheckErr(ls.Stop()) diff --git a/pkg/run/pool.go b/pkg/run/pool.go index 547760b80..4153ad565 100644 --- a/pkg/run/pool.go +++ b/pkg/run/pool.go @@ -1,3 +1,19 @@ +// Copyright Nitric Pty Ltd. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package run import "github.com/nitrictech/nitric/pkg/worker" diff --git a/pkg/run/stack.go b/pkg/run/stack.go index 54de56d07..d67b0a0b5 100644 --- a/pkg/run/stack.go +++ b/pkg/run/stack.go @@ -1,10 +1,27 @@ +// Copyright Nitric Pty Ltd. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package run import ( "fmt" - "github.com/nitrictech/nitric/pkg/worker" "github.com/pterm/pterm" + + "github.com/nitrictech/nitric/pkg/worker" ) type RunStackState struct { From 127c686ba4b31049347b6df6d307efac7e777752 Mon Sep 17 00:00:00 2001 From: Tim Holm Date: Tue, 22 Feb 2022 13:19:56 +1100 Subject: [PATCH 7/8] feat: Add logging for subscribers and schedules. --- go.mod | 2 +- go.sum | 4 +- pkg/cmd/run/root.go | 8 +++- pkg/run/stack.go | 72 ++++++++++++++++++++++++++++++++++- pkg/stack/membraneversion.txt | 2 +- 5 files changed, 81 insertions(+), 7 deletions(-) diff --git a/go.mod b/go.mod index 59b4bc527..6630279fe 100644 --- a/go.mod +++ b/go.mod @@ -25,7 +25,7 @@ require ( github.com/moby/buildkit v0.9.3 // indirect github.com/moby/moby v20.10.12+incompatible github.com/nitrictech/boxygen v0.0.1-rc.7.0.20211212231606-62c668408f91 - github.com/nitrictech/nitric v0.13.0-rc.16 + github.com/nitrictech/nitric v0.13.0-rc.17 github.com/pkg/errors v0.9.1 github.com/pterm/pterm v0.12.34 github.com/pulumi/pulumi-aws/sdk/v4 v4.33.0 diff --git a/go.sum b/go.sum index b5e6b6c88..ad08ebefd 100644 --- a/go.sum +++ b/go.sum @@ -1287,8 +1287,8 @@ github.com/nishanths/predeclared v0.2.1 h1:1TXtjmy4f3YCFjTxRd8zcFHOmoUir+gp0ESzj github.com/nishanths/predeclared v0.2.1/go.mod h1:HvkGJcA3naj4lOwnFXFDkFxVtSqQMB9sbB1usJ+xjQE= github.com/nitrictech/boxygen v0.0.1-rc.7.0.20211212231606-62c668408f91 h1:gtZZJc7l5pML1eRsqyXe0U7NdQxSa7u/cbyEvnGLBpc= github.com/nitrictech/boxygen v0.0.1-rc.7.0.20211212231606-62c668408f91/go.mod h1:2XXi1xEwqitH4/gus1bHyG/IQe8WOniK+pybGTz2y/Y= -github.com/nitrictech/nitric v0.13.0-rc.16 h1:YZ/DZThhRohjcmuHh+R7YFFd5DvfN9ufbs60kuNfTyE= -github.com/nitrictech/nitric v0.13.0-rc.16/go.mod h1:XC6DG1/NrMc59Jzq/1h6SLn6L4foSS67pCqyTpauT3o= +github.com/nitrictech/nitric v0.13.0-rc.17 h1:Pv6aGNP/+kHNVt87QfT06bqpbKIy2s3SYcqYXDIRKE4= +github.com/nitrictech/nitric v0.13.0-rc.17/go.mod h1:XC6DG1/NrMc59Jzq/1h6SLn6L4foSS67pCqyTpauT3o= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= diff --git a/pkg/cmd/run/root.go b/pkg/cmd/run/root.go index f216a4ecf..8389cbf73 100644 --- a/pkg/cmd/run/root.go +++ b/pkg/cmd/run/root.go @@ -20,6 +20,7 @@ import ( "fmt" "os" "os/signal" + "sync" "syscall" "time" @@ -145,11 +146,16 @@ nitric run -s ../projectX/ "functions/*.ts"`, stackState := run.NewStackState() area, _ := pterm.DefaultArea.Start() + lck := sync.Mutex{} // React to worker pool state and update services table pool.Listen(func(we run.WorkerEvent) { + lck.Lock() + defer lck.Unlock() stackState.UpdateFromWorkerEvent(we) apiTable := stackState.ApiTable(9001) - area.Update(apiTable) + topicsTable := stackState.TopicTable(9001) + schedTable := stackState.Schedules(9001) + area.Update(apiTable, "\n\n", topicsTable, "\n\n", schedTable) }) // TODO: revisit nitric.yaml support for this output diff --git a/pkg/run/stack.go b/pkg/run/stack.go index d67b0a0b5..51554c508 100644 --- a/pkg/run/stack.go +++ b/pkg/run/stack.go @@ -18,6 +18,7 @@ package run import ( "fmt" + "strings" "github.com/pterm/pterm" @@ -25,7 +26,9 @@ import ( ) type RunStackState struct { - apis map[string]int + apis map[string]int + subs map[string]int + schedules map[string]int } func (r *RunStackState) UpdateFromWorkerEvent(evt WorkerEvent) { @@ -39,6 +42,22 @@ func (r *RunStackState) UpdateFromWorkerEvent(evt WorkerEvent) { } else { r.apis[w.Api()] = r.apis[w.Api()] + 1 } + case *worker.SubscriptionWorker: + w := evt.Worker.(*worker.SubscriptionWorker) + + if _, ok := r.subs[w.Topic()]; !ok { + r.subs[w.Topic()] = 1 + } else { + r.subs[w.Topic()] = r.subs[w.Topic()] + 1 + } + case *worker.ScheduleWorker: + w := evt.Worker.(*worker.ScheduleWorker) + + if _, ok := r.schedules[w.Key()]; !ok { + r.schedules[w.Key()] = 1 + } else { + r.schedules[w.Key()] = r.schedules[w.Key()] + 1 + } } } else if evt.Type == WorkerEventType_Remove { switch evt.Worker.(type) { @@ -51,6 +70,24 @@ func (r *RunStackState) UpdateFromWorkerEvent(evt WorkerEvent) { // Remove the key if the reference count is 0 or less delete(r.apis, w.Api()) } + case *worker.SubscriptionWorker: + w := evt.Worker.(*worker.SubscriptionWorker) + + r.subs[w.Topic()] = r.subs[w.Topic()] - 1 + + if r.subs[w.Topic()] <= 0 { + // Remove the key if the reference count is 0 or less + delete(r.subs, w.Topic()) + } + case *worker.ScheduleWorker: + w := evt.Worker.(*worker.ScheduleWorker) + + r.schedules[w.Key()] = r.schedules[w.Key()] - 1 + + if r.schedules[w.Key()] <= 0 { + // Remove the key if the reference count is 0 or less + delete(r.schedules, w.Key()) + } } } } @@ -69,8 +106,39 @@ func (r *RunStackState) ApiTable(port int) string { return str } +func (r *RunStackState) TopicTable(port int) string { + tableData := pterm.TableData{{"Topic", "Endpoint"}} + + for k := range r.subs { + tableData = append(tableData, []string{ + k, fmt.Sprintf("http://localhost:%d/topics/%s", port, k), + }) + } + + str, _ := pterm.DefaultTable.WithHasHeader().WithData(tableData).Srender() + + return str +} + +func (r *RunStackState) Schedules(port int) string { + tableData := pterm.TableData{{"Schedule", "Endpoint"}} + + for k := range r.schedules { + nKey := strings.ToLower(strings.ReplaceAll(k, " ", "-")) + tableData = append(tableData, []string{ + k, fmt.Sprintf("http://localhost:%d/topics/%s", port, nKey), + }) + } + + str, _ := pterm.DefaultTable.WithHasHeader().WithData(tableData).Srender() + + return str +} + func NewStackState() *RunStackState { return &RunStackState{ - apis: map[string]int{}, + apis: map[string]int{}, + subs: map[string]int{}, + schedules: map[string]int{}, } } diff --git a/pkg/stack/membraneversion.txt b/pkg/stack/membraneversion.txt index ef30c8083..3f46135c5 100644 --- a/pkg/stack/membraneversion.txt +++ b/pkg/stack/membraneversion.txt @@ -1 +1 @@ -v0.13.0-rc.16 \ No newline at end of file +v0.13.0-rc.17 \ No newline at end of file From d53d566efbe45a4447d2ab8f258e4c69242fd326 Mon Sep 17 00:00:00 2001 From: Tim Holm Date: Tue, 22 Feb 2022 13:38:51 +1100 Subject: [PATCH 8/8] chore: cleanup. --- pkg/cmd/run/root.go | 11 +++++++---- pkg/run/stack.go | 2 +- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/pkg/cmd/run/root.go b/pkg/cmd/run/root.go index 8389cbf73..2836f4a0b 100644 --- a/pkg/cmd/run/root.go +++ b/pkg/cmd/run/root.go @@ -152,10 +152,13 @@ nitric run -s ../projectX/ "functions/*.ts"`, lck.Lock() defer lck.Unlock() stackState.UpdateFromWorkerEvent(we) - apiTable := stackState.ApiTable(9001) - topicsTable := stackState.TopicTable(9001) - schedTable := stackState.Schedules(9001) - area.Update(apiTable, "\n\n", topicsTable, "\n\n", schedTable) + area.Update( + stackState.ApiTable(9001), + "\n\n", + stackState.TopicTable(9001), + "\n\n", + stackState.SchedulesTable(9001), + ) }) // TODO: revisit nitric.yaml support for this output diff --git a/pkg/run/stack.go b/pkg/run/stack.go index 51554c508..b094e4761 100644 --- a/pkg/run/stack.go +++ b/pkg/run/stack.go @@ -120,7 +120,7 @@ func (r *RunStackState) TopicTable(port int) string { return str } -func (r *RunStackState) Schedules(port int) string { +func (r *RunStackState) SchedulesTable(port int) string { tableData := pterm.TableData{{"Schedule", "Endpoint"}} for k := range r.schedules {