From 3c10021b012688fa64e3d41e2d5fc8261a60db7c Mon Sep 17 00:00:00 2001 From: gamoutatsumi Date: Fri, 10 Nov 2023 12:08:17 +0900 Subject: [PATCH] use ticker in reRunWorkflow --- pkg/starter/starter.go | 155 +++++++++++++++++++++-------------------- 1 file changed, 78 insertions(+), 77 deletions(-) diff --git a/pkg/starter/starter.go b/pkg/starter/starter.go index 5fd2429..b904ad9 100644 --- a/pkg/starter/starter.go +++ b/pkg/starter/starter.go @@ -66,8 +66,16 @@ func (s *Starter) Loop(ctx context.Context) error { eg, ctx := errgroup.WithContext(ctx) eg.Go(func() error { - s.reRunWorkflow(ctx) - return nil + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + for { + select { + case <-ticker.C: + s.reRunWorkflow(ctx) + case <-ctx.Done(): + return nil + } + } }) eg.Go(func() error { @@ -341,86 +349,79 @@ func (s *Starter) checkRegisteredRunner(ctx context.Context, runnerName string, } func (s *Starter) reRunWorkflow(ctx context.Context) { - for { - select { - case <-ctx.Done(): - return - default: - gh.PendingRuns.Range(func(key, value any) bool { - installationID := key.(int64) - run := value.(*github.WorkflowRun) - client, err := gh.NewClientInstallation(installationID) - if err != nil { - logger.Logf(false, "failed to create GitHub client: %+v", err) - return true - } + gh.PendingRuns.Range(func(key, value any) bool { + installationID := key.(int64) + run := value.(*github.WorkflowRun) + client, err := gh.NewClientInstallation(installationID) + if err != nil { + logger.Logf(false, "failed to create GitHub client: %+v", err) + return true + } + + owner := run.GetRepository().GetOwner().GetLogin() + repo := run.GetRepository().GetName() + repoName := run.GetRepository().GetFullName() - owner := run.GetRepository().GetOwner().GetLogin() - repo := run.GetRepository().GetName() - repoName := run.GetRepository().GetFullName() + jobs, _, err := client.Actions.ListWorkflowJobs(ctx, owner, repo, run.GetID(), &github.ListWorkflowJobsOptions{ + Filter: "latest", + }) + if err != nil { + logger.Logf(false, "failed to get workflow jobs: %+v", err) + return true + } - jobs, _, err := client.Actions.ListWorkflowJobs(ctx, owner, repo, run.GetID(), &github.ListWorkflowJobsOptions{ - Filter: "latest", - }) + for _, j := range jobs.Jobs { + if value, ok := reQueuedJobs.Load(j.GetID()); ok { + expired := value.(time.Time) + if time.Until(expired) <= 0 { + reQueuedJobs.Delete(j.GetID()) + } + continue + } + if !slices.Contains(j.Labels, "self-hosted") && !slices.Contains(j.Labels, "myshoes") { + continue + } + if j.GetStatus() == "queued" { + repoURL := run.GetRepository().GetHTMLURL() + u, err := url.Parse(repoURL) if err != nil { - logger.Logf(false, "failed to get workflow jobs: %+v", err) - return true + logger.Logf(false, "failed to parse repository url from event: %+v", err) + continue + } + var domain string + gheDomain := "" + if u.Host != "github.com" { + gheDomain = fmt.Sprintf("%s://%s", u.Scheme, u.Host) + domain = gheDomain + } else { + domain = "https://github.com" } - for _, j := range jobs.Jobs { - if value, ok := reQueuedJobs.Load(j.GetID()); ok { - expired := value.(time.Time) - if time.Until(expired) <= 0 { - reQueuedJobs.Delete(j.GetID()) - } - continue - } - if !slices.Contains(j.Labels, "self-hosted") && !slices.Contains(j.Labels, "myshoes") { - continue - } - if j.GetStatus() == "queued" { - repoURL := run.GetRepository().GetHTMLURL() - u, err := url.Parse(repoURL) - if err != nil { - logger.Logf(false, "failed to parse repository url from event: %+v", err) - continue - } - var domain string - gheDomain := "" - if u.Host != "github.com" { - gheDomain = fmt.Sprintf("%s://%s", u.Scheme, u.Host) - domain = gheDomain - } else { - domain = "https://github.com" - } - - logger.Logf(false, "receive webhook repository: %s/%s", domain, repoName) - target, err := datastore.SearchRepo(ctx, s.ds, repoName) - if err != nil { - logger.Logf(false, "failed to search registered target: %+v", err) - continue - } - jobID := uuid.NewV4() - jobJSON, _ := json.Marshal(j) - job := datastore.Job{ - UUID: jobID, - TargetID: target.UUID, - Repository: repoName, - CheckEventJSON: string(jobJSON), - } - if err := s.ds.EnqueueJob(ctx, job); err != nil { - logger.Logf(false, "failed to enqueue job: %+v", err) - continue - } - reQueuedJobs.Store(j.GetID(), time.Now().Add(30*time.Minute)) - countRecovered, _ := CountRecovered.LoadOrStore(target.Scope, 0) - CountRecovered.Store(target.Scope, countRecovered.(int)+1) - } + logger.Logf(false, "receive webhook repository: %s/%s", domain, repoName) + target, err := datastore.SearchRepo(ctx, s.ds, repoName) + if err != nil { + logger.Logf(false, "failed to search registered target: %+v", err) + continue + } + jobID := uuid.NewV4() + jobJSON, _ := json.Marshal(j) + job := datastore.Job{ + UUID: jobID, + TargetID: target.UUID, + Repository: repoName, + CheckEventJSON: string(jobJSON), + } + if err := s.ds.EnqueueJob(ctx, job); err != nil { + logger.Logf(false, "failed to enqueue job: %+v", err) + continue } - gh.PendingRuns.Delete(installationID) - gh.ClearRunsCache(owner, repo) - return true - }) + reQueuedJobs.Store(j.GetID(), time.Now().Add(30*time.Minute)) + countRecovered, _ := CountRecovered.LoadOrStore(target.Scope, 0) + CountRecovered.Store(target.Scope, countRecovered.(int)+1) + } } - } + gh.PendingRuns.Delete(installationID) + gh.ClearRunsCache(owner, repo) + return true + }) }