Skip to content

Commit

Permalink
Implement delete queue functionality. (#426)
Browse files Browse the repository at this point in the history
* Implement delete queue functionality.

* Permission everyone by default to delete queue.

* Improve wording.
  • Loading branch information
jankaspar authored Sep 10, 2020
1 parent 41e6d19 commit 545e7c6
Show file tree
Hide file tree
Showing 16 changed files with 612 additions and 66 deletions.
71 changes: 71 additions & 0 deletions client/DotNet/Armada.Client/ClientGenerated.cs
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,77 @@ public async System.Threading.Tasks.Task<object> CreateQueueAsync(string name, A
}
}

/// <returns>A successful response.</returns>
/// <exception cref="ApiException">A server side error occurred.</exception>
public System.Threading.Tasks.Task<object> DeleteQueueAsync(string name)
{
return DeleteQueueAsync(name, System.Threading.CancellationToken.None);
}

/// <param name="cancellationToken">A cancellation token that can be used by other objects or threads to receive notice of cancellation.</param>
/// <returns>A successful response.</returns>
/// <exception cref="ApiException">A server side error occurred.</exception>
public async System.Threading.Tasks.Task<object> DeleteQueueAsync(string name, System.Threading.CancellationToken cancellationToken)
{
if (name == null)
throw new System.ArgumentNullException("name");

var urlBuilder_ = new System.Text.StringBuilder();
urlBuilder_.Append(BaseUrl != null ? BaseUrl.TrimEnd('/') : "").Append("/v1/queue/{Name}");
urlBuilder_.Replace("{Name}", System.Uri.EscapeDataString(ConvertToString(name, System.Globalization.CultureInfo.InvariantCulture)));

var client_ = _httpClient;
try
{
using (var request_ = new System.Net.Http.HttpRequestMessage())
{
request_.Method = new System.Net.Http.HttpMethod("DELETE");
request_.Headers.Accept.Add(System.Net.Http.Headers.MediaTypeWithQualityHeaderValue.Parse("application/json"));

PrepareRequest(client_, request_, urlBuilder_);
var url_ = urlBuilder_.ToString();
request_.RequestUri = new System.Uri(url_, System.UriKind.RelativeOrAbsolute);
PrepareRequest(client_, request_, url_);

var response_ = await client_.SendAsync(request_, System.Net.Http.HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false);
try
{
var headers_ = System.Linq.Enumerable.ToDictionary(response_.Headers, h_ => h_.Key, h_ => h_.Value);
if (response_.Content != null && response_.Content.Headers != null)
{
foreach (var item_ in response_.Content.Headers)
headers_[item_.Key] = item_.Value;
}

ProcessResponse(client_, response_);

var status_ = ((int)response_.StatusCode).ToString();
if (status_ == "200")
{
var objectResponse_ = await ReadObjectResponseAsync<object>(response_, headers_).ConfigureAwait(false);
return objectResponse_.Object;
}
else
if (status_ != "200" && status_ != "204")
{
var responseData_ = response_.Content == null ? null : await response_.Content.ReadAsStringAsync().ConfigureAwait(false);
throw new ApiException("The HTTP status code of the response was not expected (" + (int)response_.StatusCode + ").", (int)response_.StatusCode, responseData_, headers_, null);
}

return default(object);
}
finally
{
if (response_ != null)
response_.Dispose();
}
}
}
finally
{
}
}

protected struct ObjectResponseResult<T>
{
public ObjectResponseResult(T responseObject, string responseText)
Expand Down
37 changes: 37 additions & 0 deletions cmd/armadactl/cmd/deleteQueue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package cmd

import (
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"google.golang.org/grpc"

"github.com/G-Research/armada/pkg/api"
"github.com/G-Research/armada/pkg/client"
)

func init() {
rootCmd.AddCommand(deleteQueueCmd)
}

var deleteQueueCmd = &cobra.Command{
Use: "delete-queue name",
Short: "Delete existing queue",
Long: `This commands removes queue if it exists, the queue needs to be empty at the time of deletion.`,

Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
queue := args[0]

apiConnectionDetails := client.ExtractCommandlineArmadaApiConnectionDetails()

client.WithConnection(apiConnectionDetails, func(conn *grpc.ClientConn) {
submissionClient := api.NewSubmitClient(conn)
e := client.DeleteQueue(submissionClient, queue)
if e != nil {
log.Error(e)
return
}
log.Infof("Queue %s deleted or did not exist.", queue)
})
},
}
1 change: 1 addition & 0 deletions config/armada/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ permissionGroupMapping:
submit_jobs: ["everyone"]
submit_any_jobs: ["everyone"]
create_queue: ["everyone"]
delete_queue: ["everyone"]
cancel_jobs: ["everyone"]
cancel_any_jobs: ["everyone"]
watch_all_events: ["everyone"]
Expand Down
17 changes: 17 additions & 0 deletions e2e/test/basic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,23 @@ func TestCanSubmitJob_IncorrectJobMountFails(t *testing.T) {
})
}

