Skip to content

Commit

Permalink
Merge branch 'master' into rich/pulsar-3.0
Browse files Browse the repository at this point in the history
  • Loading branch information
kannon92 authored Jun 21, 2023
2 parents fb39869 + 7167c57 commit 66ea1d8
Show file tree
Hide file tree
Showing 17 changed files with 635 additions and 484 deletions.
42 changes: 22 additions & 20 deletions .github/workflows/airflow-operator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,39 +5,41 @@ on:
branches-ignore:
- master
paths:
- 'client/python/**'
- 'build/python-client/**'
- 'pkg/api/*.proto'
- '.github/workflows/airflow-operator.yml'
- '.github/workflows/python-client.yml'
- 'docs/python_armada_client.md'
- 'scripts/build-python-client.sh'
- 'third_party/airflow/**'
- '.github/workflows/python-tests/*'
- 'build/airflow-operator/**'
- 'pkg/api/jobservice/*.proto'
- '.github/workflows/airflow-operator.yml'
- 'build/python-client/**'
- 'client/python/**'
- 'docs/python_airflow_operator.md'
- 'scripts/build-airflow-operator.sh'
- 'docs/python_armada_client.md'
- 'internal/jobservice/*'
- 'makefile'
- '.github/workflows/python-tests/*'
- 'pkg/api/*.proto'
- 'pkg/api/jobservice/*.proto'
- 'scripts/build-airflow-operator.sh'
- 'scripts/build-python-client.sh'
- 'third_party/airflow/**'

pull_request:
branches-ignore:
- gh-pages
paths:
- 'client/python/**'
- 'build/python-client/**'
- 'pkg/api/*.proto'
- '.github/workflows/airflow-operator.yml'
- '.github/workflows/python-client.yml'
- 'docs/python_armada_client.md'
- 'scripts/build-python-client.sh'
- 'third_party/airflow/**'
- '.github/workflows/python-tests/*'
- 'build/airflow-operator/**'
- 'pkg/api/jobservice/*.proto'
- '.github/workflows/airflow-operator.yml'
- 'build/python-client/**'
- 'client/python/**'
- 'docs/python_airflow_operator.md'
- 'scripts/build-airflow-operator.sh'
- 'docs/python_armada_client.md'
- 'internal/jobservice/*'
- 'makefile'
- '.github/workflows/python-tests/*'
- 'pkg/api/*.proto'
- 'pkg/api/jobservice/*.proto'
- 'scripts/build-airflow-operator.sh'
- 'scripts/build-python-client.sh'
- 'third_party/airflow/**'

jobs:
airflow-tox:
Expand Down
14 changes: 14 additions & 0 deletions .mergify.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
pull_request_rules:
- name: Require approval from Armada maintainers
conditions:
- "#approved-reviews-by>=1"
actions:
post_check:
success_conditions:
- or:
- "#approved-reviews-by>=2"
- and:
- "#approved-reviews-by>=1"
- "author~=^(JamesMurkin|severinson|d80tb7|carlocamurri|kannon92|dejanzele|Sharpz7|ClifHouck|robertdavidsmith|theAntiYeti|richscott|suprjinx|zuqq)"
title:
Two are checks required.
10 changes: 1 addition & 9 deletions cmd/armada-load-tester/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,7 @@ var rootCmd = &cobra.Command{
Command line utility to submit many jobs to armada
Persistent config can be saved in a config file so it doesn't have to be specified every command.
Example structure:
armadaUrl: localhost:50051
basicAuth:
username: user1
password: password123
The location of this file can be passed in using --config argument or picked from $HOME/.armadactl.yaml.
`,
The location of this file can be passed in using --config argument or picked from $HOME/.armadactl.yaml.`,
}

