Skip to content

Commit

Permalink
sidecar: control what log levels should be repeated to stdout
Browse files Browse the repository at this point in the history
syncctl: set terminationGracePeriodSeconds for sync pods
syncctl: measure sync container resource usage
  • Loading branch information
absorbb committed Dec 5, 2024
1 parent 73ef5c1 commit f109306
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 27 deletions.
2 changes: 1 addition & 1 deletion sync-controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type Config struct {
KubernetesNodeSelector string `mapstructure:"KUBERNETES_NODE_SELECTOR"`

ContainerStatusCheckSeconds int `mapstructure:"CONTAINER_STATUS_CHECK_SECONDS" default:"10"`
ContainerGraceShutdownSeconds int `mapstructure:"CONTAINER_GRACE_SHUTDOWN_SECONDS" default:"30"`
ContainerGraceShutdownSeconds int `mapstructure:"CONTAINER_GRACE_SHUTDOWN_SECONDS" default:"60"`
ContainerInitTimeoutSeconds int `mapstructure:"CONTAINER_INIT_TIMEOUT_SECONDS" default:"180"`

TaskTimeoutHours int `mapstructure:"TASK_TIMEOUT_HOURS" default:"48"`
Expand Down
96 changes: 89 additions & 7 deletions sync-controller/job_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"bufio"
"bytes"
"context"
"encoding/json"
"fmt"
Expand All @@ -17,8 +18,13 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/remotecommand"
"k8s.io/utils/ptr"
"math"
"regexp"
"strconv"
"strings"
"time"
)
Expand All @@ -33,10 +39,14 @@ const (
var labelUnsupportedChars = regexp.MustCompile(`[^a-zA-Z0-9._-]`)
var nonAlphaNum = regexp.MustCompile(`[^a-zA-Z0-9-]`)

var cgroupCPUUsage = regexp.MustCompile(`(?m:^usage_usec (\d+)$)`)
var cgroupMemUsage = regexp.MustCompile(`(?m:^(\d+)$)`)

type JobRunner struct {
appbase.Service
config *Config
namespace string
clientConfig *rest.Config
clientset *kubernetes.Clientset
closeCh chan struct{}
taskStatusCh chan *TaskStatus
Expand All @@ -46,11 +56,11 @@ type JobRunner struct {

func NewJobRunner(appContext *Context) (*JobRunner, error) {
base := appbase.NewServiceBase("job-runner")
clientset, err := GetK8SClientSet(appContext)
clientset, clientConfig, err := GetK8SClientSet(appContext)
if err != nil {
return nil, err
}
j := &JobRunner{Service: base, config: appContext.config, clientset: clientset, namespace: appContext.config.KubernetesNamespace,
j := &JobRunner{Service: base, config: appContext.config, clientset: clientset, clientConfig: clientConfig, namespace: appContext.config.KubernetesNamespace,
closeCh: make(chan struct{}),
taskStatusCh: make(chan *TaskStatus, 100),
runningPods: map[string]time.Time{},
Expand Down Expand Up @@ -111,6 +121,10 @@ func (j *JobRunner) watchPodStatuses() {
j.cleanupPod(pod.Name)
} else {
taskStatus.Status = StatusRunning
metrics := j.getPodResUsage(pod.Name, "source")
if len(metrics) > 0 {
taskStatus.Metrics = metrics
}
j.Infof("Pod %s is running", pod.Name)
j.runningPods[pod.Name] = time.Now()
}
Expand Down Expand Up @@ -165,7 +179,7 @@ func (j *JobRunner) sendStatus(taskStatus *TaskStatus) {

func (j *JobRunner) cleanupPod(name string) {
j.cleanedUpPods.Put(name)
gracePeriodSeconds := int64(j.config.ContainerGraceShutdownSeconds)
gracePeriodSeconds := int64(j.config.ContainerGraceShutdownSeconds + 5)
_ = j.clientset.CoreV1().Pods(j.namespace).Delete(context.Background(), name, metav1.DeleteOptions{GracePeriodSeconds: &gracePeriodSeconds})
_ = j.clientset.CoreV1().Secrets(j.namespace).Delete(context.Background(), name+"-config", metav1.DeleteOptions{GracePeriodSeconds: &gracePeriodSeconds})
_ = j.clientset.CoreV1().ConfigMaps(j.namespace).Delete(context.Background(), name+"-config", metav1.DeleteOptions{GracePeriodSeconds: &gracePeriodSeconds})
Expand Down Expand Up @@ -259,6 +273,72 @@ func (j *JobRunner) getPodErrorLogs(podName, container string) string {
}
}

func (j *JobRunner) getPodResUsage(podName string, container string) (metrics map[string]any) {
startedAt := time.Now()
var err error
defer func() {
if err != nil {
j.Errorf("Pod %s resource usage: %+v ms: %v error: %v", podName, metrics, time.Now().Sub(startedAt), err)
} else {
j.Infof("Pod %s resource usage: %+v ms: %v", podName, metrics, time.Now().Sub(startedAt))
}
}()
cmd := []string{
"sh",
"-c",
"cat /sys/fs/cgroup/cpu.stat && cat /sys/fs/cgroup/memory.current && cat /sys/fs/cgroup/memory.peak",
}
req := j.clientset.CoreV1().RESTClient().Post().Resource("pods").Name(podName).
Namespace(j.namespace).SubResource("exec")
option := &v1.PodExecOptions{
Container: container,
Command: cmd,
Stdout: true,
Stderr: true,
}
req.VersionedParams(
option,
scheme.ParameterCodec,
)
exec, err := remotecommand.NewSPDYExecutor(j.clientConfig, "POST", req.URL())
if err != nil {
return nil
}
var stdout, stderr bytes.Buffer
err = exec.StreamWithContext(context.Background(), remotecommand.StreamOptions{
Stdout: &stdout,
Stderr: &stderr,
})
if err != nil {
return nil
}
metrics = map[string]any{}
stdoutStr := stdout.String()
cpuUsageMatches := cgroupCPUUsage.FindStringSubmatch(stdoutStr)
if len(cpuUsageMatches) == 2 {
c, _ := strconv.Atoi(cpuUsageMatches[1])
if c > 0 {
metrics["cpu_usage"] = float64(c) / 1000000
}
}
memUsage := 0
memUsageAllMatches := cgroupMemUsage.FindAllStringSubmatch(stdoutStr, -1)
for _, memUsageMatches := range memUsageAllMatches {
if len(memUsageMatches) == 2 {
m, _ := strconv.Atoi(memUsageMatches[1])
memUsage = utils.MaxInt(memUsage, m)
}
}
if memUsage > 0 {
metrics["mem_usage"] = memUsage
}
if stderr.Len() > 0 {
err = fmt.Errorf(stderr.String())
}

return metrics
}

func (j *JobRunner) CreateCronJob(taskDescriptor TaskDescriptor, configuration *TaskConfiguration) TaskStatus {
taskStatus := TaskStatus{TaskDescriptor: taskDescriptor}
jobId := "sync-" + strings.ToLower(taskStatus.SyncID)
Expand Down Expand Up @@ -461,8 +541,9 @@ func (j *JobRunner) createCronJob(jobId string, task TaskDescriptor, configurati
Namespace: j.namespace,
},
Spec: v1.PodSpec{
RestartPolicy: v1.RestartPolicyNever,
NodeSelector: nodeSelector,
RestartPolicy: v1.RestartPolicyNever,
NodeSelector: nodeSelector,
TerminationGracePeriodSeconds: ptr.To(int64(j.config.ContainerGraceShutdownSeconds)),
Containers: []v1.Container{
{Name: "source",
Image: fmt.Sprintf("%s:%s", task.Package, task.PackageVersion),
Expand Down Expand Up @@ -643,8 +724,9 @@ func (j *JobRunner) createPod(podName string, task TaskDescriptor, configuration
Namespace: j.namespace,
},
Spec: v1.PodSpec{
RestartPolicy: v1.RestartPolicyNever,
NodeSelector: nodeSelector,
RestartPolicy: v1.RestartPolicyNever,
NodeSelector: nodeSelector,
TerminationGracePeriodSeconds: ptr.To(int64(j.config.ContainerGraceShutdownSeconds)),
Containers: []v1.Container{
{Name: "source",
Image: fmt.Sprintf("%s:%s", task.Package, task.PackageVersion),
Expand Down
22 changes: 11 additions & 11 deletions sync-controller/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,24 @@ import (
"strings"
)

func GetK8SClientSet(appContext *Context) (*kubernetes.Clientset, error) {
func GetK8SClientSet(appContext *Context) (*kubernetes.Clientset, *rest.Config, error) {
config := appContext.config.KubernetesClientConfig
if config == "" || config == "local" {
// creates the in-cluster config
cc, err := rest.InClusterConfig()
if err != nil {
return nil, fmt.Errorf("error getting in cluster config: %v", err)
return nil, nil, fmt.Errorf("error getting in cluster config: %v", err)
}
clientset, err := kubernetes.NewForConfig(cc)
if err != nil {
return nil, fmt.Errorf("error creating kubernetes clientset: %v", err)
return nil, nil, fmt.Errorf("error creating kubernetes clientset: %v", err)
}
return clientset, nil
return clientset, cc, nil
} else if strings.ContainsRune(config, '\n') {
// suppose yaml file
clientconfig, err := clientcmd.NewClientConfigFromBytes([]byte(config))
if err != nil {
return nil, fmt.Errorf("error parsing kubernetes client config: %v", err)
return nil, nil, fmt.Errorf("error parsing kubernetes client config: %v", err)
}
rawConfig, _ := clientconfig.RawConfig()
clientconfig = clientcmd.NewNonInteractiveClientConfig(rawConfig,
Expand All @@ -35,13 +35,13 @@ func GetK8SClientSet(appContext *Context) (*kubernetes.Clientset, error) {
&clientcmd.ClientConfigLoadingRules{})
cc, err := clientconfig.ClientConfig()
if err != nil {
return nil, fmt.Errorf("error creating kubernetes client config: %v", err)
return nil, nil, fmt.Errorf("error creating kubernetes client config: %v", err)
}
clientset, err := kubernetes.NewForConfig(cc)
if err != nil {
return nil, fmt.Errorf("error creating kubernetes clientset: %v", err)
return nil, nil, fmt.Errorf("error creating kubernetes clientset: %v", err)
}
return clientset, nil
return clientset, cc, nil
} else {
// suppose kubeconfig file path
clientconfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
Expand All @@ -51,12 +51,12 @@ func GetK8SClientSet(appContext *Context) (*kubernetes.Clientset, error) {
})
cc, err := clientconfig.ClientConfig()
if err != nil {
return nil, fmt.Errorf("error creating kubernetes client config: %v", err)
return nil, nil, fmt.Errorf("error creating kubernetes client config: %v", err)
}
clientset, err := kubernetes.NewForConfig(cc)
if err != nil {
return nil, fmt.Errorf("error creating kubernetes clientset: %v", err)
return nil, nil, fmt.Errorf("error creating kubernetes clientset: %v", err)
}
return clientset, nil
return clientset, cc, nil
}
}
7 changes: 4 additions & 3 deletions sync-controller/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ func (t *TaskDescriptor) ExtractAnnotations() map[string]string {

type TaskStatus struct {
TaskDescriptor `json:",inline" mapstructure:",squash" `
PodName string `json:"podName"`
Status Status `json:"status"`
Description string `json:"description"`
PodName string `json:"podName"`
Status Status `json:"status"`
Description string `json:"description"`
Metrics map[string]any `json:"metrics"`
}

type Status string
Expand Down
6 changes: 5 additions & 1 deletion sync-controller/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,11 @@ func (t *TaskManager) listenTaskStatus() {
case StatusCreated:
err = db.UpsertRunningTask(t.dbpool, st.SyncID, st.TaskID, st.Package, st.PackageVersion, st.StartedAtTime(), "RUNNING", strings.Join([]string{string(st.Status), st.Description}, ": "), st.StartedBy)
case StatusRunning:
err = db.UpdateRunningTaskDate(t.dbpool, st.TaskID)
if len(st.Metrics) > 0 {
err = db.UpdateRunningTaskMetrics(t.dbpool, st.TaskID, st.Metrics)
} else {
err = db.UpdateRunningTaskDate(t.dbpool, st.TaskID)
}
default:
//do nothing. sidecar manages success status.
}
Expand Down
7 changes: 7 additions & 0 deletions sync-sidecar/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ ON CONFLICT ON CONSTRAINT source_task_pkey DO UPDATE SET updated_at=$6, status =

updateRunningTaskDateSQL = `UPDATE source_task SET updated_at=$2 where task_id=$1 and status = 'RUNNING'`

updateRunningTaskMetricsSQL = `UPDATE source_task SET updated_at=$2, metrics=$3 where task_id=$1 and status = 'RUNNING'`

updateRunningTaskStatusSQL = `UPDATE source_task SET status=$2 where task_id=$1 and status = 'RUNNING'`

upsertCheckSQL = `INSERT INTO source_check (package, version, key, status, description, timestamp) VALUES ($1, $2, $3, $4, $5, $6)
Expand Down Expand Up @@ -91,6 +93,11 @@ func UpdateRunningTaskDate(dbpool *pgxpool.Pool, taskId string) error {
return err
}

func UpdateRunningTaskMetrics(dbpool *pgxpool.Pool, taskId string, metrics map[string]any) error {
_, err := dbpool.Exec(context.Background(), updateRunningTaskMetricsSQL, taskId, time.Now(), metrics)
return err
}

func UpdateRunningTaskStatus(dbpool *pgxpool.Pool, taskId, status string) error {
_, err := dbpool.Exec(context.Background(), updateRunningTaskStatusSQL, taskId, status)
return err
Expand Down
31 changes: 27 additions & 4 deletions sync-sidecar/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
_ "github.com/jitsucom/bulker/bulkerlib/implementations/sql"
"github.com/jitsucom/bulker/eventslog"
"github.com/jitsucom/bulker/jitsubase/logging"
"github.com/jitsucom/bulker/jitsubase/utils"
"github.com/jitsucom/bulker/sync-sidecar/db"
"os"
"os/signal"
Expand All @@ -19,6 +20,15 @@ import (
"time"
)

var logLevels = map[string]int{
"TRACE": -2,
"DEBUG": -1,
"INFO": 0,
"WARN": 1,
"ERROR": 2,
"FATAL": 3,
}

type SideCar interface {
Run()
Close()
Expand All @@ -36,6 +46,9 @@ type AbstractSideCar struct {
stdOutPipeFile string
stdErrPipeFile string

logLevel string
dbLogLevel string

databaseURL string
dbpool *pgxpool.Pool

Expand Down Expand Up @@ -79,6 +92,8 @@ func main() {
stdOutPipeFile: os.Getenv("STDOUT_PIPE_FILE"),
stdErrPipeFile: os.Getenv("STDERR_PIPE_FILE"),
databaseURL: os.Getenv("DATABASE_URL"),
logLevel: strings.ToUpper(utils.DefaultString(os.Getenv("LOG_LEVEL"), "INFO")),
dbLogLevel: strings.ToUpper(utils.DefaultString(os.Getenv("DB_LOG_LEVEL"), "INFO")),
startedAt: startedAt,
}
if command == "read" {
Expand Down Expand Up @@ -183,17 +198,25 @@ func (s *AbstractSideCar) checkJsonRow(json string) bool {
}

func (s *AbstractSideCar) _log(logger, level, message string) {
fmt.Printf("%s : %s\n", level, message)
err := s.sendLog(logger, level, message)
if err != nil {
fmt.Printf("%s: %v\n", level, err)
if shouldLog(strings.ToUpper(level), s.logLevel) {
fmt.Printf("%s : %s\n", level, message)
}
if shouldLog(strings.ToUpper(level), s.dbLogLevel) {
err := s.sendLog(logger, level, message)
if err != nil {
fmt.Printf("%s: %v\n", level, err)
}
}
}

func (s *AbstractSideCar) sendLog(logger, level string, message string) error {
return db.InsertTaskLog(s.dbpool, uuid.New().String(), level, logger, message, s.syncId, s.taskId, time.Now())
}

func shouldLog(level string, enabledLevel string) bool {
return logLevels[level] >= logLevels[enabledLevel]
}

func joinStrings(str1, str2, sep string) string {
if str1 == "" {
return str2
Expand Down

0 comments on commit f109306

Please sign in to comment.