Skip to content

Commit

Permalink
fix: reuse cd manager job schema
Browse files Browse the repository at this point in the history
  • Loading branch information
smrz2001 committed Oct 23, 2023
1 parent de57db4 commit f3e06cd
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 49 deletions.
14 changes: 8 additions & 6 deletions common/aws/ddb/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"

"github.com/3box/pipeline-tools/cd/manager"

"github.com/ceramicnetwork/go-cas"
"github.com/ceramicnetwork/go-cas/common"
"github.com/ceramicnetwork/go-cas/models"
Expand Down Expand Up @@ -90,17 +92,17 @@ func (jdb *JobDatabase) createJobTable(ctx context.Context) error {

func (jdb *JobDatabase) CreateJob(ctx context.Context) (string, error) {
jobParams := map[string]interface{}{
models.JobParam_Version: models.WorkerVersion, // this will launch a CASv5 Worker
models.JobParam_Overrides: map[string]string{
manager.JobParam_Version: models.WorkerVersion, // this will launch a CASv5 Worker
manager.JobParam_Overrides: map[string]string{
models.AnchorOverrides_AppMode: models.AnchorAppMode_ContinualAnchoring,
models.AnchorOverrides_SchedulerStopAfterNoOp: "true",
},
}
// If an override anchor contract address is available, pass it through to the job.
if contractAddress, found := os.LookupEnv(models.Env_AnchorContractAddress); found {
jobParams[models.JobParam_Overrides].(map[string]string)[models.AnchorOverrides_ContractAddress] = contractAddress
jobParams[manager.JobParam_Overrides].(map[string]string)[models.AnchorOverrides_ContractAddress] = contractAddress
}
newJob := models.NewJob(models.JobType_Anchor, jobParams)
newJob := models.NewJob(manager.JobType_Anchor, jobParams)
attributeValues, err := attributevalue.MarshalMapWithOptions(newJob, func(options *attributevalue.EncoderOptions) {
options.EncodeTime = func(time time.Time) (types.AttributeValue, error) {
return &types.AttributeValueMemberN{Value: strconv.FormatInt(time.UnixNano(), 10)}, nil
Expand All @@ -124,7 +126,7 @@ func (jdb *JobDatabase) CreateJob(ctx context.Context) (string, error) {
}
}

func (jdb *JobDatabase) QueryJob(ctx context.Context, id string) (*models.JobState, error) {
func (jdb *JobDatabase) QueryJob(ctx context.Context, id string) (*manager.JobState, error) {
queryInput := dynamodb.QueryInput{
TableName: aws.String(jdb.jobTable),
IndexName: aws.String(jobTsIndex),
Expand All @@ -140,7 +142,7 @@ func (jdb *JobDatabase) QueryJob(ctx context.Context, id string) (*models.JobSta
if queryOutput, err := jdb.ddbClient.Query(httpCtx, &queryInput); err != nil {
return nil, err
} else if queryOutput.Count > 0 {
job := new(models.JobState)
job := new(manager.JobState)
if err = attributevalue.UnmarshalMapWithOptions(queryOutput.Items[0], job, func(options *attributevalue.DecoderOptions) {
options.DecodeTime = attributevalue.DecodeTimeAttributes{
S: tsDecode,
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ replace github.com/abevier/go-sqs v0.0.0-20230602180220-c0264a2611a3 => github.c

require (
dagger.io/dagger v0.7.1
github.com/3box/pipeline-tools/cd/manager v0.0.0-20231023205204-e46c925731ad
github.com/abevier/go-sqs v0.0.0-20230602180220-c0264a2611a3
github.com/abevier/tsk v0.0.0-20230712145722-249b1e98b01c
github.com/alexflint/go-arg v1.4.2
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9
dagger.io/dagger v0.7.1 h1:DVRMjMCsbsE/XhF5yAaDsUepgy9z8oGglaEiXvBbelw=
dagger.io/dagger v0.7.1/go.mod h1:CL/Vw6H6oOqXNh8tVk2p6xaSXYgktosMXNUwIfQqKhg=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/3box/pipeline-tools/cd/manager v0.0.0-20231023205204-e46c925731ad h1:uIFInU7YOvO1+/QHZ90mnw0lOv+YT1JAvSy6X076QZM=
github.com/3box/pipeline-tools/cd/manager v0.0.0-20231023205204-e46c925731ad/go.mod h1:gtA3sWQni3fE0pjUmi3xb+R2mRakG1IQxgLY6CHJ5Pk=
github.com/99designs/gqlgen v0.17.2 h1:yczvlwMsfcVu/JtejqfrLwXuSP0yZFhmcss3caEvHw8=
github.com/99designs/gqlgen v0.17.2/go.mod h1:K5fzLKwtph+FFgh9j7nFbRUdBKvTcGnsta51fsMTn3o=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
Expand Down
39 changes: 5 additions & 34 deletions models/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,14 @@ import (
"time"

"github.com/google/uuid"

"github.com/3box/pipeline-tools/cd/manager"
)

const DeployComponent = "casv5"
const WorkerVersion = "5"
const DefaultJobTtl = 2 * 7 * 24 * time.Hour

type JobType string

const (
JobType_Deploy JobType = "deploy"
JobType_Anchor JobType = "anchor"
)

type JobStage string

const (
JobStage_Queued JobStage = "queued"
JobStage_Waiting JobStage = "waiting"
JobStage_Failed JobStage = "failed"
JobStage_Completed JobStage = "completed"
)

const (
JobParam_Overrides = "overrides"
JobParam_Version = "version"
)

const (
AnchorOverrides_AppMode = "APP_MODE"
AnchorOverrides_ContractAddress = "ETH_CONTRACT_ADDRESS"
Expand All @@ -42,20 +23,10 @@ const (
AnchorAppMode_ContinualAnchoring = "continual-anchoring"
)

type JobState struct {
Job string `dynamodbav:"job"`
Stage JobStage `dynamodbav:"stage"`
Type JobType `dynamodbav:"type"`
Ts time.Time `dynamodbav:"ts"`
Params map[string]interface{} `dynamodbav:"params"`
Id string `dynamodbav:"id" json:"-"`
Ttl time.Time `dynamodbav:"ttl,unixtime" json:"-"`
}

func NewJob(jobType JobType, params map[string]interface{}) JobState {
return JobState{
func NewJob(jobType manager.JobType, params map[string]interface{}) manager.JobState {
return manager.JobState{
Job: uuid.New().String(),
Stage: JobStage_Queued,
Stage: manager.JobStage_Queued,
Type: jobType,
Ts: time.Now(),
Params: params,
Expand Down
4 changes: 3 additions & 1 deletion models/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"time"

"github.com/google/uuid"

"github.com/3box/pipeline-tools/cd/manager"
)

type AnchorRepository interface {
Expand All @@ -21,7 +23,7 @@ type StateRepository interface {

type JobRepository interface {
CreateJob(ctx context.Context) (string, error)
QueryJob(ctx context.Context, id string) (*JobState, error)
QueryJob(ctx context.Context, id string) (*manager.JobState, error)
}

type QueuePublisher interface {
Expand Down
12 changes: 7 additions & 5 deletions services/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (

"github.com/google/uuid"

"github.com/3box/pipeline-tools/cd/manager"

"github.com/ceramicnetwork/go-cas/models"
)

Expand Down Expand Up @@ -98,24 +100,24 @@ func (m *MockStateRepository) UpdateTip(_ context.Context, newTip *models.Stream
}

type MockJobRepository struct {
jobStore map[string]*models.JobState
jobStore map[string]*manager.JobState
failCount int
}

func (m *MockJobRepository) CreateJob(_ context.Context) (string, error) {
if m.jobStore == nil {
m.jobStore = make(map[string]*models.JobState, 1)
m.jobStore = make(map[string]*manager.JobState, 1)
}
if m.failCount > 0 {
m.failCount--
return "", fmt.Errorf("failed to create job")
}
newJob := models.NewJob(models.JobType_Anchor, nil)
newJob := models.NewJob(manager.JobType_Anchor, nil)
m.jobStore[newJob.Job] = &newJob
return newJob.Job, nil
}

func (m *MockJobRepository) QueryJob(_ context.Context, id string) (*models.JobState, error) {
func (m *MockJobRepository) QueryJob(_ context.Context, id string) (*manager.JobState, error) {
if jobState, found := m.jobStore[id]; found {
return jobState, nil
}
Expand All @@ -125,7 +127,7 @@ func (m *MockJobRepository) QueryJob(_ context.Context, id string) (*models.JobS
func (m *MockJobRepository) finishJobs(count int) {
for _, js := range m.jobStore {
if count > 0 {
js.Stage = models.JobStage_Completed
js.Stage = manager.JobStage_Completed
count--
}
}
Expand Down
8 changes: 5 additions & 3 deletions services/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"strconv"
"time"

"github.com/3box/pipeline-tools/cd/manager"

"github.com/ceramicnetwork/go-cas/models"
)

Expand All @@ -21,7 +23,7 @@ type WorkerService struct {
monitorTick time.Duration
maxAnchorWorkers int
amortizationFactor float64
anchorJobs map[string]*models.JobState
anchorJobs map[string]*manager.JobState
logger models.Logger
}

Expand Down Expand Up @@ -51,7 +53,7 @@ func NewWorkerService(logger models.Logger, batchMonitor models.QueueMonitor, jo
batchMonitorTick,
maxAnchorWorkers,
amortizationFactor,
make(map[string]*models.JobState),
make(map[string]*manager.JobState),
logger,
}
}
Expand Down Expand Up @@ -114,7 +116,7 @@ func (w WorkerService) calculateLoad(ctx context.Context) (int, int, error) {
for jobId, _ := range w.anchorJobs {
if jobState, err := w.jobDb.QueryJob(ctx, jobId); err != nil {
return 0, 0, err
} else if (jobState.Stage == models.JobStage_Completed) || (jobState.Stage == models.JobStage_Failed) {
} else if (jobState.Stage == manager.JobStage_Completed) || (jobState.Stage == manager.JobStage_Failed) {
// Clean out finished jobs - "completed" and "failed" are the only possible terminal stages for anchor jobs.
delete(w.anchorJobs, jobId)
} else {
Expand Down

0 comments on commit f3e06cd

Please sign in to comment.