Skip to content

Commit

Permalink
fix: use new job schema
Browse files Browse the repository at this point in the history
  • Loading branch information
smrz2001 committed Oct 23, 2023
1 parent 06b45ed commit 42f52ff
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 29 deletions.
2 changes: 1 addition & 1 deletion ci/cmd/deploy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,6 @@ func createJob(ctx context.Context) (string, error) {
return "", err
}
}
return newJob.Id, nil
return newJob.Job, nil
}
}
41 changes: 19 additions & 22 deletions common/aws/ddb/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/ceramicnetwork/go-cas/models"
)

const idTsIndex = "id-ts-index"
const jobTsIndex = "job-ts-index"

type JobDatabase struct {
ddbClient *dynamodb.Client
Expand All @@ -36,41 +36,42 @@ func NewJobDb(ctx context.Context, logger models.Logger, ddbClient *dynamodb.Cli

func (jdb *JobDatabase) createJobTable(ctx context.Context) error {
createJobTableInput := dynamodb.CreateTableInput{
BillingMode: types.BillingModePayPerRequest,
AttributeDefinitions: []types.AttributeDefinition{
{
AttributeName: aws.String("stage"),
AttributeName: aws.String("id"),
AttributeType: "S",
},
{
AttributeName: aws.String("ts"),
AttributeType: "N",
AttributeName: aws.String("job"),
AttributeType: "S",
},
{
AttributeName: aws.String("id"),
AttributeName: aws.String("stage"),
AttributeType: "S",
},
},
KeySchema: []types.KeySchemaElement{
{
AttributeName: aws.String("stage"),
KeyType: "HASH",
AttributeName: aws.String("type"),
AttributeType: "S",
},
{
AttributeName: aws.String("ts"),
KeyType: "RANGE",
AttributeType: "N",
},
},
TableName: aws.String(jdb.jobTable),
ProvisionedThroughput: &types.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(1),
WriteCapacityUnits: aws.Int64(1),
KeySchema: []types.KeySchemaElement{
{
AttributeName: aws.String("id"),
KeyType: "HASH",
},
},
TableName: aws.String(jdb.jobTable),
GlobalSecondaryIndexes: []types.GlobalSecondaryIndex{
{
IndexName: aws.String(idTsIndex),
IndexName: aws.String(jobTsIndex),
KeySchema: []types.KeySchemaElement{
{
AttributeName: aws.String("id"),
AttributeName: aws.String("job"),
KeyType: "HASH",
},
{
Expand All @@ -81,10 +82,6 @@ func (jdb *JobDatabase) createJobTable(ctx context.Context) error {
Projection: &types.Projection{
ProjectionType: types.ProjectionTypeAll,
},
ProvisionedThroughput: &types.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(1),
WriteCapacityUnits: aws.Int64(1),
},
},
},
}
Expand Down Expand Up @@ -122,15 +119,15 @@ func (jdb *JobDatabase) CreateJob(ctx context.Context) (string, error) {
if err != nil {
return "", err
} else {
return newJob.Id, nil
return newJob.Job, nil
}
}
}

func (jdb *JobDatabase) QueryJob(ctx context.Context, id string) (*models.JobState, error) {
queryInput := dynamodb.QueryInput{
TableName: aws.String(jdb.jobTable),
IndexName: aws.String(idTsIndex),
IndexName: aws.String(jobTsIndex),
KeyConditionExpression: aws.String("#id = :id"),
ExpressionAttributeNames: map[string]string{"#id": "id"},
ExpressionAttributeValues: map[string]types.AttributeValue{":id": &types.AttributeValueMemberS{Value: id}},
Expand Down
13 changes: 9 additions & 4 deletions models/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

const DeployComponent = "casv5"
const WorkerVersion = "5"
const DefaultJobTtl = 2 * 7 * 24 * time.Hour

type JobType string

Expand Down Expand Up @@ -42,19 +43,23 @@ const (
)

type JobState struct {
Job string `dynamodbav:"job"`
Stage JobStage `dynamodbav:"stage"`
Ts time.Time `dynamodbav:"ts"`
Id string `dynamodbav:"id"`
Type JobType `dynamodbav:"type"`
Ts time.Time `dynamodbav:"ts"`
Params map[string]interface{} `dynamodbav:"params"`
Id string `dynamodbav:"id" json:"-"`
Ttl time.Time `dynamodbav:"ttl,unixtime" json:"-"`
}

func NewJob(jobType JobType, params map[string]interface{}) JobState {
return JobState{
Job: uuid.New().String(),
Stage: JobStage_Queued,
Ts: time.Now(),
Id: uuid.New().String(),
Type: jobType,
Ts: time.Now(),
Params: params,
Id: uuid.New().String(),
Ttl: time.Now().Add(DefaultJobTtl),
}
}
4 changes: 2 additions & 2 deletions services/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ func (m *MockJobRepository) CreateJob(_ context.Context) (string, error) {
return "", fmt.Errorf("failed to create job")
}
newJob := models.NewJob(models.JobType_Anchor, nil)
m.jobStore[newJob.Id] = &newJob
return newJob.Id, nil
m.jobStore[newJob.Job] = &newJob
return newJob.Job, nil
}

func (m *MockJobRepository) QueryJob(_ context.Context, id string) (*models.JobState, error) {
Expand Down

0 comments on commit 42f52ff

Please sign in to comment.