Skip to content

Commit

Permalink
Merge pull request #130 from whywaita/fix/fast-deleting
Browse files Browse the repository at this point in the history
Implement fast-deleting
  • Loading branch information
whywaita authored Jan 25, 2022
2 parents 1089ac1 + 8b2a712 commit e1eeca0
Show file tree
Hide file tree
Showing 11 changed files with 130 additions and 34 deletions.
3 changes: 3 additions & 0 deletions docs/01_01_for_admin_setup.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,5 +96,8 @@ $ ./myshoes
- `MAX_CONNECTIONS_TO_BACKEND`
- default: 50
- The number of max connections to shoes-provider
- `MAX_CONCURRENCY_DELETING`
- default: 1
- The number of max concurrency of deleting

and more some env values from [shoes provider](https://github.com/whywaita/myshoes-providers).
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ require (
github.com/go-sql-driver/mysql v1.5.0
github.com/google/go-cmp v0.5.6
github.com/google/go-github/v35 v35.2.0
github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79
github.com/hashicorp/go-plugin v1.4.0
github.com/hashicorp/go-version v1.3.0
github.com/jmoiron/sqlx v1.2.0
github.com/m4ns0ur/httpcache v0.0.0-20200426190423-1040e2e8823f
github.com/ory/dockertest/v3 v3.8.0
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/prometheus/client_golang v1.11.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,6 @@ github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaUGG7oYTSPP8MxqL4YI3kZKwcP4=
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 h1:+ngKgrYPPJrOjhax5N+uePQ0Fh1Z7PheYoUI/0nzkPA=
github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/hashicorp/go-hclog v0.14.1 h1:nQcJDQwIAGnmoUWp8ubocEX40cCml/17YkF6csQLReU=
github.com/hashicorp/go-hclog v0.14.1/go.mod h1:whpDNt7SSdeAju8AWKIWsul05p54N/39EeqMAyrmvFQ=
github.com/hashicorp/go-plugin v1.4.0 h1:b0O7rs5uiJ99Iu9HugEzsM67afboErkHUWddUSpUO3A=
Expand Down Expand Up @@ -143,6 +141,8 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/lib/pq v0.0.0-20180327071824-d34b9ff171c2/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.0.0 h1:X5PMW56eZitiTeO7tKzZxFCSpbFZJtkMMooicw2us9A=
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/m4ns0ur/httpcache v0.0.0-20200426190423-1040e2e8823f h1:MBcrTbmCf7CZa9yAwcB7ArveQb9TPVy4zFnQGz/LiUU=
github.com/m4ns0ur/httpcache v0.0.0-20200426190423-1040e2e8823f/go.mod h1:UawoqorwkpZ58qWiL+nVJM0Po7FrzAdCxYVh9GgTTaA=
github.com/mattn/go-colorable v0.1.4 h1:snbPLB8fVfU9iwbbo30TPtbLRzwWu6aJS6Xh4eaaviA=
github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
Expand Down
2 changes: 2 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type conf struct {
Debug bool
Strict bool // check to registered runner before delete job
MaxConnectionsToBackend int64
MaxConcurrencyDeleting int64
}

// Config Environment keys
Expand All @@ -34,4 +35,5 @@ const (
EnvDebug = "DEBUG"
EnvStrict = "STRICT"
EnvMaxConnectionsToBackend = "MAX_CONNECTIONS_TO_BACKEND"
EnvMaxConcurrencyDeleting = "MAX_CONCURRENCY_DELETING"
)
8 changes: 8 additions & 0 deletions internal/config/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,14 @@ func Load() {
}
Config.MaxConnectionsToBackend = numberPB
}
Config.MaxConcurrencyDeleting = 1
if os.Getenv(EnvMaxConcurrencyDeleting) != "" {
numberCD, err := strconv.ParseInt(os.Getenv(EnvMaxConcurrencyDeleting), 10, 64)
if err != nil {
log.Panicf("failed to convert int64 %s: %+v", EnvMaxConcurrencyDeleting, err)
}
Config.MaxConcurrencyDeleting = numberCD
}
}