func TestCanNotSubmitJobToDeletedQueue(t *testing.T) {
skipIfIntegrationEnvNotPresent(t)

client.WithConnection(connectionDetails(), func(connection *grpc.ClientConn) {
submitClient := api.NewSubmitClient(connection)

jobRequest := createJobRequest("personal-anonymous")
createQueue(submitClient, jobRequest, t)

err := client.DeleteQueue(submitClient, jobRequest.Queue)
assert.NoError(t, err)

_, err = client.SubmitJobs(submitClient, jobRequest)
assert.Error(t, err)
})
}

func TestCanSubmitJob_ArmdactlWatchExitOnInactive(t *testing.T) {
skipIfIntegrationEnvNotPresent(t)
connDetails := connectionDetails()
Expand Down
1 change: 1 addition & 0 deletions internal/armada/authorization/permissions/permissions.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const (
SubmitJobs Permission = "submit_jobs"
SubmitAnyJobs = "submit_any_jobs"
CreateQueue = "create_queue"
DeleteQueue = "delete_queue"
CancelJobs = "cancel_jobs"
CancelAnyJobs = "cancel_any_jobs"
WatchAllEvents = "watch_all_events"
Expand Down
7 changes: 6 additions & 1 deletion internal/armada/repository/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type QueueRepository interface {
GetAllQueues() ([]*api.Queue, error)
GetQueue(name string) (*api.Queue, error)
CreateQueue(queue *api.Queue) error
DeleteQueue(name string) error
}

type RedisQueueRepository struct {
Expand Down Expand Up @@ -55,11 +56,15 @@ func (r *RedisQueueRepository) GetQueue(name string) (*api.Queue, error) {
}

func (r *RedisQueueRepository) CreateQueue(queue *api.Queue) error {

data, e := proto.Marshal(queue)
if e != nil {
return e
}
result := r.db.HSet(queueHashKey, queue.Name, data)
return result.Err()
}

func (r *RedisQueueRepository) DeleteQueue(name string) error {
result := r.db.HDel(queueHashKey, name)
return result.Err()
}
2 changes: 1 addition & 1 deletion internal/armada/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func Serve(config *configuration.ArmadaConfig) (func(), *sync.WaitGroup) {
permissions := authorization.NewPrincipalPermissionChecker(config.PermissionGroupMapping, config.PermissionScopeMapping)

submitServer := server.NewSubmitServer(permissions, jobRepository, queueRepository, eventStore, schedulingInfoRepository)
usageServer := server.NewUsageServer(permissions, config.PriorityHalfTime, usageRepository)
usageServer := server.NewUsageServer(permissions, config.PriorityHalfTime, usageRepository, queueRepository)
aggregatedQueueServer := server.NewAggregatedQueueServer(permissions, config.Scheduling, jobRepository, queueRepository, usageRepository, eventStore, schedulingInfoRepository)
eventServer := server.NewEventServer(permissions, redisEventRepository, eventStore)
leaseManager := scheduling.NewLeaseManager(jobRepository, queueRepository, eventStore, config.Scheduling.Lease.ExpireAfter)
Expand Down
21 changes: 21 additions & 0 deletions internal/armada/server/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,27 @@ func (server *SubmitServer) CreateQueue(ctx context.Context, queue *api.Queue) (
return &types.Empty{}, nil
}

func (server *SubmitServer) DeleteQueue(ctx context.Context, request *api.QueueDeleteRequest) (*types.Empty, error) {
if e := checkPermission(server.permissions, ctx, permissions.DeleteQueue); e != nil {
return nil, e
}

active, e := server.jobRepository.GetQueueActiveJobSets(request.Name)
if e != nil {
return nil, status.Errorf(codes.InvalidArgument, e.Error())
}
if len(active) > 0 {
return nil, status.Errorf(codes.FailedPrecondition, "Queue is not empty.")
}

e = server.queueRepository.DeleteQueue(request.Name)
if e != nil {
return nil, status.Errorf(codes.InvalidArgument, e.Error())
}

return &types.Empty{}, nil
}

func (server *SubmitServer) SubmitJobs(ctx context.Context, req *api.JobSubmitRequest) (*api.JobSubmitResponse, error) {
if e := server.checkQueuePermission(ctx, req.Queue, permissions.SubmitJobs, permissions.SubmitAnyJobs); e != nil {
return nil, e
Expand Down
26 changes: 23 additions & 3 deletions internal/armada/server/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,32 @@ type UsageServer struct {
permissions authorization.PermissionChecker
priorityHalfTime time.Duration
usageRepository repository.UsageRepository
queueRepository repository.QueueRepository
}

func NewUsageServer(
permissions authorization.PermissionChecker,
priorityHalfTime time.Duration,
usageRepository repository.UsageRepository) *UsageServer {
usageRepository repository.UsageRepository,
queueRepository repository.QueueRepository) *UsageServer {

return &UsageServer{
permissions: permissions,
priorityHalfTime: priorityHalfTime,
usageRepository: usageRepository}
usageRepository: usageRepository,
queueRepository: queueRepository}
}

func (s *UsageServer) ReportUsage(ctx context.Context, report *api.ClusterUsageReport) (*types.Empty, error) {
if e := checkPermission(s.permissions, ctx, permissions.ExecuteJobs); e != nil {
return nil, e
}

queues, err := s.queueRepository.GetAllQueues()
if err != nil {
return nil, err
}

reports, err := s.usageRepository.GetClusterUsageReports()
if err != nil {
return nil, err
Expand All @@ -46,10 +54,22 @@ func (s *UsageServer) ReportUsage(ctx context.Context, report *api.ClusterUsageR
}

newPriority := scheduling.CalculatePriorityUpdateFromReports(reports, report, previousPriority, s.priorityHalfTime)
filteredPriority := filterPriority(queues, newPriority)

err = s.usageRepository.UpdateCluster(report, newPriority)
err = s.usageRepository.UpdateCluster(report, filteredPriority)
if err != nil {
return nil, err
}
return &types.Empty{}, nil
}

func filterPriority(queues []*api.Queue, priority map[string]float64) map[string]float64 {
filteredPriority := map[string]float64{}
for _, q := range queues {
priority, ok := priority[q.Name]
if ok {
filteredPriority[q.Name] = priority
}
}
return filteredPriority
}
12 changes: 9 additions & 3 deletions internal/armada/server/usage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ func TestUsageServer_ReportUsage(t *testing.T) {
cpu, _ := resource.ParseQuantity("10")
memory, _ := resource.ParseQuantity("360Gi")

_, err := s.ReportUsage(context.Background(), oneQueueReport(now, cpu, memory))
err := s.queueRepository.CreateQueue(&api.Queue{Name: "q1", PriorityFactor: 1})
assert.Nil(t, err)

_, err = s.ReportUsage(context.Background(), oneQueueReport(now, cpu, memory))
assert.Nil(t, err)

priority, err := s.usageRepository.GetClusterPriority("clusterA")
Expand Down Expand Up @@ -58,8 +61,11 @@ func withUsageServer(action func(s *UsageServer)) {
}
defer db.Close()

repo := repository.NewRedisUsageRepository(redis.NewClient(&redis.Options{Addr: db.Addr()}))
server := NewUsageServer(&fakePermissionChecker{}, time.Minute, repo)
redisClient := redis.NewClient(&redis.Options{Addr: db.Addr()})

repo := repository.NewRedisUsageRepository(redisClient)
queueRepo := repository.NewRedisQueueRepository(redisClient)
server := NewUsageServer(&fakePermissionChecker{}, time.Minute, repo, queueRepo)

action(server)
}
20 changes: 20 additions & 0 deletions pkg/api/api.swagger.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,26 @@ func SwaggerJsonTemplate() string {
" \"schema\": {}\n" +
" }\n" +
" }\n" +
" },\n" +
" \"delete\": {\n" +
" \"tags\": [\n" +
" \"Submit\"\n" +
" ],\n" +
" \"operationId\": \"DeleteQueue\",\n" +
" \"parameters\": [\n" +
" {\n" +
" \"type\": \"string\",\n" +
" \"name\": \"Name\",\n" +
" \"in\": \"path\",\n" +
" \"required\": true\n" +
" }\n" +
" ],\n" +
" \"responses\": {\n" +
" \"200\": {\n" +
" \"description\": \"A successful response.\",\n" +
" \"schema\": {}\n" +
" }\n" +
" }\n" +
" }\n" +
" }\n" +
" },\n" +
Expand Down
20 changes: 20 additions & 0 deletions pkg/api/api.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,26 @@
"schema": {}
}
}
},
"delete": {
"tags": [
"Submit"
],
"operationId": "DeleteQueue",
"parameters": [
{
"type": "string",
"name": "Name",
"in": "path",
"required": true
}
],
"responses": {
"200": {
"description": "A successful response.",
"schema": {}
}
}
}
}
},
Expand Down
Loading

0 comments on commit 545e7c6

Please sign in to comment.