// Execute adds all child commands to the root command and sets flags appropriately.
Expand Down
7 changes: 0 additions & 7 deletions cmd/testsuite/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,6 @@ func RootCmd() *cobra.Command {
Long: `testsuite is a suite of automated tests for Armada deployments.
Persistent config can be saved in a config file so it doesn't have to be specified every command.
Example structure:
armadaUrl: localhost:50051
basicAuth:
username: user1
password: password123
The location of this file can be passed in using the --config argument.
If not provided, $HOME/.armadactl.yaml is used.`,
}
Expand Down
2 changes: 1 addition & 1 deletion config/jobservice/config.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
grpcPort: 60003
httpPort: 8090
purgeJobSetTime: 1000
subscribeExpirySecs: 300
subscriptionExpirySecs: 300
subscriberPoolSize: 30
# databaseType can be either 'postgres' or 'sqlite'
databaseType: "postgres"
Expand Down
48 changes: 48 additions & 0 deletions docs/python_airflow_operator.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,30 @@ Runs an Armada job and calls the job_service_client for polling.
None



#### render_template_fields(context, jinja_env=None)
Template all attributes listed in *self.template_fields*.

This mutates the attributes in-place and is irreversible.


* **Parameters**


* **context** (*Context*) – Context dict with values to apply on content.


* **jinja_env** (*Environment** | **None*) – Jinja’s environment to use for rendering.



* **Return type**

None



#### template_fields(_: Sequence[str_ _ = ('job_request_items',_ )
## armada.operators.armada_deferrable module


Expand Down Expand Up @@ -158,6 +182,28 @@ until the job completes.



#### render_template_fields(context, jinja_env=None)
Template all attributes listed in *self.template_fields*.

This mutates the attributes in-place and is irreversible.


* **Parameters**


* **context** (*Context*) – Context dict with values to apply on content.


* **jinja_env** (*Environment** | **None*) – Jinja’s environment to use for rendering.



* **Return type**

None



#### resume_job_complete(context, event, job_id)
Resumes this operator after deferring itself to ArmadaJobCompleteTrigger.
Only meant to be called from within Airflow.
Expand Down Expand Up @@ -191,6 +237,8 @@ Reports the result of the job and returns.



#### template_fields(_: Sequence[str_ _ = ('job_request_items',_ )

### _class_ armada.operators.armada_deferrable.ArmadaJobCompleteTrigger(job_id, job_service_channel_args, armada_queue, job_set_id, airflow_task_name)
Bases: `BaseTrigger`

Expand Down
2 changes: 1 addition & 1 deletion e2e/setup/jobservice.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
subscribeExpirySecs: 60
subscriptionExpirySecs: 60
subscriberPoolSize: 30
purgeJobSetTime: 10000
databaseType: "postgres"
Expand Down
115 changes: 57 additions & 58 deletions internal/scheduler/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/openconfig/goyang/pkg/indent"
"github.com/pkg/errors"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
v1 "k8s.io/api/core/v1"

