Skip to content

Commit

Permalink
DAOS-15914: crt_reply_send_input_free()
Browse files Browse the repository at this point in the history
- New crt_reply_send_input_free() API added which releases input buffer right
  after HG_Respond() instead of waiting until the handle is destroyed.

- srv_obj.c calls changed to use new crt_reply_send_input_free()

Required-githooks: true

Signed-off-by: Alexander A Oganezov <alexander.a.oganezov@intel.com>
  • Loading branch information
frostedcmos authored and mjean308 committed Sep 23, 2024
1 parent 8965f54 commit 6c48981
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 31 deletions.
10 changes: 10 additions & 0 deletions src/cart/crt_hg.c
Original file line number Diff line number Diff line change
Expand Up @@ -1479,6 +1479,16 @@ crt_hg_reply_send(struct crt_rpc_priv *rpc_priv)
rc = crt_hgret_2_der(hg_ret);
}

/* Release input buffer */
if (rpc_priv->crp_release_input_early && !rpc_priv->crp_forward) {
hg_ret = HG_Release_input_buf(rpc_priv->crp_hg_hdl);
if (hg_ret != HG_SUCCESS) {
RPC_ERROR(rpc_priv, "HG_Release_input_buf failed, hg_ret: " DF_HG_RC "\n",
DP_HG_RC(hg_ret));
/* Fall through */
}
}

return rc;
}

Expand Down
20 changes: 20 additions & 0 deletions src/cart/crt_rpc.c
Original file line number Diff line number Diff line change
Expand Up @@ -1550,6 +1550,26 @@ crt_req_send(crt_rpc_t *req, crt_cb_t complete_cb, void *arg)
return rc;
}

int
crt_reply_send_input_free(crt_rpc_t *req)
{
struct crt_rpc_priv *rpc_priv = NULL;
int rc = 0;

if (req == NULL) {
D_ERROR("invalid parameter (NULL req).\n");
D_GOTO(out, rc = -DER_INVAL);
}

rpc_priv = container_of(req, struct crt_rpc_priv, crp_pub);
rpc_priv->crp_release_input_early = 1;

return crt_reply_send(req);

out:
return rc;
}

