Skip to content

Commit

Permalink
use ticker in reRunWorkflow
Browse files Browse the repository at this point in the history
  • Loading branch information
gamoutatsumi committed Nov 10, 2023
1 parent c1a49c2 commit 3c10021
Showing 1 changed file with 78 additions and 77 deletions.
155 changes: 78 additions & 77 deletions pkg/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,16 @@ func (s *Starter) Loop(ctx context.Context) error {
eg, ctx := errgroup.WithContext(ctx)

eg.Go(func() error {
s.reRunWorkflow(ctx)
return nil
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
s.reRunWorkflow(ctx)
case <-ctx.Done():
return nil
}
}
})

eg.Go(func() error {
Expand Down Expand Up @@ -341,86 +349,79 @@ func (s *Starter) checkRegisteredRunner(ctx context.Context, runnerName string,
}

func (s *Starter) reRunWorkflow(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
default:
gh.PendingRuns.Range(func(key, value any) bool {
installationID := key.(int64)
run := value.(*github.WorkflowRun)
client, err := gh.NewClientInstallation(installationID)
if err != nil {
logger.Logf(false, "failed to create GitHub client: %+v", err)
return true
}
gh.PendingRuns.Range(func(key, value any) bool {
installationID := key.(int64)
run := value.(*github.WorkflowRun)
client, err := gh.NewClientInstallation(installationID)
if err != nil {
logger.Logf(false, "failed to create GitHub client: %+v", err)
return true
}

owner := run.GetRepository().GetOwner().GetLogin()
repo := run.GetRepository().GetName()
repoName := run.GetRepository().GetFullName()

owner := run.GetRepository().GetOwner().GetLogin()
repo := run.GetRepository().GetName()
repoName := run.GetRepository().GetFullName()
jobs, _, err := client.Actions.ListWorkflowJobs(ctx, owner, repo, run.GetID(), &github.ListWorkflowJobsOptions{
Filter: "latest",
})
if err != nil {
logger.Logf(false, "failed to get workflow jobs: %+v", err)
return true
}

jobs, _, err := client.Actions.ListWorkflowJobs(ctx, owner, repo, run.GetID(), &github.ListWorkflowJobsOptions{
Filter: "latest",
})
for _, j := range jobs.Jobs {
if value, ok := reQueuedJobs.Load(j.GetID()); ok {
expired := value.(time.Time)
if time.Until(expired) <= 0 {
reQueuedJobs.Delete(j.GetID())
}
continue
}
if !slices.Contains(j.Labels, "self-hosted") && !slices.Contains(j.Labels, "myshoes") {
continue
}
if j.GetStatus() == "queued" {
repoURL := run.GetRepository().GetHTMLURL()
u, err := url.Parse(repoURL)
if err != nil {
logger.Logf(false, "failed to get workflow jobs: %+v", err)
return true
logger.Logf(false, "failed to parse repository url from event: %+v", err)
continue
}
var domain string
gheDomain := ""
if u.Host != "github.com" {
gheDomain = fmt.Sprintf("%s://%s", u.Scheme, u.Host)
domain = gheDomain
} else {
domain = "https://github.com"
}

for _, j := range jobs.Jobs {
if value, ok := reQueuedJobs.Load(j.GetID()); ok {
expired := value.(time.Time)
if time.Until(expired) <= 0 {
reQueuedJobs.Delete(j.GetID())
}
continue
}
if !slices.Contains(j.Labels, "self-hosted") && !slices.Contains(j.Labels, "myshoes") {
continue
}
if j.GetStatus() == "queued" {
repoURL := run.GetRepository().GetHTMLURL()
u, err := url.Parse(repoURL)
if err != nil {
logger.Logf(false, "failed to parse repository url from event: %+v", err)
continue
}
var domain string
gheDomain := ""
if u.Host != "github.com" {
gheDomain = fmt.Sprintf("%s://%s", u.Scheme, u.Host)
domain = gheDomain
} else {
domain = "https://github.com"
}

logger.Logf(false, "receive webhook repository: %s/%s", domain, repoName)
target, err := datastore.SearchRepo(ctx, s.ds, repoName)
if err != nil {
logger.Logf(false, "failed to search registered target: %+v", err)
continue
}
jobID := uuid.NewV4()
jobJSON, _ := json.Marshal(j)
job := datastore.Job{
UUID: jobID,
TargetID: target.UUID,
Repository: repoName,
CheckEventJSON: string(jobJSON),
}
if err := s.ds.EnqueueJob(ctx, job); err != nil {
logger.Logf(false, "failed to enqueue job: %+v", err)
continue
}
reQueuedJobs.Store(j.GetID(), time.Now().Add(30*time.Minute))
countRecovered, _ := CountRecovered.LoadOrStore(target.Scope, 0)
CountRecovered.Store(target.Scope, countRecovered.(int)+1)
}
logger.Logf(false, "receive webhook repository: %s/%s", domain, repoName)
target, err := datastore.SearchRepo(ctx, s.ds, repoName)
if err != nil {
logger.Logf(false, "failed to search registered target: %+v", err)
continue
}
jobID := uuid.NewV4()
jobJSON, _ := json.Marshal(j)
job := datastore.Job{
UUID: jobID,
TargetID: target.UUID,
Repository: repoName,
CheckEventJSON: string(jobJSON),
}
if err := s.ds.EnqueueJob(ctx, job); err != nil {
logger.Logf(false, "failed to enqueue job: %+v", err)
continue
}
gh.PendingRuns.Delete(installationID)
gh.ClearRunsCache(owner, repo)
return true
})
reQueuedJobs.Store(j.GetID(), time.Now().Add(30*time.Minute))
countRecovered, _ := CountRecovered.LoadOrStore(target.Scope, 0)
CountRecovered.Store(target.Scope, countRecovered.(int)+1)
}
}
}
gh.PendingRuns.Delete(installationID)
gh.ClearRunsCache(owner, repo)
return true
})
}

0 comments on commit 3c10021

Please sign in to comment.