Skip to content

Commit

Permalink
Merge pull request #173 from gamoutatsumi/feat/recover-pending-job
Browse files Browse the repository at this point in the history
Add collecting metrics for pending (queued) workflow runs
  • Loading branch information
whywaita authored Aug 2, 2023
2 parents cc55714 + 3ad247f commit 2e1b8d8
Show file tree
Hide file tree
Showing 9 changed files with 304 additions and 35 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
github.com/r3labs/diff/v2 v2.15.1
github.com/satori/go.uuid v1.2.0
goji.io v2.0.2+incompatible
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df
golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
google.golang.org/grpc v1.45.0
Expand Down Expand Up @@ -66,7 +67,7 @@ require (
github.com/xeipuuv/gojsonschema v1.2.0 // indirect
golang.org/x/crypto v0.0.0-20210817164053-32db794688a5 // indirect
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd // indirect
golang.org/x/sys v0.0.0-20220412211240-33da011f77ad // indirect
golang.org/x/sys v0.1.0 // indirect
golang.org/x/text v0.3.7 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20200825200019-8632dd797987 // indirect
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0
golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df h1:UA2aFVmmsIlefxMk29Dp2juaUSth8Pyn3Tq5Y5mJGME=
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
Expand Down Expand Up @@ -480,8 +482,8 @@ golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20211116061358-0a5406a5449c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220412211240-33da011f77ad h1:ntjMns5wyP/fN65tdBD4g8J5w8n015+iIIs9rtjXkY0=
golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.1.0 h1:kunALQeHf1/185U1i0GOB/fy1IPRDDpuoOOqRReG57U=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down
26 changes: 26 additions & 0 deletions pkg/datastore/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,32 @@ func UpdateTargetStatus(ctx context.Context, ds Datastore, targetID uuid.UUID, n
return nil
}

// SearchRepo search datastore.Target from datastore
// format of repo is "orgs/repos"
func SearchRepo(ctx context.Context, ds Datastore, repo string) (*Target, error) {
sep := strings.Split(repo, "/")
if len(sep) != 2 {
return nil, fmt.Errorf("incorrect repo format ex: orgs/repo")
}

// use repo scope if set repo
repoTarget, err := ds.GetTargetByScope(ctx, repo)
if err == nil && repoTarget.CanReceiveJob() {
return repoTarget, nil
} else if err != nil && err != ErrNotFound {
return nil, fmt.Errorf("failed to get target from repo: %w", err)
}

// repo is not found, so search org target
org := sep[0]
orgTarget, err := ds.GetTargetByScope(ctx, org)
if err != nil || !orgTarget.CanReceiveJob() {
return nil, fmt.Errorf("failed to get target from organization: %w", err)
}

return orgTarget, nil
}

// TargetStatus is status for target
type TargetStatus string

Expand Down
76 changes: 76 additions & 0 deletions pkg/gh/run.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package gh

import (
"context"
"fmt"
"sync"
"time"

"github.com/google/go-github/v47/github"
"github.com/whywaita/myshoes/pkg/logger"
)

// ActiveTargets stores targets by recently received webhook
var ActiveTargets = sync.Map{}

// PendingRuns stores queued / pending workflow runs
var PendingRuns = sync.Map{}

func listRuns(ctx context.Context, client *github.Client, owner, repo string, opts *github.ListWorkflowRunsOptions) (*github.WorkflowRuns, *github.Response, error) {
runs, resp, err := client.Actions.ListRepositoryWorkflowRuns(ctx, owner, repo, opts)
if err != nil {
return nil, nil, fmt.Errorf("failed to list workflow runs: %w", err)
}
return runs, resp, nil
}

// ListRuns get workflow runs that registered repository
func ListRuns(owner, repo string) ([]*github.WorkflowRun, error) {
if cachedRs, expiration, found := responseCache.GetWithExpiration(getRunsCacheKey(owner, repo)); found {
if time.Until(expiration).Minutes() <= 1 {
go updateCache(context.Background(), owner, repo)
}
logger.Logf(true, "found workflow runs (cache hit: expiration: %s) in %s/%s", expiration.Format("2006/01/02 15:04:05.000 -0700"), owner, repo)
return cachedRs.([]*github.WorkflowRun), nil
}
go updateCache(context.Background(), owner, repo)
return []*github.WorkflowRun{}, nil
}

func getRunsCacheKey(owner, repo string) string {
return fmt.Sprintf("runs-owner-%s-repo-%s", owner, repo)
}

// ClearRunsCache clear github workflow run caches
func ClearRunsCache(owner, repo string) {
responseCache.Delete(getRunsCacheKey(owner, repo))
}

func updateCache(ctx context.Context, owner, repo string) {
var opts = &github.ListWorkflowRunsOptions{
ListOptions: github.ListOptions{
Page: 0,
PerPage: 10,
},
}
logger.Logf(true, "get workflow runs of %s/%s, recent %d runs", owner, repo, opts.PerPage)
activeTarget, ok := ActiveTargets.Load(fmt.Sprintf("%s/%s", owner, repo))
if !ok {
logger.Logf(true, "%s/%s is not active target", owner, repo)
return
}
installationID := activeTarget.(int64)
client, err := NewClientInstallation(installationID)
if err != nil {
logger.Logf(false, "failed to list workflow runs (%s/%s): %+v", owner, repo, err)
return
}
runs, resp, err := listRuns(ctx, client, owner, repo, opts)
if err != nil {
logger.Logf(false, "failed to list workflow runs (%s/%s): %+v", owner, repo, err)
return
}
storeRateLimit(getRateLimitKey(owner, repo), resp.Rate)
responseCache.Set(getRunsCacheKey(owner, repo), runs.WorkflowRuns, 15*time.Minute)
logger.Logf(true, "found %d workflow runs in %s/%s", len(runs.WorkflowRuns), owner, repo)
}
12 changes: 10 additions & 2 deletions pkg/gh/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,15 @@ func parseEventJSON(in []byte) (interface{}, error) {
return checkRun, nil
}

var workflowJob *github.WorkflowJobEvent
var workflowJobEvent *github.WorkflowJobEvent
err = json.Unmarshal(in, &workflowJobEvent)
if err == nil && workflowJobEvent.GetWorkflowJob() != nil {
return workflowJobEvent, nil
}

var workflowJob *github.WorkflowJob
err = json.Unmarshal(in, &workflowJob)
if err == nil && workflowJob.GetWorkflowJob() != nil {
if err == nil && workflowJob != nil {
return workflowJob, nil
}

Expand All @@ -36,6 +42,8 @@ func ExtractRunsOnLabels(in []byte) ([]string, error) {
case *github.WorkflowJobEvent:
// workflow_job has labels, can extract labels
return t.GetWorkflowJob().Labels, nil
case *github.WorkflowJob:
return t.Labels, nil
}

return []string{}, nil
Expand Down
1 change: 1 addition & 0 deletions pkg/metric/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func NewScrapers() []Scraper {
return []Scraper{
ScraperDatastore{},
ScraperMemory{},
ScraperGitHub{},
}
}

Expand Down
77 changes: 77 additions & 0 deletions pkg/metric/scrape_github.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package metric

import (
"context"
"fmt"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/whywaita/myshoes/pkg/datastore"
"github.com/whywaita/myshoes/pkg/gh"
"github.com/whywaita/myshoes/pkg/logger"
)

const githubName = "github"

var (
githubPendingRunsDesc = prometheus.NewDesc(
prometheus.BuildFQName(namespace, githubName, "pending_runs"),
"Number of pending runs",
[]string{"target_id", "scope"}, nil,
)
)

// ScraperGitHub is scraper implement for GitHub
type ScraperGitHub struct{}

// Name return name
func (ScraperGitHub) Name() string {
return githubName
}

// Help return help
func (ScraperGitHub) Help() string {
return "Collect from GitHub"
}

// Scrape scrape metrics
func (s ScraperGitHub) Scrape(ctx context.Context, ds datastore.Datastore, ch chan<- prometheus.Metric) error {
if err := scrapePendingRuns(ctx, ds, ch); err != nil {
return fmt.Errorf("failed to scrape pending runs: %w", err)
}
return nil
}

func scrapePendingRuns(ctx context.Context, ds datastore.Datastore, ch chan<- prometheus.Metric) error {
gh.ActiveTargets.Range(func(key, value any) bool {
var pendings float64
scope := key.(string)
installationID := value.(int64)
target, err := ds.GetTargetByScope(ctx, scope)
if err != nil {
logger.Logf(false, "failed to get target by scope (%s): %+v", scope, err)
return true
}
owner, repo := target.OwnerRepo()
if repo == "" {
return true
}
runs, err := gh.ListRuns(owner, repo)
if err != nil {
logger.Logf(false, "failed to list pending runs: %+v", err)
return true
}

for _, r := range runs {
if r.GetStatus() == "queued" || r.GetStatus() == "pending" {
if time.Since(r.CreatedAt.Time).Minutes() >= 30 {
pendings++
gh.PendingRuns.Store(installationID, r)
}
}
}
ch <- prometheus.MustNewConstMetric(githubPendingRunsDesc, prometheus.GaugeValue, pendings, target.UUID.String(), target.Scope)
return true
})
return nil
}
95 changes: 95 additions & 0 deletions pkg/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,19 @@ package starter
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"net/url"
"sync"
"time"

"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"

"github.com/google/go-github/v47/github"
uuid "github.com/satori/go.uuid"
"github.com/whywaita/myshoes/internal/config"
"github.com/whywaita/myshoes/pkg/datastore"
"github.com/whywaita/myshoes/pkg/gh"
Expand All @@ -27,6 +32,8 @@ var (
CountWaiting = 0

inProgress = sync.Map{}

reQueuedJobs = sync.Map{}
)

// Starter is dispatcher for running job
Expand All @@ -52,6 +59,11 @@ func (s *Starter) Loop(ctx context.Context) error {

eg, ctx := errgroup.WithContext(ctx)

eg.Go(func() error {
s.reRunWorkflow(ctx)
return nil
})

eg.Go(func() error {
if err := s.run(ctx, ch); err != nil {
return fmt.Errorf("faied to start processor: %w", err)
Expand Down Expand Up @@ -314,3 +326,86 @@ func (s *Starter) checkRegisteredRunner(ctx context.Context, runnerName string,
}
}
}

func (s *Starter) reRunWorkflow(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
default:
gh.PendingRuns.Range(func(key, value any) bool {
installationID := key.(int64)
run := value.(*github.WorkflowRun)
client, err := gh.NewClientInstallation(installationID)
if err != nil {
logger.Logf(false, "failed to create GitHub client: %+v", err)
return true
}

owner := run.GetRepository().GetOwner().GetLogin()
repo := run.GetRepository().GetName()
repoName := run.GetRepository().GetFullName()

jobs, _, err := client.Actions.ListWorkflowJobs(ctx, owner, repo, run.GetID(), &github.ListWorkflowJobsOptions{
Filter: "latest",
})
if err != nil {
logger.Logf(false, "failed to get workflow jobs: %+v", err)
return true
}

for _, j := range jobs.Jobs {
if value, ok := reQueuedJobs.Load(j.GetID()); ok {
expired := value.(time.Time)
if time.Until(expired) <= 0 {
reQueuedJobs.Delete(run.GetID())
}
continue
}
if !slices.Contains(j.Labels, "self-hosted") && !slices.Contains(j.Labels, "myshoes") {
continue
}
if j.GetStatus() == "queued" {
repoURL := run.GetRepository().GetHTMLURL()
u, err := url.Parse(repoURL)
if err != nil {
logger.Logf(false, "failed to parse repository url from event: %+v", err)
continue
}
var domain string
gheDomain := ""
if u.Host != "github.com" {
gheDomain = fmt.Sprintf("%s://%s", u.Scheme, u.Host)
domain = gheDomain
} else {
domain = "https://github.com"
}

logger.Logf(false, "receive webhook repository: %s/%s", domain, repoName)
target, err := datastore.SearchRepo(ctx, s.ds, repoName)
if err != nil {
logger.Logf(false, "failed to search registered target: %+v", err)
continue
}
jobID := uuid.NewV4()
jobJSON, _ := json.Marshal(j)
job := datastore.Job{
UUID: jobID,
TargetID: target.UUID,
Repository: repoName,
CheckEventJSON: string(jobJSON),
}
if err := s.ds.EnqueueJob(ctx, job); err != nil {
logger.Logf(false, "failed to enqueue job: %+v", err)
continue
}
reQueuedJobs.Store(j.GetID(), time.Now().Add(30*time.Minute))
}
}
gh.PendingRuns.Delete(installationID)
gh.ClearRunsCache(owner, repo)
return true
})
}
}
}
Loading

0 comments on commit 2e1b8d8

Please sign in to comment.