From 5a2ca114f46a1d69c829fb2d82bb85a6f70c13aa Mon Sep 17 00:00:00 2001 From: Ashley Pittman Date: Mon, 22 Apr 2024 17:12:11 +0000 Subject: [PATCH 01/14] DAOS-15682 dfuse: Perform reads in larger chunks. When dfuse sees I/O as well-aligned 128k reads then read MB at a time and cache the result allowing for faster read bandwidth for well behaved applicaions and large files. Create a new in-memory descriptor for file contents, pull in a whole descriptor on first read and perform all other reads from the same result. This should give much higher bandwidth for well behaved applications and should be easy to extend to proper readahead in future. Test-tag: test_dfuse_caching_check Signed-off-by: Ashley Pittman --- src/client/dfuse/dfuse.h | 7 + src/client/dfuse/ops/open.c | 6 +- src/client/dfuse/ops/read.c | 317 +++++++++++++++++++++++++++++++++++- utils/node_local_test.py | 12 +- 4 files changed, 330 insertions(+), 12 deletions(-) diff --git a/src/client/dfuse/dfuse.h b/src/client/dfuse/dfuse.h index f239f235493..d4c43ef6b6b 100644 --- a/src/client/dfuse/dfuse.h +++ b/src/client/dfuse/dfuse.h @@ -396,6 +396,7 @@ struct dfuse_event { union { struct dfuse_obj_hdl *de_oh; struct dfuse_inode_entry *de_ie; + struct read_chunk_data *de_cd; }; off_t de_req_position; /**< The file position requested by fuse */ union { @@ -1001,6 +1002,8 @@ struct dfuse_inode_entry { /* Entry on the evict list */ d_list_t ie_evict_entry; + + struct read_chunk_core *ie_chunk; }; /* Flush write-back cache writes to a inode. It does this by waiting for and then releasing an @@ -1098,6 +1101,10 @@ dfuse_compute_inode(struct dfuse_cont *dfs, void dfuse_cache_evict_dir(struct dfuse_info *dfuse_info, struct dfuse_inode_entry *ie); +/* Free any read chunk data for an inode */ +void +read_chunk_close(struct dfuse_inode_entry *ie); + /* Metadata caching functions. */ /* Mark the cache as up-to-date from now */ diff --git a/src/client/dfuse/ops/open.c b/src/client/dfuse/ops/open.c index 42a7d6e10fd..4d52fdc84fb 100644 --- a/src/client/dfuse/ops/open.c +++ b/src/client/dfuse/ops/open.c @@ -137,6 +137,7 @@ dfuse_cb_release(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi) struct dfuse_obj_hdl *oh = (struct dfuse_obj_hdl *)fi->fh; struct dfuse_inode_entry *ie = NULL; int rc; + uint32_t oc; uint32_t il_calls; /* Perform the opposite of what the ioctl call does, always change the open handle count @@ -204,7 +205,10 @@ dfuse_cb_release(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi) if (il_calls != 0) { atomic_fetch_sub_relaxed(&oh->doh_ie->ie_il_count, 1); } - atomic_fetch_sub_relaxed(&oh->doh_ie->ie_open_count, 1); + oc = atomic_fetch_sub_relaxed(&oh->doh_ie->ie_open_count, 1); + if (oc == 1) { + read_chunk_close(oh->doh_ie); + } if (oh->doh_evict_on_close) { ie = oh->doh_ie; diff --git a/src/client/dfuse/ops/read.c b/src/client/dfuse/ops/read.c index 0ea1d61ef39..e6cfa781fe0 100644 --- a/src/client/dfuse/ops/read.c +++ b/src/client/dfuse/ops/read.c @@ -1,5 +1,5 @@ /** - * (C) Copyright 2016-2023 Intel Corporation. + * (C) Copyright 2016-2024 Intel Corporation. * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -100,6 +100,307 @@ dfuse_readahead_reply(fuse_req_t req, size_t len, off_t position, struct dfuse_o return true; } +static struct dfuse_eq * +pick_eqt(struct dfuse_info *dfuse_info) +{ + uint64_t eqt_idx; + + eqt_idx = atomic_fetch_add_relaxed(&dfuse_info->di_eqt_idx, 1); + return &dfuse_info->di_eqt[eqt_idx % dfuse_info->di_eq_count]; +} + +/* Readahead and coalescing + * + * This code attempts to predict application and kernel I/O patterns and preemptively read file + * data ahead of when it's requested. + * + * For some kernels read I/O size is limited to 128k when using the page cache or 1Mb when using + * direct I/O. To get around the performance impact of them detect when well aligned 128k reads + * are received and read an entire buffers worth, then for future requests the data should already + * be in cache. + * + * TODO: Ensure that the linear_read flag remains set correctly by this code. + */ + +#define K128 (1024 * 128) +#define CHUNK_SIZE (1024 * 1024) + +struct read_chunk_data { + struct dfuse_event *ev; + fuse_req_t reqs[8]; + struct dfuse_obj_hdl *ohs[8]; + bool done[8]; + d_list_t list; + int bucket; + bool complete; + struct dfuse_eq *eqt; + bool exiting; + int rc; +}; + +struct read_chunk_core { + d_list_t entries; +}; + +/* Global lock for all readahead operations. Each inode has a struct read_chunk_core * entry + * which is checked for NULL and set whilst holding this lock. Each read_chunk_core then has + * a list of read_chunk_data and again, this lock protects all lists on all inodes. This avoids + * the need for a per-inode lock which for many files would consume considerable memory but does + * mean there is potentially some lock contention. The lock however is only held for list + * manipulation, no dfs or kernel calls are made whilst holding the lock. + */ +static pthread_mutex_t rc_lock = PTHREAD_MUTEX_INITIALIZER; + +static void +chunk_free(struct read_chunk_data *cd) +{ + d_list_del(&cd->list); + d_slab_release(cd->eqt->de_read_slab, cd->ev); + D_FREE(cd); +} + +/* Called when the last open file handle on a inode is closed. This needs to free everything which + * is complete and for anything that isn't flag it for deletion in the callback. + */ +void +read_chunk_close(struct dfuse_inode_entry *ie) +{ + struct read_chunk_data *cd, *cdn; + + D_MUTEX_LOCK(&rc_lock); + if (!ie->ie_chunk) + goto out; + + d_list_for_each_entry_safe(cd, cdn, &ie->ie_chunk->entries, list) { + if (cd->complete) { + chunk_free(cd); + } else { + cd->exiting = true; + } + } + D_FREE(ie->ie_chunk); +out: + D_MUTEX_UNLOCK(&rc_lock); +} + +static void +chunk_cb(struct dfuse_event *ev) +{ + struct read_chunk_data *cd = ev->de_cd; + fuse_req_t req; + + cd->rc = ev->de_ev.ev_error; + daos_event_fini(&ev->de_ev); + + do { + int i; + int done_count = 0; + + req = 0; + + D_MUTEX_LOCK(&rc_lock); + + if (cd->exiting) { + chunk_free(cd); + D_MUTEX_UNLOCK(&rc_lock); + return; + } + + cd->complete = true; + for (i = 0; i < 8; i++) { + if (cd->reqs[i]) { + req = cd->reqs[i]; + cd->reqs[i] = 0; + cd->done[i] = true; + break; + } else if (cd->done[i]) { + done_count++; + } + } + + if (done_count == 8) { + D_ASSERT(!req); + d_slab_release(cd->eqt->de_read_slab, cd->ev); + cd->ev = NULL; + } + D_MUTEX_UNLOCK(&rc_lock); + + if (req) { + size_t position = (cd->bucket * CHUNK_SIZE) + (i * K128); + + if (cd->rc != 0) { + DFUSE_REPLY_ERR_RAW(cd->ohs[i], req, cd->rc); + } else { + DFUSE_TRA_DEBUG(cd->ohs[i], "%#zx-%#zx read", position, + position + K128 - 1); + DFUSE_REPLY_BUFQ(cd->ohs[i], req, ev->de_iov.iov_buf + (i * K128), + K128); + } + } + } while (req); +} + +/* Submut a read to dfs. + * + * Returns true on success. + */ +static bool +chunk_fetch(fuse_req_t req, struct dfuse_obj_hdl *oh, struct read_chunk_data *cd, int bucket, + int slot, struct dfuse_inode_entry *ie) +{ + struct dfuse_info *dfuse_info = fuse_req_userdata(req); + struct dfuse_event *ev; + struct dfuse_eq *eqt; + int rc; + + eqt = pick_eqt(dfuse_info); + + ev = d_slab_acquire(eqt->de_read_slab); + if (ev == NULL) { + cd->rc = ENOMEM; + return false; + } + + ev->de_iov.iov_len = CHUNK_SIZE; + ev->de_req = req; + ev->de_cd = cd; + ev->de_sgl.sg_nr = 1; + ev->de_req_len = CHUNK_SIZE; + ev->de_req_position = CHUNK_SIZE * bucket; + + ev->de_complete_cb = chunk_cb; + + cd->ev = ev; + cd->eqt = eqt; + cd->reqs[slot] = req; + cd->ohs[slot] = oh; + + rc = dfs_read(ie->ie_dfs->dfs_ns, ie->ie_obj, &ev->de_sgl, ev->de_req_position, &ev->de_len, + &ev->de_ev); + if (rc != 0) + goto err; + + /* Send a message to the async thread to wake it up and poll for events */ + sem_post(&eqt->de_sem); + + /* Now ensure there are more descriptors for the next request */ + d_slab_restock(eqt->de_read_slab); + + return true; + +err: + daos_event_fini(&ev->de_ev); + d_slab_release(eqt->de_read_slab, ev); + cd->rc = rc; + return false; +} + +/* Try and do a bulk read. + * + * Returns true if it was able to handle the read. + */ +static bool +chunk_read(fuse_req_t req, size_t len, off_t position, struct dfuse_obj_hdl *oh) +{ + struct dfuse_inode_entry *ie = oh->doh_ie; + struct read_chunk_core *cc; + struct read_chunk_data *cd; + off_t last; + int bucket; + int slot; + bool submit = false; + bool rcb = false; + + last = position = (position + (K128 - 1)) & -K128; + + if (len != K128) + return false; + + if ((position % K128) != 0) + return false; + + last = D_ALIGNUP(position + len - 1, CHUNK_SIZE); + + if (last > oh->doh_ie->ie_stat.st_size) + return false; + + bucket = D_ALIGNUP(position + len, CHUNK_SIZE); + bucket = (bucket / CHUNK_SIZE) - 1; + + slot = (position / K128) % 8; + + DFUSE_TRA_DEBUG(oh, "read bucket %#zx-%#zx last %#zx size %#zx bucket %d slot %d", position, + position + len - 1, last, ie->ie_stat.st_size, bucket, slot); + + D_MUTEX_LOCK(&rc_lock); + if (ie->ie_chunk == NULL) { + D_ALLOC_PTR(ie->ie_chunk); + if (ie->ie_chunk == NULL) + goto err; + D_INIT_LIST_HEAD(&ie->ie_chunk->entries); + } + cc = ie->ie_chunk; + + d_list_for_each_entry(cd, &cc->entries, list) + if (cd->bucket == bucket) + goto found; + + D_ALLOC_PTR(cd); + if (cd == NULL) + goto err; + + d_list_add_tail(&cd->list, &cc->entries); + cd->bucket = bucket; + submit = true; + +found: + D_MUTEX_UNLOCK(&rc_lock); + + if (submit) { + DFUSE_TRA_DEBUG(oh, "submit for bucket %d[%d]", bucket, slot); + rcb = chunk_fetch(req, oh, cd, bucket, slot, ie); + } else { + struct dfuse_event *ev = NULL; + + /* Not check if this read request is complete or not yet, if it isn't then just + * save req in the right slot however if it is then reply here. After the call to + * DFUSE_REPLY_* then no reference is held on either the open file or the inode so + * at that point they could be closed, so decided if ev needs to be released whilst + * holding the lock and keep a copy of all data structures needed. + * + * TODO: Implement this in a safe way. A reference count on ev is probably + * required. For now this is safe but will maintain all previously read events + * in memory. + */ + D_MUTEX_LOCK(&rc_lock); + if (cd->complete) { + cd->done[slot] = true; + ev = cd->ev; + } else { + cd->reqs[slot] = req; + cd->ohs[slot] = oh; + } + D_MUTEX_UNLOCK(&rc_lock); + + if (ev) { + if (cd->rc != 0) { + DFUSE_REPLY_ERR_RAW(oh, req, cd->rc); + } else { + DFUSE_TRA_DEBUG(oh, "%#zx-%#zx read", position, + position + K128 - 1); + DFUSE_REPLY_BUFQ(oh, req, ev->de_iov.iov_buf + (slot * K128), K128); + } + } + rcb = true; + } + + return rcb; + +err: + D_MUTEX_UNLOCK(&rc_lock); + return false; +} + void dfuse_cb_read(fuse_req_t req, fuse_ino_t ino, size_t len, off_t position, struct fuse_file_info *fi) { @@ -109,7 +410,6 @@ dfuse_cb_read(fuse_req_t req, fuse_ino_t ino, size_t len, off_t position, struct struct dfuse_eq *eqt; int rc; struct dfuse_event *ev; - uint64_t eqt_idx; DFUSE_IE_STAT_ADD(oh->doh_ie, DS_READ); @@ -146,8 +446,10 @@ dfuse_cb_read(fuse_req_t req, fuse_ino_t ino, size_t len, off_t position, struct } } - eqt_idx = atomic_fetch_add_relaxed(&dfuse_info->di_eqt_idx, 1); - eqt = &dfuse_info->di_eqt[eqt_idx % dfuse_info->di_eq_count]; + if (oh->doh_ie->ie_dfs->dfc_data_timeout != 0 && chunk_read(req, len, position, oh)) + return; + + eqt = pick_eqt(dfuse_info); ev = d_slab_acquire(eqt->de_read_slab); if (ev == NULL) @@ -242,13 +544,10 @@ dfuse_pre_read(struct dfuse_info *dfuse_info, struct dfuse_obj_hdl *oh) struct dfuse_eq *eqt; int rc; struct dfuse_event *ev; - uint64_t eqt_idx; size_t len = oh->doh_ie->ie_stat.st_size; - eqt_idx = atomic_fetch_add_relaxed(&dfuse_info->di_eqt_idx, 1); - eqt = &dfuse_info->di_eqt[eqt_idx % dfuse_info->di_eq_count]; - - ev = d_slab_acquire(eqt->de_read_slab); + eqt = pick_eqt(dfuse_info); + ev = d_slab_acquire(eqt->de_read_slab); if (ev == NULL) D_GOTO(err, rc = ENOMEM); diff --git a/utils/node_local_test.py b/utils/node_local_test.py index b6018e90b3e..16c762e4582 100755 --- a/utils/node_local_test.py +++ b/utils/node_local_test.py @@ -5004,13 +5004,14 @@ def run_in_fg(server, conf, args): # Set to False to run dfuse without a pool. pool_on_cmd_line = True + cont_on_cmd_line = True if not container: container = create_cont(conf, pool, label=label, ctype="POSIX") # Only set the container cache attributes when the container is initially created so they # can be modified later. - cont_attrs = {'dfuse-data-cache': False, + cont_attrs = {'dfuse-data-cache': 120, 'dfuse-attr-time': 67, 'dfuse-dentry-time': 19, 'dfuse-dentry-dir-time': 31, @@ -5026,6 +5027,9 @@ def run_in_fg(server, conf, args): if pool_on_cmd_line: dargs['pool'] = pool.uuid + if cont_on_cmd_line: + dargs["container"] = container + dfuse = DFuse(server, conf, **dargs) @@ -5034,7 +5038,11 @@ def run_in_fg(server, conf, args): dfuse.start() if pool_on_cmd_line: - t_dir = join(dfuse.dir, container) + if cont_on_cmd_line: + t_dir = dfuse.dir + else: + t_dir = join(dfuse.dir, container) + else: t_dir = join(dfuse.dir, pool.uuid, container) From 5ca6b95246a8d1465b9f2075e08a6de539b6c702 Mon Sep 17 00:00:00 2001 From: Ashley Pittman Date: Fri, 26 Apr 2024 09:01:59 +0000 Subject: [PATCH 02/14] Check the size after read. Signed-off-by: Ashley Pittman --- src/client/dfuse/ops/read.c | 34 +++++++++++++++++++--------------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/src/client/dfuse/ops/read.c b/src/client/dfuse/ops/read.c index c2aa00b9668..a84fec023e6 100644 --- a/src/client/dfuse/ops/read.c +++ b/src/client/dfuse/ops/read.c @@ -191,6 +191,9 @@ chunk_cb(struct dfuse_event *ev) fuse_req_t req; cd->rc = ev->de_ev.ev_error; + + if (ev->de_len != CHUNK_SIZE) + cd->rc = EIO; daos_event_fini(&ev->de_ev); do { @@ -246,13 +249,14 @@ chunk_cb(struct dfuse_event *ev) * Returns true on success. */ static bool -chunk_fetch(fuse_req_t req, struct dfuse_obj_hdl *oh, struct read_chunk_data *cd, int bucket, - int slot, struct dfuse_inode_entry *ie) +chunk_fetch(fuse_req_t req, struct dfuse_obj_hdl *oh, struct read_chunk_data *cd, int slot) { - struct dfuse_info *dfuse_info = fuse_req_userdata(req); - struct dfuse_event *ev; - struct dfuse_eq *eqt; - int rc; + struct dfuse_info *dfuse_info = fuse_req_userdata(req); + struct dfuse_inode_entry *ie = oh->doh_ie; + struct dfuse_event *ev; + struct dfuse_eq *eqt; + int rc; + daos_off_t position = cd->bucket * CHUNK_SIZE; eqt = pick_eqt(dfuse_info); @@ -262,12 +266,10 @@ chunk_fetch(fuse_req_t req, struct dfuse_obj_hdl *oh, struct read_chunk_data *cd return false; } - ev->de_iov.iov_len = CHUNK_SIZE; - ev->de_req = req; - ev->de_cd = cd; - ev->de_sgl.sg_nr = 1; - ev->de_req_len = CHUNK_SIZE; - ev->de_req_position = CHUNK_SIZE * bucket; + ev->de_iov.iov_len = CHUNK_SIZE; + ev->de_req = req; + ev->de_cd = cd; + ev->de_sgl.sg_nr = 1; ev->de_complete_cb = chunk_cb; @@ -276,7 +278,7 @@ chunk_fetch(fuse_req_t req, struct dfuse_obj_hdl *oh, struct read_chunk_data *cd cd->reqs[slot] = req; cd->ohs[slot] = oh; - rc = dfs_read(ie->ie_dfs->dfs_ns, ie->ie_obj, &ev->de_sgl, ev->de_req_position, &ev->de_len, + rc = dfs_read(ie->ie_dfs->dfs_ns, ie->ie_obj, &ev->de_sgl, position, &ev->de_len, &ev->de_ev); if (rc != 0) goto err; @@ -359,7 +361,7 @@ chunk_read(fuse_req_t req, size_t len, off_t position, struct dfuse_obj_hdl *oh) if (submit) { DFUSE_TRA_DEBUG(oh, "submit for bucket %d[%d]", bucket, slot); - rcb = chunk_fetch(req, oh, cd, bucket, slot, ie); + rcb = chunk_fetch(req, oh, cd, slot); } else { struct dfuse_event *ev = NULL; @@ -449,7 +451,9 @@ dfuse_cb_read(fuse_req_t req, fuse_ino_t ino, size_t len, off_t position, struct } } - if (oh->doh_ie->ie_dfs->dfc_data_timeout != 0 && chunk_read(req, len, position, oh)) + if (oh->doh_ie->ie_dfs->dfc_data_timeout != 0 && + ((atomic_fetch_add(&oh->doh_ie->ie_il_count, 0) == 0)) && + chunk_read(req, len, position, oh)) return; eqt = pick_eqt(dfuse_info); From 3e98bf393613713fb080af87f234c57ad765237c Mon Sep 17 00:00:00 2001 From: Ashley Pittman Date: Fri, 26 Apr 2024 09:44:57 +0000 Subject: [PATCH 03/14] Set the len to 0. Signed-off-by: Ashley Pittman --- src/client/dfuse/ops/read.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client/dfuse/ops/read.c b/src/client/dfuse/ops/read.c index a84fec023e6..9e8ce659ead 100644 --- a/src/client/dfuse/ops/read.c +++ b/src/client/dfuse/ops/read.c @@ -270,7 +270,7 @@ chunk_fetch(fuse_req_t req, struct dfuse_obj_hdl *oh, struct read_chunk_data *cd ev->de_req = req; ev->de_cd = cd; ev->de_sgl.sg_nr = 1; - + ev->de_len = 0; ev->de_complete_cb = chunk_cb; cd->ev = ev; From be55b2820dc1f2279a61df56a8e9f7763fe341af Mon Sep 17 00:00:00 2001 From: Ashley Pittman Date: Fri, 26 Apr 2024 10:13:33 +0000 Subject: [PATCH 04/14] Fix an issue with position. Test-tag: test_dfuse_caching_check Signed-off-by: Ashley Pittman --- src/client/dfuse/ops/read.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client/dfuse/ops/read.c b/src/client/dfuse/ops/read.c index 9e8ce659ead..ec871bfc771 100644 --- a/src/client/dfuse/ops/read.c +++ b/src/client/dfuse/ops/read.c @@ -314,7 +314,7 @@ chunk_read(fuse_req_t req, size_t len, off_t position, struct dfuse_obj_hdl *oh) bool submit = false; bool rcb = false; - last = position = (position + (K128 - 1)) & -K128; + last = position + ((position + (K128 - 1)) & -K128); if (len != K128) return false; From 2a4173afa55e140f94ed069712c465d967a6cea9 Mon Sep 17 00:00:00 2001 From: Ashley Pittman Date: Fri, 26 Apr 2024 15:19:55 +0000 Subject: [PATCH 05/14] Log when there is a short read. Skip-unit-tests: true Skip-fault-injection-test: true Test-tag: test_dfuse_caching_check Signed-off-by: Ashley Pittman --- src/client/dfuse/ops/read.c | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/src/client/dfuse/ops/read.c b/src/client/dfuse/ops/read.c index ec871bfc771..f7ab60e419a 100644 --- a/src/client/dfuse/ops/read.c +++ b/src/client/dfuse/ops/read.c @@ -192,8 +192,12 @@ chunk_cb(struct dfuse_event *ev) cd->rc = ev->de_ev.ev_error; - if (ev->de_len != CHUNK_SIZE) + if (cd->rc == 0 && (ev->de_len != CHUNK_SIZE)) { cd->rc = EIO; + DS_WARN(cd->rc, "Unexpected short read expected %i got %zi", CHUNK_SIZE, + ev->de_len); + } + daos_event_fini(&ev->de_ev); do { @@ -312,7 +316,7 @@ chunk_read(fuse_req_t req, size_t len, off_t position, struct dfuse_obj_hdl *oh) int bucket; int slot; bool submit = false; - bool rcb = false; + bool rcb; last = position + ((position + (K128 - 1)) & -K128); @@ -375,6 +379,8 @@ chunk_read(fuse_req_t req, size_t len, off_t position, struct dfuse_obj_hdl *oh) * required. For now this is safe but will maintain all previously read events * in memory. */ + rcb = true; + D_MUTEX_LOCK(&rc_lock); if (cd->complete) { cd->done[slot] = true; @@ -387,14 +393,16 @@ chunk_read(fuse_req_t req, size_t len, off_t position, struct dfuse_obj_hdl *oh) if (ev) { if (cd->rc != 0) { - DFUSE_REPLY_ERR_RAW(oh, req, cd->rc); + /* Don't pass fuse an error here, rather return false and the read + * will be tried over the network. + */ + rcb = false; } else { DFUSE_TRA_DEBUG(oh, "%#zx-%#zx read", position, position + K128 - 1); DFUSE_REPLY_BUFQ(oh, req, ev->de_iov.iov_buf + (slot * K128), K128); } } - rcb = true; } return rcb; From 1dd78eaa6d48153a0411bb0ddba712d37285a36d Mon Sep 17 00:00:00 2001 From: Ashley Pittman Date: Fri, 26 Apr 2024 19:06:31 +0000 Subject: [PATCH 06/14] Improve error logging. Skip-unit-tests: true Skip-fault-injection-test: true Test-tag: test_dfuse_caching_check Signed-off-by: Ashley Pittman --- src/client/dfuse/ops/read.c | 10 +++------- src/tests/ftest/dfuse/caching_check.py | 7 ++++++- src/tests/ftest/dfuse/caching_check.yaml | 5 +++++ 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/src/client/dfuse/ops/read.c b/src/client/dfuse/ops/read.c index f7ab60e419a..eba7f86a06a 100644 --- a/src/client/dfuse/ops/read.c +++ b/src/client/dfuse/ops/read.c @@ -194,8 +194,8 @@ chunk_cb(struct dfuse_event *ev) if (cd->rc == 0 && (ev->de_len != CHUNK_SIZE)) { cd->rc = EIO; - DS_WARN(cd->rc, "Unexpected short read expected %i got %zi", CHUNK_SIZE, - ev->de_len); + DS_WARN(cd->rc, "Unexpected short read bucket %d (%#zx) expected %i got %zi", + cd->bucket, (off_t)cd->bucket * CHUNK_SIZE, CHUNK_SIZE, ev->de_len); } daos_event_fini(&ev->de_ev); @@ -318,8 +318,6 @@ chunk_read(fuse_req_t req, size_t len, off_t position, struct dfuse_obj_hdl *oh) bool submit = false; bool rcb; - last = position + ((position + (K128 - 1)) & -K128); - if (len != K128) return false; @@ -459,9 +457,7 @@ dfuse_cb_read(fuse_req_t req, fuse_ino_t ino, size_t len, off_t position, struct } } - if (oh->doh_ie->ie_dfs->dfc_data_timeout != 0 && - ((atomic_fetch_add(&oh->doh_ie->ie_il_count, 0) == 0)) && - chunk_read(req, len, position, oh)) + if (chunk_read(req, len, position, oh)) return; eqt = pick_eqt(dfuse_info); diff --git a/src/tests/ftest/dfuse/caching_check.py b/src/tests/ftest/dfuse/caching_check.py index 852a24f0dfd..c32627a741a 100644 --- a/src/tests/ftest/dfuse/caching_check.py +++ b/src/tests/ftest/dfuse/caching_check.py @@ -45,13 +45,16 @@ def test_dfuse_caching_check(self): self.log_step('Write to the dfuse mount point') self.run_ior_with_pool(fail_on_warning=False, stop_dfuse=False) + self.dfuse.get_stats() self.log_step('Get baseline read performance from dfuse with caching disabled') self.ior_cmd.update_params(flags=flags[1]) base_read_arr = [] out = self.run_ior_with_pool(fail_on_warning=False, stop_dfuse=False) + self.dfuse.get_stats() base_read_arr.append(IorCommand.get_ior_metrics(out)) out = self.run_ior_with_pool(fail_on_warning=False, stop_dfuse=False) + self.dfuse.get_stats() base_read_arr.append(IorCommand.get_ior_metrics(out)) # the index of max_mib @@ -64,10 +67,12 @@ def test_dfuse_caching_check(self): self.log_step('Get first read performance with caching enabled') out = self.run_ior_with_pool(fail_on_warning=False, stop_dfuse=False) + self.dfuse.get_stats() base_read_arr.append(IorCommand.get_ior_metrics(out)) self.log_step('Get cached read performance') - out = self.run_ior_with_pool(fail_on_warning=False) + out = self.run_ior_with_pool(fail_on_warning=False, stop_dfuse=False) + self.dfuse.get_stats() with_caching = IorCommand.get_ior_metrics(out) self.log_step('Verify cached read performance is greater than first read') diff --git a/src/tests/ftest/dfuse/caching_check.yaml b/src/tests/ftest/dfuse/caching_check.yaml index 42a9b3f3dc3..0fe79a57c84 100644 --- a/src/tests/ftest/dfuse/caching_check.yaml +++ b/src/tests/ftest/dfuse/caching_check.yaml @@ -31,3 +31,8 @@ ior: dfuse: disable_caching: true disable_wb_caching: true + client: + env_vars: + - D_LOG_MASK=INFO,DFUSE=DEBUG,DFS=DEBUG + - DD_MASK=ALL + - DD_SUBSYS=ALL From 8691c6749a3949590a4dbd68e84773e0782e114e Mon Sep 17 00:00:00 2001 From: Ashley Pittman Date: Sun, 28 Apr 2024 14:10:06 +0000 Subject: [PATCH 07/14] Make bucket larger, 64 bit type. Skip-unit-tests: true Skip-fault-injection-test: true Test-tag: test_dfuse_caching_check Signed-off-by: Ashley Pittman --- src/client/dfuse/ops/read.c | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/client/dfuse/ops/read.c b/src/client/dfuse/ops/read.c index eba7f86a06a..c6060d8f93e 100644 --- a/src/client/dfuse/ops/read.c +++ b/src/client/dfuse/ops/read.c @@ -132,7 +132,7 @@ struct read_chunk_data { struct dfuse_obj_hdl *ohs[8]; bool done[8]; d_list_t list; - int bucket; + uint64_t bucket; bool complete; struct dfuse_eq *eqt; bool exiting; @@ -194,8 +194,8 @@ chunk_cb(struct dfuse_event *ev) if (cd->rc == 0 && (ev->de_len != CHUNK_SIZE)) { cd->rc = EIO; - DS_WARN(cd->rc, "Unexpected short read bucket %d (%#zx) expected %i got %zi", - cd->bucket, (off_t)cd->bucket * CHUNK_SIZE, CHUNK_SIZE, ev->de_len); + DS_WARN(cd->rc, "Unexpected short read bucket %ld (%#zx) expected %i got %zi", + cd->bucket, cd->bucket * CHUNK_SIZE, CHUNK_SIZE, ev->de_len); } daos_event_fini(&ev->de_ev); @@ -313,7 +313,7 @@ chunk_read(fuse_req_t req, size_t len, off_t position, struct dfuse_obj_hdl *oh) struct read_chunk_core *cc; struct read_chunk_data *cd; off_t last; - int bucket; + uint64_t bucket; int slot; bool submit = false; bool rcb; @@ -334,8 +334,8 @@ chunk_read(fuse_req_t req, size_t len, off_t position, struct dfuse_obj_hdl *oh) slot = (position / K128) % 8; - DFUSE_TRA_DEBUG(oh, "read bucket %#zx-%#zx last %#zx size %#zx bucket %d slot %d", position, - position + len - 1, last, ie->ie_stat.st_size, bucket, slot); + DFUSE_TRA_DEBUG(oh, "read bucket %#zx-%#zx last %#zx size %#zx bucket %ld slot %d", + position, position + len - 1, last, ie->ie_stat.st_size, bucket, slot); D_MUTEX_LOCK(&rc_lock); if (ie->ie_chunk == NULL) { @@ -362,7 +362,7 @@ chunk_read(fuse_req_t req, size_t len, off_t position, struct dfuse_obj_hdl *oh) D_MUTEX_UNLOCK(&rc_lock); if (submit) { - DFUSE_TRA_DEBUG(oh, "submit for bucket %d[%d]", bucket, slot); + DFUSE_TRA_DEBUG(oh, "submit for bucket %ld[%d]", bucket, slot); rcb = chunk_fetch(req, oh, cd, slot); } else { struct dfuse_event *ev = NULL; From fa5689f6bda71503dabf9980fa9b5d3d018a547e Mon Sep 17 00:00:00 2001 From: Ashley Pittman Date: Sun, 28 Apr 2024 16:37:57 +0000 Subject: [PATCH 08/14] Turn off debugging again. Skip-unit-tests: true Skip-fault-injection-test: true Test-tag: test_dfuse_caching_check Signed-off-by: Ashley Pittman --- src/tests/ftest/dfuse/caching_check.yaml | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/tests/ftest/dfuse/caching_check.yaml b/src/tests/ftest/dfuse/caching_check.yaml index 0fe79a57c84..42a9b3f3dc3 100644 --- a/src/tests/ftest/dfuse/caching_check.yaml +++ b/src/tests/ftest/dfuse/caching_check.yaml @@ -31,8 +31,3 @@ ior: dfuse: disable_caching: true disable_wb_caching: true - client: - env_vars: - - D_LOG_MASK=INFO,DFUSE=DEBUG,DFS=DEBUG - - DD_MASK=ALL - - DD_SUBSYS=ALL From 6ada20deab0e50d027a27731f83a94baffc638cc Mon Sep 17 00:00:00 2001 From: Ashley Pittman Date: Mon, 29 Apr 2024 07:27:54 +0000 Subject: [PATCH 09/14] Move test to vm. Skip-unit-tests: true Skip-fault-injection-test: true Test-tag: test_dfuse_caching_check Signed-off-by: Ashley Pittman --- src/tests/ftest/dfuse/caching_check.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/tests/ftest/dfuse/caching_check.py b/src/tests/ftest/dfuse/caching_check.py index c32627a741a..2751c9778fa 100644 --- a/src/tests/ftest/dfuse/caching_check.py +++ b/src/tests/ftest/dfuse/caching_check.py @@ -1,5 +1,5 @@ """ - (C) Copyright 2019-2023 Intel Corporation. + (C) Copyright 2019-2024 Intel Corporation. SPDX-License-Identifier: BSD-2-Clause-Patent """ @@ -32,7 +32,7 @@ def test_dfuse_caching_check(self): higher than with caching disabled. :avocado: tags=all,full_regression - :avocado: tags=hw,medium + :avocado: tags=vm :avocado: tags=daosio,dfuse :avocado: tags=DfuseCachingCheck,test_dfuse_caching_check """ From cf31ce9b02c4d901a1f2c6e10439b4dec8f26af0 Mon Sep 17 00:00:00 2001 From: Ashley Pittman Date: Mon, 29 Apr 2024 12:58:14 +0000 Subject: [PATCH 10/14] Reference count the descriptors and free when done. Skip-unit-tests: true Skip-fault-injection-test: true Test-tag: dfuse Signed-off-by: Ashley Pittman --- src/client/dfuse/dfuse.h | 7 ++- src/client/dfuse/ops/open.c | 3 +- src/client/dfuse/ops/read.c | 75 ++++++++++++++++---------- src/tests/ftest/dfuse/caching_check.py | 2 +- 4 files changed, 55 insertions(+), 32 deletions(-) diff --git a/src/client/dfuse/dfuse.h b/src/client/dfuse/dfuse.h index 25eb4637054..da429605aef 100644 --- a/src/client/dfuse/dfuse.h +++ b/src/client/dfuse/dfuse.h @@ -1111,8 +1111,11 @@ dfuse_compute_inode(struct dfuse_cont *dfs, void dfuse_cache_evict_dir(struct dfuse_info *dfuse_info, struct dfuse_inode_entry *ie); -/* Free any read chunk data for an inode */ -void +/* Free any read chunk data for an inode. + * + * Returns true if feature was used. + */ +bool read_chunk_close(struct dfuse_inode_entry *ie); /* Metadata caching functions. */ diff --git a/src/client/dfuse/ops/open.c b/src/client/dfuse/ops/open.c index 8c9dd4158d4..772e92e1c13 100644 --- a/src/client/dfuse/ops/open.c +++ b/src/client/dfuse/ops/open.c @@ -209,7 +209,8 @@ dfuse_cb_release(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi) } oc = atomic_fetch_sub_relaxed(&oh->doh_ie->ie_open_count, 1); if (oc == 1) { - read_chunk_close(oh->doh_ie); + if (read_chunk_close(oh->doh_ie)) + oh->doh_linear_read = true; } if (oh->doh_evict_on_close) { diff --git a/src/client/dfuse/ops/read.c b/src/client/dfuse/ops/read.c index c6060d8f93e..92afdad951e 100644 --- a/src/client/dfuse/ops/read.c +++ b/src/client/dfuse/ops/read.c @@ -110,7 +110,7 @@ pick_eqt(struct dfuse_info *dfuse_info) return &dfuse_info->di_eqt[eqt_idx % dfuse_info->di_eq_count]; } -/* Readahead and coalescing +/* Chunk read and coalescing * * This code attempts to predict application and kernel I/O patterns and preemptively read file * data ahead of when it's requested. @@ -120,7 +120,14 @@ pick_eqt(struct dfuse_info *dfuse_info) * are received and read an entire buffers worth, then for future requests the data should already * be in cache. * - * TODO: Ensure that the linear_read flag remains set correctly by this code. + * This code is entered when caching is enabled and reads are correctly size/aligned and not in the + * last CHUNK_SIZE of a file. When open then the inode contains a single read_chunk_core pointer + * and this contains a list of read_chunk_data entries, one for each bucket. Buckets where all + * slots have been requested are remove from the list and closed when the last request is completed. + * + * TODO: Currently there is no code to remove partially read buckets from the list so reading + * one slot every chunk would leave the entire file contents in memory until close and mean long + * list traversal times. */ #define K128 (1024 * 128) @@ -130,20 +137,21 @@ struct read_chunk_data { struct dfuse_event *ev; fuse_req_t reqs[8]; struct dfuse_obj_hdl *ohs[8]; - bool done[8]; d_list_t list; uint64_t bucket; - bool complete; struct dfuse_eq *eqt; - bool exiting; int rc; + int entered; + ATOMIC int exited; + bool exiting; + bool complete; }; struct read_chunk_core { d_list_t entries; }; -/* Global lock for all readahead operations. Each inode has a struct read_chunk_core * entry +/* Global lock for all chunk read operations. Each inode has a struct read_chunk_core * entry * which is checked for NULL and set whilst holding this lock. Each read_chunk_core then has * a list of read_chunk_data and again, this lock protects all lists on all inodes. This avoids * the need for a per-inode lock which for many files would consume considerable memory but does @@ -162,16 +170,21 @@ chunk_free(struct read_chunk_data *cd) /* Called when the last open file handle on a inode is closed. This needs to free everything which * is complete and for anything that isn't flag it for deletion in the callback. + * + * Returns true if the feature was used. */ -void +bool read_chunk_close(struct dfuse_inode_entry *ie) { struct read_chunk_data *cd, *cdn; + bool rcb = false; D_MUTEX_LOCK(&rc_lock); if (!ie->ie_chunk) goto out; + rcb = true; + d_list_for_each_entry_safe(cd, cdn, &ie->ie_chunk->entries, list) { if (cd->complete) { chunk_free(cd); @@ -182,6 +195,7 @@ read_chunk_close(struct dfuse_inode_entry *ie) D_FREE(ie->ie_chunk); out: D_MUTEX_UNLOCK(&rc_lock); + return rcb; } static void @@ -189,6 +203,7 @@ chunk_cb(struct dfuse_event *ev) { struct read_chunk_data *cd = ev->de_cd; fuse_req_t req; + bool done = false; cd->rc = ev->de_ev.ev_error; @@ -202,8 +217,6 @@ chunk_cb(struct dfuse_event *ev) do { int i; - int done_count = 0; - req = 0; D_MUTEX_LOCK(&rc_lock); @@ -219,18 +232,10 @@ chunk_cb(struct dfuse_event *ev) if (cd->reqs[i]) { req = cd->reqs[i]; cd->reqs[i] = 0; - cd->done[i] = true; break; - } else if (cd->done[i]) { - done_count++; } } - if (done_count == 8) { - D_ASSERT(!req); - d_slab_release(cd->eqt->de_read_slab, cd->ev); - cd->ev = NULL; - } D_MUTEX_UNLOCK(&rc_lock); if (req) { @@ -244,8 +249,16 @@ chunk_cb(struct dfuse_event *ev) DFUSE_REPLY_BUFQ(cd->ohs[i], req, ev->de_iov.iov_buf + (i * K128), K128); } + + if (atomic_fetch_add_relaxed(&cd->exited, 1) == 7) + done = true; } - } while (req); + } while (req && !done); + + if (done) { + d_slab_release(cd->eqt->de_read_slab, cd->ev); + D_FREE(cd); + } } /* Submut a read to dfs. @@ -347,18 +360,26 @@ chunk_read(fuse_req_t req, size_t len, off_t position, struct dfuse_obj_hdl *oh) cc = ie->ie_chunk; d_list_for_each_entry(cd, &cc->entries, list) - if (cd->bucket == bucket) + if (cd->bucket == bucket) { + /* Remove from list to re-add again later. */ + d_list_del(&cd->list); goto found; + } D_ALLOC_PTR(cd); if (cd == NULL) goto err; - d_list_add_tail(&cd->list, &cc->entries); cd->bucket = bucket; submit = true; found: + + if (++cd->entered < 8) { + /* Put on front of list for efficient searching */ + d_list_add(&cd->list, &cc->entries); + } + D_MUTEX_UNLOCK(&rc_lock); if (submit) { @@ -367,21 +388,15 @@ chunk_read(fuse_req_t req, size_t len, off_t position, struct dfuse_obj_hdl *oh) } else { struct dfuse_event *ev = NULL; - /* Not check if this read request is complete or not yet, if it isn't then just + /* Now check if this read request is complete or not yet, if it isn't then just * save req in the right slot however if it is then reply here. After the call to * DFUSE_REPLY_* then no reference is held on either the open file or the inode so - * at that point they could be closed, so decided if ev needs to be released whilst - * holding the lock and keep a copy of all data structures needed. - * - * TODO: Implement this in a safe way. A reference count on ev is probably - * required. For now this is safe but will maintain all previously read events - * in memory. + * at that point they could be closed. */ rcb = true; D_MUTEX_LOCK(&rc_lock); if (cd->complete) { - cd->done[slot] = true; ev = cd->ev; } else { cd->reqs[slot] = req; @@ -400,6 +415,10 @@ chunk_read(fuse_req_t req, size_t len, off_t position, struct dfuse_obj_hdl *oh) position + K128 - 1); DFUSE_REPLY_BUFQ(oh, req, ev->de_iov.iov_buf + (slot * K128), K128); } + if (atomic_fetch_add_relaxed(&cd->exited, 1) == 7) { + d_slab_release(cd->eqt->de_read_slab, cd->ev); + D_FREE(cd); + } } } diff --git a/src/tests/ftest/dfuse/caching_check.py b/src/tests/ftest/dfuse/caching_check.py index 2751c9778fa..2d9332a6dc1 100644 --- a/src/tests/ftest/dfuse/caching_check.py +++ b/src/tests/ftest/dfuse/caching_check.py @@ -32,7 +32,7 @@ def test_dfuse_caching_check(self): higher than with caching disabled. :avocado: tags=all,full_regression - :avocado: tags=vm + :avocado: tags=hw,medium :avocado: tags=daosio,dfuse :avocado: tags=DfuseCachingCheck,test_dfuse_caching_check """ From a43df4932e6cc7ef2baa39efd3229ae8fbe860a7 Mon Sep 17 00:00:00 2001 From: Ashley Pittman Date: Tue, 30 Apr 2024 07:55:49 +0000 Subject: [PATCH 11/14] Fix formatting. Features: dfuse,-test_dfuse_find,-test_dfuse_daos_build_wt_pil4dfs Signed-off-by: Ashley Pittman --- src/client/dfuse/ops/read.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client/dfuse/ops/read.c b/src/client/dfuse/ops/read.c index 271d10e28a4..1ac00a5b7f4 100644 --- a/src/client/dfuse/ops/read.c +++ b/src/client/dfuse/ops/read.c @@ -398,7 +398,7 @@ chunk_read(fuse_req_t req, size_t len, off_t position, struct dfuse_obj_hdl *oh) D_MUTEX_LOCK(&rc_lock); if (cd->complete) { - ev = cd->ev; + ev = cd->ev; } else { cd->reqs[slot] = req; cd->ohs[slot] = oh; From 2bd425693d510b7cfabb6859cfd495f3b2677ba5 Mon Sep 17 00:00:00 2001 From: Ashley Pittman Date: Thu, 2 May 2024 07:15:51 +0000 Subject: [PATCH 12/14] Fix merge. Features: dfuse Signed-off-by: Ashley Pittman --- src/client/dfuse/ops/read.c | 1 - src/tests/ftest/dfuse/caching_check.py | 9 ++------- utils/node_local_test.py | 12 ++---------- 3 files changed, 4 insertions(+), 18 deletions(-) diff --git a/src/client/dfuse/ops/read.c b/src/client/dfuse/ops/read.c index 8185b8a57de..43ce9328230 100644 --- a/src/client/dfuse/ops/read.c +++ b/src/client/dfuse/ops/read.c @@ -140,7 +140,6 @@ pick_eqt(struct dfuse_info *dfuse_info) * list traversal times. */ -#define K128 (1024 * 128) #define CHUNK_SIZE (1024 * 1024) struct read_chunk_data { diff --git a/src/tests/ftest/dfuse/caching_check.py b/src/tests/ftest/dfuse/caching_check.py index 2d9332a6dc1..852a24f0dfd 100644 --- a/src/tests/ftest/dfuse/caching_check.py +++ b/src/tests/ftest/dfuse/caching_check.py @@ -1,5 +1,5 @@ """ - (C) Copyright 2019-2024 Intel Corporation. + (C) Copyright 2019-2023 Intel Corporation. SPDX-License-Identifier: BSD-2-Clause-Patent """ @@ -45,16 +45,13 @@ def test_dfuse_caching_check(self): self.log_step('Write to the dfuse mount point') self.run_ior_with_pool(fail_on_warning=False, stop_dfuse=False) - self.dfuse.get_stats() self.log_step('Get baseline read performance from dfuse with caching disabled') self.ior_cmd.update_params(flags=flags[1]) base_read_arr = [] out = self.run_ior_with_pool(fail_on_warning=False, stop_dfuse=False) - self.dfuse.get_stats() base_read_arr.append(IorCommand.get_ior_metrics(out)) out = self.run_ior_with_pool(fail_on_warning=False, stop_dfuse=False) - self.dfuse.get_stats() base_read_arr.append(IorCommand.get_ior_metrics(out)) # the index of max_mib @@ -67,12 +64,10 @@ def test_dfuse_caching_check(self): self.log_step('Get first read performance with caching enabled') out = self.run_ior_with_pool(fail_on_warning=False, stop_dfuse=False) - self.dfuse.get_stats() base_read_arr.append(IorCommand.get_ior_metrics(out)) self.log_step('Get cached read performance') - out = self.run_ior_with_pool(fail_on_warning=False, stop_dfuse=False) - self.dfuse.get_stats() + out = self.run_ior_with_pool(fail_on_warning=False) with_caching = IorCommand.get_ior_metrics(out) self.log_step('Verify cached read performance is greater than first read') diff --git a/utils/node_local_test.py b/utils/node_local_test.py index cfe5c0072c5..095a2b7c58c 100755 --- a/utils/node_local_test.py +++ b/utils/node_local_test.py @@ -5004,14 +5004,13 @@ def run_in_fg(server, conf, args): # Set to False to run dfuse without a pool. pool_on_cmd_line = True - cont_on_cmd_line = True if not container: container = create_cont(conf, pool, label=label, ctype="POSIX") # Only set the container cache attributes when the container is initially created so they # can be modified later. - cont_attrs = {'dfuse-data-cache': 120, + cont_attrs = {'dfuse-data-cache': False, 'dfuse-attr-time': 67, 'dfuse-dentry-time': 19, 'dfuse-dentry-dir-time': 31, @@ -5027,9 +5026,6 @@ def run_in_fg(server, conf, args): if pool_on_cmd_line: dargs['pool'] = pool.uuid - if cont_on_cmd_line: - dargs["container"] = container - dfuse = DFuse(server, conf, **dargs) @@ -5038,11 +5034,7 @@ def run_in_fg(server, conf, args): dfuse.start() if pool_on_cmd_line: - if cont_on_cmd_line: - t_dir = dfuse.dir - else: - t_dir = join(dfuse.dir, container) - + t_dir = join(dfuse.dir, container) else: t_dir = join(dfuse.dir, pool.uuid, container) From b035b8a7ca92e526725c5006e355794dc8b67201 Mon Sep 17 00:00:00 2001 From: Jeff Olivier Date: Thu, 30 May 2024 09:23:42 -0600 Subject: [PATCH 13/14] Features: dfuse Required-githooks: true Signed-off-by: Jeff Olivier From 89822d9ad30f11fc1c55d5bf59190121a2d2c2d0 Mon Sep 17 00:00:00 2001 From: Jeff Olivier Date: Fri, 31 May 2024 12:41:13 -0600 Subject: [PATCH 14/14] Features: dfuse Allow-unstable-test: true Required-githooks: true Signed-off-by: Jeff Olivier