From 01890b8c393dde9bdc11626febc697d224a228cf Mon Sep 17 00:00:00 2001 From: whywaita Date: Wed, 29 Sep 2021 10:44:11 +0900 Subject: [PATCH 1/8] Switch to dispatcher-worker method --- pkg/starter/starter.go | 226 ++++++++++++++++++++++++----------------- 1 file changed, 130 insertions(+), 96 deletions(-) diff --git a/pkg/starter/starter.go b/pkg/starter/starter.go index 7383788..d4eba95 100644 --- a/pkg/starter/starter.go +++ b/pkg/starter/starter.go @@ -4,13 +4,14 @@ import ( "context" "errors" "fmt" - "sync" "time" - "github.com/whywaita/myshoes/internal/config" + "golang.org/x/sync/errgroup" uuid "github.com/satori/go.uuid" + "golang.org/x/sync/semaphore" + "github.com/whywaita/myshoes/internal/config" "github.com/whywaita/myshoes/pkg/datastore" "github.com/whywaita/myshoes/pkg/gh" "github.com/whywaita/myshoes/pkg/logger" @@ -20,8 +21,6 @@ import ( ) var ( - // PistolInterval is interval of bung (a.k.a. create instance) - PistolInterval = 10 * time.Second // DefaultRunnerVersion is default value of actions/runner DefaultRunnerVersion = "v2.275.1" ) @@ -32,7 +31,7 @@ type Starter struct { safety safety.Safety } -// New is create starter instance +// New is creating starter instance func New(ds datastore.Datastore, s safety.Safety) *Starter { return &Starter{ ds: ds, @@ -43,24 +42,33 @@ func New(ds datastore.Datastore, s safety.Safety) *Starter { // Loop is main loop for starter func (s *Starter) Loop(ctx context.Context) error { logger.Logf(false, "start starter loop") + ch := make(chan datastore.Job) + + eg, ctx := errgroup.WithContext(ctx) + + eg.Go(func() error { + if err := s.run(ctx, ch); err != nil { + return fmt.Errorf("faied to start processor: %w", err) + } + return nil + }) - ticker := time.NewTicker(PistolInterval) + ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: - if err := s.do(ctx); err != nil { + if err := s.dispatcher(ctx, ch); err != nil { logger.Logf(false, "failed to starter: %+v", err) } - case <-ctx.Done(): return nil } } } -func (s *Starter) do(ctx context.Context) error { +func (s *Starter) dispatcher(ctx context.Context, ch chan datastore.Job) error { logger.Logf(true, "start to check starter") jobs, err := s.ds.ListJobs(ctx) if err != nil { @@ -68,101 +76,118 @@ func (s *Starter) do(ctx context.Context) error { } logger.Logf(true, "found %d jobs", len(jobs)) - wg := &sync.WaitGroup{} for _, j := range jobs { - wg.Add(1) - job := j + // send to processor + ch <- j + } - go func() { - defer wg.Done() - logger.Logf(false, "start job (job id: %s)\n", job.UUID.String()) + return nil +} - isOK, err := s.safety.Check(&job) - if err != nil { - logger.Logf(false, "failed to check safety: %+v\n", err) - return - } - if !isOK { - // is not ok, save job - return - } - if err := datastore.UpdateTargetStatus(ctx, s.ds, job.TargetID, datastore.TargetStatusRunning, fmt.Sprintf("job id: %s", job.UUID)); err != nil { - logger.Logf(false, "failed to update target status (target ID: %s, job ID: %s): %+v\n", job.TargetID, job.UUID, err) - return - } +func (s *Starter) run(ctx context.Context, ch chan datastore.Job) error { + sem := semaphore.NewWeighted(50) - target, err := s.ds.GetTarget(ctx, job.TargetID) - if err != nil { - logger.Logf(false, "failed to retrieve relational target: (target ID: %s, job ID: %s): %+v\n", job.TargetID, job.UUID, err) - return + // Processor + for { + select { + case job := <-ch: + // receive job from dispatcher + if err := sem.Acquire(ctx, 1); err != nil { + return fmt.Errorf("failed to Acquire: %w", err) } - cctx, cancel := context.WithTimeout(ctx, runner.MustRunningTime) - defer cancel() - cloudID, ipAddress, shoesType, err := s.bung(cctx, job.UUID, *target) - if err != nil { - logger.Logf(false, "failed to bung (target ID: %s, job ID: %s): %+v\n", job.TargetID, job.UUID, err) - - if err := datastore.UpdateTargetStatus(ctx, s.ds, job.TargetID, datastore.TargetStatusErr, fmt.Sprintf("failed to create an instance (job ID: %s)", job.UUID)); err != nil { - logger.Logf(false, "failed to update target status (target ID: %s, job ID: %s): %+v\n", job.TargetID, job.UUID, err) - return + go func(job datastore.Job) { + defer sem.Release(1) + if err := s.processJob(ctx, job); err != nil { + logger.Logf(false, "failed to process job: %+v\n", err) } + }(job) - return - } + case <-ctx.Done(): + return nil + } + } +} - if config.Config.Strict { - if err := s.checkRegisteredRunner(ctx, cloudID, *target); err != nil { - logger.Logf(false, "failed to check to register runner (target ID: %s, job ID: %s): %+v\n", job.TargetID, job.UUID, err) +func (s *Starter) processJob(ctx context.Context, job datastore.Job) error { + logger.Logf(false, "start job (job id: %s)\n", job.UUID.String()) - if err := deleteInstance(ctx, cloudID); err != nil { - logger.Logf(false, "failed to delete an instance that not registered instance (target ID: %s, cloud ID: %s): %+v\n", job.TargetID, cloudID, err) - // not return, need to update target status if err. - } + isOK, err := s.safety.Check(&job) + if err != nil { + return fmt.Errorf("failed to check safety: %w", err) + } + if !isOK { + // is not ok, save job + return nil + } + if err := datastore.UpdateTargetStatus(ctx, s.ds, job.TargetID, datastore.TargetStatusRunning, fmt.Sprintf("job id: %s", job.UUID)); err != nil { + return fmt.Errorf("failed to update target status (target ID: %s, job ID: %s): %w", job.TargetID, job.UUID, err) + } - if err := datastore.UpdateTargetStatus(ctx, s.ds, job.TargetID, datastore.TargetStatusErr, fmt.Sprintf("cannot register runner to GitHub (job ID: %s)", job.UUID)); err != nil { - logger.Logf(false, "failed to update target status (target ID: %s, job ID: %s): %+v\n", job.TargetID, job.UUID, err) - return - } + target, err := s.ds.GetTarget(ctx, job.TargetID) + if err != nil { + return fmt.Errorf("failed to retrieve relational target: (target ID: %s, job ID: %s): %w", job.TargetID, job.UUID, err) + } - return - } + cctx, cancel := context.WithTimeout(ctx, runner.MustRunningTime) + defer cancel() + cloudID, ipAddress, shoesType, err := s.bung(cctx, job.UUID, *target) + if err != nil { + logger.Logf(false, "failed to bung (target ID: %s, job ID: %s): %+v\n", job.TargetID, job.UUID, err) + + if err := datastore.UpdateTargetStatus(ctx, s.ds, job.TargetID, datastore.TargetStatusErr, fmt.Sprintf("failed to create an instance (job ID: %s)", job.UUID)); err != nil { + return fmt.Errorf("failed to update target status (target ID: %s, job ID: %s): %w", job.TargetID, job.UUID, err) + } + + return fmt.Errorf("failed to bung (target ID: %s, job ID: %s): %w", job.TargetID, job.UUID, err) + } + + if config.Config.Strict { + if err := s.checkRegisteredRunner(ctx, cloudID, *target); err != nil { + logger.Logf(false, "failed to check to register runner (target ID: %s, job ID: %s): %+v\n", job.TargetID, job.UUID, err) + + if err := deleteInstance(ctx, cloudID); err != nil { + logger.Logf(false, "failed to delete an instance that not registered instance (target ID: %s, cloud ID: %s): %+v\n", job.TargetID, cloudID, err) + // not return, need to update target status if err. } - r := datastore.Runner{ - UUID: job.UUID, - ShoesType: shoesType, - IPAddress: ipAddress, - TargetID: job.TargetID, - CloudID: cloudID, - ResourceType: target.ResourceType, - RepositoryURL: job.RepoURL(), - RequestWebhook: job.CheckEventJSON, + if err := datastore.UpdateTargetStatus(ctx, s.ds, job.TargetID, datastore.TargetStatusErr, fmt.Sprintf("cannot register runner to GitHub (job ID: %s)", job.UUID)); err != nil { + return fmt.Errorf("failed to update target status (target ID: %s, job ID: %s): %w", job.TargetID, job.UUID, err) } - if err := s.ds.CreateRunner(ctx, r); err != nil { - logger.Logf(false, "failed to save runner to datastore (target ID: %s, job ID: %s): %+v\n", job.TargetID, job.UUID, err) - if err := datastore.UpdateTargetStatus(ctx, s.ds, job.TargetID, datastore.TargetStatusErr, fmt.Sprintf("job id: %s", job.UUID)); err != nil { - logger.Logf(false, "failed to update target status (target ID: %s, job ID: %s): %+v\n", job.TargetID, job.UUID, err) - return - } + return fmt.Errorf("failed to check to register runner (target ID: %s, job ID: %s): %w", job.TargetID, job.UUID, err) + } + } - return - } + r := datastore.Runner{ + UUID: job.UUID, + ShoesType: shoesType, + IPAddress: ipAddress, + TargetID: job.TargetID, + CloudID: cloudID, + ResourceType: target.ResourceType, + RepositoryURL: job.RepoURL(), + RequestWebhook: job.CheckEventJSON, + } + if err := s.ds.CreateRunner(ctx, r); err != nil { + logger.Logf(false, "failed to save runner to datastore (target ID: %s, job ID: %s): %+v\n", job.TargetID, job.UUID, err) - if err := s.ds.DeleteJob(ctx, job.UUID); err != nil { - logger.Logf(false, "failed to delete job: %+v\n", err) + if err := datastore.UpdateTargetStatus(ctx, s.ds, job.TargetID, datastore.TargetStatusErr, fmt.Sprintf("job id: %s", job.UUID)); err != nil { + return fmt.Errorf("failed to update target status (target ID: %s, job ID: %s): %w", job.TargetID, job.UUID, err) + } - if err := datastore.UpdateTargetStatus(ctx, s.ds, job.TargetID, datastore.TargetStatusErr, fmt.Sprintf("job id: %s", job.UUID)); err != nil { - logger.Logf(false, "failed to update target status (target ID: %s, job ID: %s): %+v\n", job.TargetID, job.UUID, err) - return - } + return fmt.Errorf("failed to save runner to datastore (target ID: %s, job ID: %s): %w", job.TargetID, job.UUID, err) + } - return - } - }() + if err := s.ds.DeleteJob(ctx, job.UUID); err != nil { + logger.Logf(false, "failed to delete job: %+v\n", err) + + if err := datastore.UpdateTargetStatus(ctx, s.ds, job.TargetID, datastore.TargetStatusErr, fmt.Sprintf("job id: %s", job.UUID)); err != nil { + return fmt.Errorf("failed to update target status (target ID: %s, job ID: %s): %w", job.TargetID, job.UUID, err) + } + + return fmt.Errorf("failed to delete job: %w", err) } - wg.Wait() return nil } @@ -213,21 +238,30 @@ func (s *Starter) checkRegisteredRunner(ctx context.Context, cloudID string, tar if err != nil { return fmt.Errorf("failed to create github client: %w", err) } - owner, repo := gh.DivideScope(target.Scope) - for i := 0; float64(i) < runner.MustRunningTime.Seconds(); i++ { - if _, err := gh.ExistGitHubRunner(ctx, client, owner, repo, cloudID); err == nil { - // success to register runner to GitHub - return nil - } else if !errors.Is(err, gh.ErrNotFound) { - // not retryable error - return fmt.Errorf("failed to check existing runner in GitHub: %w", err) - } + cctx, cancel := context.WithTimeout(ctx, runner.MustRunningTime) + defer cancel() - logger.Logf(true, "%s is not found in GitHub, will retry... (second: %ds)", cloudID, i) - time.Sleep(1 * time.Second) - } + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() - return fmt.Errorf("faied to to check existing runner in GitHub: timeout in %s", runner.MustRunningTime) + count := 0 + for { + select { + case <-cctx.Done(): + // timeout + return fmt.Errorf("faied to to check existing runner in GitHub: timeout in %s", runner.MustRunningTime) + case <-ticker.C: + if _, err := gh.ExistGitHubRunner(cctx, client, owner, repo, cloudID); err == nil { + // success to register runner to GitHub + return nil + } else if !errors.Is(err, gh.ErrNotFound) { + // not retryable error + return fmt.Errorf("failed to check existing runner in GitHub: %w", err) + } + count++ + logger.Logf(true, "%s is not found in GitHub, will retry... (second: %ds)", cloudID, count) + } + } } From eb3b1eb2db48d1fe6c811bb87b791ff41b95da3d Mon Sep 17 00:00:00 2001 From: whywaita Date: Wed, 29 Sep 2021 11:01:09 +0900 Subject: [PATCH 2/8] check duplicate job --- pkg/starter/starter.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/pkg/starter/starter.go b/pkg/starter/starter.go index d4eba95..cb6cb77 100644 --- a/pkg/starter/starter.go +++ b/pkg/starter/starter.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "sync" "time" "golang.org/x/sync/errgroup" @@ -23,6 +24,8 @@ import ( var ( // DefaultRunnerVersion is default value of actions/runner DefaultRunnerVersion = "v2.275.1" + + inProgress = sync.Map{} ) // Starter is dispatcher for running job @@ -92,12 +95,21 @@ func (s *Starter) run(ctx context.Context, ch chan datastore.Job) error { select { case job := <-ch: // receive job from dispatcher + + if _, ok := inProgress.Load(job.UUID); ok { + // this job is in progress, skip + continue + } + if err := sem.Acquire(ctx, 1); err != nil { return fmt.Errorf("failed to Acquire: %w", err) } + inProgress.Store(job.UUID, struct{}{}) + go func(job datastore.Job) { defer sem.Release(1) + defer inProgress.Delete(job.UUID) if err := s.processJob(ctx, job); err != nil { logger.Logf(false, "failed to process job: %+v\n", err) } From dfd21fca738d08c8e8d409c5c3d6d725ab63df6f Mon Sep 17 00:00:00 2001 From: whywaita Date: Wed, 29 Sep 2021 11:12:56 +0900 Subject: [PATCH 3/8] use errgroup --- pkg/starter/starter.go | 27 +++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/pkg/starter/starter.go b/pkg/starter/starter.go index cb6cb77..be391af 100644 --- a/pkg/starter/starter.go +++ b/pkg/starter/starter.go @@ -56,19 +56,26 @@ func (s *Starter) Loop(ctx context.Context) error { return nil }) - ticker := time.NewTicker(1 * time.Second) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - if err := s.dispatcher(ctx, ch); err != nil { - logger.Logf(false, "failed to starter: %+v", err) + eg.Go(func() error { + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if err := s.dispatcher(ctx, ch); err != nil { + logger.Logf(false, "failed to starter: %+v", err) + } + case <-ctx.Done(): + return nil } - case <-ctx.Done(): - return nil } + }) + + if err := eg.Wait(); err != nil { + return fmt.Errorf("failed to errgroup wait: %w", err) } + return nil } func (s *Starter) dispatcher(ctx context.Context, ch chan datastore.Job) error { From 1ad97a1d2aa2812c6db2e4aaf0f07e09194622bf Mon Sep 17 00:00:00 2001 From: whywaita Date: Thu, 30 Sep 2021 10:55:24 +0900 Subject: [PATCH 4/8] Add MAX_CONNECTIONS_TO_BACKEND --- docs/01_01_for_admin_setup.md | 3 +++ internal/config/config.go | 6 ++++-- internal/config/init.go | 9 +++++++++ pkg/starter/starter.go | 2 +- 4 files changed, 17 insertions(+), 3 deletions(-) diff --git a/docs/01_01_for_admin_setup.md b/docs/01_01_for_admin_setup.md index 2f20587..10a1e99 100644 --- a/docs/01_01_for_admin_setup.md +++ b/docs/01_01_for_admin_setup.md @@ -93,5 +93,8 @@ $ ./myshoes - `STRICT` - default: true - set strict mode +- `MAX_CONNECTIONS_TO_BACKEND` + - default: 50 + - The number of max connections to shoes-provider and more some env values from [shoes provider](https://github.com/whywaita/myshoes-providers). diff --git a/internal/config/config.go b/internal/config/config.go index 63a0f6a..3b4313e 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -18,8 +18,9 @@ type conf struct { ShoesPluginPath string RunnerUser string - Debug bool - Strict bool // check to registered runner before delete job + Debug bool + Strict bool // check to registered runner before delete job + MaxConnectionsToBackend int64 } // Config Environment keys @@ -32,4 +33,5 @@ const ( EnvShoesPluginPath = "PLUGIN" EnvDebug = "DEBUG" EnvStrict = "STRICT" + EnvMaxConnectionsToBackend = "MAX_CONNECTIONS_TO_BACKEND" ) diff --git a/internal/config/init.go b/internal/config/init.go index 8964404..8d54382 100644 --- a/internal/config/init.go +++ b/internal/config/init.go @@ -90,6 +90,15 @@ func Load() { if os.Getenv(EnvStrict) == "false" { Config.Strict = false } + + Config.MaxConnectionsToBackend = 50 + if os.Getenv(EnvMaxConnectionsToBackend) != "" { + numberPB, err := strconv.ParseInt(os.Getenv(EnvMaxConnectionsToBackend), 10, 64) + if err != nil { + log.Panicf("failed to convert int64 %s: %+v", EnvMaxConnectionsToBackend, err) + } + Config.MaxConnectionsToBackend = numberPB + } } func checkBinary(p string) (string, error) { diff --git a/pkg/starter/starter.go b/pkg/starter/starter.go index be391af..95be3db 100644 --- a/pkg/starter/starter.go +++ b/pkg/starter/starter.go @@ -95,7 +95,7 @@ func (s *Starter) dispatcher(ctx context.Context, ch chan datastore.Job) error { } func (s *Starter) run(ctx context.Context, ch chan datastore.Job) error { - sem := semaphore.NewWeighted(50) + sem := semaphore.NewWeighted(config.Config.MaxConnectionsToBackend) // Processor for { From 30351cfe16f519ca750194deb35e86b6447f4ffd Mon Sep 17 00:00:00 2001 From: whywaita Date: Thu, 30 Sep 2021 11:28:21 +0900 Subject: [PATCH 5/8] Add metrics semaphore value --- pkg/metric/collector.go | 1 + pkg/metric/scrape_memory.go | 61 +++++++++++++++++++++++++++++++++++++ pkg/starter/starter.go | 14 ++++++--- 3 files changed, 72 insertions(+), 4 deletions(-) create mode 100644 pkg/metric/scrape_memory.go diff --git a/pkg/metric/collector.go b/pkg/metric/collector.go index 33e5ac7..03523eb 100644 --- a/pkg/metric/collector.go +++ b/pkg/metric/collector.go @@ -90,6 +90,7 @@ type Scraper interface { func NewScrapers() []Scraper { return []Scraper{ ScraperDatastore{}, + ScraperMemory{}, } } diff --git a/pkg/metric/scrape_memory.go b/pkg/metric/scrape_memory.go new file mode 100644 index 0000000..33a1b39 --- /dev/null +++ b/pkg/metric/scrape_memory.go @@ -0,0 +1,61 @@ +package metric + +import ( + "context" + "fmt" + + "github.com/whywaita/myshoes/internal/config" + "github.com/whywaita/myshoes/pkg/starter" + + "github.com/prometheus/client_golang/prometheus" + "github.com/whywaita/myshoes/pkg/datastore" +) + +const memoryName = "memory" + +var ( + memoryStarterStatus = prometheus.NewDesc( + prometheus.BuildFQName(namespace, memoryName, "starter_status"), + "status values from starter", + []string{"type"}, nil, + ) +) + +// ScraperMemory is scraper implement for memory +type ScraperMemory struct{} + +// Name return name +func (ScraperMemory) Name() string { + return memoryName +} + +// Help return help +func (ScraperMemory) Help() string { + return "Collect from memory" +} + +// Scrape scrape metrics +func (ScraperMemory) Scrape(ctx context.Context, ds datastore.Datastore, ch chan<- prometheus.Metric) error { + if err := scrapeStarterValues(ch); err != nil { + return fmt.Errorf("failed to scrape starter values: %w", err) + } + + return nil +} + +func scrapeStarterValues(ch chan<- prometheus.Metric) error { + maxConnections := config.Config.MaxConnectionsToBackend + connections := starter.ConnectionsSemaphore + + result := map[string]float64{ + "max_connections": float64(maxConnections), + "semaphore_number": float64(connections), + } + for key, n := range result { + ch <- prometheus.MustNewConstMetric( + memoryStarterStatus, prometheus.GaugeValue, n, key) + } + return nil +} + +var _ Scraper = ScraperMemory{} diff --git a/pkg/starter/starter.go b/pkg/starter/starter.go index 95be3db..898a00a 100644 --- a/pkg/starter/starter.go +++ b/pkg/starter/starter.go @@ -7,9 +7,8 @@ import ( "sync" "time" - "golang.org/x/sync/errgroup" - uuid "github.com/satori/go.uuid" + "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" "github.com/whywaita/myshoes/internal/config" @@ -24,6 +23,8 @@ import ( var ( // DefaultRunnerVersion is default value of actions/runner DefaultRunnerVersion = "v2.275.1" + // ConnectionsSemaphore is number of semaphore connections + ConnectionsSemaphore = 0 inProgress = sync.Map{} ) @@ -111,12 +112,17 @@ func (s *Starter) run(ctx context.Context, ch chan datastore.Job) error { if err := sem.Acquire(ctx, 1); err != nil { return fmt.Errorf("failed to Acquire: %w", err) } + ConnectionsSemaphore++ inProgress.Store(job.UUID, struct{}{}) go func(job datastore.Job) { - defer sem.Release(1) - defer inProgress.Delete(job.UUID) + defer func() { + sem.Release(1) + inProgress.Delete(job.UUID) + ConnectionsSemaphore-- + }() + if err := s.processJob(ctx, job); err != nil { logger.Logf(false, "failed to process job: %+v\n", err) } From f835f77515963009a343653a852c56da56f60c2b Mon Sep 17 00:00:00 2001 From: whywaita Date: Thu, 30 Sep 2021 14:27:22 +0900 Subject: [PATCH 6/8] Separate metrics --- pkg/metric/scrape_memory.go | 48 ++++++++++++++++++++++++------------- pkg/starter/starter.go | 12 ++++++---- 2 files changed, 39 insertions(+), 21 deletions(-) diff --git a/pkg/metric/scrape_memory.go b/pkg/metric/scrape_memory.go index 33a1b39..832e31a 100644 --- a/pkg/metric/scrape_memory.go +++ b/pkg/metric/scrape_memory.go @@ -4,20 +4,30 @@ import ( "context" "fmt" - "github.com/whywaita/myshoes/internal/config" - "github.com/whywaita/myshoes/pkg/starter" - "github.com/prometheus/client_golang/prometheus" + + "github.com/whywaita/myshoes/internal/config" "github.com/whywaita/myshoes/pkg/datastore" + "github.com/whywaita/myshoes/pkg/starter" ) const memoryName = "memory" var ( - memoryStarterStatus = prometheus.NewDesc( - prometheus.BuildFQName(namespace, memoryName, "starter_status"), - "status values from starter", - []string{"type"}, nil, + memoryStarterMaxRunning = prometheus.NewDesc( + prometheus.BuildFQName(namespace, memoryName, "starter_max_running"), + "The number of max running in starter (Config)", + []string{"starter"}, nil, + ) + memoryStarterQueueRunning = prometheus.NewDesc( + prometheus.BuildFQName(namespace, memoryName, "starter_queue_running"), + "running queue in starter", + []string{"starter"}, nil, + ) + memoryStarterQueueWaiting = prometheus.NewDesc( + prometheus.BuildFQName(namespace, memoryName, "starter_queue_waiting"), + "waiting queue in starter", + []string{"starter"}, nil, ) ) @@ -44,17 +54,21 @@ func (ScraperMemory) Scrape(ctx context.Context, ds datastore.Datastore, ch chan } func scrapeStarterValues(ch chan<- prometheus.Metric) error { - maxConnections := config.Config.MaxConnectionsToBackend - connections := starter.ConnectionsSemaphore + configMax := config.Config.MaxConnectionsToBackend + + const labelStarter = "starter" + + ch <- prometheus.MustNewConstMetric( + memoryStarterMaxRunning, prometheus.GaugeValue, float64(configMax), labelStarter) + + countRunning := starter.CountRunning + countWaiting := starter.CountWaiting + + ch <- prometheus.MustNewConstMetric( + memoryStarterQueueRunning, prometheus.GaugeValue, float64(countRunning), labelStarter) + ch <- prometheus.MustNewConstMetric( + memoryStarterQueueWaiting, prometheus.GaugeValue, float64(countWaiting), labelStarter) - result := map[string]float64{ - "max_connections": float64(maxConnections), - "semaphore_number": float64(connections), - } - for key, n := range result { - ch <- prometheus.MustNewConstMetric( - memoryStarterStatus, prometheus.GaugeValue, n, key) - } return nil } diff --git a/pkg/starter/starter.go b/pkg/starter/starter.go index 898a00a..e36cd29 100644 --- a/pkg/starter/starter.go +++ b/pkg/starter/starter.go @@ -23,8 +23,10 @@ import ( var ( // DefaultRunnerVersion is default value of actions/runner DefaultRunnerVersion = "v2.275.1" - // ConnectionsSemaphore is number of semaphore connections - ConnectionsSemaphore = 0 + // CountRunning is count of running semaphore + CountRunning = 0 + // CountWaiting is count of waiting job + CountWaiting = 0 inProgress = sync.Map{} ) @@ -109,10 +111,12 @@ func (s *Starter) run(ctx context.Context, ch chan datastore.Job) error { continue } + CountWaiting++ if err := sem.Acquire(ctx, 1); err != nil { return fmt.Errorf("failed to Acquire: %w", err) } - ConnectionsSemaphore++ + CountWaiting-- + CountRunning++ inProgress.Store(job.UUID, struct{}{}) @@ -120,7 +124,7 @@ func (s *Starter) run(ctx context.Context, ch chan datastore.Job) error { defer func() { sem.Release(1) inProgress.Delete(job.UUID) - ConnectionsSemaphore-- + CountRunning-- }() if err := s.processJob(ctx, job); err != nil { From a8d2a826494cba465dbae5fd4d643c6ee2f496ac Mon Sep 17 00:00:00 2001 From: whywaita Date: Thu, 30 Sep 2021 15:07:30 +0900 Subject: [PATCH 7/8] more slowly --- pkg/starter/starter.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/starter/starter.go b/pkg/starter/starter.go index e36cd29..4d68995 100644 --- a/pkg/starter/starter.go +++ b/pkg/starter/starter.go @@ -60,7 +60,7 @@ func (s *Starter) Loop(ctx context.Context) error { }) eg.Go(func() error { - ticker := time.NewTicker(1 * time.Second) + ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() for { @@ -88,7 +88,6 @@ func (s *Starter) dispatcher(ctx context.Context, ch chan datastore.Job) error { return fmt.Errorf("failed to get jobs: %w", err) } - logger.Logf(true, "found %d jobs", len(jobs)) for _, j := range jobs { // send to processor ch <- j @@ -111,6 +110,7 @@ func (s *Starter) run(ctx context.Context, ch chan datastore.Job) error { continue } + logger.Logf(true, "found new job: %s", job.UUID) CountWaiting++ if err := sem.Acquire(ctx, 1); err != nil { return fmt.Errorf("failed to Acquire: %w", err) From de6e82cca6a97c29b51b53425d81a528b433d7d9 Mon Sep 17 00:00:00 2001 From: whywaita Date: Thu, 30 Sep 2021 15:13:40 +0900 Subject: [PATCH 8/8] Fix typo --- pkg/starter/starter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/starter/starter.go b/pkg/starter/starter.go index 4d68995..2fad092 100644 --- a/pkg/starter/starter.go +++ b/pkg/starter/starter.go @@ -37,7 +37,7 @@ type Starter struct { safety safety.Safety } -// New is creating starter instance +// New create starter instance func New(ds datastore.Datastore, s safety.Safety) *Starter { return &Starter{ ds: ds,