From 20fc5490b62e0ddc7014a3822304b32592ec4cb4 Mon Sep 17 00:00:00 2001 From: Mohamad Chaarawi Date: Mon, 22 Jul 2024 19:54:35 +0000 Subject: [PATCH] DAOS-15800 client: create cart context on specific interface Cart has added the ability to select network interface on context creation. The daos_agent also added a numa-fabric map that can be queried at init time. Update the DAOS client to query from the agent a map of numa to network interface on daos_init(), and on EQ creation, select the best interface for the network context based on the numa of the calling thread. Quick-Functional: true Test-tag: test_pil4dfs_vs_dfs Required-githooks: true Signed-off-by: Mohamad Chaarawi --- src/cart/README.env | 2 +- src/cart/crt_internal_types.h | 2 +- src/client/api/SConscript | 2 +- src/client/api/event.c | 39 +++++++- src/engine/SConscript | 2 +- src/include/daos/mgmt.h | 6 ++ src/mgmt/cli_mgmt.c | 131 +++++++++++++++++++++++++-- src/rdb/tests/SConscript | 2 +- src/tests/SConscript | 2 +- src/tests/ftest/dfuse/pil4dfs_fio.py | 2 + 10 files changed, 169 insertions(+), 21 deletions(-) diff --git a/src/cart/README.env b/src/cart/README.env index d8e08ef546be..e30fd66b0ad5 100644 --- a/src/cart/README.env +++ b/src/cart/README.env @@ -155,7 +155,7 @@ This file lists the environment variables used in CaRT. . CRT_CTX_NUM If set, specifies the limit of number of allowed CaRT contexts to be created. - Valid range is [1, 64], with default being 64 if unset. + Valid range is [1, 128], with default being 128 if unset. . D_FI_CONFIG Specifies the fault injection configuration file. If this variable is not set diff --git a/src/cart/crt_internal_types.h b/src/cart/crt_internal_types.h index 02b58fec3a58..e51b34e100da 100644 --- a/src/cart/crt_internal_types.h +++ b/src/cart/crt_internal_types.h @@ -14,7 +14,7 @@ #define CRT_CONTEXT_NULL (NULL) #ifndef CRT_SRV_CONTEXT_NUM -#define CRT_SRV_CONTEXT_NUM (64) /* Maximum number of contexts */ +#define CRT_SRV_CONTEXT_NUM (128) /* Maximum number of contexts */ #endif diff --git a/src/client/api/SConscript b/src/client/api/SConscript index 8c71ceb42732..9ba2f2185c19 100644 --- a/src/client/api/SConscript +++ b/src/client/api/SConscript @@ -17,7 +17,7 @@ def scons(): if prereqs.client_requested(): libdaos = env.d_library('daos', libdaos_tgts, SHLIBVERSION=API_VERSION, - LIBS=['daos_common']) + LIBS=['daos_common', 'numa']) if hasattr(env, 'InstallVersionedLib'): env.InstallVersionedLib('$PREFIX/lib64/', libdaos, SHLIBVERSION=API_VERSION) else: diff --git a/src/client/api/event.c b/src/client/api/event.c index 2bbd63653e73..8db85ad179e8 100644 --- a/src/client/api/event.c +++ b/src/client/api/event.c @@ -83,11 +83,24 @@ daos_eq_lib_init(crt_init_options_t *crt_info) D_GOTO(unlock, rc); } - /* use a global shared context for all eq for now */ - rc = crt_context_create(&daos_eq_ctx); + if (d_dynamic_ctx_g) { + char iface[DAOS_SYS_INFO_STRING_MAX]; + + rc = dc_mgmt_get_iface(&iface[0]); + if (rc && rc != -DER_NONEXIST) { + D_ERROR("failed to get iface: " DF_RC "\n", DP_RC(rc)); + D_GOTO(crt, rc); + } + /** if no interface returned, use the default */ + if (rc == -DER_NONEXIST) + rc = crt_context_create(&daos_eq_ctx); + else + rc = crt_context_create_on_iface(iface, &daos_eq_ctx); + } else { + rc = crt_context_create(&daos_eq_ctx); + } if (rc != 0) { - D_ERROR("failed to create client context: "DF_RC"\n", - DP_RC(rc)); + D_ERROR("failed to create client context: " DF_RC "\n", DP_RC(rc)); D_GOTO(crt, rc); } @@ -656,7 +669,23 @@ daos_eq_create(daos_handle_t *eqh) eqx = daos_eq2eqx(eq); - rc = crt_context_create(&eqx->eqx_ctx); + if (d_dynamic_ctx_g) { + char iface[DAOS_SYS_INFO_STRING_MAX]; + + rc = dc_mgmt_get_iface(&iface[0]); + if (rc) { + D_ERROR("failed to get iface: " DF_RC "\n", DP_RC(rc)); + daos_eq_free(&eqx->eqx_hlink); + return rc; + } + /** if no interface returned, use the default */ + if (rc == -DER_NONEXIST) + rc = crt_context_create(&eqx->eqx_ctx); + else + rc = crt_context_create_on_iface(iface, &eqx->eqx_ctx); + } else { + rc = crt_context_create(&eqx->eqx_ctx); + } if (rc) { D_WARN("Failed to create CART context; using the global one, "DF_RC"\n", DP_RC(rc)); eqx->eqx_ctx = daos_eq_ctx; diff --git a/src/engine/SConscript b/src/engine/SConscript index 15d3385485d0..c837e83022f8 100644 --- a/src/engine/SConscript +++ b/src/engine/SConscript @@ -14,7 +14,7 @@ def scons(): denv.Append(CPPDEFINES=['-DDAOS_PMEM_BUILD']) libraries = ['daos_common_pmem', 'gurt', 'cart', 'vos_srv'] libraries += ['bio', 'dl', 'uuid', 'pthread', 'abt'] - libraries += ['hwloc', 'pmemobj', 'protobuf-c', 'isal'] + libraries += ['hwloc', 'pmemobj', 'protobuf-c', 'isal', 'numa'] denv.require('argobots', 'protobufc', 'pmdk', 'isal') diff --git a/src/include/daos/mgmt.h b/src/include/daos/mgmt.h index 8113a65ad98b..0e4b623bd40b 100644 --- a/src/include/daos/mgmt.h +++ b/src/include/daos/mgmt.h @@ -17,6 +17,8 @@ #include #include "svc.pb-c.h" +extern bool d_dynamic_ctx_g; + int dc_mgmt_init(void); void dc_mgmt_fini(void); @@ -41,6 +43,8 @@ struct dc_mgmt_sys_info { d_rank_list_t *ms_ranks; char system_name[DAOS_SYS_INFO_STRING_MAX + 1]; uint32_t provider_idx; /* Provider index (if more than one available) */ + daos_size_t numa_entries_nr; + daos_size_t *numa_iface_idx_rr; }; /** Client system handle */ @@ -78,5 +82,7 @@ int dc_get_attach_info(const char *name, bool all_ranks, struct dc_mgmt_sys_info void dc_put_attach_info(struct dc_mgmt_sys_info *info, Mgmt__GetAttachInfoResp *resp); int dc_mgmt_cache_attach_info(const char *name); void dc_mgmt_drop_attach_info(void); +int + dc_mgmt_get_iface(char *iface); int dc_mgmt_tm_register(const char *sys, const char *jobid, key_t shm_key, uid_t *owner_uid); #endif diff --git a/src/mgmt/cli_mgmt.c b/src/mgmt/cli_mgmt.c index c24f4802171f..11fe16de9208 100644 --- a/src/mgmt/cli_mgmt.c +++ b/src/mgmt/cli_mgmt.c @@ -12,17 +12,17 @@ #define D_LOGFAC DD_FAC(mgmt) -#include - #include #include #include #include +#include #include #include #include "svc.pb-c.h" #include "rpc.h" #include +#include #include #include @@ -31,6 +31,7 @@ char agent_sys_name[DAOS_SYS_NAME_MAX + 1] = DAOS_DEFAULT_SYS_NAME; static struct dc_mgmt_sys_info info_g; static Mgmt__GetAttachInfoResp *resp_g; +bool d_dynamic_ctx_g; int dc_mgmt_proto_version; int @@ -241,6 +242,7 @@ put_attach_info(struct dc_mgmt_sys_info *info, Mgmt__GetAttachInfoResp *resp) if (resp != NULL) free_get_attach_info_resp(resp); d_rank_list_free(info->ms_ranks); + D_FREE(info->numa_iface_idx_rr); } void @@ -413,9 +415,23 @@ dc_get_attach_info(const char *name, bool all_ranks, struct dc_mgmt_sys_info *in int dc_mgmt_cache_attach_info(const char *name) { + int rc; + if (name != NULL && strcmp(name, agent_sys_name) != 0) return -DER_INVAL; - return get_attach_info(name, true, &info_g, &resp_g); + rc = get_attach_info(name, true, &info_g, &resp_g); + if (rc) + return rc; + + info_g.numa_entries_nr = resp_g->n_numa_fabric_interfaces; + D_ALLOC_ARRAY(info_g.numa_iface_idx_rr, info_g.numa_entries_nr); + if (info_g.numa_iface_idx_rr == NULL) + D_GOTO(err_rank_list, rc = -DER_NOMEM); + return 0; + +err_rank_list: + d_rank_list_free(info_g.ms_ranks); + return rc; } static void @@ -625,14 +641,51 @@ dc_mgmt_net_cfg(const char *name, crt_init_options_t *crt_info) D_STRNDUP(crt_info->cio_provider, info->provider, DAOS_SYS_INFO_STRING_MAX); if (NULL == crt_info->cio_provider) D_GOTO(cleanup, rc = -DER_NOMEM); - D_STRNDUP(crt_info->cio_interface, info->interface, DAOS_SYS_INFO_STRING_MAX); - if (NULL == crt_info->cio_interface) - D_GOTO(cleanup, rc = -DER_NOMEM); - D_STRNDUP(crt_info->cio_domain, info->domain, DAOS_SYS_INFO_STRING_MAX); - if (NULL == crt_info->cio_domain) - D_GOTO(cleanup, rc = -DER_NOMEM); - D_INFO("Network interface: %s, Domain: %s\n", info->interface, info->domain); + d_getenv_bool("D_DYNAMIC_CTX", &d_dynamic_ctx_g); + if (d_dynamic_ctx_g) { + int i; + + D_ALLOC(crt_info->cio_interface, + DAOS_SYS_INFO_STRING_MAX * resp->n_numa_fabric_interfaces); + if (crt_info->cio_interface == NULL) + D_GOTO(cleanup, rc = -DER_NOMEM); + D_ALLOC(crt_info->cio_domain, + DAOS_SYS_INFO_STRING_MAX * resp->n_numa_fabric_interfaces); + if (crt_info->cio_domain == NULL) + D_GOTO(cleanup, rc = -DER_NOMEM); + + for (i = 0; i < resp->n_numa_fabric_interfaces; i++) { + Mgmt__FabricInterfaces *numa_ifaces = resp_g->numa_fabric_interfaces[i]; + int j; + + for (j = 0; j < numa_ifaces->n_ifaces; j++) { + if (i != 0 || j != 0) { + strcat(crt_info->cio_interface, ","); + strcat(crt_info->cio_domain, ","); + } + strcat(crt_info->cio_interface, numa_ifaces->ifaces[j]->interface); + strcat(crt_info->cio_domain, numa_ifaces->ifaces[j]->domain); + } + /* + * If we have multiple interfaces per numa node, we want to randomize the + * first interface selected in case we have multiple processes running + * there. So initialize the index array at that interface to -1 to know that + * this is the first selection later. + */ + if (numa_ifaces->n_ifaces) + info_g.numa_iface_idx_rr[i] = -1; + } + } else { + D_STRNDUP(crt_info->cio_interface, info->interface, DAOS_SYS_INFO_STRING_MAX); + if (NULL == crt_info->cio_interface) + D_GOTO(cleanup, rc = -DER_NOMEM); + D_STRNDUP(crt_info->cio_domain, info->domain, DAOS_SYS_INFO_STRING_MAX); + if (NULL == crt_info->cio_domain) + D_GOTO(cleanup, rc = -DER_NOMEM); + } + D_INFO("Network interface: %s, Domain: %s, Provider: %s\n", crt_info->cio_interface, + crt_info->cio_domain, crt_info->cio_provider); D_DEBUG(DB_MGMT, "CaRT initialization with:\n" "\tD_PROVIDER: %s, CRT_TIMEOUT: %d, CRT_SECONDARY_PROVIDER: %s\n", @@ -667,6 +720,64 @@ int dc_mgmt_net_cfg_check(const char *name) return 0; } +int +dc_mgmt_get_iface(char *iface) +{ + int cpu; + int numa; + int i; + + cpu = sched_getcpu(); + if (cpu < 0) { + D_ERROR("sched_getcpu() failed: %d (%s)\n", errno, strerror(errno)); + return d_errno2der(errno); + } + + numa = numa_node_of_cpu(cpu); + if (numa < 0) { + D_ERROR("numa_node_of_cpu() failed: %d (%s)\n", errno, strerror(errno)); + return d_errno2der(errno); + } + + if (resp_g->n_numa_fabric_interfaces <= 0) { + D_ERROR("No fabric interfaces initialized.\n"); + return -DER_INVAL; + } + + for (i = 0; i < resp_g->n_numa_fabric_interfaces; i++) { + Mgmt__FabricInterfaces *numa_ifaces = resp_g->numa_fabric_interfaces[i]; + int idx; + + if (numa_ifaces->numa_node != numa) + continue; + + /* + * Randomize the first interface used to avoid multiple processes starting on the + * first interface (if there is more than 1). + */ + if (info_g.numa_iface_idx_rr[i] == -1) { + d_srand(getpid()); + info_g.numa_iface_idx_rr[i] = d_rand() % numa_ifaces->n_ifaces; + } + idx = info_g.numa_iface_idx_rr[i] % numa_ifaces->n_ifaces; + D_ASSERT(numa_ifaces->ifaces[idx]->numa_node == numa); + info_g.numa_iface_idx_rr[i]++; + + if (copy_str(iface, numa_ifaces->ifaces[idx]->interface) != 0) { + D_ERROR("Interface string too long.\n"); + return -DER_INVAL; + } + D_DEBUG(DB_MGMT, "Numa: %d, Interface Selected: IDX: %d, Name = %s\n", numa, idx, + iface); + break; + } + if (i == resp_g->n_numa_fabric_interfaces) { + D_DEBUG(DB_MGMT, "No iface on numa %d\n", numa); + return -DER_NONEXIST; + } + return 0; +} + static int send_monitor_request(struct dc_pool *pool, int request_type) { struct drpc *ctx; diff --git a/src/rdb/tests/SConscript b/src/rdb/tests/SConscript index cd46e41355fb..b1919aac2ff9 100644 --- a/src/rdb/tests/SConscript +++ b/src/rdb/tests/SConscript @@ -15,7 +15,7 @@ def scons(): # rdbt client rdbt = tenv.d_program('rdbt', ['rdbt.c', 'rpc.c'] + libdaos_tgts, LIBS=['daos_common_pmem', 'cart', 'gurt', 'uuid', 'isal', 'protobuf-c', - 'pthread']) + 'pthread', 'numa']) tenv.Install('$PREFIX/bin', rdbt) diff --git a/src/tests/SConscript b/src/tests/SConscript index 0c620376c217..d29273979812 100644 --- a/src/tests/SConscript +++ b/src/tests/SConscript @@ -35,7 +35,7 @@ def build_tests(env): daos_perf = denv.d_program('daos_perf', ['daos_perf.c', perf_common], LIBS=libs_client) denv.Install('$PREFIX/bin/', daos_perf) - libs_server += ['vos', 'bio', 'abt'] + libs_server += ['vos', 'bio', 'abt', 'numa'] vos_engine = denv.StaticObject(['vos_engine.c']) if denv["STACK_MMAP"] == 1: diff --git a/src/tests/ftest/dfuse/pil4dfs_fio.py b/src/tests/ftest/dfuse/pil4dfs_fio.py index 2aa3cd1b9523..d66d4db7810a 100644 --- a/src/tests/ftest/dfuse/pil4dfs_fio.py +++ b/src/tests/ftest/dfuse/pil4dfs_fio.py @@ -115,6 +115,7 @@ def _run_fio_pil4dfs(self, ioengine): "global", "cpus_allowed", self.fio_cpus_allowed, f"fio --name=global --cpus_allowed={self.fio_cpus_allowed}") fio_cmd.env['LD_PRELOAD'] = os.path.join(self.prefix, 'lib64', 'libpil4dfs.so') + fio_cmd.env['D_DYNAMIC_CTX'] = 1 fio_cmd.hosts = self.hostlist_clients bws = {} @@ -154,6 +155,7 @@ def _run_fio_dfs(self): fio_cmd.update( "job", "pool", container.pool.uuid, f"fio --name=job --pool={container.pool.uuid}") fio_cmd.update("job", "cont", container.uuid, f"fio --name=job --cont={container.uuid}") + fio_cmd.env['D_DYNAMIC_CTX'] = 1 fio_cmd.hosts = self.hostlist_clients bws = {}