Skip to content

Commit

Permalink
Merge pull request #165 from buildkite/triarius/golines
Browse files Browse the repository at this point in the history
  • Loading branch information
triarius authored May 1, 2023
2 parents 6b143fb + 0977524 commit dd2f4e0
Show file tree
Hide file tree
Showing 10 changed files with 141 additions and 47 deletions.
15 changes: 10 additions & 5 deletions cmd/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,28 @@ func addFlags(cmd *cobra.Command) {
cmd.Flags().StringVarP(&configFile, "config", "f", "", "config file path")

// not in the config file
cmd.Flags().String("agent-token-secret", "buildkite-agent-token", "name of the Buildkite agent token secret")
cmd.Flags().
String("agent-token-secret", "buildkite-agent-token", "name of the Buildkite agent token secret")
cmd.Flags().String("buildkite-token", "", "Buildkite API token with GraphQL scopes")

// in the config file
cmd.Flags().String("org", "", "Buildkite organization name to watch")
cmd.Flags().String("image", config.DefaultAgentImage, "The image to use for the Buildkite agent")
cmd.Flags().
String("image", config.DefaultAgentImage, "The image to use for the Buildkite agent")
cmd.Flags().StringSlice(
"tags", []string{"queue=kubernetes"}, `A comma-separated list of tags for the agent (for example, "linux" or "mac,xcode=8")`,
)
cmd.Flags().String("namespace", config.DefaultNamespace, "kubernetes namespace to create resources in")
cmd.Flags().
String("namespace", config.DefaultNamespace, "kubernetes namespace to create resources in")
cmd.Flags().Bool("debug", false, "debug logs")
cmd.Flags().Int("max-in-flight", 25, "max jobs in flight, 0 means no max")
cmd.Flags().Duration("job-ttl", 10*time.Minute, "time to retain kubernetes jobs after completion")
cmd.Flags().
Duration("job-ttl", 10*time.Minute, "time to retain kubernetes jobs after completion")
cmd.Flags().String(
"cluster-uuid", "", "UUID of the Cluster. The agent token must be for the Cluster.",
)
cmd.Flags().String("profiler-address", "", "Bind address to expose the pprof profiler (e.g. localhost:6060)")
cmd.Flags().
String("profiler-address", "", "Bind address to expose the pprof profiler (e.g. localhost:6060)")
}

func ParseConfig(cmd *cobra.Command, args []string) (config.Config, error) {
Expand Down
5 changes: 4 additions & 1 deletion cmd/linter/linter.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,10 @@ func Lint(ctx context.Context, options *Options) error {
}
var plugin buildkite.Plugin
if err := json.Unmarshal(bs, &plugin); err != nil {
return fmt.Errorf("failed to unmarshal Kubernetes plugin back to buildkite plugin: %w", err)
return fmt.Errorf(
"failed to unmarshal Kubernetes plugin back to buildkite plugin: %w",
err,
)
}
step.Plugins[name] = plugin
}
Expand Down
20 changes: 10 additions & 10 deletions internal/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,17 @@ func JobName(uuid string) string {
}

type Config struct {
AgentTokenSecret string `mapstructure:"agent-token-secret" validate:"required"`
BuildkiteToken string `mapstructure:"buildkite-token" validate:"required"`
Debug bool
Image string `validate:"required"`
Debug bool `mapstructure:"debug"`
AgentTokenSecret string `mapstructure:"agent-token-secret" validate:"required"`
BuildkiteToken string `mapstructure:"buildkite-token" validate:"required"`
Image string `mapstructure:"image" validate:"required"`
JobTTL time.Duration `mapstructure:"job-ttl"`
MaxInFlight int `mapstructure:"max-in-flight" validate:"min=0"`
Namespace string `validate:"required"`
Org string `validate:"required"`
Tags stringSlice `validate:"min=1"`
ProfilerAddress string `mapstructure:"profiler-address" validate:"omitempty,hostname_port"`
ClusterUUID string `mapstructure:"cluster-uuid" validate:"omitempty"`
MaxInFlight int `mapstructure:"max-in-flight" validate:"min=0"`
Namespace string `mapstructure:"namespace" validate:"required"`
Org string `mapstructure:"org" validate:"required"`
Tags stringSlice `mapstructure:"tags" validate:"min=1"`
ProfilerAddress string `mapstructure:"profiler-address" validate:"omitempty,hostname_port"`
ClusterUUID string `mapstructure:"cluster-uuid" validate:"omitempty"`
}