int
crt_reply_send(crt_rpc_t *req)
{
Expand Down
47 changes: 24 additions & 23 deletions src/cart/crt_rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,29 +166,30 @@ struct crt_rpc_priv {
* match with crp_req_hdr.cch_flags.
*/
uint32_t crp_flags;
uint32_t crp_srv:1, /* flag of server received request */
crp_output_got:1,
crp_input_got:1,
/* flag of collective RPC request */
crp_coll:1,
/* flag of crp_tgt_uri need to be freed */
crp_uri_free:1,
/* flag of forwarded rpc for corpc */
crp_forward:1,
/* flag of in timeout binheap */
crp_in_binheap:1,
/* set if a call to crt_req_reply pending */
crp_reply_pending:1,
/* set to 1 if target ep is set */
crp_have_ep:1,
/* RPC is tracked by the context */
crp_ctx_tracked:1,
/* 1 if RPC fails HLC epsilon check */
crp_fail_hlc:1,
/* RPC completed flag */
crp_completed:1,
/* RPC originated from a primary provider */
crp_src_is_primary:1;
uint32_t crp_srv : 1, /* flag of server received request */
crp_output_got : 1, crp_input_got : 1,
/* flag of collective RPC request */
crp_coll : 1,
/* flag of crp_tgt_uri need to be freed */
crp_uri_free : 1,
/* flag of forwarded rpc for corpc */
crp_forward : 1,
/* flag of in timeout binheap */
crp_in_binheap : 1,
/* set if a call to crt_req_reply pending */
crp_reply_pending : 1,
/* set to 1 if target ep is set */
crp_have_ep : 1,
/* RPC is tracked by the context */
crp_ctx_tracked : 1,
/* 1 if RPC fails HLC epsilon check */
crp_fail_hlc : 1,
/* RPC completed flag */
crp_completed : 1,
/* RPC originated from a primary provider */
crp_src_is_primary : 1,
/* release input buffer early */
crp_release_input_early : 1;

struct crt_opc_info *crp_opc_info;
/* corpc info, only valid when (crp_coll == 1) */
Expand Down
15 changes: 15 additions & 0 deletions src/include/cart/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,21 @@ crt_req_send(crt_rpc_t *req, crt_cb_t complete_cb, void *arg);
int
crt_reply_send(crt_rpc_t *req);

/**
* Send an RPC reply and free the input buffer immediately.
* Only to be called on the server side.
*
* \param[in] req pointer to RPC request
*
* \return DER_SUCCESS on success, negative value if error
*
* \note the crt_rpc_t is exported to user, caller should fill the
* crt_rpc_t::cr_output before sending the RPC reply.
* See \ref crt_req_create.
*/
int
crt_reply_send_input_free(crt_rpc_t *req);

/**
* Return request buffer
*
Expand Down
16 changes: 8 additions & 8 deletions src/object/srv_obj.c
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ obj_rw_reply(crt_rpc_t *rpc, int status, uint64_t epoch,
ioc->ioc_map_ver, orwo->orw_epoch, status);

if (!ioc->ioc_lost_reply) {
rc = crt_reply_send(rpc);
rc = crt_reply_send_input_free(rpc);
if (rc != 0)
D_ERROR("send reply failed: "DF_RC"\n", DP_RC(rc));
} else {
Expand Down Expand Up @@ -3093,7 +3093,7 @@ obj_enum_complete(crt_rpc_t *rpc, int status, int map_version,
obj_reply_map_version_set(rpc, map_version);
oeo->oeo_epoch = epoch;

rc = crt_reply_send(rpc);
rc = crt_reply_send_input_free(rpc);
if (rc != 0)
D_ERROR("send reply failed: "DF_RC"\n", DP_RC(rc));

Expand Down Expand Up @@ -3454,7 +3454,7 @@ obj_punch_complete(crt_rpc_t *rpc, int status, uint32_t map_version)
obj_reply_set_status(rpc, status);
obj_reply_map_version_set(rpc, map_version);

rc = crt_reply_send(rpc);
rc = crt_reply_send_input_free(rpc);
if (rc != 0)
D_ERROR("send reply failed: "DF_RC"\n", DP_RC(rc));
}
Expand Down Expand Up @@ -4243,7 +4243,7 @@ ds_obj_query_key_handler(crt_rpc_t *rpc)
obj_reply_map_version_set(rpc, version);
okqo->okqo_epoch = okqi->okqi_epoch;

rc = crt_reply_send(rpc);
rc = crt_reply_send_input_free(rpc);
if (rc != 0)
D_ERROR("send reply failed: "DF_RC"\n", DP_RC(rc));
}
Expand Down Expand Up @@ -4287,7 +4287,7 @@ ds_obj_sync_handler(crt_rpc_t *rpc)
D_DEBUG(DB_IO, "obj_sync stop: "DF_UOID", epc "DF_X64", rd = %d\n",
DP_UOID(osi->osi_oid), oso->oso_epoch, rc);

rc = crt_reply_send(rpc);
rc = crt_reply_send_input_free(rpc);
if (rc != 0)
D_ERROR("send reply failed: "DF_RC"\n", DP_RC(rc));
}
Expand Down Expand Up @@ -4379,7 +4379,7 @@ obj_cpd_reply(crt_rpc_t *rpc, int status, uint32_t map_version)
D_DEBUG(DB_TRACE, "CPD rpc %p send reply, pmv %d, status %d.\n",
rpc, map_version, status);

rc = crt_reply_send(rpc);
rc = crt_reply_send_input_free(rpc);
if (rc != 0)
D_ERROR("Send CPD reply failed: "DF_RC"\n", DP_RC(rc));

Expand Down Expand Up @@ -5573,7 +5573,7 @@ ds_obj_key2anchor_handler(crt_rpc_t *rpc)
obj_reply_set_status(rpc, rc);
obj_reply_map_version_set(rpc, ioc.ioc_map_ver);
obj_ioc_end(&ioc, rc);
rc = crt_reply_send(rpc);
rc = crt_reply_send_input_free(rpc);
if (rc != 0)
D_ERROR("send reply failed: "DF_RC"\n", DP_RC(rc));
}
Expand Down Expand Up @@ -5911,7 +5911,7 @@ ds_obj_coll_query_handler(crt_rpc_t *rpc)
ocqo->ocqo_flags |= OCRF_RAW_RECX;
}

rc = crt_reply_send(rpc);
rc = crt_reply_send_input_free(rpc);
if (rc != 0)
D_ERROR("send reply failed: "DF_RC"\n", DP_RC(rc));

Expand Down

0 comments on commit 6c48981

Please sign in to comment.