From b743c7fef97126cc1da3639446a666eaa2460530 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Tue, 9 Apr 2024 18:38:28 -0700 Subject: [PATCH 1/6] job-manager: canceled job need not wait for sched Problem: when a job is canceled with an outstanding sched.alloc request, the job is stuck in CLEANUP until the scheduler responds. When the scheduler is slow, this needlessly prevents the job from terminating. When the job enters CLEANUP state with a pending sched.alloc, send the cancel request as before but immediately remove the job from the pending_jobs list and clear job->alloc_pending. The job can then transition to INACTIVE after the other cleanup activities are completed. Meanwhile, when a response is received to sched.alloc, handle it internally. If the alloc was successful, immediately free the resources. If the alloc was canceled or unsuccessful, simply ignore the response. Fixes #5876 --- src/modules/job-manager/alloc.c | 73 ++++++++++++++++++---------- src/modules/job-manager/alloc.h | 7 ++- src/modules/job-manager/event.c | 4 +- src/modules/job-manager/prioritize.c | 2 +- src/modules/job-manager/queue.c | 2 +- 5 files changed, 58 insertions(+), 30 deletions(-) diff --git a/src/modules/job-manager/alloc.c b/src/modules/job-manager/alloc.c index bdfff713927b..ce2e7bd7feec 100644 --- a/src/modules/job-manager/alloc.c +++ b/src/modules/job-manager/alloc.c @@ -120,7 +120,7 @@ static void interface_teardown (struct alloc *alloc, char *s, int errnum) /* Send sched.free request for job. * Update flags. */ -int free_request (struct alloc *alloc, struct job *job) +int free_request (struct alloc *alloc, flux_jobid_t id, json_t *R) { flux_msg_t *msg; @@ -128,8 +128,8 @@ int free_request (struct alloc *alloc, struct job *job) return -1; if (flux_msg_pack (msg, "{s:I s:O}", - "id", job->id, - "R", job->R_redacted) < 0) + "id", id, + "R", R) < 0) goto error; if (flux_send (alloc->ctx->h, msg, 0) < 0) goto error; @@ -188,20 +188,25 @@ static void alloc_response_cb (flux_t *h, "annotations", &annotations, "R", &R) < 0) goto teardown; - if (!(job = zhashx_lookup (ctx->active_jobs, &id))) { - flux_log (h, LOG_ERR, "sched.alloc-response: id=%s not active", - idf58 (id)); - errno = EINVAL; - goto teardown; - } - if (!job->alloc_pending) { - flux_log (h, LOG_ERR, "sched.alloc-response: id=%s not requested", - idf58 (id)); - errno = EINVAL; - goto teardown; - } + + job = zhashx_lookup (ctx->active_jobs, &id); + if (job && !job->alloc_pending) + job = NULL; + switch (type) { case FLUX_SCHED_ALLOC_SUCCESS: + if (!R) { + flux_log (h, LOG_ERR, "sched.alloc-response: protocol error"); + errno = EPROTO; + goto teardown; + } + (void)json_object_del (R, "scheduling"); + alloc->alloc_pending_count--; + + if (!job) { + (void)free_request (alloc, id, R); + break; + } if (alloc->alloc_limit) { if (zlistx_delete (alloc->pending_jobs, job->handle) < 0) flux_log (ctx->h, LOG_ERR, "failed to dequeue pending job"); @@ -215,19 +220,12 @@ static void alloc_response_cb (flux_t *h, errno = EEXIST; goto teardown; } - if (!R) { - flux_log (h, LOG_ERR, "sched.alloc-response: protocol error"); - errno = EPROTO; - goto teardown; - } - (void)json_object_del (R, "scheduling"); job->R_redacted = json_incref (R); if (annotations_update_and_publish (ctx, job, annotations) < 0) flux_log_error (h, "annotations_update: id=%s", idf58 (id)); /* Only modify job state after annotation event is published */ - alloc->alloc_pending_count--; job->alloc_pending = 0; if (job->annotations) { if (event_job_post_pack (ctx->event, @@ -248,11 +246,15 @@ static void alloc_response_cb (flux_t *h, errno = EPROTO; goto teardown; } + if (!job) + break; if (annotations_update_and_publish (ctx, job, annotations) < 0) flux_log_error (h, "annotations_update: id=%s", idf58 (id)); break; case FLUX_SCHED_ALLOC_DENY: // error alloc->alloc_pending_count--; + if (!job) + break; job->alloc_pending = 0; if (alloc->alloc_limit) { if (zlistx_delete (alloc->pending_jobs, job->handle) < 0) @@ -282,6 +284,8 @@ static void alloc_response_cb (flux_t *h, break; case FLUX_SCHED_ALLOC_CANCEL: alloc->alloc_pending_count--; + if (!job) + break; if (job->state == FLUX_JOB_STATE_SCHED) requeue_pending (alloc, job); else { @@ -553,7 +557,7 @@ int alloc_send_free_request (struct alloc *alloc, struct job *job) { assert (job->state == FLUX_JOB_STATE_CLEANUP); if (alloc->ready) { - if (free_request (alloc, job) < 0) + if (free_request (alloc, job->id, job->R_redacted) < 0) return -1; if ((job->flags & FLUX_JOB_DEBUG)) (void)event_job_post_pack (alloc->ctx->event, @@ -598,11 +602,30 @@ void alloc_dequeue_alloc_request (struct alloc *alloc, struct job *job) /* called from event_job_action() FLUX_JOB_STATE_CLEANUP * or alloc_queue_recalc_pending() if queue order has changed. */ -int alloc_cancel_alloc_request (struct alloc *alloc, struct job *job) +int alloc_cancel_alloc_request (struct alloc *alloc, + struct job *job, + bool finalize) { if (job->alloc_pending) { if (cancel_request (alloc, job) < 0) return -1; + if (finalize) { + job->alloc_pending = 0; + if (alloc->alloc_limit) { + (void)zlistx_delete (alloc->pending_jobs, job->handle); + job->handle = NULL; + } + bool cleared = false; + annotations_clear (job, &cleared); + if (cleared) { + (void)event_job_post_pack (alloc->ctx->event, + job, + "annotations", + EVENT_NO_COMMIT, + "{s:n}", + "annotations"); + } + } } return 0; } @@ -673,7 +696,7 @@ int alloc_queue_recalc_pending (struct alloc *alloc) && head && tail) { if (job_priority_comparator (head, tail) < 0) { - if (alloc_cancel_alloc_request (alloc, tail) < 0) { + if (alloc_cancel_alloc_request (alloc, tail, false) < 0) { flux_log_error (alloc->ctx->h, "%s: alloc_cancel_alloc_request", __FUNCTION__); diff --git a/src/modules/job-manager/alloc.h b/src/modules/job-manager/alloc.h index a3e00a477451..8a086c04cb98 100644 --- a/src/modules/job-manager/alloc.h +++ b/src/modules/job-manager/alloc.h @@ -31,8 +31,13 @@ void alloc_dequeue_alloc_request (struct alloc *alloc, struct job *job); /* Send a request to cancel pending alloc request. * This function is a no-op if job->alloc_pending is not set. + * If finalize is true, update the job as though the cancelation + * request has already been handled, so the job can progress through + * CLEANUP without waiting for the scheduler response. */ -int alloc_cancel_alloc_request (struct alloc *alloc, struct job *job); +int alloc_cancel_alloc_request (struct alloc *alloc, + struct job *job, + bool finalize); /* Accessor for the count of queued alloc requests. */ diff --git a/src/modules/job-manager/event.c b/src/modules/job-manager/event.c index e46428b875c1..a9a2d02f9030 100644 --- a/src/modules/job-manager/event.c +++ b/src/modules/job-manager/event.c @@ -293,7 +293,7 @@ int event_job_action (struct event *event, struct job *job) * SCHED state, dequeue the job first. */ if (job->alloc_pending) - alloc_cancel_alloc_request (ctx->alloc, job); + alloc_cancel_alloc_request (ctx->alloc, job, false); if (job->alloc_queued) alloc_dequeue_alloc_request (ctx->alloc, job); break; @@ -313,7 +313,7 @@ int event_job_action (struct event *event, struct job *job) break; case FLUX_JOB_STATE_CLEANUP: if (job->alloc_pending) - alloc_cancel_alloc_request (ctx->alloc, job); + alloc_cancel_alloc_request (ctx->alloc, job, true); if (job->alloc_queued) alloc_dequeue_alloc_request (ctx->alloc, job); diff --git a/src/modules/job-manager/prioritize.c b/src/modules/job-manager/prioritize.c index efc0b7b2a94b..c292e6100136 100644 --- a/src/modules/job-manager/prioritize.c +++ b/src/modules/job-manager/prioritize.c @@ -125,7 +125,7 @@ static int reprioritize_one (struct job_manager *ctx, } else if (job->alloc_pending) { if (job->priority == FLUX_JOB_PRIORITY_MIN) { - if (alloc_cancel_alloc_request (ctx->alloc, job) < 0) + if (alloc_cancel_alloc_request (ctx->alloc, job, false) < 0) return -1; } else if (oneshot) { diff --git a/src/modules/job-manager/queue.c b/src/modules/job-manager/queue.c index 5f847fb04739..54c7436f92e5 100644 --- a/src/modules/job-manager/queue.c +++ b/src/modules/job-manager/queue.c @@ -665,7 +665,7 @@ static void queue_stop (struct queue *queue, const char *name) if (job->alloc_queued) alloc_dequeue_alloc_request (queue->ctx->alloc, job); else if (job->alloc_pending) - alloc_cancel_alloc_request (queue->ctx->alloc, job); + alloc_cancel_alloc_request (queue->ctx->alloc, job, false); } job = zhashx_next (queue->ctx->active_jobs); } From 459ca7c1a9742f8f2806d2579f6d5f2fc9eb5946 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Wed, 10 Apr 2024 08:12:54 -0700 Subject: [PATCH 2/6] broker: add broker.module-debug RPC Problem: there is not an easy way to simulate an unresponsive broker module like a bogged down scheduler in test. Add a broker.module-debug RPC which lets messages be withheld from modules during a test. To defer messages send {"name":"sched-simple", "defer":true} to send the backlog and defeat message deferral send {"name":"sched-simple", "defer":false} --- src/broker/broker.c | 38 ++++++++++++++++++++++++++++++++++++++ src/broker/module.c | 29 +++++++++++++++++++++++++++++ src/broker/module.h | 5 +++++ 3 files changed, 72 insertions(+) diff --git a/src/broker/broker.c b/src/broker/broker.c index 971b48732745..ee7d91a02e7a 100644 --- a/src/broker/broker.c +++ b/src/broker/broker.c @@ -1434,6 +1434,38 @@ static void broker_lsmod_cb (flux_t *h, flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); } +static void broker_module_debug_cb (flux_t *h, + flux_msg_handler_t *mh, + const flux_msg_t *msg, + void *arg) +{ + broker_ctx_t *ctx = arg; + const char *name; + int defer = -1; + module_t *p; + + if (flux_request_unpack (msg, + NULL, + "{s:s s?b}", + "name", &name, + "defer", &defer) < 0) + goto error; + if (!(p = modhash_lookup_byname (ctx->modhash, name))) { + errno = ENOENT; + goto error; + } + if (defer != -1) { + if (module_set_defer (p, defer) < 0) + goto error; + } + if (flux_respond (h, msg, NULL) < 0) + flux_log_error (h, "error responding to module-debug request"); + return; +error: + if (flux_respond_error (h, msg, errno, NULL) < 0) + flux_log_error (h, "error responding to module-debug request"); +} + /* This is a message handler for status messages from modules, not to be * confused with module_status_cb(). */ @@ -1632,6 +1664,12 @@ static const struct flux_msg_handler_spec htab[] = { broker_lsmod_cb, FLUX_ROLE_USER, }, + { + FLUX_MSGTYPE_REQUEST, + "broker.module-debug", + broker_module_debug_cb, + 0, + }, { FLUX_MSGTYPE_REQUEST, "broker.module-status", diff --git a/src/broker/module.c b/src/broker/module.c index e932dfc4830d..686f54d8ca34 100644 --- a/src/broker/module.c +++ b/src/broker/module.c @@ -77,6 +77,7 @@ struct broker_module { struct flux_msglist *rmmod_requests; struct flux_msglist *insmod_requests; + struct flux_msglist *deferred_messages; flux_t *h; /* module's handle */ struct subhash *sub; @@ -439,6 +440,13 @@ int module_sendmsg_new (module_t *p, flux_msg_t **msg) return -1; } } + if (p->deferred_messages) { + if (flux_msglist_append (p->deferred_messages, *msg) < 0) + return -1; + flux_msg_decref (*msg); + *msg = NULL; + return 0; + } return flux_send_new (p->h_broker, msg, 0); } @@ -498,6 +506,7 @@ void module_destroy (module_t *p) json_decref (p->attr_cache); flux_msglist_destroy (p->rmmod_requests); flux_msglist_destroy (p->insmod_requests); + flux_msglist_destroy (p->deferred_messages); subhash_destroy (p->sub); free (p); errno = saved_errno; @@ -531,6 +540,26 @@ void module_mute (module_t *p) p->muted = true; } +int module_set_defer (module_t *p, bool flag) +{ + if (flag && !p->deferred_messages) { + if (!(p->deferred_messages = flux_msglist_create ())) + return -1; + } + if (!flag && p->deferred_messages) { + const flux_msg_t *msg; + while ((msg = flux_msglist_pop (p->deferred_messages))) { + if (flux_send_new (p->h_broker, (flux_msg_t **)&msg, 0) < 0) { + flux_msg_decref (msg); + return -1; + } + } + flux_msglist_destroy (p->deferred_messages); + p->deferred_messages = NULL; + } + return 0; +} + int module_start (module_t *p) { int errnum; diff --git a/src/broker/module.h b/src/broker/module.h index ace51051475f..b5c3f91f112c 100644 --- a/src/broker/module.h +++ b/src/broker/module.h @@ -77,6 +77,11 @@ int module_start (module_t *p); */ int module_stop (module_t *p, flux_t *h); +/* Defer all messages that would be sent to module if flag=true. + * Stop deferring them and send backlog if flag=false. + */ +int module_set_defer (module_t *p, bool flag); + /* Mute module. Do not send any more messages. */ void module_mute (module_t *p); From d3d602ffc5b09ebe70baee8de28b716211c9b0fe Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Wed, 10 Apr 2024 09:10:25 -0700 Subject: [PATCH 3/6] testsuite: cover module-debug RPC Problem: there is no test coverage for the broker.module-debug RPC and its "defer" function. Add some tests to t0003-module.t. --- t/t0003-module.t | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/t/t0003-module.t b/t/t0003-module.t index f34efff5eb9f..05be2181aef6 100755 --- a/t/t0003-module.t +++ b/t/t0003-module.t @@ -29,6 +29,12 @@ module_getinfo () { FLUX_HANDLE_TRACE=1 flux python -c "import flux; print(flux.Flux().rpc(\"$1.info\").get_str())" } +# Usage: module_debug_defer modname True|False +module_debug_defer () { + flux python -c "import flux; flux.Flux().rpc(\"broker.module-debug\",{\"name\":\"$1\",\"defer\":$2}).get()" +} + + test_expect_success 'module: load test module' ' flux module load $testmod ' @@ -287,4 +293,29 @@ test_expect_success 'module: module name is called out' ' grep ".content. was not properly shut down" nounload.err ' +test_expect_success 'module: load testmod' ' + flux module load $testmod +' +test_expect_success 'module-debug name=badname fails' ' + test_must_fail module_debug_defer badname True +' +test_expect_success 'module-debug defer=42 fails (not boolean)' ' + test_must_fail module_debug_defer testmod 42 +' +test_expect_success 'module-debug name=testmod defer=True works' ' + module_debug_defer testmod True +' +test_expect_success 'testmod does not respond to ping' ' + test_expect_code 137 run_timeout 2 flux ping --count=1 testmod +' +test_expect_success 'module-debug name=testmod defer=False works' ' + module_debug_defer testmod False +' +test_expect_success 'testmod does respond to ping' ' + run_timeout 10 flux ping --count=1 testmod +' +test_expect_success 'module: remove testmod' ' + flux module remove -f testmod +' + test_done From 5be6906a89de1f8c5d42ae84fe2b789767c09ca6 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Wed, 10 Apr 2024 09:40:58 -0700 Subject: [PATCH 4/6] testsuite: cover job behavior with slow scheduler Problem: there are no tests that show jobs can make progress they are not blocked on an allocation, but the scheduler is slow. Add a test that uses the new module-debug "defer" function to simulate a slow scheduler and make sure that jobs can get through CLEANUP when the scheduler is out to lunch, and that the system recovers and can run jobs aftwards. --- t/Makefile.am | 1 + t/t2305-sched-slow.t | 84 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 85 insertions(+) create mode 100755 t/t2305-sched-slow.t diff --git a/t/Makefile.am b/t/Makefile.am index c21362cb8bdc..65132af65e87 100644 --- a/t/Makefile.am +++ b/t/Makefile.am @@ -170,6 +170,7 @@ TESTSCRIPTS = \ t2302-sched-simple-up-down.t \ t2303-sched-hello.t \ t2304-sched-simple-alloc-check.t \ + t2305-sched-slow.t \ t2310-resource-module.t \ t2311-resource-drain.t \ t2312-resource-exclude.t \ diff --git a/t/t2305-sched-slow.t b/t/t2305-sched-slow.t new file mode 100755 index 000000000000..a9af370428b0 --- /dev/null +++ b/t/t2305-sched-slow.t @@ -0,0 +1,84 @@ +#!/bin/sh + +test_description='test a scheduler that is slow to respond +' + +# Append --logfile option if FLUX_TESTS_LOGFILE is set in environment: +test -n "$FLUX_TESTS_LOGFILE" && set -- "$@" --logfile +. $(dirname $0)/sharness.sh + +test_under_flux 1 + +# Usage: module_debug_defer modname True|False +module_debug_defer () { + flux python -c "import flux; flux.Flux().rpc(\"broker.module-debug\",{\"name\":\"$1\",\"defer\":$2}).get()" +} + +test_expect_success 'pause sched message handling' ' + module_debug_defer sched-simple True +' +test_expect_success 'a job can be submitted when the scheduler is unresponsive' ' + flux submit -N1 \ + --flags=debug --wait-event=debug.alloc-request \ + true >job1.id +' +test_expect_success 'the job is blocked on the alloc request' ' + test_expect_code 137 run_timeout 2 \ + flux job wait-event $(cat job1.id) alloc +' +test_expect_success 'unpause sched message handling' ' + module_debug_defer sched-simple False +' +test_expect_success 'the job gets its allocation and completes' ' + run_timeout 30 flux job wait-event $(cat job1.id) clean +' +test_expect_success 'submit a job and wait for it to get an alloc response' ' + flux submit -N1 --wait-event=alloc sleep 3600 >job2.id +' +test_expect_success 'pause sched message handling' ' + module_debug_defer sched-simple True +' +test_expect_success 'cancel the job' ' + flux cancel $(cat job2.id) +' +test_expect_success 'the job is able to get through CLEANUP state' ' + run_timeout 30 flux job wait-event $(cat job2.id) clean +' +test_expect_success 'unpause sched message handling' ' + module_debug_defer sched-simple False +' +test_expect_success 'another -N1 job can run so resources are free' ' + run_timeout 30 flux run -N1 true +' +test_expect_success 'drain the only node in the instance' ' + flux resource drain 0 +' +test_expect_success 'submit a job' ' + flux submit -N1 \ + --flags=debug --wait-event=debug.alloc-request \ + true >job3.id +' +test_expect_success 'the job is blocked on the alloc request' ' + test_expect_code 137 run_timeout 2 \ + flux job wait-event $(cat job3.id) alloc +' +test_expect_success 'pause sched message handling' ' + module_debug_defer sched-simple True +' +test_expect_success 'cancel the job' ' + flux cancel $(cat job3.id) +' +test_expect_success 'the job is able to get through CLEANUP state' ' + run_timeout 30 flux job wait-event $(cat job3.id) clean +' +test_expect_success 'unpause sched message handling' ' + module_debug_defer sched-simple False +' +test_expect_success 'undrain the node' ' + flux resource undrain 0 +' +test_expect_success 'another -N1 job can run so everything still works!' ' + run_timeout 30 flux run -N1 true +' + +test_done From 112da322dabeceeb5580f41dca075e2b30cba50c Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Thu, 11 Apr 2024 09:32:15 -0700 Subject: [PATCH 5/6] job-manager: improve inline docs for sched.cancel Problem: the alloc_cancel_alloc_request() function is a bit unclear. Expand the block comment above this function to explain its operation in more detail. --- src/modules/job-manager/alloc.c | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/src/modules/job-manager/alloc.c b/src/modules/job-manager/alloc.c index ce2e7bd7feec..44d889bf6027 100644 --- a/src/modules/job-manager/alloc.c +++ b/src/modules/job-manager/alloc.c @@ -599,8 +599,21 @@ void alloc_dequeue_alloc_request (struct alloc *alloc, struct job *job) } } -/* called from event_job_action() FLUX_JOB_STATE_CLEANUP - * or alloc_queue_recalc_pending() if queue order has changed. +/* Send a sched.cancel request for job. This RPC receives no direct response. + * Instead, the sched.alloc request receives a FLUX_SCHED_ALLOC_CANCEL or a + * FLUX_SCHED_ALLOC_SUCCESS response. + * + * As described in RFC 27, sched.alloc requests are canceled when: + * 1) a job in SCHED state is canceled + * 2) a queue is administratively disabled + * 3) when repriortizing jobs in limited mode + * + * The finalize flag is for the first case. It allows the job to continue + * through CLEANUP without waiting for the scheduler to respond to the cancel. + * The sched.alloc response handler must handle the case where the job is + * no longer active or its alloc_pending flag is clear. Essentially 'finalize' + * causes the job related finalization stuff to happen here rather than + * in the sched.alloc response handler. */ int alloc_cancel_alloc_request (struct alloc *alloc, struct job *job, From c053cabe575ab55cb18011e2140869e707529315 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Thu, 11 Apr 2024 10:21:32 -0700 Subject: [PATCH 6/6] job-manager: refactor annotate clear interface Problem: code for clearing annotations and posting a NULL annotations event is duplicated in several places in alloc.c Replace annotations_clear() and annotations_sched_clear() with annotations_clear_and_publish(), which cuts down on the code duplication. --- src/modules/job-manager/alloc.c | 54 +++--------------------------- src/modules/job-manager/annotate.c | 45 +++++++++++++++---------- src/modules/job-manager/annotate.h | 10 ++++-- 3 files changed, 40 insertions(+), 69 deletions(-) diff --git a/src/modules/job-manager/alloc.c b/src/modules/job-manager/alloc.c index 44d889bf6027..4f534db05c75 100644 --- a/src/modules/job-manager/alloc.c +++ b/src/modules/job-manager/alloc.c @@ -60,7 +60,6 @@ static void requeue_pending (struct alloc *alloc, struct job *job) { struct job_manager *ctx = alloc->ctx; bool fwd = job->priority > (FLUX_JOB_PRIORITY_MAX / 2); - bool cleared = false; assert (job->alloc_pending); if (job->handle) { @@ -74,18 +73,7 @@ static void requeue_pending (struct alloc *alloc, struct job *job) flux_log (ctx->h, LOG_ERR, "failed to enqueue job for scheduling"); job->alloc_queued = 1; } - annotations_sched_clear (job, &cleared); - if (cleared) { - if (event_job_post_pack (ctx->event, - job, - "annotations", - EVENT_NO_COMMIT, - "{s:n}", - "annotations") < 0) - flux_log_error (ctx->h, - "%s: event_job_post_pack", - __FUNCTION__); - } + annotations_clear_and_publish (ctx, job, "sched"); } /* Initiate teardown. Clear any alloc/free requests, and clear @@ -176,7 +164,6 @@ static void alloc_response_cb (flux_t *h, json_t *annotations = NULL; json_t *R = NULL; struct job *job; - bool cleared = false; if (flux_response_decode (msg, NULL, NULL) < 0) goto teardown; // ENOSYS here if scheduler not loaded/shutting down @@ -261,19 +248,7 @@ static void alloc_response_cb (flux_t *h, flux_log (ctx->h, LOG_ERR, "failed to dequeue pending job"); job->handle = NULL; } - annotations_clear (job, &cleared); - if (cleared) { - if (event_job_post_pack (ctx->event, - job, - "annotations", - EVENT_NO_COMMIT, - "{s:n}", - "annotations") < 0) - flux_log_error (ctx->h, - "%s: event_job_post_pack: id=%s", - __FUNCTION__, - idf58 (id)); - } + annotations_clear_and_publish (ctx, job, NULL); if (raise_job_exception (ctx, job, "alloc", @@ -294,21 +269,9 @@ static void alloc_response_cb (flux_t *h, flux_log (ctx->h, LOG_ERR, "failed to dequeue pending job"); job->handle = NULL; } - annotations_clear (job, &cleared); + annotations_clear_and_publish (ctx, job, NULL); } job->alloc_pending = 0; - if (cleared) { - if (event_job_post_pack (ctx->event, - job, - "annotations", - EVENT_NO_COMMIT, - "{s:n}", - "annotations") < 0) - flux_log_error (ctx->h, - "%s: event_job_post_pack: id=%s", - __FUNCTION__, - idf58 (id)); - } if (queue_started (alloc->ctx->queue, job)) { if (event_job_action (ctx->event, job) < 0) { flux_log_error (h, @@ -628,16 +591,7 @@ int alloc_cancel_alloc_request (struct alloc *alloc, (void)zlistx_delete (alloc->pending_jobs, job->handle); job->handle = NULL; } - bool cleared = false; - annotations_clear (job, &cleared); - if (cleared) { - (void)event_job_post_pack (alloc->ctx->event, - job, - "annotations", - EVENT_NO_COMMIT, - "{s:n}", - "annotations"); - } + annotations_clear_and_publish (alloc->ctx, job, NULL); } } return 0; diff --git a/src/modules/job-manager/annotate.c b/src/modules/job-manager/annotate.c index d6fc74c10b99..22327014c18b 100644 --- a/src/modules/job-manager/annotate.c +++ b/src/modules/job-manager/annotate.c @@ -29,6 +29,7 @@ #include "src/common/libczmqcontainers/czmq_containers.h" #include "src/common/libutil/jpath.h" +#include "src/common/libjob/idf58.h" #include "job.h" #include "event.h" @@ -40,26 +41,11 @@ struct annotate { flux_msg_handler_t **handlers; }; -void annotations_clear (struct job *job, bool *cleared) +static void annotations_clear (struct job *job) { if (job->annotations) { json_decref (job->annotations); job->annotations = NULL; - if (cleared) - (*cleared) = true; - } -} - -void annotations_sched_clear (struct job *job, bool *cleared) -{ - if (job->annotations) { - if (json_object_del (job->annotations, "sched") == 0) { - /* Special case if annotations are now empty */ - if (!json_object_size (job->annotations)) - annotations_clear (job, NULL); - if (cleared) - (*cleared) = true; - } } } @@ -93,11 +79,36 @@ int annotations_update (struct job *job, const char *path, json_t *annotations) * will handle advertisement of the clear. */ if (!json_object_size (job->annotations)) - annotations_clear (job, NULL); + annotations_clear (job); } return 0; } +void annotations_clear_and_publish (struct job_manager *ctx, + struct job *job, + const char *key) +{ + if (job->annotations) { + if (key) + (void)json_object_del (job->annotations, key); + else + (void)json_object_clear (job->annotations); + if (json_object_size (job->annotations) == 0) { + annotations_clear (job); + if (event_job_post_pack (ctx->event, + job, + "annotations", + EVENT_NO_COMMIT, + "{s:n}", + "annotations") < 0) { + flux_log_error (ctx->h, + "error posting null annotations event for %s", + idf58 (job->id)); + } + } + } +} + int annotations_update_and_publish (struct job_manager *ctx, struct job *job, json_t *annotations) diff --git a/src/modules/job-manager/annotate.h b/src/modules/job-manager/annotate.h index 291f41fd3094..8e17e74abb3c 100644 --- a/src/modules/job-manager/annotate.h +++ b/src/modules/job-manager/annotate.h @@ -16,8 +16,6 @@ #include "job.h" #include "job-manager.h" -void annotations_clear (struct job *job, bool *cleared); -void annotations_sched_clear (struct job *job, bool *cleared); int annotations_update (struct job *job, const char *path, json_t *annotations); struct annotate *annotate_ctx_create (struct job_manager *ctx); @@ -30,6 +28,14 @@ int annotations_update_and_publish (struct job_manager *ctx, struct job *job, json_t *annotations); +/* clear key from annotations, or clear all annotations if key == NULL. + * If that transitioned the annotations object from non-empty to empty, + * post an annotations event with the context of {"annotations":null}. + */ +void annotations_clear_and_publish (struct job_manager *ctx, + struct job *job, + const char *key); + #endif /* ! _FLUX_JOB_MANAGER_ANNOTATE_H */ /* * vi:tabstop=4 shiftwidth=4 expandtab