diff --git a/internal/scheduler/database/redis_executor_repository.go b/internal/scheduler/database/redis_executor_repository.go deleted file mode 100644 index 6b84dbe382a..00000000000 --- a/internal/scheduler/database/redis_executor_repository.go +++ /dev/null @@ -1,68 +0,0 @@ -package database - -import ( - "fmt" - "time" - - "github.com/gogo/protobuf/proto" - "github.com/pkg/errors" - "github.com/redis/go-redis/v9" - - "github.com/armadaproject/armada/internal/common/armadacontext" - protoutil "github.com/armadaproject/armada/internal/common/proto" - "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" -) - -const ( - executorsPrefix = "executors" -) - -type RedisExecutorRepository struct { - db redis.UniversalClient - executorsKey string -} - -func NewRedisExecutorRepository(db redis.UniversalClient, schedulerName string) *RedisExecutorRepository { - return &RedisExecutorRepository{ - db: db, - executorsKey: fmt.Sprintf("%s_%s", executorsPrefix, schedulerName), - } -} - -func (r *RedisExecutorRepository) GetExecutors(ctx *armadacontext.Context) ([]*schedulerobjects.Executor, error) { - result, err := r.db.HGetAll(ctx, r.executorsKey).Result() - if err != nil { - return nil, errors.Wrap(err, "Error retrieving executors from redis") - } - executors := make([]*schedulerobjects.Executor, len(result)) - i := 0 - for _, v := range result { - executor, err := protoutil.Unmarshall([]byte(v), &schedulerobjects.Executor{}) - if err != nil { - return nil, err - } - executors[i] = executor - i++ - } - return executors, nil -} - -func (r *RedisExecutorRepository) GetLastUpdateTimes(_ *armadacontext.Context) (map[string]time.Time, error) { - // We could implement this in a very inefficient way, but I don't believe it's needed so panic for now - panic("GetLastUpdateTimes is not implemented") -} - -func (r *RedisExecutorRepository) StoreExecutor(ctx *armadacontext.Context, executor *schedulerobjects.Executor) error { - data, err := proto.Marshal(executor) - if err != nil { - return errors.Wrap(err, "Error marshalling executor proto") - } - - pipe := r.db.TxPipeline() - pipe.HSet(ctx, r.executorsKey, executor.Id, data) - _, err = pipe.Exec(ctx) - if err != nil { - return errors.Wrap(err, "Error storing executor in redis") - } - return nil -} diff --git a/internal/scheduler/database/redis_executor_repository_test.go b/internal/scheduler/database/redis_executor_repository_test.go deleted file mode 100644 index 95288496259..00000000000 --- a/internal/scheduler/database/redis_executor_repository_test.go +++ /dev/null @@ -1,92 +0,0 @@ -package database - -import ( - "testing" - "time" - - "github.com/redis/go-redis/v9" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "golang.org/x/exp/slices" - - "github.com/armadaproject/armada/internal/common/armadacontext" - "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" -) - -func TestRedisExecutorRepository_LoadAndSave(t *testing.T) { - t1 := time.Now().UTC().Round(1 * time.Microsecond) - tests := map[string]struct { - executors []*schedulerobjects.Executor - }{ - "not empty": { - executors: []*schedulerobjects.Executor{ - { - Id: "test-executor-1", - Pool: "test-pool-1", - Nodes: []*schedulerobjects.Node{ - { - Id: "test-node-1", - LastSeen: t1, - }, - }, - LastUpdateTime: t1, - UnassignedJobRuns: []string{"run1", "run2"}, - }, - { - Id: "test-executor-2", - Pool: "test-pool-2", - Nodes: []*schedulerobjects.Node{ - { - Id: "test-node-2", - LastSeen: t1, - }, - }, - LastUpdateTime: t1, - UnassignedJobRuns: []string{"run3", "run4"}, - }, - }, - }, - "empty": { - executors: []*schedulerobjects.Executor{}, - }, - } - for name, tc := range tests { - t.Run(name, func(t *testing.T) { - ctx, cancel := armadacontext.WithTimeout(armadacontext.Background(), 5*time.Second) - defer cancel() - withRedisExecutorRepository(ctx, func(repo *RedisExecutorRepository) { - for _, executor := range tc.executors { - err := repo.StoreExecutor(ctx, executor) - require.NoError(t, err) - } - retrievedExecutors, err := repo.GetExecutors(ctx) - require.NoError(t, err) - executorSort := func(a *schedulerobjects.Executor, b *schedulerobjects.Executor) int { - if a.Id > b.Id { - return -1 - } else if a.Id < b.Id { - return 1 - } else { - return 0 - } - } - slices.SortFunc(retrievedExecutors, executorSort) - slices.SortFunc(tc.executors, executorSort) - require.Equal(t, len(tc.executors), len(retrievedExecutors)) - for i, expected := range tc.executors { - assert.Equal(t, expected, retrievedExecutors[i]) - } - }) - }) - } -} - -func withRedisExecutorRepository(ctx *armadacontext.Context, action func(repository *RedisExecutorRepository)) { - client := redis.NewClient(&redis.Options{Addr: "localhost:6379", DB: 10}) - defer client.FlushDB(ctx) - defer client.Close() - - client.FlushDB(ctx) - repo := NewRedisExecutorRepository(client, "pulsar") - action(repo) -}