Skip to content

Commit

Permalink
fix: reuse cd manager job schema
Browse files Browse the repository at this point in the history
  • Loading branch information
smrz2001 committed Oct 25, 2023
1 parent de57db4 commit 8b44edb
Show file tree
Hide file tree
Showing 9 changed files with 82 additions and 73 deletions.
4 changes: 3 additions & 1 deletion ci/cmd/deploy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"

"github.com/3box/pipeline-tools/cd/manager"

"github.com/ceramicnetwork/go-cas"
"github.com/ceramicnetwork/go-cas/common/aws/config"
"github.com/ceramicnetwork/go-cas/models"
Expand All @@ -28,7 +30,7 @@ func main() {
}

func createJob(ctx context.Context) (string, error) {
newJob := models.NewJob(models.JobType_Deploy, map[string]interface{}{
newJob := models.NewJob(manager.JobType_Deploy, map[string]interface{}{
"component": models.DeployComponent,
"sha": "latest",
"shaTag": os.Getenv(cas.Env_ShaTag),
Expand Down
64 changes: 50 additions & 14 deletions common/aws/ddb/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,20 @@ import (
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"

"github.com/3box/pipeline-tools/cd/manager"

"github.com/ceramicnetwork/go-cas"
"github.com/ceramicnetwork/go-cas/common"
"github.com/ceramicnetwork/go-cas/models"
)

const stageTsIndex = "stage-ts-index"
const typeTsIndex = "type-ts-index"
const jobTsIndex = "job-ts-index"

type JobDatabase struct {
ddbClient *dynamodb.Client
jobTable string
table string
logger models.Logger
}

Expand Down Expand Up @@ -65,8 +69,40 @@ func (jdb *JobDatabase) createJobTable(ctx context.Context) error {
KeyType: "HASH",
},
},
TableName: aws.String(jdb.jobTable),
TableName: aws.String(jdb.table),
GlobalSecondaryIndexes: []types.GlobalSecondaryIndex{
{
IndexName: aws.String(stageTsIndex),
KeySchema: []types.KeySchemaElement{
{
AttributeName: aws.String("stage"),
KeyType: "HASH",
},
{
AttributeName: aws.String("ts"),
KeyType: "RANGE",
},
},
Projection: &types.Projection{
ProjectionType: types.ProjectionTypeAll,
},
},
{
IndexName: aws.String(typeTsIndex),
KeySchema: []types.KeySchemaElement{
{
AttributeName: aws.String("type"),
KeyType: "HASH",
},
{
AttributeName: aws.String("ts"),
KeyType: "RANGE",
},
},
Projection: &types.Projection{
ProjectionType: types.ProjectionTypeAll,
},
},
{
IndexName: aws.String(jobTsIndex),
KeySchema: []types.KeySchemaElement{
Expand All @@ -90,17 +126,17 @@ func (jdb *JobDatabase) createJobTable(ctx context.Context) error {

func (jdb *JobDatabase) CreateJob(ctx context.Context) (string, error) {
jobParams := map[string]interface{}{
models.JobParam_Version: models.WorkerVersion, // this will launch a CASv5 Worker
models.JobParam_Overrides: map[string]string{
manager.JobParam_Version: models.WorkerVersion, // this will launch a CASv5 Worker
manager.JobParam_Overrides: map[string]string{
models.AnchorOverrides_AppMode: models.AnchorAppMode_ContinualAnchoring,
models.AnchorOverrides_SchedulerStopAfterNoOp: "true",
},
}
// If an override anchor contract address is available, pass it through to the job.
if contractAddress, found := os.LookupEnv(models.Env_AnchorContractAddress); found {
jobParams[models.JobParam_Overrides].(map[string]string)[models.AnchorOverrides_ContractAddress] = contractAddress
jobParams[manager.JobParam_Overrides].(map[string]string)[models.AnchorOverrides_ContractAddress] = contractAddress
}
newJob := models.NewJob(models.JobType_Anchor, jobParams)
newJob := models.NewJob(manager.JobType_Anchor, jobParams)
attributeValues, err := attributevalue.MarshalMapWithOptions(newJob, func(options *attributevalue.EncoderOptions) {
options.EncodeTime = func(time time.Time) (types.AttributeValue, error) {
return &types.AttributeValueMemberN{Value: strconv.FormatInt(time.UnixNano(), 10)}, nil
Expand All @@ -113,7 +149,7 @@ func (jdb *JobDatabase) CreateJob(ctx context.Context) (string, error) {
defer httpCancel()

_, err = jdb.ddbClient.PutItem(httpCtx, &dynamodb.PutItemInput{
TableName: aws.String(jdb.jobTable),
TableName: aws.String(jdb.table),
Item: attributeValues,
})
if err != nil {
Expand All @@ -124,13 +160,13 @@ func (jdb *JobDatabase) CreateJob(ctx context.Context) (string, error) {
}
}

func (jdb *JobDatabase) QueryJob(ctx context.Context, id string) (*models.JobState, error) {
func (jdb *JobDatabase) QueryJob(ctx context.Context, jobId string) (*manager.JobState, error) {
queryInput := dynamodb.QueryInput{
TableName: aws.String(jdb.jobTable),
TableName: aws.String(jdb.table),
IndexName: aws.String(jobTsIndex),
KeyConditionExpression: aws.String("#id = :id"),
ExpressionAttributeNames: map[string]string{"#id": "id"},
ExpressionAttributeValues: map[string]types.AttributeValue{":id": &types.AttributeValueMemberS{Value: id}},
KeyConditionExpression: aws.String("#job = :job"),
ExpressionAttributeNames: map[string]string{"#job": "job"},
ExpressionAttributeValues: map[string]types.AttributeValue{":job": &types.AttributeValueMemberS{Value: jobId}},
ScanIndexForward: aws.Bool(false), // descending order so we get the latest job state
}

Expand All @@ -140,7 +176,7 @@ func (jdb *JobDatabase) QueryJob(ctx context.Context, id string) (*models.JobSta
if queryOutput, err := jdb.ddbClient.Query(httpCtx, &queryInput); err != nil {
return nil, err
} else if queryOutput.Count > 0 {
job := new(models.JobState)
job := new(manager.JobState)
if err = attributevalue.UnmarshalMapWithOptions(queryOutput.Items[0], job, func(options *attributevalue.DecoderOptions) {
options.DecodeTime = attributevalue.DecodeTimeAttributes{
S: tsDecode,
Expand All @@ -153,6 +189,6 @@ func (jdb *JobDatabase) QueryJob(ctx context.Context, id string) (*models.JobSta
}
} else {
// A job specifically requested must be present
return nil, fmt.Errorf("job %s not found", id)
return nil, fmt.Errorf("job %s not found", jobId)
}
}
21 changes: 6 additions & 15 deletions common/aws/ddb/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,8 @@ func (sdb *StateDatabase) createCheckpointTable(ctx context.Context) error {
KeyType: "HASH",
},
},
TableName: aws.String(sdb.checkpointTable),
ProvisionedThroughput: &types.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(1),
WriteCapacityUnits: aws.Int64(1),
},
TableName: aws.String(sdb.checkpointTable),
BillingMode: types.BillingModePayPerRequest,
}
return createTable(ctx, sdb.logger, sdb.client, &createStreamTableInput)
}
Expand All @@ -95,11 +92,8 @@ func (sdb *StateDatabase) createStreamTable(ctx context.Context) error {
KeyType: "RANGE",
},
},
TableName: aws.String(sdb.streamTable),
ProvisionedThroughput: &types.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(1),
WriteCapacityUnits: aws.Int64(1),
},
TableName: aws.String(sdb.streamTable),
BillingMode: types.BillingModePayPerRequest,
}
return createTable(ctx, sdb.logger, sdb.client, &createTableInput)
}
Expand All @@ -126,11 +120,8 @@ func (sdb *StateDatabase) createTipTable(ctx context.Context) error {
KeyType: "RANGE",
},
},
TableName: aws.String(sdb.tipTable),
ProvisionedThroughput: &types.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(1),
WriteCapacityUnits: aws.Int64(1),
},
TableName: aws.String(sdb.tipTable),
BillingMode: types.BillingModePayPerRequest,
}
return createTable(ctx, sdb.logger, sdb.client, &createTableInput)
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ replace github.com/abevier/go-sqs v0.0.0-20230602180220-c0264a2611a3 => github.c

require (
dagger.io/dagger v0.7.1
github.com/3box/pipeline-tools/cd/manager v0.0.0-20231023205204-e46c925731ad
github.com/abevier/go-sqs v0.0.0-20230602180220-c0264a2611a3
github.com/abevier/tsk v0.0.0-20230712145722-249b1e98b01c
github.com/alexflint/go-arg v1.4.2
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9
dagger.io/dagger v0.7.1 h1:DVRMjMCsbsE/XhF5yAaDsUepgy9z8oGglaEiXvBbelw=
dagger.io/dagger v0.7.1/go.mod h1:CL/Vw6H6oOqXNh8tVk2p6xaSXYgktosMXNUwIfQqKhg=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/3box/pipeline-tools/cd/manager v0.0.0-20231023205204-e46c925731ad h1:uIFInU7YOvO1+/QHZ90mnw0lOv+YT1JAvSy6X076QZM=
github.com/3box/pipeline-tools/cd/manager v0.0.0-20231023205204-e46c925731ad/go.mod h1:gtA3sWQni3fE0pjUmi3xb+R2mRakG1IQxgLY6CHJ5Pk=
github.com/99designs/gqlgen v0.17.2 h1:yczvlwMsfcVu/JtejqfrLwXuSP0yZFhmcss3caEvHw8=
github.com/99designs/gqlgen v0.17.2/go.mod h1:K5fzLKwtph+FFgh9j7nFbRUdBKvTcGnsta51fsMTn3o=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
Expand Down
39 changes: 5 additions & 34 deletions models/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,14 @@ import (
"time"

"github.com/google/uuid"

"github.com/3box/pipeline-tools/cd/manager"
)

const DeployComponent = "casv5"
const WorkerVersion = "5"
const DefaultJobTtl = 2 * 7 * 24 * time.Hour

type JobType string

const (
JobType_Deploy JobType = "deploy"
JobType_Anchor JobType = "anchor"
)

type JobStage string

const (
JobStage_Queued JobStage = "queued"
JobStage_Waiting JobStage = "waiting"
JobStage_Failed JobStage = "failed"
JobStage_Completed JobStage = "completed"
)

const (
JobParam_Overrides = "overrides"
JobParam_Version = "version"
)

const (
AnchorOverrides_AppMode = "APP_MODE"
AnchorOverrides_ContractAddress = "ETH_CONTRACT_ADDRESS"
Expand All @@ -42,20 +23,10 @@ const (
AnchorAppMode_ContinualAnchoring = "continual-anchoring"
)

type JobState struct {
Job string `dynamodbav:"job"`
Stage JobStage `dynamodbav:"stage"`
Type JobType `dynamodbav:"type"`
Ts time.Time `dynamodbav:"ts"`
Params map[string]interface{} `dynamodbav:"params"`
Id string `dynamodbav:"id" json:"-"`
Ttl time.Time `dynamodbav:"ttl,unixtime" json:"-"`
}

func NewJob(jobType JobType, params map[string]interface{}) JobState {
return JobState{
func NewJob(jobType manager.JobType, params map[string]interface{}) manager.JobState {
return manager.JobState{
Job: uuid.New().String(),
Stage: JobStage_Queued,
Stage: manager.JobStage_Queued,
Type: jobType,
Ts: time.Now(),
Params: params,
Expand Down
4 changes: 3 additions & 1 deletion models/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"time"

"github.com/google/uuid"

"github.com/3box/pipeline-tools/cd/manager"
)

type AnchorRepository interface {
Expand All @@ -21,7 +23,7 @@ type StateRepository interface {

type JobRepository interface {
CreateJob(ctx context.Context) (string, error)
QueryJob(ctx context.Context, id string) (*JobState, error)
QueryJob(ctx context.Context, id string) (*manager.JobState, error)
}

type QueuePublisher interface {
Expand Down
12 changes: 7 additions & 5 deletions services/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (

"github.com/google/uuid"

"github.com/3box/pipeline-tools/cd/manager"

"github.com/ceramicnetwork/go-cas/models"
)

Expand Down Expand Up @@ -98,24 +100,24 @@ func (m *MockStateRepository) UpdateTip(_ context.Context, newTip *models.Stream
}

type MockJobRepository struct {
jobStore map[string]*models.JobState
jobStore map[string]*manager.JobState
failCount int
}

func (m *MockJobRepository) CreateJob(_ context.Context) (string, error) {
if m.jobStore == nil {
m.jobStore = make(map[string]*models.JobState, 1)
m.jobStore = make(map[string]*manager.JobState, 1)
}
if m.failCount > 0 {
m.failCount--
return "", fmt.Errorf("failed to create job")
}
newJob := models.NewJob(models.JobType_Anchor, nil)
newJob := models.NewJob(manager.JobType_Anchor, nil)
m.jobStore[newJob.Job] = &newJob
return newJob.Job, nil
}

func (m *MockJobRepository) QueryJob(_ context.Context, id string) (*models.JobState, error) {
func (m *MockJobRepository) QueryJob(_ context.Context, id string) (*manager.JobState, error) {
if jobState, found := m.jobStore[id]; found {
return jobState, nil
}
Expand All @@ -125,7 +127,7 @@ func (m *MockJobRepository) QueryJob(_ context.Context, id string) (*models.JobS
func (m *MockJobRepository) finishJobs(count int) {
for _, js := range m.jobStore {
if count > 0 {
js.Stage = models.JobStage_Completed
js.Stage = manager.JobStage_Completed
count--
}
}
Expand Down
8 changes: 5 additions & 3 deletions services/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"strconv"
"time"

"github.com/3box/pipeline-tools/cd/manager"

"github.com/ceramicnetwork/go-cas/models"
)

Expand All @@ -21,7 +23,7 @@ type WorkerService struct {
monitorTick time.Duration
maxAnchorWorkers int
amortizationFactor float64
anchorJobs map[string]*models.JobState
anchorJobs map[string]*manager.JobState
logger models.Logger
}

Expand Down Expand Up @@ -51,7 +53,7 @@ func NewWorkerService(logger models.Logger, batchMonitor models.QueueMonitor, jo
batchMonitorTick,
maxAnchorWorkers,
amortizationFactor,
make(map[string]*models.JobState),
make(map[string]*manager.JobState),
logger,
}
}
Expand Down Expand Up @@ -114,7 +116,7 @@ func (w WorkerService) calculateLoad(ctx context.Context) (int, int, error) {
for jobId, _ := range w.anchorJobs {
if jobState, err := w.jobDb.QueryJob(ctx, jobId); err != nil {
return 0, 0, err
} else if (jobState.Stage == models.JobStage_Completed) || (jobState.Stage == models.JobStage_Failed) {
} else if (jobState.Stage == manager.JobStage_Completed) || (jobState.Stage == manager.JobStage_Failed) {
// Clean out finished jobs - "completed" and "failed" are the only possible terminal stages for anchor jobs.
delete(w.anchorJobs, jobId)
} else {
Expand Down

0 comments on commit 8b44edb

Please sign in to comment.