From 3ad247f7dda078af568f314d77c3e8d0c763ac12 Mon Sep 17 00:00:00 2001 From: gamoutatsumi Date: Tue, 4 Jul 2023 11:09:25 +0900 Subject: [PATCH] re-run pending jobs --- go.mod | 3 +- go.sum | 6 ++- pkg/datastore/interface.go | 26 ++++++++++ pkg/gh/run.go | 11 ++++- pkg/gh/webhook.go | 12 ++++- pkg/metric/scrape_github.go | 45 +++++++----------- pkg/starter/starter.go | 95 +++++++++++++++++++++++++++++++++++++ pkg/web/webhook.go | 30 +----------- 8 files changed, 164 insertions(+), 64 deletions(-) diff --git a/go.mod b/go.mod index 606eebb..9c56d84 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( github.com/r3labs/diff/v2 v2.15.1 github.com/satori/go.uuid v1.2.0 goji.io v2.0.2+incompatible + golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c google.golang.org/grpc v1.45.0 @@ -66,7 +67,7 @@ require ( github.com/xeipuuv/gojsonschema v1.2.0 // indirect golang.org/x/crypto v0.0.0-20210817164053-32db794688a5 // indirect golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd // indirect - golang.org/x/sys v0.0.0-20220412211240-33da011f77ad // indirect + golang.org/x/sys v0.1.0 // indirect golang.org/x/text v0.3.7 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20200825200019-8632dd797987 // indirect diff --git a/go.sum b/go.sum index 8d11912..4e4d963 100644 --- a/go.sum +++ b/go.sum @@ -357,6 +357,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= +golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df h1:UA2aFVmmsIlefxMk29Dp2juaUSth8Pyn3Tq5Y5mJGME= +golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= @@ -480,8 +482,8 @@ golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20211116061358-0a5406a5449c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220412211240-33da011f77ad h1:ntjMns5wyP/fN65tdBD4g8J5w8n015+iIIs9rtjXkY0= -golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.1.0 h1:kunALQeHf1/185U1i0GOB/fy1IPRDDpuoOOqRReG57U= +golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/pkg/datastore/interface.go b/pkg/datastore/interface.go index e589513..5bb5c29 100644 --- a/pkg/datastore/interface.go +++ b/pkg/datastore/interface.go @@ -125,6 +125,32 @@ func UpdateTargetStatus(ctx context.Context, ds Datastore, targetID uuid.UUID, n return nil } +// SearchRepo search datastore.Target from datastore +// format of repo is "orgs/repos" +func SearchRepo(ctx context.Context, ds Datastore, repo string) (*Target, error) { + sep := strings.Split(repo, "/") + if len(sep) != 2 { + return nil, fmt.Errorf("incorrect repo format ex: orgs/repo") + } + + // use repo scope if set repo + repoTarget, err := ds.GetTargetByScope(ctx, repo) + if err == nil && repoTarget.CanReceiveJob() { + return repoTarget, nil + } else if err != nil && err != ErrNotFound { + return nil, fmt.Errorf("failed to get target from repo: %w", err) + } + + // repo is not found, so search org target + org := sep[0] + orgTarget, err := ds.GetTargetByScope(ctx, org) + if err != nil || !orgTarget.CanReceiveJob() { + return nil, fmt.Errorf("failed to get target from organization: %w", err) + } + + return orgTarget, nil +} + // TargetStatus is status for target type TargetStatus string diff --git a/pkg/gh/run.go b/pkg/gh/run.go index 176b99f..26f4d5f 100644 --- a/pkg/gh/run.go +++ b/pkg/gh/run.go @@ -13,6 +13,9 @@ import ( // ActiveTargets stores targets by recently received webhook var ActiveTargets = sync.Map{} +// PendingRuns stores queued / pending workflow runs +var PendingRuns = sync.Map{} + func listRuns(ctx context.Context, client *github.Client, owner, repo string, opts *github.ListWorkflowRunsOptions) (*github.WorkflowRuns, *github.Response, error) { runs, resp, err := client.Actions.ListRepositoryWorkflowRuns(ctx, owner, repo, opts) if err != nil { @@ -22,7 +25,7 @@ func listRuns(ctx context.Context, client *github.Client, owner, repo string, op } // ListRuns get workflow runs that registered repository -func ListRuns(ctx context.Context, owner, repo string) ([]*github.WorkflowRun, error) { +func ListRuns(owner, repo string) ([]*github.WorkflowRun, error) { if cachedRs, expiration, found := responseCache.GetWithExpiration(getRunsCacheKey(owner, repo)); found { if time.Until(expiration).Minutes() <= 1 { go updateCache(context.Background(), owner, repo) @@ -34,11 +37,15 @@ func ListRuns(ctx context.Context, owner, repo string) ([]*github.WorkflowRun, e return []*github.WorkflowRun{}, nil } - func getRunsCacheKey(owner, repo string) string { return fmt.Sprintf("runs-owner-%s-repo-%s", owner, repo) } +// ClearRunsCache clear github workflow run caches +func ClearRunsCache(owner, repo string) { + responseCache.Delete(getRunsCacheKey(owner, repo)) +} + func updateCache(ctx context.Context, owner, repo string) { var opts = &github.ListWorkflowRunsOptions{ ListOptions: github.ListOptions{ diff --git a/pkg/gh/webhook.go b/pkg/gh/webhook.go index 5656c80..7800ae0 100644 --- a/pkg/gh/webhook.go +++ b/pkg/gh/webhook.go @@ -16,9 +16,15 @@ func parseEventJSON(in []byte) (interface{}, error) { return checkRun, nil } - var workflowJob *github.WorkflowJobEvent + var workflowJobEvent *github.WorkflowJobEvent + err = json.Unmarshal(in, &workflowJobEvent) + if err == nil && workflowJobEvent.GetWorkflowJob() != nil { + return workflowJobEvent, nil + } + + var workflowJob *github.WorkflowJob err = json.Unmarshal(in, &workflowJob) - if err == nil && workflowJob.GetWorkflowJob() != nil { + if err == nil && workflowJob != nil { return workflowJob, nil } @@ -36,6 +42,8 @@ func ExtractRunsOnLabels(in []byte) ([]string, error) { case *github.WorkflowJobEvent: // workflow_job has labels, can extract labels return t.GetWorkflowJob().Labels, nil + case *github.WorkflowJob: + return t.Labels, nil } return []string{}, nil diff --git a/pkg/metric/scrape_github.go b/pkg/metric/scrape_github.go index ce5b402..0f6aae5 100644 --- a/pkg/metric/scrape_github.go +++ b/pkg/metric/scrape_github.go @@ -43,48 +43,35 @@ func (s ScraperGitHub) Scrape(ctx context.Context, ds datastore.Datastore, ch ch } func scrapePendingRuns(ctx context.Context, ds datastore.Datastore, ch chan<- prometheus.Metric) error { - var targets []*datastore.Target - gh.ActiveTargets.Range(func(key, _ any) bool { - scope := key.(string) - target, _ := ds.GetTargetByScope(ctx, scope) - targets = append(targets, target) - return true - }) - if len(targets) == 0 { - ch <- prometheus.MustNewConstMetric( - githubPendingRunsDesc, prometheus.GaugeValue, 0, "none", "none", - ) - return nil - } - - for _, t := range targets { - owner, repo := t.OwnerRepo() + gh.ActiveTargets.Range(func(key, value any) bool { var pendings float64 + scope := key.(string) + installationID := value.(int64) + target, err := ds.GetTargetByScope(ctx, scope) + if err != nil { + logger.Logf(false, "failed to get target by scope (%s): %+v", scope, err) + return true + } + owner, repo := target.OwnerRepo() if repo == "" { - continue + return true } - runs, err := gh.ListRuns(ctx, owner, repo) + runs, err := gh.ListRuns(owner, repo) if err != nil { logger.Logf(false, "failed to list pending runs: %+v", err) - continue + return true } - if len(runs) == 0 { - ch <- prometheus.MustNewConstMetric( - githubPendingRunsDesc, prometheus.GaugeValue, 0, t.UUID.String(), t.Scope, - ) - continue - } for _, r := range runs { if r.GetStatus() == "queued" || r.GetStatus() == "pending" { if time.Since(r.CreatedAt.Time).Minutes() >= 30 { pendings++ + gh.PendingRuns.Store(installationID, r) } } } - ch <- prometheus.MustNewConstMetric( - githubPendingRunsDesc, prometheus.GaugeValue, pendings, t.UUID.String(), t.Scope, - ) - } + ch <- prometheus.MustNewConstMetric(githubPendingRunsDesc, prometheus.GaugeValue, pendings, target.UUID.String(), target.Scope) + return true + }) return nil } diff --git a/pkg/starter/starter.go b/pkg/starter/starter.go index 928d272..1bf4366 100644 --- a/pkg/starter/starter.go +++ b/pkg/starter/starter.go @@ -3,14 +3,19 @@ package starter import ( "context" "database/sql" + "encoding/json" "errors" "fmt" + "net/url" "sync" "time" + "golang.org/x/exp/slices" "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" + "github.com/google/go-github/v47/github" + uuid "github.com/satori/go.uuid" "github.com/whywaita/myshoes/internal/config" "github.com/whywaita/myshoes/pkg/datastore" "github.com/whywaita/myshoes/pkg/gh" @@ -27,6 +32,8 @@ var ( CountWaiting = 0 inProgress = sync.Map{} + + reQueuedJobs = sync.Map{} ) // Starter is dispatcher for running job @@ -52,6 +59,11 @@ func (s *Starter) Loop(ctx context.Context) error { eg, ctx := errgroup.WithContext(ctx) + eg.Go(func() error { + s.reRunWorkflow(ctx) + return nil + }) + eg.Go(func() error { if err := s.run(ctx, ch); err != nil { return fmt.Errorf("faied to start processor: %w", err) @@ -314,3 +326,86 @@ func (s *Starter) checkRegisteredRunner(ctx context.Context, runnerName string, } } } + +func (s *Starter) reRunWorkflow(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + default: + gh.PendingRuns.Range(func(key, value any) bool { + installationID := key.(int64) + run := value.(*github.WorkflowRun) + client, err := gh.NewClientInstallation(installationID) + if err != nil { + logger.Logf(false, "failed to create GitHub client: %+v", err) + return true + } + + owner := run.GetRepository().GetOwner().GetLogin() + repo := run.GetRepository().GetName() + repoName := run.GetRepository().GetFullName() + + jobs, _, err := client.Actions.ListWorkflowJobs(ctx, owner, repo, run.GetID(), &github.ListWorkflowJobsOptions{ + Filter: "latest", + }) + if err != nil { + logger.Logf(false, "failed to get workflow jobs: %+v", err) + return true + } + + for _, j := range jobs.Jobs { + if value, ok := reQueuedJobs.Load(j.GetID()); ok { + expired := value.(time.Time) + if time.Until(expired) <= 0 { + reQueuedJobs.Delete(run.GetID()) + } + continue + } + if !slices.Contains(j.Labels, "self-hosted") && !slices.Contains(j.Labels, "myshoes") { + continue + } + if j.GetStatus() == "queued" { + repoURL := run.GetRepository().GetHTMLURL() + u, err := url.Parse(repoURL) + if err != nil { + logger.Logf(false, "failed to parse repository url from event: %+v", err) + continue + } + var domain string + gheDomain := "" + if u.Host != "github.com" { + gheDomain = fmt.Sprintf("%s://%s", u.Scheme, u.Host) + domain = gheDomain + } else { + domain = "https://github.com" + } + + logger.Logf(false, "receive webhook repository: %s/%s", domain, repoName) + target, err := datastore.SearchRepo(ctx, s.ds, repoName) + if err != nil { + logger.Logf(false, "failed to search registered target: %+v", err) + continue + } + jobID := uuid.NewV4() + jobJSON, _ := json.Marshal(j) + job := datastore.Job{ + UUID: jobID, + TargetID: target.UUID, + Repository: repoName, + CheckEventJSON: string(jobJSON), + } + if err := s.ds.EnqueueJob(ctx, job); err != nil { + logger.Logf(false, "failed to enqueue job: %+v", err) + continue + } + reQueuedJobs.Store(j.GetID(), time.Now().Add(30*time.Minute)) + } + } + gh.PendingRuns.Delete(installationID) + gh.ClearRunsCache(owner, repo) + return true + }) + } + } +} diff --git a/pkg/web/webhook.go b/pkg/web/webhook.go index b9b599e..97f9ed3 100644 --- a/pkg/web/webhook.go +++ b/pkg/web/webhook.go @@ -133,14 +133,14 @@ func processCheckRun(ctx context.Context, ds datastore.Datastore, repoName, repo } logger.Logf(false, "receive webhook repository: %s/%s", domain, repoName) - target, err := searchRepo(ctx, ds, repoName) + target, err := datastore.SearchRepo(ctx, ds, repoName) if err != nil { return fmt.Errorf("failed to search registered target: %w", err) } if !target.CanReceiveJob() { // do nothing if status is cannot receive - logger.Logf(false, "%s/%s is %s now, do nothing", target.Status, domain, repoName) + logger.Logf(false, "%s/%s is %s now, do nothing", domain, repoName, target.Status) return nil } @@ -208,29 +208,3 @@ func isRequestedMyshoesLabel(labels []string) bool { } return false } - -// searchRepo search datastore.Target from datastore -// format of repo is "orgs/repos" -func searchRepo(ctx context.Context, ds datastore.Datastore, repo string) (*datastore.Target, error) { - sep := strings.Split(repo, "/") - if len(sep) != 2 { - return nil, fmt.Errorf("incorrect repo format ex: orgs/repo") - } - - // use repo scope if set repo - repoTarget, err := ds.GetTargetByScope(ctx, repo) - if err == nil && repoTarget.CanReceiveJob() { - return repoTarget, nil - } else if err != nil && err != datastore.ErrNotFound { - return nil, fmt.Errorf("failed to get target from repo: %w", err) - } - - // repo is not found, so search org target - org := sep[0] - orgTarget, err := ds.GetTargetByScope(ctx, org) - if err != nil || !orgTarget.CanReceiveJob() { - return nil, fmt.Errorf("failed to get target from organization: %w", err) - } - - return orgTarget, nil -}