Skip to content

Commit

Permalink
Merge pull request #79 from nitrictech/feat/api-listing
Browse files Browse the repository at this point in the history
Add API listing for config as code
  • Loading branch information
tjholm authored Feb 22, 2022
2 parents 8b4f1b1 + d53d566 commit aacc733
Show file tree
Hide file tree
Showing 8 changed files with 266 additions and 34 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ require (
github.com/moby/buildkit v0.9.3 // indirect
github.com/moby/moby v20.10.12+incompatible
github.com/nitrictech/boxygen v0.0.1-rc.7.0.20211212231606-62c668408f91
github.com/nitrictech/nitric v0.13.0-rc.11
github.com/nitrictech/nitric v0.13.0-rc.17
github.com/pkg/errors v0.9.1
github.com/pterm/pterm v0.12.34
github.com/pulumi/pulumi-aws/sdk/v4 v4.33.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1287,8 +1287,8 @@ github.com/nishanths/predeclared v0.2.1 h1:1TXtjmy4f3YCFjTxRd8zcFHOmoUir+gp0ESzj
github.com/nishanths/predeclared v0.2.1/go.mod h1:HvkGJcA3naj4lOwnFXFDkFxVtSqQMB9sbB1usJ+xjQE=
github.com/nitrictech/boxygen v0.0.1-rc.7.0.20211212231606-62c668408f91 h1:gtZZJc7l5pML1eRsqyXe0U7NdQxSa7u/cbyEvnGLBpc=
github.com/nitrictech/boxygen v0.0.1-rc.7.0.20211212231606-62c668408f91/go.mod h1:2XXi1xEwqitH4/gus1bHyG/IQe8WOniK+pybGTz2y/Y=
github.com/nitrictech/nitric v0.13.0-rc.11 h1:nn4j71Wk32KV0j3zN5y/S8eCpVYjeR/E3PYOkFgt1lY=
github.com/nitrictech/nitric v0.13.0-rc.11/go.mod h1:XC6DG1/NrMc59Jzq/1h6SLn6L4foSS67pCqyTpauT3o=
github.com/nitrictech/nitric v0.13.0-rc.17 h1:Pv6aGNP/+kHNVt87QfT06bqpbKIy2s3SYcqYXDIRKE4=
github.com/nitrictech/nitric v0.13.0-rc.17/go.mod h1:XC6DG1/NrMc59Jzq/1h6SLn6L4foSS67pCqyTpauT3o=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
Expand Down
52 changes: 33 additions & 19 deletions pkg/cmd/run/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ import (
"fmt"
"os"
"os/signal"
"sync"
"syscall"
"time"

"github.com/pkg/errors"
"github.com/pterm/pterm"
"github.com/spf13/cobra"

"github.com/nitrictech/cli/pkg/build"
Expand Down Expand Up @@ -85,11 +87,13 @@ nitric run -s ../projectX/ "functions/*.ts"`,
ls := run.NewLocalServices(s.Name, s.Dir)
memerr := make(chan error)

pool := run.NewRunProcessPool()

startLocalServices := tasklet.Runner{
StartMsg: "Starting Local Services",
Runner: func(progress output.Progress) error {
go func(errch chan error) {
errch <- ls.Start()
errch <- ls.Start(pool)
}(memerr)

for {
Expand Down Expand Up @@ -137,25 +141,34 @@ nitric run -s ../projectX/ "functions/*.ts"`,
}
tasklet.MustRun(startFunctions, tasklet.Opts{Signal: term})

fmt.Println("Local running, use ctrl-C to stop")

type apiendpoint struct {
Api string `yaml:"api"`
Endpoint string `yaml:"endpoint"`
}
apis := []apiendpoint{}

for a := range s.ApiDocs {
apis = append(apis, apiendpoint{Api: a, Endpoint: fmt.Sprintf("http://127.0.0.1:9001/apis/%s", a)})
}
pterm.DefaultBasicText.Println("Local running, use ctrl-C to stop")

stackState := run.NewStackState()

area, _ := pterm.DefaultArea.Start()
lck := sync.Mutex{}
// React to worker pool state and update services table
pool.Listen(func(we run.WorkerEvent) {
lck.Lock()
defer lck.Unlock()
stackState.UpdateFromWorkerEvent(we)
area.Update(
stackState.ApiTable(9001),
"\n\n",
stackState.TopicTable(9001),
"\n\n",
stackState.SchedulesTable(9001),
)
})

if len(apis) == 0 {
// if we have a nitric.yaml then ApiDocs will be empty
for a := range s.Apis {
apis = append(apis, apiendpoint{Api: a, Endpoint: fmt.Sprintf("http://127.0.0.1:9001/apis/%s", a)})
}
}
output.Print(apis)
// TODO: revisit nitric.yaml support for this output
// if len(apis) == 0 {
// // if we have a nitric.yaml then ApiDocs will be empty
// for a := range s.Apis {
// apis[a] = fmt.Sprintf("http://127.0.0.1:9001/apis/%s", a)
// }
// }
// output.Print(apis)

select {
case membraneError := <-memerr:
Expand All @@ -170,6 +183,7 @@ nitric run -s ../projectX/ "functions/*.ts"`,
}
}

_ = area.Stop()
_ = logger.Stop()
// Stop the membrane
cobra.CheckErr(ls.Stop())
Expand Down
82 changes: 82 additions & 0 deletions pkg/run/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright Nitric Pty Ltd.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package run

import "github.com/nitrictech/nitric/pkg/worker"

type WorkerEventType string

const (
WorkerEventType_Add WorkerEventType = "add"
WorkerEventType_Remove WorkerEventType = "remove"
)

type WorkerEvent struct {
Type WorkerEventType
Worker worker.Worker
}

type WorkerListener = func(WorkerEvent)

type RunProcessPool struct {
worker.WorkerPool
listeners []WorkerListener
}

func (r *RunProcessPool) notifyListeners(evt WorkerEvent) {
for _, l := range r.listeners {
l(evt)
}
}

func (r *RunProcessPool) AddWorker(w worker.Worker) error {
if err := r.WorkerPool.AddWorker(w); err != nil {
return err
}
// notify listener of successfully added worker
r.notifyListeners(WorkerEvent{
Type: WorkerEventType_Add,
Worker: w,
})
return nil
}

func (r *RunProcessPool) RemoveWorker(w worker.Worker) error {
if err := r.WorkerPool.RemoveWorker(w); err != nil {
return err
}
// notify listener of successfully removed worker
r.notifyListeners(WorkerEvent{
Type: WorkerEventType_Remove,
Worker: w,
})
return nil
}

func (r *RunProcessPool) Listen(l WorkerListener) {
r.listeners = append(r.listeners, l)
}

func NewRunProcessPool() *RunProcessPool {
return &RunProcessPool{
listeners: make([]WorkerListener, 0),
WorkerPool: worker.NewProcessPool(&worker.ProcessPoolOptions{
MinWorkers: 0,
MaxWorkers: 100,
}),
}
}
12 changes: 2 additions & 10 deletions pkg/run/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
)

type LocalServices interface {
Start() error
Start(pool worker.WorkerPool) error
Stop() error
Running() bool
Status() *LocalServicesStatus
Expand Down Expand Up @@ -85,7 +85,7 @@ func (l *localServices) Status() *LocalServicesStatus {
return l.status
}

func (l *localServices) Start() error {
func (l *localServices) Start(pool worker.WorkerPool) error {
var err error
l.mio, err = NewMinio(l.status.RunDir, "minio")
if err != nil {
Expand Down Expand Up @@ -129,14 +129,6 @@ func (l *localServices) Start() error {
return err
}

// Create a new Worker Pool
// TODO: We may want to override GetWorker on the default ProcessPool
// For now we'll use the default and expand from there
pool := worker.NewProcessPool(&worker.ProcessPoolOptions{
MinWorkers: 0,
MaxWorkers: 100,
})

ev, err := NewEvents(pool)
if err != nil {
return err
Expand Down
144 changes: 144 additions & 0 deletions pkg/run/stack.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// Copyright Nitric Pty Ltd.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package run

import (
"fmt"
"strings"

"github.com/pterm/pterm"

"github.com/nitrictech/nitric/pkg/worker"
)

type RunStackState struct {
apis map[string]int
subs map[string]int
schedules map[string]int
}

func (r *RunStackState) UpdateFromWorkerEvent(evt WorkerEvent) {
if evt.Type == WorkerEventType_Add {
switch evt.Worker.(type) {
case *worker.RouteWorker:
w := evt.Worker.(*worker.RouteWorker)

if _, ok := r.apis[w.Api()]; !ok {
r.apis[w.Api()] = 1
} else {
r.apis[w.Api()] = r.apis[w.Api()] + 1
}
case *worker.SubscriptionWorker:
w := evt.Worker.(*worker.SubscriptionWorker)

if _, ok := r.subs[w.Topic()]; !ok {
r.subs[w.Topic()] = 1
} else {
r.subs[w.Topic()] = r.subs[w.Topic()] + 1
}
case *worker.ScheduleWorker:
w := evt.Worker.(*worker.ScheduleWorker)

if _, ok := r.schedules[w.Key()]; !ok {
r.schedules[w.Key()] = 1
} else {
r.schedules[w.Key()] = r.schedules[w.Key()] + 1
}
}
} else if evt.Type == WorkerEventType_Remove {
switch evt.Worker.(type) {
case *worker.RouteWorker:
w := evt.Worker.(*worker.RouteWorker)

r.apis[w.Api()] = r.apis[w.Api()] - 1

if r.apis[w.Api()] <= 0 {
// Remove the key if the reference count is 0 or less
delete(r.apis, w.Api())
}
case *worker.SubscriptionWorker:
w := evt.Worker.(*worker.SubscriptionWorker)

r.subs[w.Topic()] = r.subs[w.Topic()] - 1

if r.subs[w.Topic()] <= 0 {
// Remove the key if the reference count is 0 or less
delete(r.subs, w.Topic())
}
case *worker.ScheduleWorker:
w := evt.Worker.(*worker.ScheduleWorker)

r.schedules[w.Key()] = r.schedules[w.Key()] - 1

if r.schedules[w.Key()] <= 0 {
// Remove the key if the reference count is 0 or less
delete(r.schedules, w.Key())
}
}
}
}

func (r *RunStackState) ApiTable(port int) string {
tableData := pterm.TableData{{"Api", "Endpoint"}}

for k := range r.apis {
tableData = append(tableData, []string{
k, fmt.Sprintf("http://localhost:%d/apis/%s", port, k),
})
}

str, _ := pterm.DefaultTable.WithHasHeader().WithData(tableData).Srender()

return str
}

func (r *RunStackState) TopicTable(port int) string {
tableData := pterm.TableData{{"Topic", "Endpoint"}}

for k := range r.subs {
tableData = append(tableData, []string{
k, fmt.Sprintf("http://localhost:%d/topics/%s", port, k),
})
}

str, _ := pterm.DefaultTable.WithHasHeader().WithData(tableData).Srender()

return str
}

func (r *RunStackState) SchedulesTable(port int) string {
tableData := pterm.TableData{{"Schedule", "Endpoint"}}

for k := range r.schedules {
nKey := strings.ToLower(strings.ReplaceAll(k, " ", "-"))
tableData = append(tableData, []string{
k, fmt.Sprintf("http://localhost:%d/topics/%s", port, nKey),
})
}

str, _ := pterm.DefaultTable.WithHasHeader().WithData(tableData).Srender()

return str
}

func NewStackState() *RunStackState {
return &RunStackState{
apis: map[string]int{},
subs: map[string]int{},
schedules: map[string]int{},
}
}
2 changes: 1 addition & 1 deletion pkg/stack/function_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestFunctionVersionString(t *testing.T) {
}{
{
name: "from embed",
want: "v0.13.0-rc.11",
want: "v0.13.0-rc.17",
},
{
name: "from function",
Expand Down
2 changes: 1 addition & 1 deletion pkg/stack/membraneversion.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v0.13.0-rc.11
v0.13.0-rc.17

0 comments on commit aacc733

Please sign in to comment.