Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(api): Update go dependencies in merlin api server #551

Merged
merged 15 commits into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/merlin.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ env:
ARTIFACT_RETENTION_DAYS: 7
DOCKER_BUILDKIT: 1
DOCKER_REGISTRY: ghcr.io
GO_VERSION: "1.20"
GO_VERSION: "1.22"

jobs:
create-version:
Expand Down Expand Up @@ -147,7 +147,7 @@ jobs:
uses: golangci/golangci-lint-action@v3
with:
# Ensure the same version as the one defined in Makefile
version: v1.51.2
version: v1.56.2
deadlycoconuts marked this conversation as resolved.
Show resolved Hide resolved
working-directory: api

test-api:
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# ============================================================
# Build stage 1: Build API
# ============================================================
FROM golang:1.20-alpine as go-builder
FROM golang:1.22-alpine as go-builder

RUN apk update && apk add --no-cache git ca-certificates bash
RUN mkdir -p src/api
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ VERSION := $(or ${VERSION}, $(shell git describe --tags --always --first-parent)
LOG_URL?=localhost:8002
TEST_TAGS?=

GOLANGCI_LINT_VERSION="v1.51.2"
GOLANGCI_LINT_VERSION="v1.56.2"
PROTOC_GEN_GO_JSON_VERSION="v1.1.0"
PROTOC_GEN_GO_VERSION="v1.26"
PYTHON_VERSION ?= "39" #set as 38 39 310 for 3.8-3.10 respectively
Expand Down
9 changes: 6 additions & 3 deletions api/batch/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func NewController(
manifestManager ManifestManager,
envMetaData cluster.Metadata,
batchJobTemplater *BatchJobTemplater,
) Controller {
) (Controller, error) {
informerFactory := externalversions.NewSharedInformerFactory(sparkClient, resyncPeriod)
informer := informerFactory.Sparkoperator().V1beta2().SparkApplications().Informer()
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
Expand All @@ -130,10 +130,13 @@ func NewController(
ContainerFetcher: cluster.NewContainerFetcher(kubeClient.CoreV1(), envMetaData),
}

informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
_, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: controller.onUpdate,
})
return controller
if err != nil {
return nil, err
}
return controller, nil
}

