diff --git a/src/broker/broker.c b/src/broker/broker.c index 971b48732745..ee7d91a02e7a 100644 --- a/src/broker/broker.c +++ b/src/broker/broker.c @@ -1434,6 +1434,38 @@ static void broker_lsmod_cb (flux_t *h, flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); } +static void broker_module_debug_cb (flux_t *h, + flux_msg_handler_t *mh, + const flux_msg_t *msg, + void *arg) +{ + broker_ctx_t *ctx = arg; + const char *name; + int defer = -1; + module_t *p; + + if (flux_request_unpack (msg, + NULL, + "{s:s s?b}", + "name", &name, + "defer", &defer) < 0) + goto error; + if (!(p = modhash_lookup_byname (ctx->modhash, name))) { + errno = ENOENT; + goto error; + } + if (defer != -1) { + if (module_set_defer (p, defer) < 0) + goto error; + } + if (flux_respond (h, msg, NULL) < 0) + flux_log_error (h, "error responding to module-debug request"); + return; +error: + if (flux_respond_error (h, msg, errno, NULL) < 0) + flux_log_error (h, "error responding to module-debug request"); +} + /* This is a message handler for status messages from modules, not to be * confused with module_status_cb(). */ @@ -1632,6 +1664,12 @@ static const struct flux_msg_handler_spec htab[] = { broker_lsmod_cb, FLUX_ROLE_USER, }, + { + FLUX_MSGTYPE_REQUEST, + "broker.module-debug", + broker_module_debug_cb, + 0, + }, { FLUX_MSGTYPE_REQUEST, "broker.module-status", diff --git a/src/broker/module.c b/src/broker/module.c index e932dfc4830d..686f54d8ca34 100644 --- a/src/broker/module.c +++ b/src/broker/module.c @@ -77,6 +77,7 @@ struct broker_module { struct flux_msglist *rmmod_requests; struct flux_msglist *insmod_requests; + struct flux_msglist *deferred_messages; flux_t *h; /* module's handle */ struct subhash *sub; @@ -439,6 +440,13 @@ int module_sendmsg_new (module_t *p, flux_msg_t **msg) return -1; } } + if (p->deferred_messages) { + if (flux_msglist_append (p->deferred_messages, *msg) < 0) + return -1; + flux_msg_decref (*msg); + *msg = NULL; + return 0; + } return flux_send_new (p->h_broker, msg, 0); } @@ -498,6 +506,7 @@ void module_destroy (module_t *p) json_decref (p->attr_cache); flux_msglist_destroy (p->rmmod_requests); flux_msglist_destroy (p->insmod_requests); + flux_msglist_destroy (p->deferred_messages); subhash_destroy (p->sub); free (p); errno = saved_errno; @@ -531,6 +540,26 @@ void module_mute (module_t *p) p->muted = true; } +int module_set_defer (module_t *p, bool flag) +{ + if (flag && !p->deferred_messages) { + if (!(p->deferred_messages = flux_msglist_create ())) + return -1; + } + if (!flag && p->deferred_messages) { + const flux_msg_t *msg; + while ((msg = flux_msglist_pop (p->deferred_messages))) { + if (flux_send_new (p->h_broker, (flux_msg_t **)&msg, 0) < 0) { + flux_msg_decref (msg); + return -1; + } + } + flux_msglist_destroy (p->deferred_messages); + p->deferred_messages = NULL; + } + return 0; +} + int module_start (module_t *p) { int errnum; diff --git a/src/broker/module.h b/src/broker/module.h index ace51051475f..b5c3f91f112c 100644 --- a/src/broker/module.h +++ b/src/broker/module.h @@ -77,6 +77,11 @@ int module_start (module_t *p); */ int module_stop (module_t *p, flux_t *h); +/* Defer all messages that would be sent to module if flag=true. + * Stop deferring them and send backlog if flag=false. + */ +int module_set_defer (module_t *p, bool flag); + /* Mute module. Do not send any more messages. */ void module_mute (module_t *p); diff --git a/src/modules/job-manager/alloc.c b/src/modules/job-manager/alloc.c index bdfff713927b..4f534db05c75 100644 --- a/src/modules/job-manager/alloc.c +++ b/src/modules/job-manager/alloc.c @@ -60,7 +60,6 @@ static void requeue_pending (struct alloc *alloc, struct job *job) { struct job_manager *ctx = alloc->ctx; bool fwd = job->priority > (FLUX_JOB_PRIORITY_MAX / 2); - bool cleared = false; assert (job->alloc_pending); if (job->handle) { @@ -74,18 +73,7 @@ static void requeue_pending (struct alloc *alloc, struct job *job) flux_log (ctx->h, LOG_ERR, "failed to enqueue job for scheduling"); job->alloc_queued = 1; } - annotations_sched_clear (job, &cleared); - if (cleared) { - if (event_job_post_pack (ctx->event, - job, - "annotations", - EVENT_NO_COMMIT, - "{s:n}", - "annotations") < 0) - flux_log_error (ctx->h, - "%s: event_job_post_pack", - __FUNCTION__); - } + annotations_clear_and_publish (ctx, job, "sched"); } /* Initiate teardown. Clear any alloc/free requests, and clear @@ -120,7 +108,7 @@ static void interface_teardown (struct alloc *alloc, char *s, int errnum) /* Send sched.free request for job. * Update flags. */ -int free_request (struct alloc *alloc, struct job *job) +int free_request (struct alloc *alloc, flux_jobid_t id, json_t *R) { flux_msg_t *msg; @@ -128,8 +116,8 @@ int free_request (struct alloc *alloc, struct job *job) return -1; if (flux_msg_pack (msg, "{s:I s:O}", - "id", job->id, - "R", job->R_redacted) < 0) + "id", id, + "R", R) < 0) goto error; if (flux_send (alloc->ctx->h, msg, 0) < 0) goto error; @@ -176,7 +164,6 @@ static void alloc_response_cb (flux_t *h, json_t *annotations = NULL; json_t *R = NULL; struct job *job; - bool cleared = false; if (flux_response_decode (msg, NULL, NULL) < 0) goto teardown; // ENOSYS here if scheduler not loaded/shutting down @@ -188,20 +175,25 @@ static void alloc_response_cb (flux_t *h, "annotations", &annotations, "R", &R) < 0) goto teardown; - if (!(job = zhashx_lookup (ctx->active_jobs, &id))) { - flux_log (h, LOG_ERR, "sched.alloc-response: id=%s not active", - idf58 (id)); - errno = EINVAL; - goto teardown; - } - if (!job->alloc_pending) { - flux_log (h, LOG_ERR, "sched.alloc-response: id=%s not requested", - idf58 (id)); - errno = EINVAL; - goto teardown; - } + + job = zhashx_lookup (ctx->active_jobs, &id); + if (job && !job->alloc_pending) + job = NULL; + switch (type) { case FLUX_SCHED_ALLOC_SUCCESS: + if (!R) { + flux_log (h, LOG_ERR, "sched.alloc-response: protocol error"); + errno = EPROTO; + goto teardown; + } + (void)json_object_del (R, "scheduling"); + alloc->alloc_pending_count--; + + if (!job) { + (void)free_request (alloc, id, R); + break; + } if (alloc->alloc_limit) { if (zlistx_delete (alloc->pending_jobs, job->handle) < 0) flux_log (ctx->h, LOG_ERR, "failed to dequeue pending job"); @@ -215,19 +207,12 @@ static void alloc_response_cb (flux_t *h, errno = EEXIST; goto teardown; } - if (!R) { - flux_log (h, LOG_ERR, "sched.alloc-response: protocol error"); - errno = EPROTO; - goto teardown; - } - (void)json_object_del (R, "scheduling"); job->R_redacted = json_incref (R); if (annotations_update_and_publish (ctx, job, annotations) < 0) flux_log_error (h, "annotations_update: id=%s", idf58 (id)); /* Only modify job state after annotation event is published */ - alloc->alloc_pending_count--; job->alloc_pending = 0; if (job->annotations) { if (event_job_post_pack (ctx->event, @@ -248,30 +233,22 @@ static void alloc_response_cb (flux_t *h, errno = EPROTO; goto teardown; } + if (!job) + break; if (annotations_update_and_publish (ctx, job, annotations) < 0) flux_log_error (h, "annotations_update: id=%s", idf58 (id)); break; case FLUX_SCHED_ALLOC_DENY: // error alloc->alloc_pending_count--; + if (!job) + break; job->alloc_pending = 0; if (alloc->alloc_limit) { if (zlistx_delete (alloc->pending_jobs, job->handle) < 0) flux_log (ctx->h, LOG_ERR, "failed to dequeue pending job"); job->handle = NULL; } - annotations_clear (job, &cleared); - if (cleared) { - if (event_job_post_pack (ctx->event, - job, - "annotations", - EVENT_NO_COMMIT, - "{s:n}", - "annotations") < 0) - flux_log_error (ctx->h, - "%s: event_job_post_pack: id=%s", - __FUNCTION__, - idf58 (id)); - } + annotations_clear_and_publish (ctx, job, NULL); if (raise_job_exception (ctx, job, "alloc", @@ -282,6 +259,8 @@ static void alloc_response_cb (flux_t *h, break; case FLUX_SCHED_ALLOC_CANCEL: alloc->alloc_pending_count--; + if (!job) + break; if (job->state == FLUX_JOB_STATE_SCHED) requeue_pending (alloc, job); else { @@ -290,21 +269,9 @@ static void alloc_response_cb (flux_t *h, flux_log (ctx->h, LOG_ERR, "failed to dequeue pending job"); job->handle = NULL; } - annotations_clear (job, &cleared); + annotations_clear_and_publish (ctx, job, NULL); } job->alloc_pending = 0; - if (cleared) { - if (event_job_post_pack (ctx->event, - job, - "annotations", - EVENT_NO_COMMIT, - "{s:n}", - "annotations") < 0) - flux_log_error (ctx->h, - "%s: event_job_post_pack: id=%s", - __FUNCTION__, - idf58 (id)); - } if (queue_started (alloc->ctx->queue, job)) { if (event_job_action (ctx->event, job) < 0) { flux_log_error (h, @@ -553,7 +520,7 @@ int alloc_send_free_request (struct alloc *alloc, struct job *job) { assert (job->state == FLUX_JOB_STATE_CLEANUP); if (alloc->ready) { - if (free_request (alloc, job) < 0) + if (free_request (alloc, job->id, job->R_redacted) < 0) return -1; if ((job->flags & FLUX_JOB_DEBUG)) (void)event_job_post_pack (alloc->ctx->event, @@ -595,14 +562,37 @@ void alloc_dequeue_alloc_request (struct alloc *alloc, struct job *job) } } -/* called from event_job_action() FLUX_JOB_STATE_CLEANUP - * or alloc_queue_recalc_pending() if queue order has changed. +/* Send a sched.cancel request for job. This RPC receives no direct response. + * Instead, the sched.alloc request receives a FLUX_SCHED_ALLOC_CANCEL or a + * FLUX_SCHED_ALLOC_SUCCESS response. + * + * As described in RFC 27, sched.alloc requests are canceled when: + * 1) a job in SCHED state is canceled + * 2) a queue is administratively disabled + * 3) when repriortizing jobs in limited mode + * + * The finalize flag is for the first case. It allows the job to continue + * through CLEANUP without waiting for the scheduler to respond to the cancel. + * The sched.alloc response handler must handle the case where the job is + * no longer active or its alloc_pending flag is clear. Essentially 'finalize' + * causes the job related finalization stuff to happen here rather than + * in the sched.alloc response handler. */ -int alloc_cancel_alloc_request (struct alloc *alloc, struct job *job) +int alloc_cancel_alloc_request (struct alloc *alloc, + struct job *job, + bool finalize) { if (job->alloc_pending) { if (cancel_request (alloc, job) < 0) return -1; + if (finalize) { + job->alloc_pending = 0; + if (alloc->alloc_limit) { + (void)zlistx_delete (alloc->pending_jobs, job->handle); + job->handle = NULL; + } + annotations_clear_and_publish (alloc->ctx, job, NULL); + } } return 0; } @@ -673,7 +663,7 @@ int alloc_queue_recalc_pending (struct alloc *alloc) && head && tail) { if (job_priority_comparator (head, tail) < 0) { - if (alloc_cancel_alloc_request (alloc, tail) < 0) { + if (alloc_cancel_alloc_request (alloc, tail, false) < 0) { flux_log_error (alloc->ctx->h, "%s: alloc_cancel_alloc_request", __FUNCTION__); diff --git a/src/modules/job-manager/alloc.h b/src/modules/job-manager/alloc.h index a3e00a477451..8a086c04cb98 100644 --- a/src/modules/job-manager/alloc.h +++ b/src/modules/job-manager/alloc.h @@ -31,8 +31,13 @@ void alloc_dequeue_alloc_request (struct alloc *alloc, struct job *job); /* Send a request to cancel pending alloc request. * This function is a no-op if job->alloc_pending is not set. + * If finalize is true, update the job as though the cancelation + * request has already been handled, so the job can progress through + * CLEANUP without waiting for the scheduler response. */ -int alloc_cancel_alloc_request (struct alloc *alloc, struct job *job); +int alloc_cancel_alloc_request (struct alloc *alloc, + struct job *job, + bool finalize); /* Accessor for the count of queued alloc requests. */ diff --git a/src/modules/job-manager/annotate.c b/src/modules/job-manager/annotate.c index d6fc74c10b99..22327014c18b 100644 --- a/src/modules/job-manager/annotate.c +++ b/src/modules/job-manager/annotate.c @@ -29,6 +29,7 @@ #include "src/common/libczmqcontainers/czmq_containers.h" #include "src/common/libutil/jpath.h" +#include "src/common/libjob/idf58.h" #include "job.h" #include "event.h" @@ -40,26 +41,11 @@ struct annotate { flux_msg_handler_t **handlers; }; -void annotations_clear (struct job *job, bool *cleared) +static void annotations_clear (struct job *job) { if (job->annotations) { json_decref (job->annotations); job->annotations = NULL; - if (cleared) - (*cleared) = true; - } -} - -void annotations_sched_clear (struct job *job, bool *cleared) -{ - if (job->annotations) { - if (json_object_del (job->annotations, "sched") == 0) { - /* Special case if annotations are now empty */ - if (!json_object_size (job->annotations)) - annotations_clear (job, NULL); - if (cleared) - (*cleared) = true; - } } } @@ -93,11 +79,36 @@ int annotations_update (struct job *job, const char *path, json_t *annotations) * will handle advertisement of the clear. */ if (!json_object_size (job->annotations)) - annotations_clear (job, NULL); + annotations_clear (job); } return 0; } +void annotations_clear_and_publish (struct job_manager *ctx, + struct job *job, + const char *key) +{ + if (job->annotations) { + if (key) + (void)json_object_del (job->annotations, key); + else + (void)json_object_clear (job->annotations); + if (json_object_size (job->annotations) == 0) { + annotations_clear (job); + if (event_job_post_pack (ctx->event, + job, + "annotations", + EVENT_NO_COMMIT, + "{s:n}", + "annotations") < 0) { + flux_log_error (ctx->h, + "error posting null annotations event for %s", + idf58 (job->id)); + } + } + } +} + int annotations_update_and_publish (struct job_manager *ctx, struct job *job, json_t *annotations) diff --git a/src/modules/job-manager/annotate.h b/src/modules/job-manager/annotate.h index 291f41fd3094..8e17e74abb3c 100644 --- a/src/modules/job-manager/annotate.h +++ b/src/modules/job-manager/annotate.h @@ -16,8 +16,6 @@ #include "job.h" #include "job-manager.h" -void annotations_clear (struct job *job, bool *cleared); -void annotations_sched_clear (struct job *job, bool *cleared); int annotations_update (struct job *job, const char *path, json_t *annotations); struct annotate *annotate_ctx_create (struct job_manager *ctx); @@ -30,6 +28,14 @@ int annotations_update_and_publish (struct job_manager *ctx, struct job *job, json_t *annotations); +/* clear key from annotations, or clear all annotations if key == NULL. + * If that transitioned the annotations object from non-empty to empty, + * post an annotations event with the context of {"annotations":null}. + */ +void annotations_clear_and_publish (struct job_manager *ctx, + struct job *job, + const char *key); + #endif /* ! _FLUX_JOB_MANAGER_ANNOTATE_H */ /* * vi:tabstop=4 shiftwidth=4 expandtab diff --git a/src/modules/job-manager/event.c b/src/modules/job-manager/event.c index e46428b875c1..a9a2d02f9030 100644 --- a/src/modules/job-manager/event.c +++ b/src/modules/job-manager/event.c @@ -293,7 +293,7 @@ int event_job_action (struct event *event, struct job *job) * SCHED state, dequeue the job first. */ if (job->alloc_pending) - alloc_cancel_alloc_request (ctx->alloc, job); + alloc_cancel_alloc_request (ctx->alloc, job, false); if (job->alloc_queued) alloc_dequeue_alloc_request (ctx->alloc, job); break; @@ -313,7 +313,7 @@ int event_job_action (struct event *event, struct job *job) break; case FLUX_JOB_STATE_CLEANUP: if (job->alloc_pending) - alloc_cancel_alloc_request (ctx->alloc, job); + alloc_cancel_alloc_request (ctx->alloc, job, true); if (job->alloc_queued) alloc_dequeue_alloc_request (ctx->alloc, job); diff --git a/src/modules/job-manager/prioritize.c b/src/modules/job-manager/prioritize.c index efc0b7b2a94b..c292e6100136 100644 --- a/src/modules/job-manager/prioritize.c +++ b/src/modules/job-manager/prioritize.c @@ -125,7 +125,7 @@ static int reprioritize_one (struct job_manager *ctx, } else if (job->alloc_pending) { if (job->priority == FLUX_JOB_PRIORITY_MIN) { - if (alloc_cancel_alloc_request (ctx->alloc, job) < 0) + if (alloc_cancel_alloc_request (ctx->alloc, job, false) < 0) return -1; } else if (oneshot) { diff --git a/src/modules/job-manager/queue.c b/src/modules/job-manager/queue.c index 5f847fb04739..54c7436f92e5 100644 --- a/src/modules/job-manager/queue.c +++ b/src/modules/job-manager/queue.c @@ -665,7 +665,7 @@ static void queue_stop (struct queue *queue, const char *name) if (job->alloc_queued) alloc_dequeue_alloc_request (queue->ctx->alloc, job); else if (job->alloc_pending) - alloc_cancel_alloc_request (queue->ctx->alloc, job); + alloc_cancel_alloc_request (queue->ctx->alloc, job, false); } job = zhashx_next (queue->ctx->active_jobs); } diff --git a/t/Makefile.am b/t/Makefile.am index c21362cb8bdc..65132af65e87 100644 --- a/t/Makefile.am +++ b/t/Makefile.am @@ -170,6 +170,7 @@ TESTSCRIPTS = \ t2302-sched-simple-up-down.t \ t2303-sched-hello.t \ t2304-sched-simple-alloc-check.t \ + t2305-sched-slow.t \ t2310-resource-module.t \ t2311-resource-drain.t \ t2312-resource-exclude.t \ diff --git a/t/t0003-module.t b/t/t0003-module.t index f34efff5eb9f..05be2181aef6 100755 --- a/t/t0003-module.t +++ b/t/t0003-module.t @@ -29,6 +29,12 @@ module_getinfo () { FLUX_HANDLE_TRACE=1 flux python -c "import flux; print(flux.Flux().rpc(\"$1.info\").get_str())" } +# Usage: module_debug_defer modname True|False +module_debug_defer () { + flux python -c "import flux; flux.Flux().rpc(\"broker.module-debug\",{\"name\":\"$1\",\"defer\":$2}).get()" +} + + test_expect_success 'module: load test module' ' flux module load $testmod ' @@ -287,4 +293,29 @@ test_expect_success 'module: module name is called out' ' grep ".content. was not properly shut down" nounload.err ' +test_expect_success 'module: load testmod' ' + flux module load $testmod +' +test_expect_success 'module-debug name=badname fails' ' + test_must_fail module_debug_defer badname True +' +test_expect_success 'module-debug defer=42 fails (not boolean)' ' + test_must_fail module_debug_defer testmod 42 +' +test_expect_success 'module-debug name=testmod defer=True works' ' + module_debug_defer testmod True +' +test_expect_success 'testmod does not respond to ping' ' + test_expect_code 137 run_timeout 2 flux ping --count=1 testmod +' +test_expect_success 'module-debug name=testmod defer=False works' ' + module_debug_defer testmod False +' +test_expect_success 'testmod does respond to ping' ' + run_timeout 10 flux ping --count=1 testmod +' +test_expect_success 'module: remove testmod' ' + flux module remove -f testmod +' + test_done diff --git a/t/t2305-sched-slow.t b/t/t2305-sched-slow.t new file mode 100755 index 000000000000..a9af370428b0 --- /dev/null +++ b/t/t2305-sched-slow.t @@ -0,0 +1,84 @@ +#!/bin/sh + +test_description='test a scheduler that is slow to respond +' + +# Append --logfile option if FLUX_TESTS_LOGFILE is set in environment: +test -n "$FLUX_TESTS_LOGFILE" && set -- "$@" --logfile +. $(dirname $0)/sharness.sh + +test_under_flux 1 + +# Usage: module_debug_defer modname True|False +module_debug_defer () { + flux python -c "import flux; flux.Flux().rpc(\"broker.module-debug\",{\"name\":\"$1\",\"defer\":$2}).get()" +} + +test_expect_success 'pause sched message handling' ' + module_debug_defer sched-simple True +' +test_expect_success 'a job can be submitted when the scheduler is unresponsive' ' + flux submit -N1 \ + --flags=debug --wait-event=debug.alloc-request \ + true >job1.id +' +test_expect_success 'the job is blocked on the alloc request' ' + test_expect_code 137 run_timeout 2 \ + flux job wait-event $(cat job1.id) alloc +' +test_expect_success 'unpause sched message handling' ' + module_debug_defer sched-simple False +' +test_expect_success 'the job gets its allocation and completes' ' + run_timeout 30 flux job wait-event $(cat job1.id) clean +' +test_expect_success 'submit a job and wait for it to get an alloc response' ' + flux submit -N1 --wait-event=alloc sleep 3600 >job2.id +' +test_expect_success 'pause sched message handling' ' + module_debug_defer sched-simple True +' +test_expect_success 'cancel the job' ' + flux cancel $(cat job2.id) +' +test_expect_success 'the job is able to get through CLEANUP state' ' + run_timeout 30 flux job wait-event $(cat job2.id) clean +' +test_expect_success 'unpause sched message handling' ' + module_debug_defer sched-simple False +' +test_expect_success 'another -N1 job can run so resources are free' ' + run_timeout 30 flux run -N1 true +' +test_expect_success 'drain the only node in the instance' ' + flux resource drain 0 +' +test_expect_success 'submit a job' ' + flux submit -N1 \ + --flags=debug --wait-event=debug.alloc-request \ + true >job3.id +' +test_expect_success 'the job is blocked on the alloc request' ' + test_expect_code 137 run_timeout 2 \ + flux job wait-event $(cat job3.id) alloc +' +test_expect_success 'pause sched message handling' ' + module_debug_defer sched-simple True +' +test_expect_success 'cancel the job' ' + flux cancel $(cat job3.id) +' +test_expect_success 'the job is able to get through CLEANUP state' ' + run_timeout 30 flux job wait-event $(cat job3.id) clean +' +test_expect_success 'unpause sched message handling' ' + module_debug_defer sched-simple False +' +test_expect_success 'undrain the node' ' + flux resource undrain 0 +' +test_expect_success 'another -N1 job can run so everything still works!' ' + run_timeout 30 flux run -N1 true +' + +test_done