From b30e7605809880222973c07a56b38eef746a3b87 Mon Sep 17 00:00:00 2001 From: gamoutatsumi Date: Wed, 14 Jun 2023 18:12:21 +0900 Subject: [PATCH 1/7] add list pending jobs --- pkg/gh/job.go | 51 +++++++++++++++++++++++++++++++++++++++++++++++++++ pkg/gh/run.go | 51 +++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 102 insertions(+) create mode 100644 pkg/gh/job.go create mode 100644 pkg/gh/run.go diff --git a/pkg/gh/job.go b/pkg/gh/job.go new file mode 100644 index 0000000..0691f09 --- /dev/null +++ b/pkg/gh/job.go @@ -0,0 +1,51 @@ +package gh + +import ( + "context" + "fmt" + "time" + + "github.com/google/go-github/v47/github" + "github.com/whywaita/myshoes/pkg/logger" +) + +func listJobs(ctx context.Context, client *github.Client, owner, repo string, runID int64, opts *github.ListWorkflowJobsOptions) (*github.Jobs, *github.Response, error) { + jobs, resp, err := client.Actions.ListWorkflowJobs(ctx, owner, repo, runID, opts) + if err != nil { + return nil, nil, fmt.Errorf("failed to list workflow jobs: %w", err) + } + return jobs, resp, nil +} + +func ListJobs(ctx context.Context, client *github.Client, owner, repo string, runID int64) ([]*github.WorkflowJob, error) { + if cachedJs, found := responseCache.Get(getJobsCacheKey(owner, repo)); found { + return cachedJs.([]*github.WorkflowJob), nil + } + var opts = &github.ListWorkflowJobsOptions{ + ListOptions: github.ListOptions{ + Page: 0, + PerPage: 100, + }, + } + var js []*github.WorkflowJob + for { + logger.Logf(true, "get jobs from GitHub, page: %d, now latest jobs: %d", opts.Page, len(js)) + jobs, resp, err := listJobs(ctx, client, owner, repo, runID, opts) + if err != nil { + return nil, fmt.Errorf("failed to list jobs: %w", err) + } + storeRateLimit(getRateLimitKey(owner, repo), resp.Rate) + js = append(js, jobs.Jobs...) + if resp.NextPage == 0 { + break + } + opts.Page = resp.NextPage + } + responseCache.Set(getJobsCacheKey(owner, repo), js, 1*time.Second) + logger.Logf(true, "found %d jobs in GitHub", len(js)) + return js, nil +} + +func getJobsCacheKey(owner, repo string) string { + return fmt.Sprintf("jobs-owner-%s-repo-%s", owner, repo) +} diff --git a/pkg/gh/run.go b/pkg/gh/run.go new file mode 100644 index 0000000..49aa28c --- /dev/null +++ b/pkg/gh/run.go @@ -0,0 +1,51 @@ +package gh + +import ( + "context" + "fmt" + "time" + + "github.com/google/go-github/v47/github" + "github.com/whywaita/myshoes/pkg/logger" +) + +func listRuns(ctx context.Context, client *github.Client, owner, repo string, opts *github.ListWorkflowRunsOptions) (*github.WorkflowRuns, *github.Response, error) { + runs, resp, err := client.Actions.ListRepositoryWorkflowRuns(ctx, owner, repo, opts) + if err != nil { + return nil, nil, fmt.Errorf("failed to list workflow runs: %w", err) + } + return runs, resp, nil +} + +func ListRuns(ctx context.Context, client *github.Client, owner, repo string) ([]*github.WorkflowRun, error) { + if cachedRs, found := responseCache.Get(getRunsCacheKey(owner, repo)); found { + return cachedRs.([]*github.WorkflowRun), nil + } + var opts = &github.ListWorkflowRunsOptions{ + ListOptions: github.ListOptions{ + Page: 0, + PerPage: 10, + }, + } + var js []*github.WorkflowRun + for { + logger.Logf(true, "get workflow runs from GitHub, now recent %d runs", len(js)) + runs, resp, err := listRuns(ctx, client, owner, repo, opts) + if err != nil { + return nil, fmt.Errorf("failed to list workflow runs: %w", err) + } + storeRateLimit(getRateLimitKey(owner, repo), resp.Rate) + js = append(js, runs.WorkflowRuns...) + if resp.NextPage == 0 { + break + } + opts.Page = resp.NextPage + } + responseCache.Set(getRunsCacheKey(owner, repo), js, 1*time.Second) + logger.Logf(true, "found %d workflow runs in GitHub", len(js)) + return js, nil +} + +func getRunsCacheKey(owner, repo string) string { + return fmt.Sprintf("runs-owner-%s-repo-%s", owner, repo) +} From a01c7a295c0b215ec92bcc30a37794a1e82df2d5 Mon Sep 17 00:00:00 2001 From: gamoutatsumi Date: Thu, 15 Jun 2023 14:52:50 +0900 Subject: [PATCH 2/7] add metrics for pending jobs --- pkg/gh/run.go | 24 ++++------ pkg/metric/scrape_github.go | 94 +++++++++++++++++++++++++++++++++++++ 2 files changed, 102 insertions(+), 16 deletions(-) create mode 100644 pkg/metric/scrape_github.go diff --git a/pkg/gh/run.go b/pkg/gh/run.go index 49aa28c..a0c65fb 100644 --- a/pkg/gh/run.go +++ b/pkg/gh/run.go @@ -27,23 +27,15 @@ func ListRuns(ctx context.Context, client *github.Client, owner, repo string) ([ PerPage: 10, }, } - var js []*github.WorkflowRun - for { - logger.Logf(true, "get workflow runs from GitHub, now recent %d runs", len(js)) - runs, resp, err := listRuns(ctx, client, owner, repo, opts) - if err != nil { - return nil, fmt.Errorf("failed to list workflow runs: %w", err) - } - storeRateLimit(getRateLimitKey(owner, repo), resp.Rate) - js = append(js, runs.WorkflowRuns...) - if resp.NextPage == 0 { - break - } - opts.Page = resp.NextPage + logger.Logf(true, "get workflow runs from GitHub, now recent %d runs", opts.PerPage) + runs, resp, err := listRuns(ctx, client, owner, repo, opts) + if err != nil { + return nil, fmt.Errorf("failed to list workflow runs: %w", err) } - responseCache.Set(getRunsCacheKey(owner, repo), js, 1*time.Second) - logger.Logf(true, "found %d workflow runs in GitHub", len(js)) - return js, nil + storeRateLimit(getRateLimitKey(owner, repo), resp.Rate) + responseCache.Set(getRunsCacheKey(owner, repo), runs.WorkflowRuns, 1*time.Second) + logger.Logf(true, "found %d workflow runs in GitHub", len(runs.WorkflowRuns)) + return runs.WorkflowRuns, nil } func getRunsCacheKey(owner, repo string) string { diff --git a/pkg/metric/scrape_github.go b/pkg/metric/scrape_github.go new file mode 100644 index 0000000..d83a4dc --- /dev/null +++ b/pkg/metric/scrape_github.go @@ -0,0 +1,94 @@ +package metric + +import ( + "context" + "fmt" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/whywaita/myshoes/pkg/datastore" + "github.com/whywaita/myshoes/pkg/gh" +) + +const githubName = "github" + +var ( + githubPendingJobsDesc = prometheus.NewDesc( + prometheus.BuildFQName(namespace, datastoreName, "pending_jobs"), + "Number of pending jobs", + []string{"target_id"}, nil, + ) +) + +type ScraperGitHub struct{} + +func (ScraperGitHub) Name() string { + return githubName +} + +func (ScraperGitHub) Help() string { + return "Collect from GitHub" +} + +func (s *ScraperGitHub) Scrape(ctx context.Context, ds datastore.Datastore, ch chan<- prometheus.Metric) error { + if err := scrapePendingJobs(ctx, ds, ch); err != nil { + return fmt.Errorf("failed to scrape pending jobs: %w", err) + } + return nil +} + +func scrapePendingJobs(ctx context.Context, ds datastore.Datastore, ch chan<- prometheus.Metric) error { + targets, err := ds.ListTargets(ctx) + if err != nil { + return fmt.Errorf("failed to list pending jobs: %w", err) + } + if len(targets) == 0 { + ch <- prometheus.MustNewConstMetric( + githubPendingJobsDesc, prometheus.GaugeValue, 0, "none", + ) + return nil + } + + result := map[string]float64{} + + for _, t := range targets { + client, err := gh.NewClient(t.GitHubToken) + if err != nil { + return fmt.Errorf("failed to list pending jobs: %w", err) + } + owner, repo := t.OwnerRepo() + runs, err := gh.ListRuns(ctx, client, owner, repo) + if err != nil { + return fmt.Errorf("failed to list pending jobs: %w", err) + } + if len(runs) == 0 { + ch <- prometheus.MustNewConstMetric( + githubPendingJobsDesc, prometheus.GaugeValue, 0, t.UUID.String(), + ) + continue + } + for _, r := range runs { + jobs, err := gh.ListJobs(ctx, client, owner, repo, r.GetID()) + if err != nil { + return fmt.Errorf("failed to list pending jobs: %w", err) + } + if len(jobs) == 0 { + ch <- prometheus.MustNewConstMetric( + githubPendingJobsDesc, prometheus.GaugeValue, 0, t.UUID.String(), + ) + continue + } + for _, j := range jobs { + if j.GetStatus() == "pending" && time.Since(j.GetStartedAt().Time) >= 30 { + result[t.UUID.String()]++ + } + } + } + } + for targetID, number := range result { + ch <- prometheus.MustNewConstMetric( + githubPendingJobsDesc, prometheus.GaugeValue, number, targetID, + ) + } + return nil +} From 3088f30a61a48b3141c5aac01a4a820fe4c50452 Mon Sep 17 00:00:00 2001 From: gamoutatsumi Date: Thu, 15 Jun 2023 16:27:07 +0900 Subject: [PATCH 3/7] use github scraper --- pkg/gh/run.go | 2 +- pkg/metric/collector.go | 1 + pkg/metric/scrape_github.go | 12 +++++++++--- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/pkg/gh/run.go b/pkg/gh/run.go index a0c65fb..3dee8ab 100644 --- a/pkg/gh/run.go +++ b/pkg/gh/run.go @@ -27,7 +27,7 @@ func ListRuns(ctx context.Context, client *github.Client, owner, repo string) ([ PerPage: 10, }, } - logger.Logf(true, "get workflow runs from GitHub, now recent %d runs", opts.PerPage) + logger.Logf(true, "get workflow runs of %s/%s, now recent %d runs", owner, repo, opts.PerPage) runs, resp, err := listRuns(ctx, client, owner, repo, opts) if err != nil { return nil, fmt.Errorf("failed to list workflow runs: %w", err) diff --git a/pkg/metric/collector.go b/pkg/metric/collector.go index 03523eb..0cf726a 100644 --- a/pkg/metric/collector.go +++ b/pkg/metric/collector.go @@ -91,6 +91,7 @@ func NewScrapers() []Scraper { return []Scraper{ ScraperDatastore{}, ScraperMemory{}, + ScraperGitHub{}, } } diff --git a/pkg/metric/scrape_github.go b/pkg/metric/scrape_github.go index d83a4dc..fb6865b 100644 --- a/pkg/metric/scrape_github.go +++ b/pkg/metric/scrape_github.go @@ -6,8 +6,10 @@ import ( "time" "github.com/prometheus/client_golang/prometheus" + "github.com/whywaita/myshoes/internal/config" "github.com/whywaita/myshoes/pkg/datastore" "github.com/whywaita/myshoes/pkg/gh" + "github.com/whywaita/myshoes/pkg/logger" ) const githubName = "github" @@ -30,7 +32,7 @@ func (ScraperGitHub) Help() string { return "Collect from GitHub" } -func (s *ScraperGitHub) Scrape(ctx context.Context, ds datastore.Datastore, ch chan<- prometheus.Metric) error { +func (s ScraperGitHub) Scrape(ctx context.Context, ds datastore.Datastore, ch chan<- prometheus.Metric) error { if err := scrapePendingJobs(ctx, ds, ch); err != nil { return fmt.Errorf("failed to scrape pending jobs: %w", err) } @@ -52,14 +54,18 @@ func scrapePendingJobs(ctx context.Context, ds datastore.Datastore, ch chan<- pr result := map[string]float64{} for _, t := range targets { - client, err := gh.NewClient(t.GitHubToken) + client, err := gh.NewClientGitHubApps() if err != nil { return fmt.Errorf("failed to list pending jobs: %w", err) } owner, repo := t.OwnerRepo() + if repo == "" { + continue + } runs, err := gh.ListRuns(ctx, client, owner, repo) if err != nil { - return fmt.Errorf("failed to list pending jobs: %w", err) + logger.Logf(false, "failed to list pending jobs: %+v", err) + continue } if len(runs) == 0 { ch <- prometheus.MustNewConstMetric( From 314feeb814b7c7f30c318bbc783dbd17b072f65e Mon Sep 17 00:00:00 2001 From: gamoutatsumi Date: Fri, 16 Jun 2023 17:40:37 +0900 Subject: [PATCH 4/7] scrape workflow run from github --- pkg/gh/job.go | 51 ---------------------------------- pkg/gh/run.go | 20 ++++++++++---- pkg/metric/scrape_github.go | 55 ++++++++++++++----------------------- 3 files changed, 36 insertions(+), 90 deletions(-) delete mode 100644 pkg/gh/job.go diff --git a/pkg/gh/job.go b/pkg/gh/job.go deleted file mode 100644 index 0691f09..0000000 --- a/pkg/gh/job.go +++ /dev/null @@ -1,51 +0,0 @@ -package gh - -import ( - "context" - "fmt" - "time" - - "github.com/google/go-github/v47/github" - "github.com/whywaita/myshoes/pkg/logger" -) - -func listJobs(ctx context.Context, client *github.Client, owner, repo string, runID int64, opts *github.ListWorkflowJobsOptions) (*github.Jobs, *github.Response, error) { - jobs, resp, err := client.Actions.ListWorkflowJobs(ctx, owner, repo, runID, opts) - if err != nil { - return nil, nil, fmt.Errorf("failed to list workflow jobs: %w", err) - } - return jobs, resp, nil -} - -func ListJobs(ctx context.Context, client *github.Client, owner, repo string, runID int64) ([]*github.WorkflowJob, error) { - if cachedJs, found := responseCache.Get(getJobsCacheKey(owner, repo)); found { - return cachedJs.([]*github.WorkflowJob), nil - } - var opts = &github.ListWorkflowJobsOptions{ - ListOptions: github.ListOptions{ - Page: 0, - PerPage: 100, - }, - } - var js []*github.WorkflowJob - for { - logger.Logf(true, "get jobs from GitHub, page: %d, now latest jobs: %d", opts.Page, len(js)) - jobs, resp, err := listJobs(ctx, client, owner, repo, runID, opts) - if err != nil { - return nil, fmt.Errorf("failed to list jobs: %w", err) - } - storeRateLimit(getRateLimitKey(owner, repo), resp.Rate) - js = append(js, jobs.Jobs...) - if resp.NextPage == 0 { - break - } - opts.Page = resp.NextPage - } - responseCache.Set(getJobsCacheKey(owner, repo), js, 1*time.Second) - logger.Logf(true, "found %d jobs in GitHub", len(js)) - return js, nil -} - -func getJobsCacheKey(owner, repo string) string { - return fmt.Sprintf("jobs-owner-%s-repo-%s", owner, repo) -} diff --git a/pkg/gh/run.go b/pkg/gh/run.go index 3dee8ab..e0aa3f0 100644 --- a/pkg/gh/run.go +++ b/pkg/gh/run.go @@ -17,24 +17,34 @@ func listRuns(ctx context.Context, client *github.Client, owner, repo string, op return runs, resp, nil } -func ListRuns(ctx context.Context, client *github.Client, owner, repo string) ([]*github.WorkflowRun, error) { +// ListRuns get workflow runs that registered repository +func ListRuns(ctx context.Context, owner, repo, scope string) ([]*github.WorkflowRun, error) { if cachedRs, found := responseCache.Get(getRunsCacheKey(owner, repo)); found { + logger.Logf(true, "found workflow runs (cache hit) in %s/%s", owner, repo) return cachedRs.([]*github.WorkflowRun), nil } + installationID, err := IsInstalledGitHubApp(ctx, scope) + if err != nil { + return nil, fmt.Errorf("failed to list pending runs (%s/%s): %w", owner, repo, err) + } + client, err := NewClientInstallation(installationID) + if err != nil { + return nil, fmt.Errorf("failed to list workflow runs (%s/%s): %w", owner, repo, err) + } var opts = &github.ListWorkflowRunsOptions{ ListOptions: github.ListOptions{ Page: 0, PerPage: 10, }, } - logger.Logf(true, "get workflow runs of %s/%s, now recent %d runs", owner, repo, opts.PerPage) + logger.Logf(true, "get workflow runs of %s/%s, recent %d runs", owner, repo, opts.PerPage) runs, resp, err := listRuns(ctx, client, owner, repo, opts) if err != nil { - return nil, fmt.Errorf("failed to list workflow runs: %w", err) + return nil, fmt.Errorf("failed to list workflow runs (%s/%s): %w", owner, repo, err) } storeRateLimit(getRateLimitKey(owner, repo), resp.Rate) - responseCache.Set(getRunsCacheKey(owner, repo), runs.WorkflowRuns, 1*time.Second) - logger.Logf(true, "found %d workflow runs in GitHub", len(runs.WorkflowRuns)) + responseCache.Set(getRunsCacheKey(owner, repo), runs.WorkflowRuns, 5*time.Minute) + logger.Logf(true, "found %d workflow runs in %s/%s", len(runs.WorkflowRuns), owner, repo) return runs.WorkflowRuns, nil } diff --git a/pkg/metric/scrape_github.go b/pkg/metric/scrape_github.go index fb6865b..daa737d 100644 --- a/pkg/metric/scrape_github.go +++ b/pkg/metric/scrape_github.go @@ -6,7 +6,6 @@ import ( "time" "github.com/prometheus/client_golang/prometheus" - "github.com/whywaita/myshoes/internal/config" "github.com/whywaita/myshoes/pkg/datastore" "github.com/whywaita/myshoes/pkg/gh" "github.com/whywaita/myshoes/pkg/logger" @@ -15,85 +14,73 @@ import ( const githubName = "github" var ( - githubPendingJobsDesc = prometheus.NewDesc( - prometheus.BuildFQName(namespace, datastoreName, "pending_jobs"), - "Number of pending jobs", + githubPendingRunsDesc = prometheus.NewDesc( + prometheus.BuildFQName(namespace, githubName, "pending_runs"), + "Number of pending runs", []string{"target_id"}, nil, ) ) +// ScraperGitHub is scraper implement for GitHub type ScraperGitHub struct{} +// Name return name func (ScraperGitHub) Name() string { return githubName } +// Help return help func (ScraperGitHub) Help() string { return "Collect from GitHub" } +// Scrape scrape metrics func (s ScraperGitHub) Scrape(ctx context.Context, ds datastore.Datastore, ch chan<- prometheus.Metric) error { - if err := scrapePendingJobs(ctx, ds, ch); err != nil { - return fmt.Errorf("failed to scrape pending jobs: %w", err) + if err := scrapePendingRuns(ctx, ds, ch); err != nil { + return fmt.Errorf("failed to scrape pending runs: %w", err) } return nil } -func scrapePendingJobs(ctx context.Context, ds datastore.Datastore, ch chan<- prometheus.Metric) error { +func scrapePendingRuns(ctx context.Context, ds datastore.Datastore, ch chan<- prometheus.Metric) error { targets, err := ds.ListTargets(ctx) if err != nil { - return fmt.Errorf("failed to list pending jobs: %w", err) + return fmt.Errorf("failed to list pending runs: %w", err) } if len(targets) == 0 { ch <- prometheus.MustNewConstMetric( - githubPendingJobsDesc, prometheus.GaugeValue, 0, "none", + githubPendingRunsDesc, prometheus.GaugeValue, 0, "none", ) return nil } - result := map[string]float64{} - for _, t := range targets { - client, err := gh.NewClientGitHubApps() - if err != nil { - return fmt.Errorf("failed to list pending jobs: %w", err) - } owner, repo := t.OwnerRepo() + var pendings float64 if repo == "" { continue } - runs, err := gh.ListRuns(ctx, client, owner, repo) + runs, err := gh.ListRuns(ctx, owner, repo, t.Scope) if err != nil { - logger.Logf(false, "failed to list pending jobs: %+v", err) + logger.Logf(false, "failed to list pending runs: %+v", err) continue } + if len(runs) == 0 { ch <- prometheus.MustNewConstMetric( - githubPendingJobsDesc, prometheus.GaugeValue, 0, t.UUID.String(), + githubPendingRunsDesc, prometheus.GaugeValue, 0, t.UUID.String(), ) continue } for _, r := range runs { - jobs, err := gh.ListJobs(ctx, client, owner, repo, r.GetID()) - if err != nil { - return fmt.Errorf("failed to list pending jobs: %w", err) - } - if len(jobs) == 0 { - ch <- prometheus.MustNewConstMetric( - githubPendingJobsDesc, prometheus.GaugeValue, 0, t.UUID.String(), - ) - continue - } - for _, j := range jobs { - if j.GetStatus() == "pending" && time.Since(j.GetStartedAt().Time) >= 30 { - result[t.UUID.String()]++ + if r.GetStatus() == "queued" || r.GetStatus() == "pending" { + if time.Since(r.CreatedAt.Time).Minutes() >= 30 { + pendings++ } } } - } - for targetID, number := range result { ch <- prometheus.MustNewConstMetric( - githubPendingJobsDesc, prometheus.GaugeValue, number, targetID, + githubPendingRunsDesc, prometheus.GaugeValue, pendings, t.UUID.String(), ) } return nil From a6e590241b4c5d71f1987b6b008f650b1d22fcfe Mon Sep 17 00:00:00 2001 From: gamoutatsumi Date: Thu, 22 Jun 2023 15:03:25 +0900 Subject: [PATCH 5/7] update cache in goroutine --- pkg/gh/run.go | 37 ++++++++++++++++++++++++++----------- 1 file changed, 26 insertions(+), 11 deletions(-) diff --git a/pkg/gh/run.go b/pkg/gh/run.go index e0aa3f0..4f465e9 100644 --- a/pkg/gh/run.go +++ b/pkg/gh/run.go @@ -19,17 +19,36 @@ func listRuns(ctx context.Context, client *github.Client, owner, repo string, op // ListRuns get workflow runs that registered repository func ListRuns(ctx context.Context, owner, repo, scope string) ([]*github.WorkflowRun, error) { - if cachedRs, found := responseCache.Get(getRunsCacheKey(owner, repo)); found { - logger.Logf(true, "found workflow runs (cache hit) in %s/%s", owner, repo) + if cachedRs, expiration, found := responseCache.GetWithExpiration(getRunsCacheKey(owner, repo)); found { + if time.Until(expiration).Minutes() <= 1 { + go updateCache(context.Background(), owner, repo, scope) + } + logger.Logf(true, "found workflow runs (cache hit: expiration: %s) in %s/%s", expiration.Format("2006/01/02 15:04:05.000 -0700"), owner, repo) return cachedRs.([]*github.WorkflowRun), nil } + go updateCache(context.Background(), owner, repo, scope) + return []*github.WorkflowRun{}, nil +} + +func getRunsCacheKey(owner, repo string) string { + return fmt.Sprintf("runs-owner-%s-repo-%s", owner, repo) +} + +func updateCache(ctx context.Context, owner, repo, scope string) { installationID, err := IsInstalledGitHubApp(ctx, scope) if err != nil { - return nil, fmt.Errorf("failed to list pending runs (%s/%s): %w", owner, repo, err) + logger.Logf(false, "failed to list pending runs (%s/%s): %+v", owner, repo, err) + installationID, err = IsInstalledGitHubApp(ctx, owner) + if err != nil { + logger.Logf(false, "failed to list pending runs (%s): %+v", owner, err) + responseCache.Set(getRunsCacheKey(owner, repo), []*github.WorkflowRun{}, 1*time.Hour) + return + } } client, err := NewClientInstallation(installationID) if err != nil { - return nil, fmt.Errorf("failed to list workflow runs (%s/%s): %w", owner, repo, err) + logger.Logf(false, "failed to list workflow runs (%s/%s): %+v", owner, repo, err) + return } var opts = &github.ListWorkflowRunsOptions{ ListOptions: github.ListOptions{ @@ -40,14 +59,10 @@ func ListRuns(ctx context.Context, owner, repo, scope string) ([]*github.Workflo logger.Logf(true, "get workflow runs of %s/%s, recent %d runs", owner, repo, opts.PerPage) runs, resp, err := listRuns(ctx, client, owner, repo, opts) if err != nil { - return nil, fmt.Errorf("failed to list workflow runs (%s/%s): %w", owner, repo, err) + logger.Logf(false, "failed to list workflow runs (%s/%s): %+v", owner, repo, err) + return } storeRateLimit(getRateLimitKey(owner, repo), resp.Rate) - responseCache.Set(getRunsCacheKey(owner, repo), runs.WorkflowRuns, 5*time.Minute) + responseCache.Set(getRunsCacheKey(owner, repo), runs.WorkflowRuns, 15*time.Minute) logger.Logf(true, "found %d workflow runs in %s/%s", len(runs.WorkflowRuns), owner, repo) - return runs.WorkflowRuns, nil -} - -func getRunsCacheKey(owner, repo string) string { - return fmt.Sprintf("runs-owner-%s-repo-%s", owner, repo) } From e8f9bc91c8e4c853f739032d9f84b70067300293 Mon Sep 17 00:00:00 2001 From: gamoutatsumi Date: Wed, 28 Jun 2023 10:55:57 +0900 Subject: [PATCH 6/7] get scrape targets from ActiveTargets --- pkg/gh/run.go | 39 +++++++++++++++++++------------------ pkg/metric/scrape_github.go | 21 +++++++++++--------- pkg/web/webhook.go | 13 +++++++++++-- 3 files changed, 43 insertions(+), 30 deletions(-) diff --git a/pkg/gh/run.go b/pkg/gh/run.go index 4f465e9..176b99f 100644 --- a/pkg/gh/run.go +++ b/pkg/gh/run.go @@ -3,12 +3,16 @@ package gh import ( "context" "fmt" + "sync" "time" "github.com/google/go-github/v47/github" "github.com/whywaita/myshoes/pkg/logger" ) +// ActiveTargets stores targets by recently received webhook +var ActiveTargets = sync.Map{} + func listRuns(ctx context.Context, client *github.Client, owner, repo string, opts *github.ListWorkflowRunsOptions) (*github.WorkflowRuns, *github.Response, error) { runs, resp, err := client.Actions.ListRepositoryWorkflowRuns(ctx, owner, repo, opts) if err != nil { @@ -18,38 +22,24 @@ func listRuns(ctx context.Context, client *github.Client, owner, repo string, op } // ListRuns get workflow runs that registered repository -func ListRuns(ctx context.Context, owner, repo, scope string) ([]*github.WorkflowRun, error) { +func ListRuns(ctx context.Context, owner, repo string) ([]*github.WorkflowRun, error) { if cachedRs, expiration, found := responseCache.GetWithExpiration(getRunsCacheKey(owner, repo)); found { if time.Until(expiration).Minutes() <= 1 { - go updateCache(context.Background(), owner, repo, scope) + go updateCache(context.Background(), owner, repo) } logger.Logf(true, "found workflow runs (cache hit: expiration: %s) in %s/%s", expiration.Format("2006/01/02 15:04:05.000 -0700"), owner, repo) return cachedRs.([]*github.WorkflowRun), nil } - go updateCache(context.Background(), owner, repo, scope) + go updateCache(context.Background(), owner, repo) return []*github.WorkflowRun{}, nil } + func getRunsCacheKey(owner, repo string) string { return fmt.Sprintf("runs-owner-%s-repo-%s", owner, repo) } -func updateCache(ctx context.Context, owner, repo, scope string) { - installationID, err := IsInstalledGitHubApp(ctx, scope) - if err != nil { - logger.Logf(false, "failed to list pending runs (%s/%s): %+v", owner, repo, err) - installationID, err = IsInstalledGitHubApp(ctx, owner) - if err != nil { - logger.Logf(false, "failed to list pending runs (%s): %+v", owner, err) - responseCache.Set(getRunsCacheKey(owner, repo), []*github.WorkflowRun{}, 1*time.Hour) - return - } - } - client, err := NewClientInstallation(installationID) - if err != nil { - logger.Logf(false, "failed to list workflow runs (%s/%s): %+v", owner, repo, err) - return - } +func updateCache(ctx context.Context, owner, repo string) { var opts = &github.ListWorkflowRunsOptions{ ListOptions: github.ListOptions{ Page: 0, @@ -57,6 +47,17 @@ func updateCache(ctx context.Context, owner, repo, scope string) { }, } logger.Logf(true, "get workflow runs of %s/%s, recent %d runs", owner, repo, opts.PerPage) + activeTarget, ok := ActiveTargets.Load(fmt.Sprintf("%s/%s", owner, repo)) + if !ok { + logger.Logf(true, "%s/%s is not active target", owner, repo) + return + } + installationID := activeTarget.(int64) + client, err := NewClientInstallation(installationID) + if err != nil { + logger.Logf(false, "failed to list workflow runs (%s/%s): %+v", owner, repo, err) + return + } runs, resp, err := listRuns(ctx, client, owner, repo, opts) if err != nil { logger.Logf(false, "failed to list workflow runs (%s/%s): %+v", owner, repo, err) diff --git a/pkg/metric/scrape_github.go b/pkg/metric/scrape_github.go index daa737d..ce5b402 100644 --- a/pkg/metric/scrape_github.go +++ b/pkg/metric/scrape_github.go @@ -17,7 +17,7 @@ var ( githubPendingRunsDesc = prometheus.NewDesc( prometheus.BuildFQName(namespace, githubName, "pending_runs"), "Number of pending runs", - []string{"target_id"}, nil, + []string{"target_id", "scope"}, nil, ) ) @@ -43,13 +43,16 @@ func (s ScraperGitHub) Scrape(ctx context.Context, ds datastore.Datastore, ch ch } func scrapePendingRuns(ctx context.Context, ds datastore.Datastore, ch chan<- prometheus.Metric) error { - targets, err := ds.ListTargets(ctx) - if err != nil { - return fmt.Errorf("failed to list pending runs: %w", err) - } + var targets []*datastore.Target + gh.ActiveTargets.Range(func(key, _ any) bool { + scope := key.(string) + target, _ := ds.GetTargetByScope(ctx, scope) + targets = append(targets, target) + return true + }) if len(targets) == 0 { ch <- prometheus.MustNewConstMetric( - githubPendingRunsDesc, prometheus.GaugeValue, 0, "none", + githubPendingRunsDesc, prometheus.GaugeValue, 0, "none", "none", ) return nil } @@ -60,7 +63,7 @@ func scrapePendingRuns(ctx context.Context, ds datastore.Datastore, ch chan<- pr if repo == "" { continue } - runs, err := gh.ListRuns(ctx, owner, repo, t.Scope) + runs, err := gh.ListRuns(ctx, owner, repo) if err != nil { logger.Logf(false, "failed to list pending runs: %+v", err) continue @@ -68,7 +71,7 @@ func scrapePendingRuns(ctx context.Context, ds datastore.Datastore, ch chan<- pr if len(runs) == 0 { ch <- prometheus.MustNewConstMetric( - githubPendingRunsDesc, prometheus.GaugeValue, 0, t.UUID.String(), + githubPendingRunsDesc, prometheus.GaugeValue, 0, t.UUID.String(), t.Scope, ) continue } @@ -80,7 +83,7 @@ func scrapePendingRuns(ctx context.Context, ds datastore.Datastore, ch chan<- pr } } ch <- prometheus.MustNewConstMetric( - githubPendingRunsDesc, prometheus.GaugeValue, pendings, t.UUID.String(), + githubPendingRunsDesc, prometheus.GaugeValue, pendings, t.UUID.String(), t.Scope, ) } return nil diff --git a/pkg/web/webhook.go b/pkg/web/webhook.go index be26bf6..b9b599e 100644 --- a/pkg/web/webhook.go +++ b/pkg/web/webhook.go @@ -78,7 +78,15 @@ func handleGitHubEvent(w http.ResponseWriter, r *http.Request, ds datastore.Data } } -func receivePingWebhook(_ context.Context, _ *github.PingEvent) error { +func storeActiveTarget(repoName string, installationID int64) { + gh.ActiveTargets.Store(repoName, installationID) +} + +func receivePingWebhook(_ context.Context, event *github.PingEvent) error { + repoName := event.GetRepo().GetFullName() + installationID := event.GetInstallation().GetID() + + storeActiveTarget(repoName, installationID) return nil } @@ -99,7 +107,7 @@ func receiveCheckRunWebhook(ctx context.Context, event *github.CheckRunEvent, ds if err != nil { return fmt.Errorf("failed to json.Marshal: %w", err) } - + storeActiveTarget(repoName, installationID) return processCheckRun(ctx, ds, repoName, repoURL, installationID, jb) } @@ -188,6 +196,7 @@ func receiveWorkflowJobWebhook(ctx context.Context, event *github.WorkflowJobEve return fmt.Errorf("failed to json.Marshal: %w", err) } + storeActiveTarget(repoName, installationID) return processCheckRun(ctx, ds, repoName, repoURL, installationID, jb) } From 3ad247f7dda078af568f314d77c3e8d0c763ac12 Mon Sep 17 00:00:00 2001 From: gamoutatsumi Date: Tue, 4 Jul 2023 11:09:25 +0900 Subject: [PATCH 7/7] re-run pending jobs --- go.mod | 3 +- go.sum | 6 ++- pkg/datastore/interface.go | 26 ++++++++++ pkg/gh/run.go | 11 ++++- pkg/gh/webhook.go | 12 ++++- pkg/metric/scrape_github.go | 45 +++++++----------- pkg/starter/starter.go | 95 +++++++++++++++++++++++++++++++++++++ pkg/web/webhook.go | 30 +----------- 8 files changed, 164 insertions(+), 64 deletions(-) diff --git a/go.mod b/go.mod index 606eebb..9c56d84 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( github.com/r3labs/diff/v2 v2.15.1 github.com/satori/go.uuid v1.2.0 goji.io v2.0.2+incompatible + golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c google.golang.org/grpc v1.45.0 @@ -66,7 +67,7 @@ require ( github.com/xeipuuv/gojsonschema v1.2.0 // indirect golang.org/x/crypto v0.0.0-20210817164053-32db794688a5 // indirect golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd // indirect - golang.org/x/sys v0.0.0-20220412211240-33da011f77ad // indirect + golang.org/x/sys v0.1.0 // indirect golang.org/x/text v0.3.7 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20200825200019-8632dd797987 // indirect diff --git a/go.sum b/go.sum index 8d11912..4e4d963 100644 --- a/go.sum +++ b/go.sum @@ -357,6 +357,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= +golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df h1:UA2aFVmmsIlefxMk29Dp2juaUSth8Pyn3Tq5Y5mJGME= +golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= @@ -480,8 +482,8 @@ golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20211116061358-0a5406a5449c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220412211240-33da011f77ad h1:ntjMns5wyP/fN65tdBD4g8J5w8n015+iIIs9rtjXkY0= -golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.1.0 h1:kunALQeHf1/185U1i0GOB/fy1IPRDDpuoOOqRReG57U= +golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/pkg/datastore/interface.go b/pkg/datastore/interface.go index e589513..5bb5c29 100644 --- a/pkg/datastore/interface.go +++ b/pkg/datastore/interface.go @@ -125,6 +125,32 @@ func UpdateTargetStatus(ctx context.Context, ds Datastore, targetID uuid.UUID, n return nil } +// SearchRepo search datastore.Target from datastore +// format of repo is "orgs/repos" +func SearchRepo(ctx context.Context, ds Datastore, repo string) (*Target, error) { + sep := strings.Split(repo, "/") + if len(sep) != 2 { + return nil, fmt.Errorf("incorrect repo format ex: orgs/repo") + } + + // use repo scope if set repo + repoTarget, err := ds.GetTargetByScope(ctx, repo) + if err == nil && repoTarget.CanReceiveJob() { + return repoTarget, nil + } else if err != nil && err != ErrNotFound { + return nil, fmt.Errorf("failed to get target from repo: %w", err) + } + + // repo is not found, so search org target + org := sep[0] + orgTarget, err := ds.GetTargetByScope(ctx, org) + if err != nil || !orgTarget.CanReceiveJob() { + return nil, fmt.Errorf("failed to get target from organization: %w", err) + } + + return orgTarget, nil +} + // TargetStatus is status for target type TargetStatus string diff --git a/pkg/gh/run.go b/pkg/gh/run.go index 176b99f..26f4d5f 100644 --- a/pkg/gh/run.go +++ b/pkg/gh/run.go @@ -13,6 +13,9 @@ import ( // ActiveTargets stores targets by recently received webhook var ActiveTargets = sync.Map{} +// PendingRuns stores queued / pending workflow runs +var PendingRuns = sync.Map{} + func listRuns(ctx context.Context, client *github.Client, owner, repo string, opts *github.ListWorkflowRunsOptions) (*github.WorkflowRuns, *github.Response, error) { runs, resp, err := client.Actions.ListRepositoryWorkflowRuns(ctx, owner, repo, opts) if err != nil { @@ -22,7 +25,7 @@ func listRuns(ctx context.Context, client *github.Client, owner, repo string, op } // ListRuns get workflow runs that registered repository -func ListRuns(ctx context.Context, owner, repo string) ([]*github.WorkflowRun, error) { +func ListRuns(owner, repo string) ([]*github.WorkflowRun, error) { if cachedRs, expiration, found := responseCache.GetWithExpiration(getRunsCacheKey(owner, repo)); found { if time.Until(expiration).Minutes() <= 1 { go updateCache(context.Background(), owner, repo) @@ -34,11 +37,15 @@ func ListRuns(ctx context.Context, owner, repo string) ([]*github.WorkflowRun, e return []*github.WorkflowRun{}, nil } - func getRunsCacheKey(owner, repo string) string { return fmt.Sprintf("runs-owner-%s-repo-%s", owner, repo) } +// ClearRunsCache clear github workflow run caches +func ClearRunsCache(owner, repo string) { + responseCache.Delete(getRunsCacheKey(owner, repo)) +} + func updateCache(ctx context.Context, owner, repo string) { var opts = &github.ListWorkflowRunsOptions{ ListOptions: github.ListOptions{ diff --git a/pkg/gh/webhook.go b/pkg/gh/webhook.go index 5656c80..7800ae0 100644 --- a/pkg/gh/webhook.go +++ b/pkg/gh/webhook.go @@ -16,9 +16,15 @@ func parseEventJSON(in []byte) (interface{}, error) { return checkRun, nil } - var workflowJob *github.WorkflowJobEvent + var workflowJobEvent *github.WorkflowJobEvent + err = json.Unmarshal(in, &workflowJobEvent) + if err == nil && workflowJobEvent.GetWorkflowJob() != nil { + return workflowJobEvent, nil + } + + var workflowJob *github.WorkflowJob err = json.Unmarshal(in, &workflowJob) - if err == nil && workflowJob.GetWorkflowJob() != nil { + if err == nil && workflowJob != nil { return workflowJob, nil } @@ -36,6 +42,8 @@ func ExtractRunsOnLabels(in []byte) ([]string, error) { case *github.WorkflowJobEvent: // workflow_job has labels, can extract labels return t.GetWorkflowJob().Labels, nil + case *github.WorkflowJob: + return t.Labels, nil } return []string{}, nil diff --git a/pkg/metric/scrape_github.go b/pkg/metric/scrape_github.go index ce5b402..0f6aae5 100644 --- a/pkg/metric/scrape_github.go +++ b/pkg/metric/scrape_github.go @@ -43,48 +43,35 @@ func (s ScraperGitHub) Scrape(ctx context.Context, ds datastore.Datastore, ch ch } func scrapePendingRuns(ctx context.Context, ds datastore.Datastore, ch chan<- prometheus.Metric) error { - var targets []*datastore.Target - gh.ActiveTargets.Range(func(key, _ any) bool { - scope := key.(string) - target, _ := ds.GetTargetByScope(ctx, scope) - targets = append(targets, target) - return true - }) - if len(targets) == 0 { - ch <- prometheus.MustNewConstMetric( - githubPendingRunsDesc, prometheus.GaugeValue, 0, "none", "none", - ) - return nil - } - - for _, t := range targets { - owner, repo := t.OwnerRepo() + gh.ActiveTargets.Range(func(key, value any) bool { var pendings float64 + scope := key.(string) + installationID := value.(int64) + target, err := ds.GetTargetByScope(ctx, scope) + if err != nil { + logger.Logf(false, "failed to get target by scope (%s): %+v", scope, err) + return true + } + owner, repo := target.OwnerRepo() if repo == "" { - continue + return true } - runs, err := gh.ListRuns(ctx, owner, repo) + runs, err := gh.ListRuns(owner, repo) if err != nil { logger.Logf(false, "failed to list pending runs: %+v", err) - continue + return true } - if len(runs) == 0 { - ch <- prometheus.MustNewConstMetric( - githubPendingRunsDesc, prometheus.GaugeValue, 0, t.UUID.String(), t.Scope, - ) - continue - } for _, r := range runs { if r.GetStatus() == "queued" || r.GetStatus() == "pending" { if time.Since(r.CreatedAt.Time).Minutes() >= 30 { pendings++ + gh.PendingRuns.Store(installationID, r) } } } - ch <- prometheus.MustNewConstMetric( - githubPendingRunsDesc, prometheus.GaugeValue, pendings, t.UUID.String(), t.Scope, - ) - } + ch <- prometheus.MustNewConstMetric(githubPendingRunsDesc, prometheus.GaugeValue, pendings, target.UUID.String(), target.Scope) + return true + }) return nil } diff --git a/pkg/starter/starter.go b/pkg/starter/starter.go index 928d272..1bf4366 100644 --- a/pkg/starter/starter.go +++ b/pkg/starter/starter.go @@ -3,14 +3,19 @@ package starter import ( "context" "database/sql" + "encoding/json" "errors" "fmt" + "net/url" "sync" "time" + "golang.org/x/exp/slices" "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" + "github.com/google/go-github/v47/github" + uuid "github.com/satori/go.uuid" "github.com/whywaita/myshoes/internal/config" "github.com/whywaita/myshoes/pkg/datastore" "github.com/whywaita/myshoes/pkg/gh" @@ -27,6 +32,8 @@ var ( CountWaiting = 0 inProgress = sync.Map{} + + reQueuedJobs = sync.Map{} ) // Starter is dispatcher for running job @@ -52,6 +59,11 @@ func (s *Starter) Loop(ctx context.Context) error { eg, ctx := errgroup.WithContext(ctx) + eg.Go(func() error { + s.reRunWorkflow(ctx) + return nil + }) + eg.Go(func() error { if err := s.run(ctx, ch); err != nil { return fmt.Errorf("faied to start processor: %w", err) @@ -314,3 +326,86 @@ func (s *Starter) checkRegisteredRunner(ctx context.Context, runnerName string, } } } + +func (s *Starter) reRunWorkflow(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + default: + gh.PendingRuns.Range(func(key, value any) bool { + installationID := key.(int64) + run := value.(*github.WorkflowRun) + client, err := gh.NewClientInstallation(installationID) + if err != nil { + logger.Logf(false, "failed to create GitHub client: %+v", err) + return true + } + + owner := run.GetRepository().GetOwner().GetLogin() + repo := run.GetRepository().GetName() + repoName := run.GetRepository().GetFullName() + + jobs, _, err := client.Actions.ListWorkflowJobs(ctx, owner, repo, run.GetID(), &github.ListWorkflowJobsOptions{ + Filter: "latest", + }) + if err != nil { + logger.Logf(false, "failed to get workflow jobs: %+v", err) + return true + } + + for _, j := range jobs.Jobs { + if value, ok := reQueuedJobs.Load(j.GetID()); ok { + expired := value.(time.Time) + if time.Until(expired) <= 0 { + reQueuedJobs.Delete(run.GetID()) + } + continue + } + if !slices.Contains(j.Labels, "self-hosted") && !slices.Contains(j.Labels, "myshoes") { + continue + } + if j.GetStatus() == "queued" { + repoURL := run.GetRepository().GetHTMLURL() + u, err := url.Parse(repoURL) + if err != nil { + logger.Logf(false, "failed to parse repository url from event: %+v", err) + continue + } + var domain string + gheDomain := "" + if u.Host != "github.com" { + gheDomain = fmt.Sprintf("%s://%s", u.Scheme, u.Host) + domain = gheDomain + } else { + domain = "https://github.com" + } + + logger.Logf(false, "receive webhook repository: %s/%s", domain, repoName) + target, err := datastore.SearchRepo(ctx, s.ds, repoName) + if err != nil { + logger.Logf(false, "failed to search registered target: %+v", err) + continue + } + jobID := uuid.NewV4() + jobJSON, _ := json.Marshal(j) + job := datastore.Job{ + UUID: jobID, + TargetID: target.UUID, + Repository: repoName, + CheckEventJSON: string(jobJSON), + } + if err := s.ds.EnqueueJob(ctx, job); err != nil { + logger.Logf(false, "failed to enqueue job: %+v", err) + continue + } + reQueuedJobs.Store(j.GetID(), time.Now().Add(30*time.Minute)) + } + } + gh.PendingRuns.Delete(installationID) + gh.ClearRunsCache(owner, repo) + return true + }) + } + } +} diff --git a/pkg/web/webhook.go b/pkg/web/webhook.go index b9b599e..97f9ed3 100644 --- a/pkg/web/webhook.go +++ b/pkg/web/webhook.go @@ -133,14 +133,14 @@ func processCheckRun(ctx context.Context, ds datastore.Datastore, repoName, repo } logger.Logf(false, "receive webhook repository: %s/%s", domain, repoName) - target, err := searchRepo(ctx, ds, repoName) + target, err := datastore.SearchRepo(ctx, ds, repoName) if err != nil { return fmt.Errorf("failed to search registered target: %w", err) } if !target.CanReceiveJob() { // do nothing if status is cannot receive - logger.Logf(false, "%s/%s is %s now, do nothing", target.Status, domain, repoName) + logger.Logf(false, "%s/%s is %s now, do nothing", domain, repoName, target.Status) return nil } @@ -208,29 +208,3 @@ func isRequestedMyshoesLabel(labels []string) bool { } return false } - -// searchRepo search datastore.Target from datastore -// format of repo is "orgs/repos" -func searchRepo(ctx context.Context, ds datastore.Datastore, repo string) (*datastore.Target, error) { - sep := strings.Split(repo, "/") - if len(sep) != 2 { - return nil, fmt.Errorf("incorrect repo format ex: orgs/repo") - } - - // use repo scope if set repo - repoTarget, err := ds.GetTargetByScope(ctx, repo) - if err == nil && repoTarget.CanReceiveJob() { - return repoTarget, nil - } else if err != nil && err != datastore.ErrNotFound { - return nil, fmt.Errorf("failed to get target from repo: %w", err) - } - - // repo is not found, so search org target - org := sep[0] - orgTarget, err := ds.GetTargetByScope(ctx, org) - if err != nil || !orgTarget.CanReceiveJob() { - return nil, fmt.Errorf("failed to get target from organization: %w", err) - } - - return orgTarget, nil -}