diff --git a/ci/cmd/deploy/main.go b/ci/cmd/deploy/main.go index c3988eb..f017c3b 100644 --- a/ci/cmd/deploy/main.go +++ b/ci/cmd/deploy/main.go @@ -13,6 +13,8 @@ import ( "github.com/aws/aws-sdk-go-v2/service/dynamodb" "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" + "github.com/3box/pipeline-tools/cd/manager" + "github.com/ceramicnetwork/go-cas" "github.com/ceramicnetwork/go-cas/common/aws/config" "github.com/ceramicnetwork/go-cas/models" @@ -28,7 +30,7 @@ func main() { } func createJob(ctx context.Context) (string, error) { - newJob := models.NewJob(models.JobType_Deploy, map[string]interface{}{ + newJob := models.NewJob(manager.JobType_Deploy, map[string]interface{}{ "component": models.DeployComponent, "sha": "latest", "shaTag": os.Getenv(cas.Env_ShaTag), diff --git a/common/aws/ddb/job.go b/common/aws/ddb/job.go index 60631ef..0228876 100644 --- a/common/aws/ddb/job.go +++ b/common/aws/ddb/job.go @@ -12,6 +12,8 @@ import ( "github.com/aws/aws-sdk-go-v2/service/dynamodb" "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" + "github.com/3box/pipeline-tools/cd/manager" + "github.com/ceramicnetwork/go-cas" "github.com/ceramicnetwork/go-cas/common" "github.com/ceramicnetwork/go-cas/models" @@ -90,17 +92,17 @@ func (jdb *JobDatabase) createJobTable(ctx context.Context) error { func (jdb *JobDatabase) CreateJob(ctx context.Context) (string, error) { jobParams := map[string]interface{}{ - models.JobParam_Version: models.WorkerVersion, // this will launch a CASv5 Worker - models.JobParam_Overrides: map[string]string{ + manager.JobParam_Version: models.WorkerVersion, // this will launch a CASv5 Worker + manager.JobParam_Overrides: map[string]string{ models.AnchorOverrides_AppMode: models.AnchorAppMode_ContinualAnchoring, models.AnchorOverrides_SchedulerStopAfterNoOp: "true", }, } // If an override anchor contract address is available, pass it through to the job. if contractAddress, found := os.LookupEnv(models.Env_AnchorContractAddress); found { - jobParams[models.JobParam_Overrides].(map[string]string)[models.AnchorOverrides_ContractAddress] = contractAddress + jobParams[manager.JobParam_Overrides].(map[string]string)[models.AnchorOverrides_ContractAddress] = contractAddress } - newJob := models.NewJob(models.JobType_Anchor, jobParams) + newJob := models.NewJob(manager.JobType_Anchor, jobParams) attributeValues, err := attributevalue.MarshalMapWithOptions(newJob, func(options *attributevalue.EncoderOptions) { options.EncodeTime = func(time time.Time) (types.AttributeValue, error) { return &types.AttributeValueMemberN{Value: strconv.FormatInt(time.UnixNano(), 10)}, nil @@ -124,7 +126,7 @@ func (jdb *JobDatabase) CreateJob(ctx context.Context) (string, error) { } } -func (jdb *JobDatabase) QueryJob(ctx context.Context, id string) (*models.JobState, error) { +func (jdb *JobDatabase) QueryJob(ctx context.Context, id string) (*manager.JobState, error) { queryInput := dynamodb.QueryInput{ TableName: aws.String(jdb.jobTable), IndexName: aws.String(jobTsIndex), @@ -140,7 +142,7 @@ func (jdb *JobDatabase) QueryJob(ctx context.Context, id string) (*models.JobSta if queryOutput, err := jdb.ddbClient.Query(httpCtx, &queryInput); err != nil { return nil, err } else if queryOutput.Count > 0 { - job := new(models.JobState) + job := new(manager.JobState) if err = attributevalue.UnmarshalMapWithOptions(queryOutput.Items[0], job, func(options *attributevalue.DecoderOptions) { options.DecodeTime = attributevalue.DecodeTimeAttributes{ S: tsDecode, diff --git a/go.mod b/go.mod index 835f0f5..ddb7569 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ replace github.com/abevier/go-sqs v0.0.0-20230602180220-c0264a2611a3 => github.c require ( dagger.io/dagger v0.7.1 + github.com/3box/pipeline-tools/cd/manager v0.0.0-20231023205204-e46c925731ad github.com/abevier/go-sqs v0.0.0-20230602180220-c0264a2611a3 github.com/abevier/tsk v0.0.0-20230712145722-249b1e98b01c github.com/alexflint/go-arg v1.4.2 diff --git a/go.sum b/go.sum index 62d831a..1aeb0e4 100644 --- a/go.sum +++ b/go.sum @@ -33,6 +33,8 @@ cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9 dagger.io/dagger v0.7.1 h1:DVRMjMCsbsE/XhF5yAaDsUepgy9z8oGglaEiXvBbelw= dagger.io/dagger v0.7.1/go.mod h1:CL/Vw6H6oOqXNh8tVk2p6xaSXYgktosMXNUwIfQqKhg= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= +github.com/3box/pipeline-tools/cd/manager v0.0.0-20231023205204-e46c925731ad h1:uIFInU7YOvO1+/QHZ90mnw0lOv+YT1JAvSy6X076QZM= +github.com/3box/pipeline-tools/cd/manager v0.0.0-20231023205204-e46c925731ad/go.mod h1:gtA3sWQni3fE0pjUmi3xb+R2mRakG1IQxgLY6CHJ5Pk= github.com/99designs/gqlgen v0.17.2 h1:yczvlwMsfcVu/JtejqfrLwXuSP0yZFhmcss3caEvHw8= github.com/99designs/gqlgen v0.17.2/go.mod h1:K5fzLKwtph+FFgh9j7nFbRUdBKvTcGnsta51fsMTn3o= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= diff --git a/models/job.go b/models/job.go index 4c9ba37..b92d932 100644 --- a/models/job.go +++ b/models/job.go @@ -4,33 +4,14 @@ import ( "time" "github.com/google/uuid" + + "github.com/3box/pipeline-tools/cd/manager" ) const DeployComponent = "casv5" const WorkerVersion = "5" const DefaultJobTtl = 2 * 7 * 24 * time.Hour -type JobType string - -const ( - JobType_Deploy JobType = "deploy" - JobType_Anchor JobType = "anchor" -) - -type JobStage string - -const ( - JobStage_Queued JobStage = "queued" - JobStage_Waiting JobStage = "waiting" - JobStage_Failed JobStage = "failed" - JobStage_Completed JobStage = "completed" -) - -const ( - JobParam_Overrides = "overrides" - JobParam_Version = "version" -) - const ( AnchorOverrides_AppMode = "APP_MODE" AnchorOverrides_ContractAddress = "ETH_CONTRACT_ADDRESS" @@ -42,20 +23,10 @@ const ( AnchorAppMode_ContinualAnchoring = "continual-anchoring" ) -type JobState struct { - Job string `dynamodbav:"job"` - Stage JobStage `dynamodbav:"stage"` - Type JobType `dynamodbav:"type"` - Ts time.Time `dynamodbav:"ts"` - Params map[string]interface{} `dynamodbav:"params"` - Id string `dynamodbav:"id" json:"-"` - Ttl time.Time `dynamodbav:"ttl,unixtime" json:"-"` -} - -func NewJob(jobType JobType, params map[string]interface{}) JobState { - return JobState{ +func NewJob(jobType manager.JobType, params map[string]interface{}) manager.JobState { + return manager.JobState{ Job: uuid.New().String(), - Stage: JobStage_Queued, + Stage: manager.JobStage_Queued, Type: jobType, Ts: time.Now(), Params: params, diff --git a/models/services.go b/models/services.go index 9cf6731..4f17b3a 100644 --- a/models/services.go +++ b/models/services.go @@ -5,6 +5,8 @@ import ( "time" "github.com/google/uuid" + + "github.com/3box/pipeline-tools/cd/manager" ) type AnchorRepository interface { @@ -21,7 +23,7 @@ type StateRepository interface { type JobRepository interface { CreateJob(ctx context.Context) (string, error) - QueryJob(ctx context.Context, id string) (*JobState, error) + QueryJob(ctx context.Context, id string) (*manager.JobState, error) } type QueuePublisher interface { diff --git a/services/test_helpers.go b/services/test_helpers.go index bb678c2..d4a73bc 100644 --- a/services/test_helpers.go +++ b/services/test_helpers.go @@ -11,6 +11,8 @@ import ( "github.com/google/uuid" + "github.com/3box/pipeline-tools/cd/manager" + "github.com/ceramicnetwork/go-cas/models" ) @@ -98,24 +100,24 @@ func (m *MockStateRepository) UpdateTip(_ context.Context, newTip *models.Stream } type MockJobRepository struct { - jobStore map[string]*models.JobState + jobStore map[string]*manager.JobState failCount int } func (m *MockJobRepository) CreateJob(_ context.Context) (string, error) { if m.jobStore == nil { - m.jobStore = make(map[string]*models.JobState, 1) + m.jobStore = make(map[string]*manager.JobState, 1) } if m.failCount > 0 { m.failCount-- return "", fmt.Errorf("failed to create job") } - newJob := models.NewJob(models.JobType_Anchor, nil) + newJob := models.NewJob(manager.JobType_Anchor, nil) m.jobStore[newJob.Job] = &newJob return newJob.Job, nil } -func (m *MockJobRepository) QueryJob(_ context.Context, id string) (*models.JobState, error) { +func (m *MockJobRepository) QueryJob(_ context.Context, id string) (*manager.JobState, error) { if jobState, found := m.jobStore[id]; found { return jobState, nil } @@ -125,7 +127,7 @@ func (m *MockJobRepository) QueryJob(_ context.Context, id string) (*models.JobS func (m *MockJobRepository) finishJobs(count int) { for _, js := range m.jobStore { if count > 0 { - js.Stage = models.JobStage_Completed + js.Stage = manager.JobStage_Completed count-- } } diff --git a/services/worker.go b/services/worker.go index 946b00f..9d0ce36 100644 --- a/services/worker.go +++ b/services/worker.go @@ -7,6 +7,8 @@ import ( "strconv" "time" + "github.com/3box/pipeline-tools/cd/manager" + "github.com/ceramicnetwork/go-cas/models" ) @@ -21,7 +23,7 @@ type WorkerService struct { monitorTick time.Duration maxAnchorWorkers int amortizationFactor float64 - anchorJobs map[string]*models.JobState + anchorJobs map[string]*manager.JobState logger models.Logger } @@ -51,7 +53,7 @@ func NewWorkerService(logger models.Logger, batchMonitor models.QueueMonitor, jo batchMonitorTick, maxAnchorWorkers, amortizationFactor, - make(map[string]*models.JobState), + make(map[string]*manager.JobState), logger, } } @@ -114,7 +116,7 @@ func (w WorkerService) calculateLoad(ctx context.Context) (int, int, error) { for jobId, _ := range w.anchorJobs { if jobState, err := w.jobDb.QueryJob(ctx, jobId); err != nil { return 0, 0, err - } else if (jobState.Stage == models.JobStage_Completed) || (jobState.Stage == models.JobStage_Failed) { + } else if (jobState.Stage == manager.JobStage_Completed) || (jobState.Stage == manager.JobStage_Failed) { // Clean out finished jobs - "completed" and "failed" are the only possible terminal stages for anchor jobs. delete(w.anchorJobs, jobId) } else {