diff --git a/src/client/dfuse/dfuse.h b/src/client/dfuse/dfuse.h index f239f2354930..d4c43ef6b6b5 100644 --- a/src/client/dfuse/dfuse.h +++ b/src/client/dfuse/dfuse.h @@ -396,6 +396,7 @@ struct dfuse_event { union { struct dfuse_obj_hdl *de_oh; struct dfuse_inode_entry *de_ie; + struct read_chunk_data *de_cd; }; off_t de_req_position; /**< The file position requested by fuse */ union { @@ -1001,6 +1002,8 @@ struct dfuse_inode_entry { /* Entry on the evict list */ d_list_t ie_evict_entry; + + struct read_chunk_core *ie_chunk; }; /* Flush write-back cache writes to a inode. It does this by waiting for and then releasing an @@ -1098,6 +1101,10 @@ dfuse_compute_inode(struct dfuse_cont *dfs, void dfuse_cache_evict_dir(struct dfuse_info *dfuse_info, struct dfuse_inode_entry *ie); +/* Free any read chunk data for an inode */ +void +read_chunk_close(struct dfuse_inode_entry *ie); + /* Metadata caching functions. */ /* Mark the cache as up-to-date from now */ diff --git a/src/client/dfuse/ops/open.c b/src/client/dfuse/ops/open.c index 42a7d6e10fdf..4d52fdc84fb1 100644 --- a/src/client/dfuse/ops/open.c +++ b/src/client/dfuse/ops/open.c @@ -137,6 +137,7 @@ dfuse_cb_release(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi) struct dfuse_obj_hdl *oh = (struct dfuse_obj_hdl *)fi->fh; struct dfuse_inode_entry *ie = NULL; int rc; + uint32_t oc; uint32_t il_calls; /* Perform the opposite of what the ioctl call does, always change the open handle count @@ -204,7 +205,10 @@ dfuse_cb_release(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi) if (il_calls != 0) { atomic_fetch_sub_relaxed(&oh->doh_ie->ie_il_count, 1); } - atomic_fetch_sub_relaxed(&oh->doh_ie->ie_open_count, 1); + oc = atomic_fetch_sub_relaxed(&oh->doh_ie->ie_open_count, 1); + if (oc == 1) { + read_chunk_close(oh->doh_ie); + } if (oh->doh_evict_on_close) { ie = oh->doh_ie; diff --git a/src/client/dfuse/ops/read.c b/src/client/dfuse/ops/read.c index 0ea1d61ef39d..5914ab0b63fe 100644 --- a/src/client/dfuse/ops/read.c +++ b/src/client/dfuse/ops/read.c @@ -1,5 +1,5 @@ /** - * (C) Copyright 2016-2023 Intel Corporation. + * (C) Copyright 2016-2024 Intel Corporation. * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -100,6 +100,309 @@ dfuse_readahead_reply(fuse_req_t req, size_t len, off_t position, struct dfuse_o return true; } +static struct dfuse_eq * +pick_eqt(struct dfuse_info *dfuse_info) +{ + uint64_t eqt_idx; + + eqt_idx = atomic_fetch_add_relaxed(&dfuse_info->di_eqt_idx, 1); + return &dfuse_info->di_eqt[eqt_idx % dfuse_info->di_eq_count]; +} + +/* Readahead and coalescing + * + * This code attempts to predict application and kernel I/O patterns and preemptively read file + * data ahead of when it's requested. + * + * For some kernels read I/O size is limited to 128k when using the page cache or 1Mb when using + * direct I/O. To get around the performance impact of them detect when well aligned 128k reads + * are received and read an entire buffers worth, then for future requests the data should already + * be in cache. + * + * TODO: Ensure that the linear_read flag remains set correctly by this code. + */ + +#define K128 (1024 * 128) +#define CHUNK_SIZE (1024 * 1024) + +struct read_chunk_data { + struct dfuse_event *ev; + fuse_req_t reqs[8]; + struct dfuse_obj_hdl *ohs[8]; + bool done[8]; + d_list_t list; + int bucket; + bool complete; + struct dfuse_eq *eqt; + bool exiting; + int rc; +}; + +struct read_chunk_core { + d_list_t entries; +}; + +/* Global lock for all readahead operations. Each inode has a struct read_chunk_core * entry + * which is checked for NULL and set whilst holding this lock. Each read_chunk_core then has + * a list of read_chunk_data and again, this lock protects all lists on all inodes. This avoids + * the need for a per-inode lock which for many files would consume considerable memory but does + * mean there is potentially some lock contention. The lock however is only held for list + * manipulation, no dfs or kernel calls are made whilst holding the lock. + */ +static pthread_mutex_t rc_lock = PTHREAD_MUTEX_INITIALIZER; + +static void +chunk_free(struct read_chunk_data *cd) +{ + d_list_del(&cd->list); + d_slab_release(cd->eqt->de_read_slab, cd->ev); + D_FREE(cd); +} + +/* Called when the last open file handle on a inode is closed. This needs to free everything which + * is complete and for anything that isn't flag it for deletion in the callback. + */ +void +read_chunk_close(struct dfuse_inode_entry *ie) +{ + struct read_chunk_data *cd, *cdn; + + D_MUTEX_LOCK(&rc_lock); + if (!ie->ie_chunk) + goto out; + + d_list_for_each_entry_safe(cd, cdn, &ie->ie_chunk->entries, list) { + if (cd->complete) { + chunk_free(cd); + } else { + cd->exiting = true; + } + } + D_FREE(ie->ie_chunk); +out: + D_MUTEX_UNLOCK(&rc_lock); +} + +static void +chunk_cb(struct dfuse_event *ev) +{ + struct read_chunk_data *cd = ev->de_cd; + fuse_req_t req; + + cd->rc = ev->de_ev.ev_error; + daos_event_fini(&ev->de_ev); + + do { + int i; + int done_count = 0; + + req = 0; + + D_MUTEX_LOCK(&rc_lock); + + if (cd->exiting) { + chunk_free(cd); + D_MUTEX_UNLOCK(&rc_lock); + return; + } + + cd->complete = true; + for (i = 0; i < 8; i++) { + if (cd->reqs[i]) { + req = cd->reqs[i]; + cd->reqs[i] = 0; + cd->done[i] = true; + break; + } else if (cd->done[i]) { + done_count++; + } + } + + if (done_count == 8) { + D_ASSERT(!req); + d_slab_release(cd->eqt->de_read_slab, cd->ev); + cd->ev = NULL; + } + D_MUTEX_UNLOCK(&rc_lock); + + if (req) { + size_t position = (cd->bucket * CHUNK_SIZE) + (i * K128); + + if (cd->rc != 0) { + DFUSE_REPLY_ERR_RAW(cd->ohs[i], req, cd->rc); + } else { + DFUSE_TRA_DEBUG(cd->ohs[i], "%#zx-%#zx read", position, + position + K128 - 1); + DFUSE_REPLY_BUFQ(cd->ohs[i], req, ev->de_iov.iov_buf + (i * K128), + K128); + } + } + } while (req); +} + +/* Submut a read to dfs. + * + * Returns true on success. + */ +static bool +chunk_fetch(fuse_req_t req, struct dfuse_obj_hdl *oh, struct read_chunk_data *cd, int bucket, + int slot, struct dfuse_inode_entry *ie) +{ + struct dfuse_info *dfuse_info = fuse_req_userdata(req); + struct dfuse_event *ev; + struct dfuse_eq *eqt; + int rc; + + eqt = pick_eqt(dfuse_info); + + ev = d_slab_acquire(eqt->de_read_slab); + if (ev == NULL) { + cd->rc = ENOMEM; + return false; + } + + ev->de_iov.iov_len = CHUNK_SIZE; + ev->de_req = req; + ev->de_cd = cd; + ev->de_sgl.sg_nr = 1; + ev->de_req_len = CHUNK_SIZE; + ev->de_req_position = CHUNK_SIZE * bucket; + + ev->de_complete_cb = chunk_cb; + + cd->ev = ev; + cd->eqt = eqt; + cd->reqs[slot] = req; + cd->ohs[slot] = oh; + + rc = dfs_read(ie->ie_dfs->dfs_ns, ie->ie_obj, &ev->de_sgl, ev->de_req_position, &ev->de_len, + &ev->de_ev); + if (rc != 0) + goto err; + + /* Send a message to the async thread to wake it up and poll for events */ + sem_post(&eqt->de_sem); + + /* Now ensure there are more descriptors for the next request */ + d_slab_restock(eqt->de_read_slab); + + return true; + +err: + daos_event_fini(&ev->de_ev); + d_slab_release(eqt->de_read_slab, ev); + cd->rc = rc; + return false; +} + +/* Try and do a bulk read. + * + * Returns true if it was able to handle the read. + */ +static bool +chunk_read(fuse_req_t req, size_t len, off_t position, struct dfuse_obj_hdl *oh) +{ + struct dfuse_inode_entry *ie = oh->doh_ie; + struct read_chunk_core *cc; + struct read_chunk_data *cd; + off_t last; + int bucket; + int slot; + bool submit = false; + bool rcb = false; + + last = position = (position + (K128 - 1)) & -K128; + + if (len != K128) + return false; + + if ((position % K128) != 0) + return false; + + last = D_ALIGNUP(position + len - 1, CHUNK_SIZE); + + if (last > oh->doh_ie->ie_stat.st_size) + return false; + + bucket = D_ALIGNUP(position + len, CHUNK_SIZE); + bucket = (bucket / CHUNK_SIZE) - 1; + + slot = (position / K128) % 8; + + DFUSE_TRA_DEBUG(oh, "read bucket %#zx-%#zx last %#zx size %#zx bucket %d slot %d", position, + position + len - 1, last, ie->ie_stat.st_size, bucket, slot); + + D_MUTEX_LOCK(&rc_lock); + if (ie->ie_chunk == NULL) { + D_ALLOC_PTR(ie->ie_chunk); + if (ie->ie_chunk == NULL) + goto err; + D_INIT_LIST_HEAD(&ie->ie_chunk->entries); + } + cc = ie->ie_chunk; + + d_list_for_each_entry(cd, &cc->entries, list) + if (cd->bucket == bucket) + goto found; + + D_ALLOC_PTR(cd); + if (cd == NULL) + goto err; + + d_list_add_tail(&cd->list, &cc->entries); + cd->bucket = bucket; + submit = true; + +found: + D_MUTEX_UNLOCK(&rc_lock); + + cd->reqs[slot] = req; + + if (submit) { + DFUSE_TRA_DEBUG(oh, "submit for bucket %d[%d]", bucket, slot); + rcb = chunk_fetch(req, oh, cd, bucket, slot, ie); + } else { + struct dfuse_event *ev = NULL; + + /* Not check if this read request is complete or not yet, if it isn't then just + * save req in the right slot however if it is then reply here. After the call to + * DFUSE_REPLY_* then no reference is held on either the open file or the inode so + * at that point they could be closed, so decided if ev needs to be released whilst + * holding the lock and keep a copy of all data structures needed. + * + * TODO: Implement this in a safe way. A reference count on ev is probably + * required. For now this is safe but will maintain all previously read events + * in memory. + */ + D_MUTEX_LOCK(&rc_lock); + if (cd->complete) { + cd->done[slot] = true; + ev = ev->de_iov.iov_buf; + } else { + cd->reqs[slot] = req; + cd->ohs[slot] = oh; + } + D_MUTEX_UNLOCK(&rc_lock); + + if (ev) { + if (cd->rc != 0) { + DFUSE_REPLY_ERR_RAW(oh, req, cd->rc); + } else { + DFUSE_TRA_DEBUG(oh, "%#zx-%#zx read", position, + position + K128 - 1); + DFUSE_REPLY_BUFQ(oh, req, ev->de_iov.iov_buf + (slot * K128), K128); + } + } + rcb = true; + } + + return rcb; + +err: + D_MUTEX_UNLOCK(&rc_lock); + return false; +} + void dfuse_cb_read(fuse_req_t req, fuse_ino_t ino, size_t len, off_t position, struct fuse_file_info *fi) { @@ -109,7 +412,6 @@ dfuse_cb_read(fuse_req_t req, fuse_ino_t ino, size_t len, off_t position, struct struct dfuse_eq *eqt; int rc; struct dfuse_event *ev; - uint64_t eqt_idx; DFUSE_IE_STAT_ADD(oh->doh_ie, DS_READ); @@ -146,8 +448,10 @@ dfuse_cb_read(fuse_req_t req, fuse_ino_t ino, size_t len, off_t position, struct } } - eqt_idx = atomic_fetch_add_relaxed(&dfuse_info->di_eqt_idx, 1); - eqt = &dfuse_info->di_eqt[eqt_idx % dfuse_info->di_eq_count]; + if (oh->doh_ie->ie_dfs->dfc_data_timeout != 0 && chunk_read(req, len, position, oh)) + return; + + eqt = pick_eqt(dfuse_info); ev = d_slab_acquire(eqt->de_read_slab); if (ev == NULL) @@ -242,13 +546,10 @@ dfuse_pre_read(struct dfuse_info *dfuse_info, struct dfuse_obj_hdl *oh) struct dfuse_eq *eqt; int rc; struct dfuse_event *ev; - uint64_t eqt_idx; size_t len = oh->doh_ie->ie_stat.st_size; - eqt_idx = atomic_fetch_add_relaxed(&dfuse_info->di_eqt_idx, 1); - eqt = &dfuse_info->di_eqt[eqt_idx % dfuse_info->di_eq_count]; - - ev = d_slab_acquire(eqt->de_read_slab); + eqt = pick_eqt(dfuse_info); + ev = d_slab_acquire(eqt->de_read_slab); if (ev == NULL) D_GOTO(err, rc = ENOMEM); diff --git a/utils/node_local_test.py b/utils/node_local_test.py index b6018e90b3e3..16c762e4582d 100755 --- a/utils/node_local_test.py +++ b/utils/node_local_test.py @@ -5004,13 +5004,14 @@ def run_in_fg(server, conf, args): # Set to False to run dfuse without a pool. pool_on_cmd_line = True + cont_on_cmd_line = True if not container: container = create_cont(conf, pool, label=label, ctype="POSIX") # Only set the container cache attributes when the container is initially created so they # can be modified later. - cont_attrs = {'dfuse-data-cache': False, + cont_attrs = {'dfuse-data-cache': 120, 'dfuse-attr-time': 67, 'dfuse-dentry-time': 19, 'dfuse-dentry-dir-time': 31, @@ -5026,6 +5027,9 @@ def run_in_fg(server, conf, args): if pool_on_cmd_line: dargs['pool'] = pool.uuid + if cont_on_cmd_line: + dargs["container"] = container + dfuse = DFuse(server, conf, **dargs) @@ -5034,7 +5038,11 @@ def run_in_fg(server, conf, args): dfuse.start() if pool_on_cmd_line: - t_dir = join(dfuse.dir, container) + if cont_on_cmd_line: + t_dir = dfuse.dir + else: + t_dir = join(dfuse.dir, container) + else: t_dir = join(dfuse.dir, pool.uuid, container)