func (c *controller) Submit(ctx context.Context, predictionJob *models.PredictionJob, namespace string) error {
Expand Down
34 changes: 24 additions & 10 deletions api/batch/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,8 +328,10 @@ func TestSubmit(t *testing.T) {
mockManifestManager := &batchMock.ManifestManager{}
clusterMetadata := cluster.Metadata{GcpProject: "my-gcp", ClusterName: "my-cluster"}
batchJobTemplater := NewBatchJobTemplater(defaultBatchConfig)
ctl := NewController(mockStorage, mockMlpAPIClient, mockSparkClient, mockKubeClient, mockManifestManager,
ctl, err := NewController(mockStorage, mockMlpAPIClient, mockSparkClient, mockKubeClient,
mockManifestManager,
clusterMetadata, batchJobTemplater)
assert.NoError(t, err)

mockKubeClient.PrependReactor("get", "namespaces", func(action ktesting.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, kerrors.NewNotFound(schema.GroupResource{}, action.(ktesting.GetAction).GetName())
Expand Down Expand Up @@ -358,7 +360,7 @@ func TestSubmit(t *testing.T) {
mockManifestManager.On("DeleteJobSpec", context.Background(), jobName, defaultNamespace).Return(nil)
}

err := ctl.Submit(context.Background(), predictionJob, test.namespace)
err = ctl.Submit(context.Background(), predictionJob, test.namespace)
if test.wantError {
assert.Error(t, err)
assert.Equal(t, test.wantErrorMsg, err.Error())
Expand Down Expand Up @@ -396,13 +398,14 @@ func TestCleanupAfterSubmitFailed(t *testing.T) {
mockManifestManager := &batchMock.ManifestManager{}
clusterMetadata := cluster.Metadata{GcpProject: "my-gcp", ClusterName: "my-cluster"}
batchJobTemplater := NewBatchJobTemplater(defaultBatchConfig)
ctl := NewController(mockStorage, mockMlpAPIClient, mockSparkClient, mockKubeClient, mockManifestManager,
ctl, err := NewController(mockStorage, mockMlpAPIClient, mockSparkClient, mockKubeClient, mockManifestManager,
clusterMetadata, batchJobTemplater)
assert.NoError(t, err)

mockManifestManager.On("DeleteSecret", context.Background(), jobName, defaultNamespace).Return(nil)
mockManifestManager.On("DeleteJobSpec", context.Background(), jobName, defaultNamespace).Return(nil)

err := ctl.Submit(context.Background(), predictionJob, defaultNamespace)
err = ctl.Submit(context.Background(), predictionJob, defaultNamespace)
assert.Error(t, err)
mockManifestManager.AssertExpectations(t)
}
Expand All @@ -422,8 +425,12 @@ func TestOnUpdate(t *testing.T) {
mockManifestManager := &batchMock.ManifestManager{}
clusterMetadata := cluster.Metadata{GcpProject: "my-gcp", ClusterName: "my-cluster"}
batchJobTemplater := NewBatchJobTemplater(defaultBatchConfig)
ctl := NewController(mockStorage, mockMlpAPIClient, mockSparkClient, mockKubeClient, mockManifestManager,
clusterMetadata, batchJobTemplater).(*controller)
newController, err := NewController(mockStorage, mockMlpAPIClient, mockSparkClient, mockKubeClient,
mockManifestManager,
clusterMetadata, batchJobTemplater)
assert.NoError(t, err)

ctl := newController.(*controller)
stopCh := make(chan struct{})
defer close(stopCh)
go ctl.Run(stopCh)
Expand Down Expand Up @@ -499,8 +506,12 @@ func TestUpdateStatus(t *testing.T) {
mockManifestManager := &batchMock.ManifestManager{}
clusterMetadata := cluster.Metadata{GcpProject: "my-gcp", ClusterName: "my-cluster"}
batchJobTemplater := NewBatchJobTemplater(defaultBatchConfig)
ctl := NewController(mockStorage, mockMlpAPIClient, mockSparkClient, mockKubeClient, mockManifestManager,
clusterMetadata, batchJobTemplater).(*controller)
newController, err := NewController(mockStorage, mockMlpAPIClient, mockSparkClient, mockKubeClient,
mockManifestManager,
clusterMetadata, batchJobTemplater)
assert.NoError(t, err)

ctl := newController.(*controller)
stopCh := make(chan struct{})
defer close(stopCh)
go ctl.Run(stopCh)
Expand Down Expand Up @@ -597,9 +608,12 @@ func TestStop(t *testing.T) {
mockManifestManager := &batchMock.ManifestManager{}
clusterMetadata := cluster.Metadata{GcpProject: "my-gcp", ClusterName: "my-cluster"}
batchJobTemplater := NewBatchJobTemplater(defaultBatchConfig)
ctl := NewController(mockStorage, mockMlpAPIClient, mockSparkClient, mockKubeClient, mockManifestManager,
newController, err := NewController(mockStorage, mockMlpAPIClient, mockSparkClient, mockKubeClient,
mockManifestManager,
clusterMetadata, batchJobTemplater)
assert.NoError(t, err)

ctl := newController.(*controller)
mockKubeClient.PrependReactor("get", "namespaces", func(action ktesting.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, kerrors.NewNotFound(schema.GroupResource{}, action.(ktesting.GetAction).GetName())
})
Expand All @@ -616,7 +630,7 @@ func TestStop(t *testing.T) {
})
mockStorage.On("Delete", predictionJob).Return(nil)

err := ctl.Stop(context.Background(), predictionJob, namespace.Name)
err = ctl.Stop(context.Background(), predictionJob, namespace.Name)
if test.wantError {
assert.Error(t, err)
assert.Equal(t, test.wantErrorMsg, err.Error())
Expand Down
23 changes: 13 additions & 10 deletions api/cluster/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,8 @@ func (c *controller) Deploy(ctx context.Context, modelService *models.Service) (
if modelService.CurrentIsvcName != "" {
if modelService.DeploymentMode == deployment.ServerlessDeploymentMode ||
modelService.DeploymentMode == deployment.EmptyDeploymentMode {
currentIsvc, err := c.kserveClient.InferenceServices(modelService.Namespace).Get(modelService.CurrentIsvcName, metav1.GetOptions{})
currentIsvc, err := c.kserveClient.InferenceServices(modelService.Namespace).Get(ctx,
modelService.CurrentIsvcName, metav1.GetOptions{})
deadlycoconuts marked this conversation as resolved.
Show resolved Hide resolved
if err != nil && !kerrors.IsNotFound(err) {
return nil, errors.Wrapf(err, fmt.Sprintf("%v (%s)", ErrUnableToGetInferenceServiceStatus, isvcName))
}
Expand All @@ -234,10 +235,10 @@ func (c *controller) Deploy(ctx context.Context, modelService *models.Service) (
}

// check the cluster to see if the inference service has already been deployed
s, err := c.kserveClient.InferenceServices(modelService.Namespace).Get(modelService.Name, metav1.GetOptions{})
s, err := c.kserveClient.InferenceServices(modelService.Namespace).Get(ctx, modelService.Name, metav1.GetOptions{})
if err != nil {
if kerrors.IsNotFound(err) {
s, err = c.kserveClient.InferenceServices(modelService.Namespace).Create(spec)
s, err = c.kserveClient.InferenceServices(modelService.Namespace).Create(ctx, spec, metav1.CreateOptions{})
if err != nil {
log.Errorf("unable to create inference service %s: %v", isvcName, err)
return nil, errors.Wrapf(err, fmt.Sprintf("%v (%s)", ErrUnableToCreateInferenceService, isvcName))
Expand All @@ -260,7 +261,7 @@ func (c *controller) Deploy(ctx context.Context, modelService *models.Service) (
s, err = c.waitInferenceServiceReady(s)
if err != nil {
// remove created inferenceservice when got error
if err := c.deleteInferenceService(isvcName, modelService.Namespace); err != nil {
if err := c.deleteInferenceService(ctx, isvcName, modelService.Namespace); err != nil {
log.Errorf("unable to delete inference service %s with error %v", isvcName, err)
}

Expand Down Expand Up @@ -288,7 +289,7 @@ func (c *controller) Deploy(ctx context.Context, modelService *models.Service) (

// Delete previous inference service
if modelService.CurrentIsvcName != "" {
if err := c.deleteInferenceService(modelService.CurrentIsvcName, modelService.Namespace); err != nil {
if err := c.deleteInferenceService(ctx, modelService.CurrentIsvcName, modelService.Namespace); err != nil {
log.Errorf("unable to delete prevision revision %s with error %v", modelService.CurrentIsvcName, err)
return nil, errors.Wrapf(err, fmt.Sprintf("%v (%s)", ErrUnableToDeletePreviousInferenceService, modelService.CurrentIsvcName))
}
Expand All @@ -305,15 +306,16 @@ func (c *controller) Deploy(ctx context.Context, modelService *models.Service) (
}

func (c *controller) Delete(ctx context.Context, modelService *models.Service) (*models.Service, error) {
infSvc, err := c.kserveClient.InferenceServices(modelService.Namespace).Get(modelService.Name, metav1.GetOptions{})
infSvc, err := c.kserveClient.InferenceServices(modelService.Namespace).Get(ctx, modelService.Name,
metav1.GetOptions{})
if err != nil {
if !kerrors.IsNotFound(err) {
return nil, errors.Wrapf(err, "unable to check status of inference service: %s", infSvc.Name)
}
return modelService, nil
}

if err := c.deleteInferenceService(modelService.Name, modelService.Namespace); err != nil {
if err := c.deleteInferenceService(ctx, modelService.Name, modelService.Namespace); err != nil {
return nil, err
}

Expand All @@ -336,9 +338,10 @@ func (c *controller) Delete(ctx context.Context, modelService *models.Service) (
return modelService, nil
}

func (c *controller) deleteInferenceService(serviceName string, namespace string) error {
func (c *controller) deleteInferenceService(ctx context.Context, serviceName string, namespace string) error {
gracePeriod := int64(deletionGracePeriodSecond)
err := c.kserveClient.InferenceServices(namespace).Delete(serviceName, &metav1.DeleteOptions{GracePeriodSeconds: &gracePeriod})
err := c.kserveClient.InferenceServices(namespace).Delete(ctx, serviceName,
metav1.DeleteOptions{GracePeriodSeconds: &gracePeriod})
if client.IgnoreNotFound(err) != nil {
return errors.Wrapf(err, "unable to delete inference service: %s %v", serviceName, err)
}
Expand Down Expand Up @@ -373,7 +376,7 @@ func (c *controller) waitInferenceServiceReady(service *kservev1beta1.InferenceS
}
}()

isvcWatcher, err := c.kserveClient.InferenceServices(service.Namespace).Watch(metav1.ListOptions{
isvcWatcher, err := c.kserveClient.InferenceServices(service.Namespace).Watch(ctx, metav1.ListOptions{
FieldSelector: fmt.Sprintf("metadata.name=%s", service.Name),
})
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions api/cluster/resource/templater.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ func (t *InferenceServiceTemplater) enrichStandardTransformerEnvVars(modelServic

func createHTTPGetLivenessProbe(httpPath string, port int) *corev1.Probe {
return &corev1.Probe{
Handler: corev1.Handler{
ProbeHandler: corev1.ProbeHandler{
deadlycoconuts marked this conversation as resolved.
Show resolved Hide resolved
HTTPGet: &corev1.HTTPGetAction{
Path: httpPath,
Scheme: "HTTP",
Expand All @@ -575,7 +575,7 @@ func createHTTPGetLivenessProbe(httpPath string, port int) *corev1.Probe {

func createGRPCLivenessProbe(port int) *corev1.Probe {
return &corev1.Probe{
Handler: corev1.Handler{
ProbeHandler: corev1.ProbeHandler{
Exec: &corev1.ExecAction{
Command: []string{grpcHealthProbeCommand, fmt.Sprintf("-addr=:%d", port)},
},
Expand Down
7 changes: 5 additions & 2 deletions api/cmd/api/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"gorm.io/gorm"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/utils/clock"
deadlycoconuts marked this conversation as resolved.
Show resolved Hide resolved

"github.com/caraml-dev/merlin/api"
"github.com/caraml-dev/merlin/batch"
Expand Down Expand Up @@ -403,8 +403,11 @@ func initBatchControllers(cfg *config.Config, db *gorm.DB, mlpAPIClient mlp.APIC

batchJobTemplator := batch.NewBatchJobTemplater(cfg.BatchConfig)

ctl := batch.NewController(predictionJobStorage, mlpAPIClient, sparkClient, kubeClient, manifestManager,
ctl, err := batch.NewController(predictionJobStorage, mlpAPIClient, sparkClient, kubeClient, manifestManager,
envMetadata, batchJobTemplator)
if err != nil {
log.Panicf("unable to create batch controller: %v", err)
}
stopCh := make(chan struct{})
go ctl.Run(stopCh)

Expand Down
17 changes: 8 additions & 9 deletions api/cmd/inference-logger/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,22 @@ import (
"strings"
"time"

"github.com/caraml-dev/merlin/pkg/inference-logger/liveness"
merlinlogger "github.com/caraml-dev/merlin/pkg/inference-logger/logger"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/kelseyhightower/envconfig"
nrconfig "github.com/newrelic/newrelic-client-go/v2/pkg/config"
nrlog "github.com/newrelic/newrelic-client-go/v2/pkg/logs"
"github.com/pkg/errors"
"go.uber.org/zap"
network "knative.dev/networking/pkg"
"knative.dev/networking/pkg/http/header"
"knative.dev/networking/pkg/http/proxy"
pkgnet "knative.dev/pkg/network"
pkghandler "knative.dev/pkg/network/handlers"
"knative.dev/pkg/signals"
"knative.dev/serving/pkg/queue"
"knative.dev/serving/pkg/queue/health"
"knative.dev/serving/pkg/queue/readiness"

"github.com/caraml-dev/merlin/pkg/inference-logger/liveness"
merlinlogger "github.com/caraml-dev/merlin/pkg/inference-logger/logger"
)

var (
Expand Down Expand Up @@ -216,10 +216,9 @@ func buildServer(target *url.URL, dispatcher *merlinlogger.Dispatcher, loggingMo

httpProxy := httputil.NewSingleHostReverseProxy(target)
httpProxy.Transport = pkgnet.NewAutoTransport(maxIdleConns /* max-idle */, maxIdleConns /* max-idle-per-host */)
// nolint:staticcheck
httpProxy.ErrorHandler = pkgnet.ErrorHandler(log)
httpProxy.BufferPool = network.NewBufferPool()
httpProxy.FlushInterval = network.FlushInterval
httpProxy.ErrorHandler = pkghandler.Error(log)
httpProxy.BufferPool = proxy.NewBufferPool()
httpProxy.FlushInterval = proxy.FlushInterval

var composedHandler http.Handler = httpProxy
composedHandler = merlinlogger.NewLoggerHandler(dispatcher, loggingMode, composedHandler, log)
Expand All @@ -230,7 +229,7 @@ func buildServer(target *url.URL, dispatcher *merlinlogger.Dispatcher, loggingMo
drainer := &pkghandler.Drainer{
QuietPeriod: drainSleepDuration,
// Add Activator probe header to the drainer so it can handle probes directly from activator
HealthCheckUAPrefixes: []string{network.ActivatorUserAgent},
HealthCheckUAPrefixes: []string{header.ActivatorUserAgent},
deadlycoconuts marked this conversation as resolved.
Show resolved Hide resolved
Inner: composedHandler,
HealthCheck: health.ProbeHandler(probe, false),
}
Expand Down
2 changes: 1 addition & 1 deletion api/cmd/transformer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/kelseyhightower/envconfig"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/version"
"github.com/prometheus/client_golang/prometheus/collectors/version"
"go.opentelemetry.io/contrib/propagators/b3"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/jaeger"
Expand Down
Loading
Loading