From 49fe9643bb4defba0a33ea0347586a09c067cf92 Mon Sep 17 00:00:00 2001 From: Mohsin Zaidi <2236875+smrz2001@users.noreply.github.com> Date: Thu, 26 Oct 2023 10:51:58 -0400 Subject: [PATCH] fix: reuse cd manager utils/job modules --- ci/cmd/deploy/main.go | 4 +- common/aws/ddb/job.go | 113 +++++---------------------------------- common/aws/ddb/state.go | 14 ++--- common/aws/ddb/utils.go | 56 ------------------- go.mod | 17 +++--- go.sum | 27 ++++++---- models/job.go | 8 +-- models/services.go | 4 +- services/test_helpers.go | 12 ++--- services/worker.go | 8 +-- 10 files changed, 67 insertions(+), 196 deletions(-) delete mode 100644 common/aws/ddb/utils.go diff --git a/ci/cmd/deploy/main.go b/ci/cmd/deploy/main.go index f017c3b..0579eb4 100644 --- a/ci/cmd/deploy/main.go +++ b/ci/cmd/deploy/main.go @@ -13,7 +13,7 @@ import ( "github.com/aws/aws-sdk-go-v2/service/dynamodb" "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" - "github.com/3box/pipeline-tools/cd/manager" + "github.com/3box/pipeline-tools/cd/manager/common/job" "github.com/ceramicnetwork/go-cas" "github.com/ceramicnetwork/go-cas/common/aws/config" @@ -30,7 +30,7 @@ func main() { } func createJob(ctx context.Context) (string, error) { - newJob := models.NewJob(manager.JobType_Deploy, map[string]interface{}{ + newJob := models.NewJob(job.JobType_Deploy, map[string]interface{}{ "component": models.DeployComponent, "sha": "latest", "shaTag": os.Getenv(cas.Env_ShaTag), diff --git a/common/aws/ddb/job.go b/common/aws/ddb/job.go index 2d8b4b2..8929b8a 100644 --- a/common/aws/ddb/job.go +++ b/common/aws/ddb/job.go @@ -12,17 +12,14 @@ import ( "github.com/aws/aws-sdk-go-v2/service/dynamodb" "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" - "github.com/3box/pipeline-tools/cd/manager" + "github.com/3box/pipeline-tools/cd/manager/common/aws/utils" + "github.com/3box/pipeline-tools/cd/manager/common/job" "github.com/ceramicnetwork/go-cas" "github.com/ceramicnetwork/go-cas/common" "github.com/ceramicnetwork/go-cas/models" ) -const stageTsIndex = "stage-ts-index" -const typeTsIndex = "type-ts-index" -const jobTsIndex = "job-ts-index" - type JobDatabase struct { ddbClient *dynamodb.Client table string @@ -39,104 +36,22 @@ func NewJobDb(ctx context.Context, logger models.Logger, ddbClient *dynamodb.Cli } func (jdb *JobDatabase) createJobTable(ctx context.Context) error { - createJobTableInput := dynamodb.CreateTableInput{ - BillingMode: types.BillingModePayPerRequest, - AttributeDefinitions: []types.AttributeDefinition{ - { - AttributeName: aws.String("id"), - AttributeType: "S", - }, - { - AttributeName: aws.String("job"), - AttributeType: "S", - }, - { - AttributeName: aws.String("stage"), - AttributeType: "S", - }, - { - AttributeName: aws.String("type"), - AttributeType: "S", - }, - { - AttributeName: aws.String("ts"), - AttributeType: "N", - }, - }, - KeySchema: []types.KeySchemaElement{ - { - AttributeName: aws.String("id"), - KeyType: "HASH", - }, - }, - TableName: aws.String(jdb.table), - GlobalSecondaryIndexes: []types.GlobalSecondaryIndex{ - { - IndexName: aws.String(stageTsIndex), - KeySchema: []types.KeySchemaElement{ - { - AttributeName: aws.String("stage"), - KeyType: "HASH", - }, - { - AttributeName: aws.String("ts"), - KeyType: "RANGE", - }, - }, - Projection: &types.Projection{ - ProjectionType: types.ProjectionTypeAll, - }, - }, - { - IndexName: aws.String(typeTsIndex), - KeySchema: []types.KeySchemaElement{ - { - AttributeName: aws.String("type"), - KeyType: "HASH", - }, - { - AttributeName: aws.String("ts"), - KeyType: "RANGE", - }, - }, - Projection: &types.Projection{ - ProjectionType: types.ProjectionTypeAll, - }, - }, - { - IndexName: aws.String(jobTsIndex), - KeySchema: []types.KeySchemaElement{ - { - AttributeName: aws.String("job"), - KeyType: "HASH", - }, - { - AttributeName: aws.String("ts"), - KeyType: "RANGE", - }, - }, - Projection: &types.Projection{ - ProjectionType: types.ProjectionTypeAll, - }, - }, - }, - } - return createTable(ctx, jdb.logger, jdb.ddbClient, &createJobTableInput) + return job.CreateJobTable(ctx, jdb.ddbClient, jdb.table) } func (jdb *JobDatabase) CreateJob(ctx context.Context) (string, error) { jobParams := map[string]interface{}{ - manager.JobParam_Version: models.WorkerVersion, // this will launch a CASv5 Worker - manager.JobParam_Overrides: map[string]string{ + job.JobParam_Version: models.WorkerVersion, // this will launch a CASv5 Worker + job.JobParam_Overrides: map[string]string{ models.AnchorOverrides_AppMode: models.AnchorAppMode_ContinualAnchoring, models.AnchorOverrides_SchedulerStopAfterNoOp: "true", }, } // If an override anchor contract address is available, pass it through to the job. if contractAddress, found := os.LookupEnv(models.Env_AnchorContractAddress); found { - jobParams[manager.JobParam_Overrides].(map[string]string)[models.AnchorOverrides_ContractAddress] = contractAddress + jobParams[job.JobParam_Overrides].(map[string]string)[models.AnchorOverrides_ContractAddress] = contractAddress } - newJob := models.NewJob(manager.JobType_Anchor, jobParams) + newJob := models.NewJob(job.JobType_Anchor, jobParams) attributeValues, err := attributevalue.MarshalMapWithOptions(newJob, func(options *attributevalue.EncoderOptions) { options.EncodeTime = func(time time.Time) (types.AttributeValue, error) { return &types.AttributeValueMemberN{Value: strconv.FormatInt(time.UnixNano(), 10)}, nil @@ -160,10 +75,10 @@ func (jdb *JobDatabase) CreateJob(ctx context.Context) (string, error) { } } -func (jdb *JobDatabase) QueryJob(ctx context.Context, jobId string) (*manager.JobState, error) { +func (jdb *JobDatabase) QueryJob(ctx context.Context, jobId string) (*job.JobState, error) { queryInput := dynamodb.QueryInput{ TableName: aws.String(jdb.table), - IndexName: aws.String(jobTsIndex), + IndexName: aws.String(job.JobTsIndex), KeyConditionExpression: aws.String("#job = :job"), ExpressionAttributeNames: map[string]string{"#job": "job"}, ExpressionAttributeValues: map[string]types.AttributeValue{":job": &types.AttributeValueMemberS{Value: jobId}}, @@ -176,16 +91,16 @@ func (jdb *JobDatabase) QueryJob(ctx context.Context, jobId string) (*manager.Jo if queryOutput, err := jdb.ddbClient.Query(httpCtx, &queryInput); err != nil { return nil, err } else if queryOutput.Count > 0 { - job := new(manager.JobState) - if err = attributevalue.UnmarshalMapWithOptions(queryOutput.Items[0], job, func(options *attributevalue.DecoderOptions) { + j := new(job.JobState) + if err = attributevalue.UnmarshalMapWithOptions(queryOutput.Items[0], j, func(options *attributevalue.DecoderOptions) { options.DecodeTime = attributevalue.DecodeTimeAttributes{ - S: tsDecode, - N: tsDecode, + S: utils.TsDecode, + N: utils.TsDecode, } }); err != nil { return nil, err } else { - return job, nil + return j, nil } } else { // A job specifically requested must be present diff --git a/common/aws/ddb/state.go b/common/aws/ddb/state.go index f313095..035c241 100644 --- a/common/aws/ddb/state.go +++ b/common/aws/ddb/state.go @@ -12,6 +12,8 @@ import ( "github.com/aws/aws-sdk-go-v2/service/dynamodb" "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" + "github.com/3box/pipeline-tools/cd/manager/common/aws/utils" + "github.com/ceramicnetwork/go-cas" "github.com/ceramicnetwork/go-cas/common" "github.com/ceramicnetwork/go-cas/models" @@ -67,7 +69,7 @@ func (sdb *StateDatabase) createCheckpointTable(ctx context.Context) error { TableName: aws.String(sdb.checkpointTable), BillingMode: types.BillingModePayPerRequest, } - return createTable(ctx, sdb.logger, sdb.client, &createStreamTableInput) + return utils.CreateTable(ctx, sdb.client, &createStreamTableInput) } func (sdb *StateDatabase) createStreamTable(ctx context.Context) error { @@ -95,7 +97,7 @@ func (sdb *StateDatabase) createStreamTable(ctx context.Context) error { TableName: aws.String(sdb.streamTable), BillingMode: types.BillingModePayPerRequest, } - return createTable(ctx, sdb.logger, sdb.client, &createTableInput) + return utils.CreateTable(ctx, sdb.client, &createTableInput) } func (sdb *StateDatabase) createTipTable(ctx context.Context) error { @@ -123,7 +125,7 @@ func (sdb *StateDatabase) createTipTable(ctx context.Context) error { TableName: aws.String(sdb.tipTable), BillingMode: types.BillingModePayPerRequest, } - return createTable(ctx, sdb.logger, sdb.client, &createTableInput) + return utils.CreateTable(ctx, sdb.client, &createTableInput) } func (sdb *StateDatabase) GetCheckpoint(ctx context.Context, ckptType models.CheckpointType) (time.Time, error) { @@ -254,11 +256,11 @@ func (sdb *StateDatabase) UpdateTip(ctx context.Context, newTip *models.StreamTi oldTip, func(options *attributevalue.DecoderOptions) { options.DecodeTime = attributevalue.DecodeTimeAttributes{ - S: tsDecode, - N: tsDecode, + S: utils.TsDecode, + N: utils.TsDecode, } }); err != nil { - sdb.logger.Errorf("error unmarshaling old tip: %v", err) + sdb.logger.Errorf("error unmarshalling old tip: %v", err) // We've written the new tip and lost the previous tip here. This means that we won't be able to mark // the old tip REPLACED. As a result, the old tip will get anchored along with the new tip, causing the // new tip to be rejected in Ceramic via conflict resolution. While not ideal, this is no worse than diff --git a/common/aws/ddb/utils.go b/common/aws/ddb/utils.go deleted file mode 100644 index 62deb55..0000000 --- a/common/aws/ddb/utils.go +++ /dev/null @@ -1,56 +0,0 @@ -package ddb - -import ( - "context" - "strconv" - "time" - - "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/service/dynamodb" - "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" - - "github.com/ceramicnetwork/go-cas/common" - "github.com/ceramicnetwork/go-cas/models" -) - -const tableCreationRetries = 3 -const tableCreationWait = 3 * time.Second - -func createTable(ctx context.Context, logger models.Logger, client *dynamodb.Client, createTableIn *dynamodb.CreateTableInput) error { - if exists, err := tableExists(ctx, logger, client, *createTableIn.TableName); !exists { - httpCtx, httpCancel := context.WithTimeout(ctx, common.DefaultRpcWaitTime) - defer httpCancel() - - if _, err = client.CreateTable(httpCtx, createTableIn); err != nil { - return err - } - for i := 0; i < tableCreationRetries; i++ { - if exists, err = tableExists(ctx, logger, client, *createTableIn.TableName); exists { - return nil - } - time.Sleep(tableCreationWait) - } - return err - } - return nil -} - -func tableExists(ctx context.Context, logger models.Logger, client *dynamodb.Client, table string) (bool, error) { - httpCtx, httpCancel := context.WithTimeout(ctx, common.DefaultRpcWaitTime) - defer httpCancel() - - if output, err := client.DescribeTable(httpCtx, &dynamodb.DescribeTableInput{TableName: aws.String(table)}); err != nil { - logger.Infof("table does not exist: %v", table) - return false, err - } else { - return output.Table.TableStatus == types.TableStatusActive, nil - } -} - -func tsDecode(ts string) (time.Time, error) { - nsec, err := strconv.ParseInt(ts, 10, 64) - if err != nil { - return time.Time{}, err - } - return time.Unix(0, nsec), nil -} diff --git a/go.mod b/go.mod index ddb7569..efc18a2 100644 --- a/go.mod +++ b/go.mod @@ -6,14 +6,15 @@ replace github.com/abevier/go-sqs v0.0.0-20230602180220-c0264a2611a3 => github.c require ( dagger.io/dagger v0.7.1 - github.com/3box/pipeline-tools/cd/manager v0.0.0-20231023205204-e46c925731ad + github.com/3box/pipeline-tools/cd/manager/common/aws/utils v0.0.0-20231026142209-de0959fd5908 + github.com/3box/pipeline-tools/cd/manager/common/job v0.0.0-20231026142209-de0959fd5908 github.com/abevier/go-sqs v0.0.0-20230602180220-c0264a2611a3 github.com/abevier/tsk v0.0.0-20230712145722-249b1e98b01c github.com/alexflint/go-arg v1.4.2 - github.com/aws/aws-sdk-go-v2 v1.18.0 + github.com/aws/aws-sdk-go-v2 v1.21.2 github.com/aws/aws-sdk-go-v2/config v1.18.4 github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.10.7 - github.com/aws/aws-sdk-go-v2/service/dynamodb v1.17.8 + github.com/aws/aws-sdk-go-v2/service/dynamodb v1.23.0 github.com/aws/aws-sdk-go-v2/service/ecr v1.18.11 github.com/aws/aws-sdk-go-v2/service/sqs v1.22.0 github.com/disgoorg/disgo v0.16.4 @@ -37,17 +38,17 @@ require ( github.com/alexflint/go-scalar v1.0.0 // indirect github.com/aws/aws-sdk-go-v2/credentials v1.13.4 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.20 // indirect - github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.33 // indirect - github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.27 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.43 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.37 // indirect github.com/aws/aws-sdk-go-v2/internal/ini v1.3.28 // indirect github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.13.27 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.11 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.7.20 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.15 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.7.37 // indirect github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.20 // indirect github.com/aws/aws-sdk-go-v2/service/sso v1.11.26 // indirect github.com/aws/aws-sdk-go-v2/service/ssooidc v1.13.9 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.17.6 // indirect - github.com/aws/smithy-go v1.13.5 // indirect + github.com/aws/smithy-go v1.15.0 // indirect github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/disgoorg/json v1.0.0 // indirect github.com/disgoorg/log v1.2.0 // indirect diff --git a/go.sum b/go.sum index 1aeb0e4..693738e 100644 --- a/go.sum +++ b/go.sum @@ -33,8 +33,10 @@ cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9 dagger.io/dagger v0.7.1 h1:DVRMjMCsbsE/XhF5yAaDsUepgy9z8oGglaEiXvBbelw= dagger.io/dagger v0.7.1/go.mod h1:CL/Vw6H6oOqXNh8tVk2p6xaSXYgktosMXNUwIfQqKhg= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= -github.com/3box/pipeline-tools/cd/manager v0.0.0-20231023205204-e46c925731ad h1:uIFInU7YOvO1+/QHZ90mnw0lOv+YT1JAvSy6X076QZM= -github.com/3box/pipeline-tools/cd/manager v0.0.0-20231023205204-e46c925731ad/go.mod h1:gtA3sWQni3fE0pjUmi3xb+R2mRakG1IQxgLY6CHJ5Pk= +github.com/3box/pipeline-tools/cd/manager/common/aws/utils v0.0.0-20231026142209-de0959fd5908 h1:TMo2kFAn6p0c/S6dmnHhjKAfEH4EInjOxTTE3btOxQs= +github.com/3box/pipeline-tools/cd/manager/common/aws/utils v0.0.0-20231026142209-de0959fd5908/go.mod h1:XIstImO8QgJX/ztikxq+gvYumQc4/23+qFHdJ8H60sk= +github.com/3box/pipeline-tools/cd/manager/common/job v0.0.0-20231026142209-de0959fd5908 h1:vSJ8+L7TTDpZLUN+DSckpekBs/AAOsQ5MV0lcIHyzlE= +github.com/3box/pipeline-tools/cd/manager/common/job v0.0.0-20231026142209-de0959fd5908/go.mod h1:ADJZlCg+tCuZ23wXpw7jSGj8CZ8Ye8qi2TERpcCpfms= github.com/99designs/gqlgen v0.17.2 h1:yczvlwMsfcVu/JtejqfrLwXuSP0yZFhmcss3caEvHw8= github.com/99designs/gqlgen v0.17.2/go.mod h1:K5fzLKwtph+FFgh9j7nFbRUdBKvTcGnsta51fsMTn3o= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= @@ -60,8 +62,9 @@ github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kd github.com/arbovm/levenshtein v0.0.0-20160628152529-48b4e1c0c4d0/go.mod h1:t2tdKJDJF9BV14lnkjHmOQgcvEKgtqs5a1N3LNdJhGE= github.com/aws/aws-sdk-go-v2 v1.17.2/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw= github.com/aws/aws-sdk-go-v2 v1.17.3/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw= -github.com/aws/aws-sdk-go-v2 v1.18.0 h1:882kkTpSFhdgYRKVZ/VCgf7sd0ru57p2JCxz4/oN5RY= github.com/aws/aws-sdk-go-v2 v1.18.0/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw= +github.com/aws/aws-sdk-go-v2 v1.21.2 h1:+LXZ0sgo8quN9UOKXXzAWRT3FWd4NxeXWOZom9pE7GA= +github.com/aws/aws-sdk-go-v2 v1.21.2/go.mod h1:ErQhvNuEMhJjweavOYhxVkn2RUx7kQXVATHrjKtxIpM= github.com/aws/aws-sdk-go-v2/config v1.18.4 h1:VZKhr3uAADXHStS/Gf9xSYVmmaluTUfkc0dcbPiDsKE= github.com/aws/aws-sdk-go-v2/config v1.18.4/go.mod h1:EZxMPLSdGAZ3eAmkqXfYbRppZJTzFTkv8VyEzJhKko4= github.com/aws/aws-sdk-go-v2/credentials v1.13.4 h1:nEbHIyJy7mCvQ/kzGG7VWHSBpRB4H6sJy3bWierWUtg= @@ -71,24 +74,29 @@ github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.10.7/go.mod h1:p github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.20 h1:tpNOglTZ8kg9T38NpcGBxudqfUAwUzyUnLQ4XSd0CHE= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.20/go.mod h1:d9xFpWd3qYwdIXM0fvu7deD08vvdRXyc/ueV+0SqaWE= github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.26/go.mod h1:2E0LdbJW6lbeU4uxjum99GZzI0ZjDpAb0CoSCM0oeEY= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.33 h1:kG5eQilShqmJbv11XL1VpyDbaEJzWxd4zRiCG30GSn4= github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.33/go.mod h1:7i0PF1ME/2eUPFcjkVIwq+DOygHEoK92t5cDqNgYbIw= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.43 h1:nFBQlGtkbPzp/NjZLuFxRqmT91rLJkgvsEQs68h962Y= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.43/go.mod h1:auo+PiyLl0n1l8A0e8RIeR8tOzYPfZZH/JNlrJ8igTQ= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.20/go.mod h1:/+6lSiby8TBFpTVXZgKiN/rCfkYXEGvhlM4zCgPpt7w= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.27 h1:vFQlirhuM8lLlpI7imKOMsjdQLuN9CPi+k44F/OFVsk= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.27/go.mod h1:UrHnn3QV/d0pBZ6QBAEQcqFLf8FAzLmoUfPVIueOvoM= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.37 h1:JRVhO25+r3ar2mKGP7E0LDl8K9/G36gjlqca5iQbaqc= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.37/go.mod h1:Qe+2KtKml+FEsQF/DHmDV+xjtche/hwoF75EG4UlHW8= github.com/aws/aws-sdk-go-v2/internal/ini v1.3.27/go.mod h1:RdwFVc7PBYWY33fa2+8T1mSqQ7ZEK4ILpM0wfioDC3w= github.com/aws/aws-sdk-go-v2/internal/ini v1.3.28 h1:KeTxcGdNnQudb46oOl4d90f2I33DF/c6q3RnZAmvQdQ= github.com/aws/aws-sdk-go-v2/internal/ini v1.3.28/go.mod h1:yRZVr/iT0AqyHeep00SZ4YfBAKojXz08w3XMBscdi0c= -github.com/aws/aws-sdk-go-v2/service/dynamodb v1.17.8 h1:VgdGaSIoH4JhUZIspT8UgK0aBF85TiLve7VHEx3NfqE= github.com/aws/aws-sdk-go-v2/service/dynamodb v1.17.8/go.mod h1:jvXzk+hVrlkiQOvnq6jH+F6qBK0CEceXkEWugT+4Kdc= +github.com/aws/aws-sdk-go-v2/service/dynamodb v1.23.0 h1:xmSAn14nM6IdHyuWO/bsrAagOQtnqzuUCLxdVmj9nhg= +github.com/aws/aws-sdk-go-v2/service/dynamodb v1.23.0/go.mod h1:1HkLh8vaL4obF95fne7ZOu7sxomS/+vkBt3/+gqqwE4= github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.13.27 h1:7MhqbR+k+b0gbOxp+W8yXgsl/Z5/dtMh85K0WI8X2EA= github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.13.27/go.mod h1:wX9QEZJ8Dw1fdAKCOAUmSvAe3wNJFxnE/4AeYc8blGA= github.com/aws/aws-sdk-go-v2/service/ecr v1.18.11 h1:wlTgmb/sCmVRJrN5De3CiHj4v/bTCgL5+qpdEd0CPtw= github.com/aws/aws-sdk-go-v2/service/ecr v1.18.11/go.mod h1:Ce1q2jlNm8BVpjLaOnwnm5v2RClAbK6txwPljFzyW6c= -github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.11 h1:y2+VQzC6Zh2ojtV2LoC0MNwHWc6qXv/j2vrQtlftkdA= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.11/go.mod h1:iV4q2hsqtNECrfmlXyord9u4zyuFEJX9eLgLpSPzWA8= -github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.7.20 h1:kSZR22oLBDMtP8ZPGXhz649NU77xsJDG7g3xfT6nHVk= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.15 h1:7R8uRYyXzdD71KWVCL78lJZltah6VVznXBazvKjfH58= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.15/go.mod h1:26SQUPcTNgV1Tapwdt4a1rOsYRsnBsJHLMPoxK2b0d8= github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.7.20/go.mod h1:lxM5qubwGNX29Qy+xTFG8G0r2Mj/TmyC+h3hS/7E4V8= +github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.7.37 h1:4LoizcvPT9A0tiAFhepxn0bGZXkzvN0pG0epydY3Pno= +github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.7.37/go.mod h1:7xBUZyP6LeLc+5Ym9PG7atqw4sR28sBtYcHETik+bPE= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.20 h1:jlgyHbkZQAgAc7VIxJDmtouH8eNjOk2REVAQfVhdaiQ= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.20/go.mod h1:Xs52xaLBqDEKRcAfX/hgjmD3YQ7c/W+BEyfamlO/W2E= github.com/aws/aws-sdk-go-v2/service/sqs v1.22.0 h1:ikSvot5NdywduxtkOwOa2GJFzFuJq1ZjXsGjoIA82Ao= @@ -99,8 +107,9 @@ github.com/aws/aws-sdk-go-v2/service/ssooidc v1.13.9 h1:wihKuqYUlA2T/Rx+yu2s6NDA github.com/aws/aws-sdk-go-v2/service/ssooidc v1.13.9/go.mod h1:2E/3D/mB8/r2J7nK42daoKP/ooCwbf0q1PznNc+DZTU= github.com/aws/aws-sdk-go-v2/service/sts v1.17.6 h1:VQFOLQVL3BrKM/NLO/7FiS4vcp5bqK0mGMyk09xLoAY= github.com/aws/aws-sdk-go-v2/service/sts v1.17.6/go.mod h1:Az3OXXYGyfNwQNsK/31L4R75qFYnO641RZGAoV3uH1c= -github.com/aws/smithy-go v1.13.5 h1:hgz0X/DX0dGqTYpGALqXJoRKRj5oQ7150i5FdTePzO8= github.com/aws/smithy-go v1.13.5/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA= +github.com/aws/smithy-go v1.15.0 h1:PS/durmlzvAFpQHDs4wi4sNNP9ExsqZh6IlfdHXgKK8= +github.com/aws/smithy-go v1.15.0/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/bradleyjkemp/cupaloy/v2 v2.6.0/go.mod h1:bm7JXdkRd4BHJk9HpwqAI8BoAY1lps46Enkdqw6aRX0= github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= diff --git a/models/job.go b/models/job.go index b92d932..92200a8 100644 --- a/models/job.go +++ b/models/job.go @@ -5,7 +5,7 @@ import ( "github.com/google/uuid" - "github.com/3box/pipeline-tools/cd/manager" + "github.com/3box/pipeline-tools/cd/manager/common/job" ) const DeployComponent = "casv5" @@ -23,10 +23,10 @@ const ( AnchorAppMode_ContinualAnchoring = "continual-anchoring" ) -func NewJob(jobType manager.JobType, params map[string]interface{}) manager.JobState { - return manager.JobState{ +func NewJob(jobType job.JobType, params map[string]interface{}) job.JobState { + return job.JobState{ Job: uuid.New().String(), - Stage: manager.JobStage_Queued, + Stage: job.JobStage_Queued, Type: jobType, Ts: time.Now(), Params: params, diff --git a/models/services.go b/models/services.go index 4f17b3a..6a042a9 100644 --- a/models/services.go +++ b/models/services.go @@ -6,7 +6,7 @@ import ( "github.com/google/uuid" - "github.com/3box/pipeline-tools/cd/manager" + "github.com/3box/pipeline-tools/cd/manager/common/job" ) type AnchorRepository interface { @@ -23,7 +23,7 @@ type StateRepository interface { type JobRepository interface { CreateJob(ctx context.Context) (string, error) - QueryJob(ctx context.Context, id string) (*manager.JobState, error) + QueryJob(ctx context.Context, id string) (*job.JobState, error) } type QueuePublisher interface { diff --git a/services/test_helpers.go b/services/test_helpers.go index d4a73bc..0428c42 100644 --- a/services/test_helpers.go +++ b/services/test_helpers.go @@ -11,7 +11,7 @@ import ( "github.com/google/uuid" - "github.com/3box/pipeline-tools/cd/manager" + "github.com/3box/pipeline-tools/cd/manager/common/job" "github.com/ceramicnetwork/go-cas/models" ) @@ -100,24 +100,24 @@ func (m *MockStateRepository) UpdateTip(_ context.Context, newTip *models.Stream } type MockJobRepository struct { - jobStore map[string]*manager.JobState + jobStore map[string]*job.JobState failCount int } func (m *MockJobRepository) CreateJob(_ context.Context) (string, error) { if m.jobStore == nil { - m.jobStore = make(map[string]*manager.JobState, 1) + m.jobStore = make(map[string]*job.JobState, 1) } if m.failCount > 0 { m.failCount-- return "", fmt.Errorf("failed to create job") } - newJob := models.NewJob(manager.JobType_Anchor, nil) + newJob := models.NewJob(job.JobType_Anchor, nil) m.jobStore[newJob.Job] = &newJob return newJob.Job, nil } -func (m *MockJobRepository) QueryJob(_ context.Context, id string) (*manager.JobState, error) { +func (m *MockJobRepository) QueryJob(_ context.Context, id string) (*job.JobState, error) { if jobState, found := m.jobStore[id]; found { return jobState, nil } @@ -127,7 +127,7 @@ func (m *MockJobRepository) QueryJob(_ context.Context, id string) (*manager.Job func (m *MockJobRepository) finishJobs(count int) { for _, js := range m.jobStore { if count > 0 { - js.Stage = manager.JobStage_Completed + js.Stage = job.JobStage_Completed count-- } } diff --git a/services/worker.go b/services/worker.go index 9d0ce36..0da77bb 100644 --- a/services/worker.go +++ b/services/worker.go @@ -7,7 +7,7 @@ import ( "strconv" "time" - "github.com/3box/pipeline-tools/cd/manager" + "github.com/3box/pipeline-tools/cd/manager/common/job" "github.com/ceramicnetwork/go-cas/models" ) @@ -23,7 +23,7 @@ type WorkerService struct { monitorTick time.Duration maxAnchorWorkers int amortizationFactor float64 - anchorJobs map[string]*manager.JobState + anchorJobs map[string]*job.JobState logger models.Logger } @@ -53,7 +53,7 @@ func NewWorkerService(logger models.Logger, batchMonitor models.QueueMonitor, jo batchMonitorTick, maxAnchorWorkers, amortizationFactor, - make(map[string]*manager.JobState), + make(map[string]*job.JobState), logger, } } @@ -116,7 +116,7 @@ func (w WorkerService) calculateLoad(ctx context.Context) (int, int, error) { for jobId, _ := range w.anchorJobs { if jobState, err := w.jobDb.QueryJob(ctx, jobId); err != nil { return 0, 0, err - } else if (jobState.Stage == manager.JobStage_Completed) || (jobState.Stage == manager.JobStage_Failed) { + } else if (jobState.Stage == job.JobStage_Completed) || (jobState.Stage == job.JobStage_Failed) { // Clean out finished jobs - "completed" and "failed" are the only possible terminal stages for anchor jobs. delete(w.anchorJobs, jobId) } else {