type stringSlice []string
Expand Down
7 changes: 6 additions & 1 deletion internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@ import (
"k8s.io/client-go/kubernetes"
)

func Run(ctx context.Context, logger *zap.Logger, k8sClient kubernetes.Interface, cfg config.Config) {
func Run(
ctx context.Context,
logger *zap.Logger,
k8sClient kubernetes.Interface,
cfg config.Config,
) {
if cfg.ProfilerAddress != "" {
logger.Info("profiler listening for requests")
go func() {
Expand Down
11 changes: 9 additions & 2 deletions internal/controller/scheduler/completions.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ func NewPodCompletionWatcher(logger *zap.Logger, k8s kubernetes.Interface) *comp
}

// Creates a Pods informer and registers the handler on it
func (w *completionsWatcher) RegisterInformer(ctx context.Context, factory informers.SharedInformerFactory) error {
func (w *completionsWatcher) RegisterInformer(
ctx context.Context,
factory informers.SharedInformerFactory,
) error {
informer := factory.Core().V1().Pods().Informer()
informer.AddEventHandler(w)
go factory.Start(ctx.Done())
Expand Down Expand Up @@ -69,7 +72,11 @@ func (w *completionsWatcher) cleanupSidecars(pod *v1.Pod) {
}); err != nil {
w.logger.Error("failed to update job", zap.Error(err))
}
w.logger.Debug("agent finished", zap.String("uuid", pod.Labels[config.UUIDLabel]), zap.Int32("exit code", terminated.ExitCode))
w.logger.Debug(
"agent finished",
zap.String("uuid", pod.Labels[config.UUIDLabel]),
zap.Int32("exit code", terminated.ExitCode),
)
}
}

Expand Down
5 changes: 4 additions & 1 deletion internal/controller/scheduler/imagePullBackOffWatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ func NewImagePullBackOffWatcher(
}

// Creates a Pods informer and registers the handler on it
func (w *imagePullBackOffWatcher) RegisterInformer(ctx context.Context, factory informers.SharedInformerFactory) error {
func (w *imagePullBackOffWatcher) RegisterInformer(
ctx context.Context,
factory informers.SharedInformerFactory,
) error {
informer := factory.Core().V1().Pods().Informer()
informer.AddEventHandler(w)
go factory.Start(ctx.Done())
Expand Down
23 changes: 19 additions & 4 deletions internal/controller/scheduler/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ type MaxInFlightLimiter struct {
completions *sync.Cond
}

func NewLimiter(logger *zap.Logger, scheduler monitor.JobHandler, maxInFlight int) *MaxInFlightLimiter {
func NewLimiter(
logger *zap.Logger,
scheduler monitor.JobHandler,
maxInFlight int,
) *MaxInFlightLimiter {
l := &MaxInFlightLimiter{
scheduler: scheduler,
MaxInFlight: maxInFlight,
Expand All @@ -35,7 +39,10 @@ func NewLimiter(logger *zap.Logger, scheduler monitor.JobHandler, maxInFlight in
}

// Creates a Jobs informer, registers the handler on it, and waits for cache sync
func (l *MaxInFlightLimiter) RegisterInformer(ctx context.Context, factory informers.SharedInformerFactory) error {
func (l *MaxInFlightLimiter) RegisterInformer(
ctx context.Context,
factory informers.SharedInformerFactory,
) error {
informer := factory.Batch().V1().Jobs()
jobInformer := informer.Informer()
jobInformer.AddEventHandler(l)
Expand Down Expand Up @@ -85,7 +92,11 @@ func (l *MaxInFlightLimiter) OnAdd(obj interface{}) {
if !jobFinished(job) {
uuid := job.Labels[config.UUIDLabel]
if _, alreadyInFlight := l.inFlight[uuid]; !alreadyInFlight {
l.logger.Debug("adding in-flight job", zap.String("uuid", uuid), zap.Int("in-flight", len(l.inFlight)))
l.logger.Debug(
"adding in-flight job",
zap.String("uuid", uuid),
zap.Int("in-flight", len(l.inFlight)),
)
l.inFlight[uuid] = struct{}{}
}
}
Expand Down Expand Up @@ -120,7 +131,11 @@ func (l *MaxInFlightLimiter) markComplete(job *batchv1.Job) {
uuid := job.Labels[config.UUIDLabel]
if _, alreadyInFlight := l.inFlight[uuid]; alreadyInFlight {
delete(l.inFlight, uuid)
l.logger.Debug("job complete", zap.String("uuid", uuid), zap.Int("in-flight", len(l.inFlight)))
l.logger.Debug(
"job complete",
zap.String("uuid", uuid),
zap.Int("in-flight", len(l.inFlight)),
)
l.completions.Signal()
}
}
Expand Down
32 changes: 24 additions & 8 deletions internal/controller/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ func New(logger *zap.Logger, client kubernetes.Interface, cfg Config) *worker {
}

// returns an informer factory configured to watch resources (pods, jobs) created by the scheduler
func NewInformerFactory(k8s kubernetes.Interface, tags []string) (informers.SharedInformerFactory, error) {
func NewInformerFactory(
k8s kubernetes.Interface,
tags []string,
) (informers.SharedInformerFactory, error) {
hasTag, err := labels.NewRequirement(config.TagLabel, selection.In, config.TagsToLabels(tags))
if err != nil {
return nil, fmt.Errorf("failed to build tag label selector for job manager: %w", err)
Expand All @@ -55,9 +58,13 @@ func NewInformerFactory(k8s kubernetes.Interface, tags []string) (informers.Shar
if err != nil {
return nil, fmt.Errorf("failed to build uuid label selector for job manager: %w", err)
}
factory := informers.NewSharedInformerFactoryWithOptions(k8s, 0, informers.WithTweakListOptions(func(opt *metav1.ListOptions) {
opt.LabelSelector = labels.NewSelector().Add(*hasTag, *hasUUID).String()
}))
factory := informers.NewSharedInformerFactoryWithOptions(
k8s,
0,
informers.WithTweakListOptions(func(opt *metav1.ListOptions) {
opt.LabelSelector = labels.NewSelector().Add(*hasTag, *hasUUID).String()
}),
)
return factory, nil
}

Expand Down Expand Up @@ -97,7 +104,9 @@ func (w *worker) Create(ctx context.Context, job *monitor.Job) error {
if err != nil {
return fmt.Errorf("failed to create job: %w", err)
}
_, err = w.client.BatchV1().Jobs(w.cfg.Namespace).Create(ctx, kjob, metav1.CreateOptions{})
_, err = w.client.BatchV1().
Jobs(w.cfg.Namespace).
Create(ctx, kjob, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("failed to create job: %w", err)
}
Expand Down Expand Up @@ -229,7 +238,7 @@ func (w *jobWrapper) Build() (*batchv1.Job, error) {
}
for k, v := range w.envMap {
switch k {
case "BUILDKITE_COMMAND", "BUILDKITE_ARTIFACT_PATHS", "BUILDKITE_PLUGINS": //noop
case "BUILDKITE_COMMAND", "BUILDKITE_ARTIFACT_PATHS", "BUILDKITE_PLUGINS": // noop
default:
env = append(env, corev1.EnvVar{Name: k, Value: v})
}
Expand Down Expand Up @@ -420,7 +429,11 @@ func (w *jobWrapper) Build() (*batchv1.Job, error) {
Image: w.cfg.Image,
ImagePullPolicy: corev1.PullAlways,
Command: []string{"cp"},
Args: []string{"/usr/local/bin/buildkite-agent", "/usr/local/bin/ssh-env-config.sh", "/workspace"},
Args: []string{
"/usr/local/bin/buildkite-agent",
"/usr/local/bin/ssh-env-config.sh",
"/workspace",
},
VolumeMounts: []corev1.VolumeMount{
{
Name: "workspace",
Expand Down Expand Up @@ -458,7 +471,10 @@ func (w *jobWrapper) annotateWithJobURL() {
buildURL := w.envMap["BUILDKITE_BUILD_URL"]
u, err := url.Parse(buildURL)
if err != nil {
w.logger.Warn("could not parse BuildURL when annotating with JobURL", zap.String("buildURL", buildURL))
w.logger.Warn(
"could not parse BuildURL when annotating with JobURL",
zap.String("buildURL", buildURL),
)
return
}
u.Fragment = w.job.Uuid
Expand Down
24 changes: 18 additions & 6 deletions internal/controller/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ func TestJobPluginConversion(t *testing.T) {
EnvFrom: []corev1.EnvFromSource{
{
ConfigMapRef: &corev1.ConfigMapEnvSource{
LocalObjectReference: corev1.LocalObjectReference{Name: "some-configmap"},
LocalObjectReference: corev1.LocalObjectReference{
Name: "some-configmap",
},
},
},
},
Expand All @@ -46,7 +48,9 @@ func TestJobPluginConversion(t *testing.T) {
"github.com/buildkite-plugins/kubernetes-buildkite-plugin": pluginConfig,
},
{
"github.com/buildkite-plugins/some-other-buildkite-plugin": map[string]interface{}{"foo": "bar"},
"github.com/buildkite-plugins/some-other-buildkite-plugin": map[string]interface{}{
"foo": "bar",
},
},
})
require.NoError(t, err)
Expand All @@ -58,7 +62,11 @@ func TestJobPluginConversion(t *testing.T) {
},
Tag: "queue=kubernetes",
}
wrapper := scheduler.NewJobWrapper(zaptest.NewLogger(t), input, scheduler.Config{AgentToken: "token-secret"})
wrapper := scheduler.NewJobWrapper(
zaptest.NewLogger(t),
input,
scheduler.Config{AgentToken: "token-secret"},
)
result, err := wrapper.ParsePlugins().Build()
require.NoError(t, err)

Expand Down Expand Up @@ -86,7 +94,11 @@ func TestJobPluginConversion(t *testing.T) {
require.Equal(t, config.TagToLabel(input.Tag), tagLabel)

pluginsEnv := findEnv(t, commandContainer.Env, "BUILDKITE_PLUGINS")
require.Equal(t, pluginsEnv.Value, `[{"github.com/buildkite-plugins/some-other-buildkite-plugin":{"foo":"bar"}}]`)
require.Equal(
t,
pluginsEnv.Value,
`[{"github.com/buildkite-plugins/some-other-buildkite-plugin":{"foo":"bar"}}]`,
)
}

func TestTagEnv(t *testing.T) {
Expand Down Expand Up @@ -138,7 +150,6 @@ func assertEnvFieldPath(t *testing.T, container corev1.Container, envVarName, fi
assert.Equal(t, env.ValueFrom.FieldRef.FieldPath, fieldPath)
}
}

}

func TestJobWithNoKubernetesPlugin(t *testing.T) {
Expand Down Expand Up @@ -187,7 +198,8 @@ func TestFailureJobs(t *testing.T) {

commandContainer := findContainer(t, result.Spec.Template.Spec.Containers, "container-0")
commandEnv := findEnv(t, commandContainer.Env, "BUILDKITE_COMMAND")
require.Equal(t,
require.Equal(
t,
`echo "failed parsing Kubernetes plugin: json: cannot unmarshal string into Go value of type scheduler.KubernetesPlugin" && exit 1`,
commandEnv.Value,
)
Expand Down
Loading

0 comments on commit dd2f4e0

Please sign in to comment.