"github.com/armadaproject/armada/internal/armada/configuration"
Expand Down Expand Up @@ -154,36 +155,34 @@ func (sctx *SchedulingContext) ReportString(verbosity int32) string {
fmt.Fprintf(w, "Number of gangs scheduled:\t%d\n", sctx.NumScheduledGangs)
fmt.Fprintf(w, "Number of jobs scheduled:\t%d\n", sctx.NumScheduledJobs)
fmt.Fprintf(w, "Number of jobs preempted:\t%d\n", sctx.NumEvictedJobs)
scheduled := armadamaps.Filter(
sctx.QueueSchedulingContexts,
func(_ string, qctx *QueueSchedulingContext) bool {
return len(qctx.SuccessfulJobSchedulingContexts) > 0
},
)
if verbosity <= 0 {
fmt.Fprintf(w, "Scheduled queues:\t%v\n", maps.Keys(scheduled))
} else {
fmt.Fprint(w, "Scheduled queues:\n")
for queueName, qctx := range scheduled {
fmt.Fprintf(w, "\t%s:\n", queueName)
fmt.Fprintf(w, indent.String("\t\t", qctx.ReportString(verbosity-2)))
}
}
preempted := armadamaps.Filter(
sctx.QueueSchedulingContexts,
func(_ string, qctx *QueueSchedulingContext) bool {
return len(qctx.EvictedJobsById) > 0
},
)
if verbosity <= 0 {
fmt.Fprintf(
w,
"Scheduled queues:\t%v\n",
maps.Keys(
armadamaps.Filter(
sctx.QueueSchedulingContexts,
func(_ string, qctx *QueueSchedulingContext) bool {
return len(qctx.SuccessfulJobSchedulingContexts) > 0
},
),
),
)
fmt.Fprintf(
w,
"Preempted queues:\t%v\n",
maps.Keys(
armadamaps.Filter(
sctx.QueueSchedulingContexts,
func(_ string, qctx *QueueSchedulingContext) bool {
return len(qctx.EvictedJobsById) > 0
},
),
),
)
fmt.Fprintf(w, "Preempted queues:\t%v\n", maps.Keys(preempted))
} else {
fmt.Fprint(w, "Queues:\n")
for queueName, qctx := range sctx.QueueSchedulingContexts {
fmt.Fprint(w, "Preempted queues:\n")
for queueName, qctx := range preempted {
fmt.Fprintf(w, "\t%s:\n", queueName)
fmt.Fprintf(w, indent.String("\t\t", qctx.ReportString(verbosity-1)))
fmt.Fprintf(w, indent.String("\t\t", qctx.ReportString(verbosity-2)))
}
}
w.Flush()
Expand Down Expand Up @@ -342,28 +341,29 @@ func (qctx *QueueSchedulingContext) String() string {
return qctx.ReportString(0)
}

const maxPrintedJobIdsByReason = 1
const maxJobIdsToPrint = 1

func (qctx *QueueSchedulingContext) ReportString(verbosity int32) string {
var sb strings.Builder
w := tabwriter.NewWriter(&sb, 1, 1, 1, ' ', 0)
if verbosity > 0 {
fmt.Fprintf(w, "Created:\t%s\n", qctx.Created)
if verbosity >= 0 {
fmt.Fprintf(w, "Time:\t%s\n", qctx.Created)
fmt.Fprintf(w, "Queue:\t%s\n", qctx.Queue)
}
fmt.Fprintf(w, "Scheduled resources:\t%s\n", qctx.ScheduledResourcesByPriority.AggregateByResource().CompactString())
fmt.Fprintf(w, "Scheduled resources (by priority):\t%s\n", qctx.ScheduledResourcesByPriority.String())
fmt.Fprintf(w, "Preempted resources:\t%s\n", qctx.EvictedResourcesByPriority.AggregateByResource().CompactString())
fmt.Fprintf(w, "Preempted resources (by priority):\t%s\n", qctx.EvictedResourcesByPriority.String())
if verbosity > 0 {
if verbosity >= 0 {
fmt.Fprintf(w, "Total allocated resources after scheduling:\t%s\n", qctx.AllocatedByPriority.AggregateByResource().CompactString())
fmt.Fprintf(w, "Total allocated resources after scheduling (by priority):\t%s\n", qctx.AllocatedByPriority.String())
fmt.Fprintf(w, "Number of jobs scheduled:\t%d\n", len(qctx.SuccessfulJobSchedulingContexts))
fmt.Fprintf(w, "Number of jobs that could not be scheduled:\t%d\n", len(qctx.UnsuccessfulJobSchedulingContexts))
fmt.Fprintf(w, "Number of jobs preempted:\t%d\n", len(qctx.EvictedJobsById))
fmt.Fprintf(w, "Number of jobs that could not be scheduled:\t%d\n", len(qctx.UnsuccessfulJobSchedulingContexts))
if len(qctx.SuccessfulJobSchedulingContexts) > 0 {
jobIdsToPrint := maps.Keys(qctx.SuccessfulJobSchedulingContexts)
if verbosity <= 1 && len(jobIdsToPrint) > maxPrintedJobIdsByReason {
jobIdsToPrint = jobIdsToPrint[0:maxPrintedJobIdsByReason]
if len(jobIdsToPrint) > maxJobIdsToPrint {
jobIdsToPrint = jobIdsToPrint[0:maxJobIdsToPrint]
}
fmt.Fprintf(w, "Scheduled jobs:\t%v", jobIdsToPrint)
if len(jobIdsToPrint) != len(qctx.SuccessfulJobSchedulingContexts) {
Expand All @@ -372,39 +372,38 @@ func (qctx *QueueSchedulingContext) ReportString(verbosity int32) string {
fmt.Fprint(w, "\n")
}
}
if len(qctx.EvictedJobsById) > 0 {
jobIdsToPrint := maps.Keys(qctx.EvictedJobsById)
if len(jobIdsToPrint) > maxJobIdsToPrint {
jobIdsToPrint = jobIdsToPrint[0:maxJobIdsToPrint]
}
fmt.Fprintf(w, "Preempted jobs:\t%v", jobIdsToPrint)
if len(jobIdsToPrint) != len(qctx.EvictedJobsById) {
fmt.Fprintf(w, " (and %d others not shown)\n", len(qctx.EvictedJobsById)-len(jobIdsToPrint))
} else {
fmt.Fprint(w, "\n")
}
}
if len(qctx.UnsuccessfulJobSchedulingContexts) > 0 {
fmt.Fprint(w, "Unschedulable jobs:\n")
for reason, jobIds := range armadaslices.MapAndGroupByFuncs(
jobIdsByReason := armadaslices.MapAndGroupByFuncs(
maps.Values(qctx.UnsuccessfulJobSchedulingContexts),
func(jctx *JobSchedulingContext) string {
return jctx.UnschedulableReason
},
func(jctx *JobSchedulingContext) string {
return jctx.JobId
},
) {
jobIdsToPrint := jobIds
if verbosity <= 1 && len(jobIdsToPrint) > maxPrintedJobIdsByReason {
jobIdsToPrint = jobIds[0:maxPrintedJobIdsByReason]
)
reasons := maps.Keys(jobIdsByReason)
slices.SortFunc(reasons, func(a, b string) bool { return len(jobIdsByReason[a]) < len(jobIdsByReason[b]) })
for i := len(reasons) - 1; i >= 0; i-- {
reason := reasons[i]
jobIds := jobIdsByReason[reason]
if len(jobIds) <= 0 {
continue
}
fmt.Fprintf(w, "\t%d:\t%s jobs\t%v", len(qctx.UnsuccessfulJobSchedulingContexts), reason, jobIdsToPrint)
if len(jobIdsToPrint) != len(jobIds) {
fmt.Fprintf(w, " (and %d others not shown)\n", len(jobIds)-len(jobIdsToPrint))
} else {
fmt.Fprint(w, "\n")
}
}
}
if len(qctx.EvictedJobsById) > 0 {
jobIdsToPrint := maps.Keys(qctx.EvictedJobsById)
if verbosity <= 1 && len(jobIdsToPrint) > maxPrintedJobIdsByReason {
jobIdsToPrint = jobIdsToPrint[0:maxPrintedJobIdsByReason]
}
fmt.Fprintf(w, "Preempted jobs:\t%v", jobIdsToPrint)
if len(jobIdsToPrint) != len(qctx.EvictedJobsById) {
fmt.Fprintf(w, " (and %d others not shown)\n", len(qctx.EvictedJobsById)-len(jobIdsToPrint))
} else {
fmt.Fprint(w, "\n")
fmt.Fprintf(w, "\t%d:\t%s (e.g., %s)\n", len(jobIds), reason, jobIds[0])
}
}
}
Expand Down Expand Up @@ -573,7 +572,7 @@ func (jctx *JobSchedulingContext) String() string {
var sb strings.Builder
w := tabwriter.NewWriter(&sb, 1, 1, 1, ' ', 0)
fmt.Fprintf(w, "Time:\t%s\n", jctx.Created)
fmt.Fprintf(w, "Job id:\t%s\n", jctx.JobId)
fmt.Fprintf(w, "Job ID:\t%s\n", jctx.JobId)
fmt.Fprintf(w, "Number of nodes in cluster:\t%d\n", jctx.NumNodes)
if jctx.UnschedulableReason != "" {
fmt.Fprintf(w, "UnschedulableReason:\t%s\n", jctx.UnschedulableReason)
Expand Down
Loading

0 comments on commit 66ea1d8

Please sign in to comment.