Skip to content

Commit

Permalink
fix: reuse cd manager utils/job modules
Browse files Browse the repository at this point in the history
  • Loading branch information
smrz2001 committed Oct 26, 2023
1 parent 8b44edb commit 49fe964
Show file tree
Hide file tree
Showing 10 changed files with 67 additions and 196 deletions.
4 changes: 2 additions & 2 deletions ci/cmd/deploy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"

"github.com/3box/pipeline-tools/cd/manager"
"github.com/3box/pipeline-tools/cd/manager/common/job"

"github.com/ceramicnetwork/go-cas"
"github.com/ceramicnetwork/go-cas/common/aws/config"
Expand All @@ -30,7 +30,7 @@ func main() {
}

func createJob(ctx context.Context) (string, error) {
newJob := models.NewJob(manager.JobType_Deploy, map[string]interface{}{
newJob := models.NewJob(job.JobType_Deploy, map[string]interface{}{
"component": models.DeployComponent,
"sha": "latest",
"shaTag": os.Getenv(cas.Env_ShaTag),
Expand Down
113 changes: 14 additions & 99 deletions common/aws/ddb/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,14 @@ import (
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"

"github.com/3box/pipeline-tools/cd/manager"
"github.com/3box/pipeline-tools/cd/manager/common/aws/utils"
"github.com/3box/pipeline-tools/cd/manager/common/job"

"github.com/ceramicnetwork/go-cas"
"github.com/ceramicnetwork/go-cas/common"
"github.com/ceramicnetwork/go-cas/models"
)

const stageTsIndex = "stage-ts-index"
const typeTsIndex = "type-ts-index"
const jobTsIndex = "job-ts-index"

type JobDatabase struct {
ddbClient *dynamodb.Client
table string
Expand All @@ -39,104 +36,22 @@ func NewJobDb(ctx context.Context, logger models.Logger, ddbClient *dynamodb.Cli
}

func (jdb *JobDatabase) createJobTable(ctx context.Context) error {
createJobTableInput := dynamodb.CreateTableInput{
BillingMode: types.BillingModePayPerRequest,
AttributeDefinitions: []types.AttributeDefinition{
{
AttributeName: aws.String("id"),
AttributeType: "S",
},
{
AttributeName: aws.String("job"),
AttributeType: "S",
},
{
AttributeName: aws.String("stage"),
AttributeType: "S",
},
{
AttributeName: aws.String("type"),
AttributeType: "S",
},
{
AttributeName: aws.String("ts"),
AttributeType: "N",
},
},
KeySchema: []types.KeySchemaElement{
{
AttributeName: aws.String("id"),
KeyType: "HASH",
},
},
TableName: aws.String(jdb.table),
GlobalSecondaryIndexes: []types.GlobalSecondaryIndex{
{
IndexName: aws.String(stageTsIndex),
KeySchema: []types.KeySchemaElement{
{
AttributeName: aws.String("stage"),
KeyType: "HASH",
},
{
AttributeName: aws.String("ts"),
KeyType: "RANGE",
},
},
Projection: &types.Projection{
ProjectionType: types.ProjectionTypeAll,
},
},
{
IndexName: aws.String(typeTsIndex),
KeySchema: []types.KeySchemaElement{
{
AttributeName: aws.String("type"),
KeyType: "HASH",
},
{
AttributeName: aws.String("ts"),
KeyType: "RANGE",
},
},
Projection: &types.Projection{
ProjectionType: types.ProjectionTypeAll,
},
},
{
IndexName: aws.String(jobTsIndex),
KeySchema: []types.KeySchemaElement{
{
AttributeName: aws.String("job"),
KeyType: "HASH",
},
{
AttributeName: aws.String("ts"),
KeyType: "RANGE",
},
},
Projection: &types.Projection{
ProjectionType: types.ProjectionTypeAll,
},
},
},
}
return createTable(ctx, jdb.logger, jdb.ddbClient, &createJobTableInput)
return job.CreateJobTable(ctx, jdb.ddbClient, jdb.table)
}

func (jdb *JobDatabase) CreateJob(ctx context.Context) (string, error) {
jobParams := map[string]interface{}{
manager.JobParam_Version: models.WorkerVersion, // this will launch a CASv5 Worker
manager.JobParam_Overrides: map[string]string{
job.JobParam_Version: models.WorkerVersion, // this will launch a CASv5 Worker
job.JobParam_Overrides: map[string]string{
models.AnchorOverrides_AppMode: models.AnchorAppMode_ContinualAnchoring,
models.AnchorOverrides_SchedulerStopAfterNoOp: "true",
},
}
// If an override anchor contract address is available, pass it through to the job.
if contractAddress, found := os.LookupEnv(models.Env_AnchorContractAddress); found {
jobParams[manager.JobParam_Overrides].(map[string]string)[models.AnchorOverrides_ContractAddress] = contractAddress
jobParams[job.JobParam_Overrides].(map[string]string)[models.AnchorOverrides_ContractAddress] = contractAddress
}
newJob := models.NewJob(manager.JobType_Anchor, jobParams)
newJob := models.NewJob(job.JobType_Anchor, jobParams)
attributeValues, err := attributevalue.MarshalMapWithOptions(newJob, func(options *attributevalue.EncoderOptions) {
options.EncodeTime = func(time time.Time) (types.AttributeValue, error) {
return &types.AttributeValueMemberN{Value: strconv.FormatInt(time.UnixNano(), 10)}, nil
Expand All @@ -160,10 +75,10 @@ func (jdb *JobDatabase) CreateJob(ctx context.Context) (string, error) {
}
}

func (jdb *JobDatabase) QueryJob(ctx context.Context, jobId string) (*manager.JobState, error) {
func (jdb *JobDatabase) QueryJob(ctx context.Context, jobId string) (*job.JobState, error) {
queryInput := dynamodb.QueryInput{
TableName: aws.String(jdb.table),
IndexName: aws.String(jobTsIndex),
IndexName: aws.String(job.JobTsIndex),
KeyConditionExpression: aws.String("#job = :job"),
ExpressionAttributeNames: map[string]string{"#job": "job"},
ExpressionAttributeValues: map[string]types.AttributeValue{":job": &types.AttributeValueMemberS{Value: jobId}},
Expand All @@ -176,16 +91,16 @@ func (jdb *JobDatabase) QueryJob(ctx context.Context, jobId string) (*manager.Jo
if queryOutput, err := jdb.ddbClient.Query(httpCtx, &queryInput); err != nil {
return nil, err
} else if queryOutput.Count > 0 {
job := new(manager.JobState)
if err = attributevalue.UnmarshalMapWithOptions(queryOutput.Items[0], job, func(options *attributevalue.DecoderOptions) {
j := new(job.JobState)
if err = attributevalue.UnmarshalMapWithOptions(queryOutput.Items[0], j, func(options *attributevalue.DecoderOptions) {
options.DecodeTime = attributevalue.DecodeTimeAttributes{
S: tsDecode,
N: tsDecode,
S: utils.TsDecode,
N: utils.TsDecode,
}
}); err != nil {
return nil, err
} else {
return job, nil
return j, nil
}
} else {
// A job specifically requested must be present
Expand Down
14 changes: 8 additions & 6 deletions common/aws/ddb/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"

"github.com/3box/pipeline-tools/cd/manager/common/aws/utils"

"github.com/ceramicnetwork/go-cas"
"github.com/ceramicnetwork/go-cas/common"
"github.com/ceramicnetwork/go-cas/models"
Expand Down Expand Up @@ -67,7 +69,7 @@ func (sdb *StateDatabase) createCheckpointTable(ctx context.Context) error {
TableName: aws.String(sdb.checkpointTable),
BillingMode: types.BillingModePayPerRequest,
}
return createTable(ctx, sdb.logger, sdb.client, &createStreamTableInput)
return utils.CreateTable(ctx, sdb.client, &createStreamTableInput)
}

func (sdb *StateDatabase) createStreamTable(ctx context.Context) error {
Expand Down Expand Up @@ -95,7 +97,7 @@ func (sdb *StateDatabase) createStreamTable(ctx context.Context) error {
TableName: aws.String(sdb.streamTable),
BillingMode: types.BillingModePayPerRequest,
}
return createTable(ctx, sdb.logger, sdb.client, &createTableInput)
return utils.CreateTable(ctx, sdb.client, &createTableInput)
}

func (sdb *StateDatabase) createTipTable(ctx context.Context) error {
Expand Down Expand Up @@ -123,7 +125,7 @@ func (sdb *StateDatabase) createTipTable(ctx context.Context) error {
TableName: aws.String(sdb.tipTable),
BillingMode: types.BillingModePayPerRequest,
}
return createTable(ctx, sdb.logger, sdb.client, &createTableInput)
return utils.CreateTable(ctx, sdb.client, &createTableInput)
}

func (sdb *StateDatabase) GetCheckpoint(ctx context.Context, ckptType models.CheckpointType) (time.Time, error) {
Expand Down Expand Up @@ -254,11 +256,11 @@ func (sdb *StateDatabase) UpdateTip(ctx context.Context, newTip *models.StreamTi
oldTip,
func(options *attributevalue.DecoderOptions) {
options.DecodeTime = attributevalue.DecodeTimeAttributes{
S: tsDecode,
N: tsDecode,
S: utils.TsDecode,
N: utils.TsDecode,
}
}); err != nil {
sdb.logger.Errorf("error unmarshaling old tip: %v", err)
sdb.logger.Errorf("error unmarshalling old tip: %v", err)
// We've written the new tip and lost the previous tip here. This means that we won't be able to mark
// the old tip REPLACED. As a result, the old tip will get anchored along with the new tip, causing the
// new tip to be rejected in Ceramic via conflict resolution. While not ideal, this is no worse than
Expand Down
56 changes: 0 additions & 56 deletions common/aws/ddb/utils.go

This file was deleted.

17 changes: 9 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ replace github.com/abevier/go-sqs v0.0.0-20230602180220-c0264a2611a3 => github.c

require (
dagger.io/dagger v0.7.1
github.com/3box/pipeline-tools/cd/manager v0.0.0-20231023205204-e46c925731ad
github.com/3box/pipeline-tools/cd/manager/common/aws/utils v0.0.0-20231026142209-de0959fd5908
github.com/3box/pipeline-tools/cd/manager/common/job v0.0.0-20231026142209-de0959fd5908
github.com/abevier/go-sqs v0.0.0-20230602180220-c0264a2611a3
github.com/abevier/tsk v0.0.0-20230712145722-249b1e98b01c
github.com/alexflint/go-arg v1.4.2
github.com/aws/aws-sdk-go-v2 v1.18.0
github.com/aws/aws-sdk-go-v2 v1.21.2
github.com/aws/aws-sdk-go-v2/config v1.18.4
github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.10.7
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.17.8
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.23.0
github.com/aws/aws-sdk-go-v2/service/ecr v1.18.11
github.com/aws/aws-sdk-go-v2/service/sqs v1.22.0
github.com/disgoorg/disgo v0.16.4
Expand All @@ -37,17 +38,17 @@ require (
github.com/alexflint/go-scalar v1.0.0 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.13.4 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.20 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.33 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.27 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.43 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.37 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.28 // indirect
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.13.27 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.11 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.7.20 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.15 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.7.37 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.20 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.11.26 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.13.9 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.17.6 // indirect
github.com/aws/smithy-go v1.13.5 // indirect
github.com/aws/smithy-go v1.15.0 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/disgoorg/json v1.0.0 // indirect
github.com/disgoorg/log v1.2.0 // indirect
Expand Down
Loading

0 comments on commit 49fe964

Please sign in to comment.