From 6c48981ec4c17a192b753a1f7382725c88726179 Mon Sep 17 00:00:00 2001 From: Alexander A Oganezov Date: Thu, 25 Jul 2024 04:34:56 +0000 Subject: [PATCH] DAOS-15914: crt_reply_send_input_free() - New crt_reply_send_input_free() API added which releases input buffer right after HG_Respond() instead of waiting until the handle is destroyed. - srv_obj.c calls changed to use new crt_reply_send_input_free() Required-githooks: true Signed-off-by: Alexander A Oganezov --- src/cart/crt_hg.c | 10 +++++++++ src/cart/crt_rpc.c | 20 ++++++++++++++++++ src/cart/crt_rpc.h | 47 +++++++++++++++++++++--------------------- src/include/cart/api.h | 15 ++++++++++++++ src/object/srv_obj.c | 16 +++++++------- 5 files changed, 77 insertions(+), 31 deletions(-) diff --git a/src/cart/crt_hg.c b/src/cart/crt_hg.c index 789d75ceb31..33ef9a6c9b4 100644 --- a/src/cart/crt_hg.c +++ b/src/cart/crt_hg.c @@ -1479,6 +1479,16 @@ crt_hg_reply_send(struct crt_rpc_priv *rpc_priv) rc = crt_hgret_2_der(hg_ret); } + /* Release input buffer */ + if (rpc_priv->crp_release_input_early && !rpc_priv->crp_forward) { + hg_ret = HG_Release_input_buf(rpc_priv->crp_hg_hdl); + if (hg_ret != HG_SUCCESS) { + RPC_ERROR(rpc_priv, "HG_Release_input_buf failed, hg_ret: " DF_HG_RC "\n", + DP_HG_RC(hg_ret)); + /* Fall through */ + } + } + return rc; } diff --git a/src/cart/crt_rpc.c b/src/cart/crt_rpc.c index 72d8214aa09..ead03d1bf29 100644 --- a/src/cart/crt_rpc.c +++ b/src/cart/crt_rpc.c @@ -1550,6 +1550,26 @@ crt_req_send(crt_rpc_t *req, crt_cb_t complete_cb, void *arg) return rc; } +int +crt_reply_send_input_free(crt_rpc_t *req) +{ + struct crt_rpc_priv *rpc_priv = NULL; + int rc = 0; + + if (req == NULL) { + D_ERROR("invalid parameter (NULL req).\n"); + D_GOTO(out, rc = -DER_INVAL); + } + + rpc_priv = container_of(req, struct crt_rpc_priv, crp_pub); + rpc_priv->crp_release_input_early = 1; + + return crt_reply_send(req); + +out: + return rc; +} + int crt_reply_send(crt_rpc_t *req) { diff --git a/src/cart/crt_rpc.h b/src/cart/crt_rpc.h index a5e6afee780..3a590933103 100644 --- a/src/cart/crt_rpc.h +++ b/src/cart/crt_rpc.h @@ -166,29 +166,30 @@ struct crt_rpc_priv { * match with crp_req_hdr.cch_flags. */ uint32_t crp_flags; - uint32_t crp_srv:1, /* flag of server received request */ - crp_output_got:1, - crp_input_got:1, - /* flag of collective RPC request */ - crp_coll:1, - /* flag of crp_tgt_uri need to be freed */ - crp_uri_free:1, - /* flag of forwarded rpc for corpc */ - crp_forward:1, - /* flag of in timeout binheap */ - crp_in_binheap:1, - /* set if a call to crt_req_reply pending */ - crp_reply_pending:1, - /* set to 1 if target ep is set */ - crp_have_ep:1, - /* RPC is tracked by the context */ - crp_ctx_tracked:1, - /* 1 if RPC fails HLC epsilon check */ - crp_fail_hlc:1, - /* RPC completed flag */ - crp_completed:1, - /* RPC originated from a primary provider */ - crp_src_is_primary:1; + uint32_t crp_srv : 1, /* flag of server received request */ + crp_output_got : 1, crp_input_got : 1, + /* flag of collective RPC request */ + crp_coll : 1, + /* flag of crp_tgt_uri need to be freed */ + crp_uri_free : 1, + /* flag of forwarded rpc for corpc */ + crp_forward : 1, + /* flag of in timeout binheap */ + crp_in_binheap : 1, + /* set if a call to crt_req_reply pending */ + crp_reply_pending : 1, + /* set to 1 if target ep is set */ + crp_have_ep : 1, + /* RPC is tracked by the context */ + crp_ctx_tracked : 1, + /* 1 if RPC fails HLC epsilon check */ + crp_fail_hlc : 1, + /* RPC completed flag */ + crp_completed : 1, + /* RPC originated from a primary provider */ + crp_src_is_primary : 1, + /* release input buffer early */ + crp_release_input_early : 1; struct crt_opc_info *crp_opc_info; /* corpc info, only valid when (crp_coll == 1) */ diff --git a/src/include/cart/api.h b/src/include/cart/api.h index 21941d7cb98..16fc2091d4b 100644 --- a/src/include/cart/api.h +++ b/src/include/cart/api.h @@ -474,6 +474,21 @@ crt_req_send(crt_rpc_t *req, crt_cb_t complete_cb, void *arg); int crt_reply_send(crt_rpc_t *req); +/** + * Send an RPC reply and free the input buffer immediately. + * Only to be called on the server side. + * + * \param[in] req pointer to RPC request + * + * \return DER_SUCCESS on success, negative value if error + * + * \note the crt_rpc_t is exported to user, caller should fill the + * crt_rpc_t::cr_output before sending the RPC reply. + * See \ref crt_req_create. + */ +int +crt_reply_send_input_free(crt_rpc_t *req); + /** * Return request buffer * diff --git a/src/object/srv_obj.c b/src/object/srv_obj.c index a51682b4785..c62012205d7 100644 --- a/src/object/srv_obj.c +++ b/src/object/srv_obj.c @@ -187,7 +187,7 @@ obj_rw_reply(crt_rpc_t *rpc, int status, uint64_t epoch, ioc->ioc_map_ver, orwo->orw_epoch, status); if (!ioc->ioc_lost_reply) { - rc = crt_reply_send(rpc); + rc = crt_reply_send_input_free(rpc); if (rc != 0) D_ERROR("send reply failed: "DF_RC"\n", DP_RC(rc)); } else { @@ -3093,7 +3093,7 @@ obj_enum_complete(crt_rpc_t *rpc, int status, int map_version, obj_reply_map_version_set(rpc, map_version); oeo->oeo_epoch = epoch; - rc = crt_reply_send(rpc); + rc = crt_reply_send_input_free(rpc); if (rc != 0) D_ERROR("send reply failed: "DF_RC"\n", DP_RC(rc)); @@ -3454,7 +3454,7 @@ obj_punch_complete(crt_rpc_t *rpc, int status, uint32_t map_version) obj_reply_set_status(rpc, status); obj_reply_map_version_set(rpc, map_version); - rc = crt_reply_send(rpc); + rc = crt_reply_send_input_free(rpc); if (rc != 0) D_ERROR("send reply failed: "DF_RC"\n", DP_RC(rc)); } @@ -4243,7 +4243,7 @@ ds_obj_query_key_handler(crt_rpc_t *rpc) obj_reply_map_version_set(rpc, version); okqo->okqo_epoch = okqi->okqi_epoch; - rc = crt_reply_send(rpc); + rc = crt_reply_send_input_free(rpc); if (rc != 0) D_ERROR("send reply failed: "DF_RC"\n", DP_RC(rc)); } @@ -4287,7 +4287,7 @@ ds_obj_sync_handler(crt_rpc_t *rpc) D_DEBUG(DB_IO, "obj_sync stop: "DF_UOID", epc "DF_X64", rd = %d\n", DP_UOID(osi->osi_oid), oso->oso_epoch, rc); - rc = crt_reply_send(rpc); + rc = crt_reply_send_input_free(rpc); if (rc != 0) D_ERROR("send reply failed: "DF_RC"\n", DP_RC(rc)); } @@ -4379,7 +4379,7 @@ obj_cpd_reply(crt_rpc_t *rpc, int status, uint32_t map_version) D_DEBUG(DB_TRACE, "CPD rpc %p send reply, pmv %d, status %d.\n", rpc, map_version, status); - rc = crt_reply_send(rpc); + rc = crt_reply_send_input_free(rpc); if (rc != 0) D_ERROR("Send CPD reply failed: "DF_RC"\n", DP_RC(rc)); @@ -5573,7 +5573,7 @@ ds_obj_key2anchor_handler(crt_rpc_t *rpc) obj_reply_set_status(rpc, rc); obj_reply_map_version_set(rpc, ioc.ioc_map_ver); obj_ioc_end(&ioc, rc); - rc = crt_reply_send(rpc); + rc = crt_reply_send_input_free(rpc); if (rc != 0) D_ERROR("send reply failed: "DF_RC"\n", DP_RC(rc)); } @@ -5911,7 +5911,7 @@ ds_obj_coll_query_handler(crt_rpc_t *rpc) ocqo->ocqo_flags |= OCRF_RAW_RECX; } - rc = crt_reply_send(rpc); + rc = crt_reply_send_input_free(rpc); if (rc != 0) D_ERROR("send reply failed: "DF_RC"\n", DP_RC(rc));