func checkBinary(p string) (string, error) {
Expand Down
21 changes: 20 additions & 1 deletion pkg/gh/github.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ import (
"path"
"strings"
"sync"
"time"

"github.com/bradleyfalzon/ghinstallation/v2"
"github.com/google/go-github/v35/github"
"github.com/gregjones/httpcache"
"github.com/m4ns0ur/httpcache"
"github.com/patrickmn/go-cache"
"github.com/whywaita/myshoes/pkg/logger"
"golang.org/x/oauth2"
)
Expand All @@ -21,6 +23,9 @@ var (
// ErrNotFound is error for not found
ErrNotFound = fmt.Errorf("not found")

// ResponseCache is cache variable
responseCache = cache.New(5*time.Minute, 10*time.Minute)

// rateLimitRemain is remaining of Rate limit, for metrics
rateLimitRemain = sync.Map{}
// rateLimitLimit is limit of Rate limit, for metrics
Expand Down Expand Up @@ -168,6 +173,11 @@ func ExistGitHubRunner(ctx context.Context, client *github.Client, owner, repo,
return nil, fmt.Errorf("failed to get list of runners: %w", err)
}

return ExistGitHubRunnerWithRunner(runners, runnerName)
}

// ExistGitHubRunnerWithRunner check exist registered of GitHub runner from a list of runner
func ExistGitHubRunnerWithRunner(runners []*github.Runner, runnerName string) (*github.Runner, error) {
for _, r := range runners {
if strings.EqualFold(r.GetName(), runnerName) {
return r, nil
Expand All @@ -179,6 +189,10 @@ func ExistGitHubRunner(ctx context.Context, client *github.Client, owner, repo,

// ListRunners get runners that registered repository or org
func ListRunners(ctx context.Context, client *github.Client, owner, repo string) ([]*github.Runner, error) {
if cachedRs, found := responseCache.Get(getCacheKey(owner, repo)); found {
return cachedRs.([]*github.Runner), nil
}

var opts = &github.ListOptions{
Page: 0,
PerPage: 10,
Expand All @@ -200,11 +214,16 @@ func ListRunners(ctx context.Context, client *github.Client, owner, repo string)
opts.Page = resp.NextPage
}

responseCache.Set(getCacheKey(owner, repo), rs, 1*time.Second)
logger.Logf(true, "found %d runners in GitHub", len(rs))

return rs, nil
}

func getCacheKey(owner, repo string) string {
return fmt.Sprintf("owner-%s-repo-%s", owner, repo)
}

func listRunners(ctx context.Context, client *github.Client, owner, repo string, opts *github.ListOptions) (*github.Runners, *github.Response, error) {
if repo == "" {
runners, resp, err := client.Actions.ListOrganizationRunners(ctx, owner, opts)
Expand Down
21 changes: 20 additions & 1 deletion pkg/metric/scrape_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import (
"fmt"

"github.com/prometheus/client_golang/prometheus"

"github.com/whywaita/myshoes/internal/config"
"github.com/whywaita/myshoes/pkg/datastore"
"github.com/whywaita/myshoes/pkg/gh"
"github.com/whywaita/myshoes/pkg/runner"
"github.com/whywaita/myshoes/pkg/starter"
)

Expand Down Expand Up @@ -40,6 +40,16 @@ var (
"The number of rate limit max",
[]string{"scope"}, nil,
)
memoryRunnerMaxConcurrencyDeleting = prometheus.NewDesc(
prometheus.BuildFQName(namespace, memoryName, "runner_max_concurrency_deleting"),
"The number of max concurrency deleting in runner (Config)",
[]string{"runner"}, nil,
)
memoryRunnerQueueConcurrencyDeleting = prometheus.NewDesc(
prometheus.BuildFQName(namespace, memoryName, "runner_queue_concurrency_deleting"),
"deleting concurrency in runner",
[]string{"runner"}, nil,
)
)

// ScraperMemory is scraper implement for memory
Expand Down Expand Up @@ -83,6 +93,15 @@ func scrapeStarterValues(ch chan<- prometheus.Metric) error {
ch <- prometheus.MustNewConstMetric(
memoryStarterQueueWaiting, prometheus.GaugeValue, float64(countWaiting), labelStarter)

const labelRunner = "runner"
configRunnerDeletingMax := config.Config.MaxConcurrencyDeleting
countRunnerDeletingNow := runner.ConcurrencyDeleting

ch <- prometheus.MustNewConstMetric(
memoryRunnerMaxConcurrencyDeleting, prometheus.GaugeValue, float64(configRunnerDeletingMax), labelRunner)
ch <- prometheus.MustNewConstMetric(
memoryRunnerQueueConcurrencyDeleting, prometheus.GaugeValue, float64(countRunnerDeletingNow), labelRunner)

return nil
}

Expand Down
91 changes: 68 additions & 23 deletions pkg/runner/runner_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ import (
"time"

"github.com/google/go-github/v35/github"
"github.com/whywaita/myshoes/internal/config"
"github.com/whywaita/myshoes/pkg/datastore"
"github.com/whywaita/myshoes/pkg/gh"
"github.com/whywaita/myshoes/pkg/logger"
"github.com/whywaita/myshoes/pkg/shoes"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
Expand All @@ -22,6 +25,11 @@ type Runner struct {
ds *datastore.Runner
}

var (
// ConcurrencyDeleting is value of concurrency
ConcurrencyDeleting int64 = 0
)

func (m *Manager) do(ctx context.Context) error {
logger.Logf(true, "start runner manager")

Expand All @@ -47,7 +55,7 @@ func (m *Manager) removeRunners(ctx context.Context, t datastore.Target) error {
return fmt.Errorf("failed to retrieve list of running runner: %w", err)
}

isZero, err := isRegisteredRunnerZeroInGitHub(ctx, t)
isZero, ghRunners, err := isRegisteredRunnerZeroInGitHub(ctx, t)
if err != nil {
return fmt.Errorf("failed to check number of registerd runner: %w", err)
} else if isZero && len(runners) == 0 {
Expand All @@ -58,48 +66,81 @@ func (m *Manager) removeRunners(ctx context.Context, t datastore.Target) error {
return nil
}

sem := semaphore.NewWeighted(config.Config.MaxConcurrencyDeleting)
var eg errgroup.Group
ConcurrencyDeleting = 0

for _, runner := range runners {
_, mode, err := datastore.GetRunnerTemporaryMode(runner.RunnerVersion)
if err != nil {
logger.Logf(false, "failed to get runner temporary mode: %+v", err)
continue
runner := runner

if err := sem.Acquire(ctx, 1); err != nil {
return fmt.Errorf("failed to Acquire: %w", err)
}
ConcurrencyDeleting++

switch mode {
case datastore.RunnerTemporaryOnce:
if err := m.removeRunnerModeOnce(ctx, t, runner); err != nil {
logger.Logf(false, "failed to remove runner (mode once): %+v", err)
}
case datastore.RunnerTemporaryEphemeral:
if err := m.removeRunnerModeEphemeral(ctx, t, runner); err != nil {
logger.Logf(false, "failed to remove runner (mode ephemeral): %+v", err)
eg.Go(func() error {
defer func() {
sem.Release(1)
ConcurrencyDeleting--
}()

if err := m.removeRunner(ctx, t, runner, ghRunners); err != nil {
logger.Logf(false, "failed to delete runner: %+v", err)
}
}
return nil
})
}

if err := eg.Wait(); err != nil {
return fmt.Errorf("failed to wait errgroup.Wait(): %w", err)
}

return nil
}

func (m *Manager) removeRunner(ctx context.Context, t datastore.Target, runner datastore.Runner, ghRunners []*github.Runner) error {
if err := sanitizeRunnerMustRunningTime(runner); errors.Is(err, ErrNotWillDeleteRunner) {
return nil
}

_, mode, err := datastore.GetRunnerTemporaryMode(runner.RunnerVersion)
if err != nil {
return fmt.Errorf("failed to get runner temporary mode: %w", err)
}

switch mode {
case datastore.RunnerTemporaryOnce:
if err := m.removeRunnerModeOnce(ctx, t, runner, ghRunners); err != nil {
return fmt.Errorf("failed to remove runner (mode once): %w", err)
}
case datastore.RunnerTemporaryEphemeral:
if err := m.removeRunnerModeEphemeral(ctx, t, runner, ghRunners); err != nil {
return fmt.Errorf("failed to remove runner (mode ephemeral): %w", err)
}
}
return nil
}

func isRegisteredRunnerZeroInGitHub(ctx context.Context, t datastore.Target) (bool, error) {
func isRegisteredRunnerZeroInGitHub(ctx context.Context, t datastore.Target) (bool, []*github.Runner, error) {
owner, repo := t.OwnerRepo()
client, err := gh.NewClient(t.GitHubToken, t.GHEDomain.String)
if err != nil {
return false, fmt.Errorf("failed to create github client: %w", err)
return false, nil, fmt.Errorf("failed to create github client: %w", err)
}

ghRunners, err := gh.ListRunners(ctx, client, owner, repo)
if err != nil {
return false, fmt.Errorf("failed to get list of runner in GitHub: %w", err)
return false, nil, fmt.Errorf("failed to get list of runner in GitHub: %w", err)
}

if len(ghRunners) == 0 {
return true, nil
return true, nil, nil
}
return false, nil
return false, ghRunners, nil
}

// Error values
var (
// ErrNotWillDeleteRunner is error message for "not will delete runner"
ErrNotWillDeleteRunner = fmt.Errorf("not will delete runner")
)

Expand All @@ -113,13 +154,13 @@ var (
func sanitizeGitHubRunner(ghRunner github.Runner, dsRunner datastore.Runner) error {
switch ghRunner.GetStatus() {
case StatusWillDelete:
if err := sanitizeRunner(&dsRunner, MustRunningTime); err != nil {
if err := sanitizeRunner(dsRunner, MustRunningTime); err != nil {
logger.Logf(false, "%s is offline and not running %s, so not will delete (created_at: %s, now: %s)", dsRunner.UUID, MustRunningTime, dsRunner.CreatedAt, time.Now().UTC())
return fmt.Errorf("failed to sanitize will delete runner: %w", err)
}
return nil
case StatusSleep:
if err := sanitizeRunner(&dsRunner, MustGoalTime); err != nil {
if err := sanitizeRunner(dsRunner, MustGoalTime); err != nil {
logger.Logf(false, "%s is idle and not running %s, so not will delete (created_at: %s, now: %s)", dsRunner.UUID, MustGoalTime, dsRunner.CreatedAt, time.Now().UTC())
return fmt.Errorf("failed to sanitize idle runner: %w", err)
}
Expand All @@ -129,7 +170,11 @@ func sanitizeGitHubRunner(ghRunner github.Runner, dsRunner datastore.Runner) err
return ErrNotWillDeleteRunner
}

func sanitizeRunner(runner *datastore.Runner, needTime time.Duration) error {
func sanitizeRunnerMustRunningTime(runner datastore.Runner) error {
return sanitizeRunner(runner, MustRunningTime)
}

func sanitizeRunner(runner datastore.Runner, needTime time.Duration) error {
spent := runner.CreatedAt.Add(needTime)
now := time.Now().UTC()
if !spent.Before(now) {
Expand Down
5 changes: 3 additions & 2 deletions pkg/runner/runner_delete_ephemeral.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,22 @@ import (
"errors"
"fmt"

"github.com/google/go-github/v35/github"
"github.com/whywaita/myshoes/pkg/datastore"
"github.com/whywaita/myshoes/pkg/gh"
"github.com/whywaita/myshoes/pkg/logger"
)

// removeRunnerModeEphemeral remove runner that created by --ephemeral flag.
// --ephemeral flag is delete self-hosted runner when end of job. So, The origin list of runner from datastore.
func (m *Manager) removeRunnerModeEphemeral(ctx context.Context, t datastore.Target, runner datastore.Runner) error {
func (m *Manager) removeRunnerModeEphemeral(ctx context.Context, t datastore.Target, runner datastore.Runner, ghRunners []*github.Runner) error {
owner, repo := t.OwnerRepo()
client, err := gh.NewClient(t.GitHubToken, t.GHEDomain.String)
if err != nil {
return fmt.Errorf("failed to create github client: %w", err)
}

ghRunner, err := gh.ExistGitHubRunner(ctx, client, owner, repo, ToName(runner.UUID.String()))
ghRunner, err := gh.ExistGitHubRunnerWithRunner(ghRunners, ToName(runner.UUID.String()))
switch {
case errors.Is(err, gh.ErrNotFound):
// deleted in GitHub, It's completed
Expand Down
5 changes: 3 additions & 2 deletions pkg/runner/runner_delete_once.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,22 @@ import (
"errors"
"fmt"

"github.com/google/go-github/v35/github"
"github.com/whywaita/myshoes/pkg/datastore"
"github.com/whywaita/myshoes/pkg/gh"
"github.com/whywaita/myshoes/pkg/logger"
)

// removeRunnerModeOnce remove runner that created by --once flag.
// --once flag is not delete self-hosted runner when end of job. So, The origin list of runner from GitHub.
func (m *Manager) removeRunnerModeOnce(ctx context.Context, t datastore.Target, runner datastore.Runner) error {
func (m *Manager) removeRunnerModeOnce(ctx context.Context, t datastore.Target, runner datastore.Runner, ghRunners []*github.Runner) error {
owner, repo := t.OwnerRepo()
client, err := gh.NewClient(t.GitHubToken, t.GHEDomain.String)
if err != nil {
return fmt.Errorf("failed to create github client: %w", err)
}

ghRunner, err := gh.ExistGitHubRunner(ctx, client, owner, repo, ToName(runner.UUID.String()))
ghRunner, err := gh.ExistGitHubRunnerWithRunner(ghRunners, ToName(runner.UUID.String()))
switch {
case errors.Is(err, gh.ErrNotFound):
logger.Logf(false, "NotFound in GitHub, so will delete in datastore without GitHub (runner: %s)", runner.UUID.String())
Expand Down
Loading

0 comments on commit e1eeca0

Please sign in to comment.