diff --git a/src/client/dfuse/dfuse.h b/src/client/dfuse/dfuse.h index 4c4d88bbbdb..369ab21fa09 100644 --- a/src/client/dfuse/dfuse.h +++ b/src/client/dfuse/dfuse.h @@ -89,15 +89,59 @@ dfuse_launch_fuse(struct dfuse_info *dfuse_info, struct fuse_args *args); struct dfuse_inode_entry; +/* Preread. + * + * DFuse can start a pre-read of the file on open, then when reads do occur they can happen directly + * from the buffer. For 'linear reads' of a file this means that the read can be triggered sooner + * and performed as one dfs request. Make use of the pre-read code to only use this for trivial + * reads, if a file is not read linearly or it's written to then back off to the regular behavior, + * which will likely use the kernel cache. + * + * Pre-read is enabled when: + * Caching is enabled + * The file is not cached + * The file is small enough to fit in one buffer (1mb) + * The previous file from the same directory was read linearly. + * Similar to the READDIR_PLUS_AUTO logic this feature is enabled bassed on the I/O pattern of the + * most recent access to the parent directory, general I/O workloads or interception library use are + * unlikely to trigger this code however something that is reading the entire contents of a + * directory tree should. + * + * This works by creating a new descriptor which is pointed to by the open handle, on open dfuse + * decides if it will use pre-read and if so allocate a new descriptor, add it to the open handle + * and then once it's replied to the open immediately issue a read. The new descriptor includes a + * lock which is locked by open before it replies to the kernel request and unlocked by the dfs read + * callback. Read requests then take the lock to ensure the dfs read is complete and reply directly + * with the data in the buffer. + * + * This works up to the buffer size, the pre-read tries to read the expected file size is smaller + * then dfuse will detect this and back off to regular read, however it will not detect if the file + * has grown in size. + * + * A dfuse_event is hung off this new descriptor and these come from the same pool as regular reads, + * this buffer is kept as long as it's needed but released as soon as possible, either on error or + * when EOF is returned to the kernel. If it's still present on release then it's freed then. + */ +struct dfuse_read_ahead { + pthread_mutex_t dra_lock; + struct dfuse_event *dra_ev; + int dra_rc; +}; + /** what is returned as the handle for fuse fuse_file_info on create/open/opendir */ struct dfuse_obj_hdl { /** pointer to dfs_t */ dfs_t *doh_dfs; /** the DFS object handle. Not created for directories. */ dfs_obj_t *doh_obj; + + struct dfuse_read_ahead *doh_readahead; + /** the inode entry for the file */ struct dfuse_inode_entry *doh_ie; + struct dfuse_inode_entry *doh_parent_dir; + /** readdir handle. */ struct dfuse_readdir_hdl *doh_rd; @@ -335,9 +379,11 @@ struct dfuse_event { struct dfuse_inode_entry *de_ie; }; off_t de_req_position; /**< The file position requested by fuse */ - size_t de_req_len; + union { + size_t de_req_len; + size_t de_readahead_len; + }; void (*de_complete_cb)(struct dfuse_event *ev); - struct stat de_attr; }; @@ -680,6 +726,8 @@ struct fuse_lowlevel_ops dfuse_ops; #define DFUSE_REPLY_ENTRY(inode, req, entry) \ do { \ int __rc; \ + DFUSE_TRA_DEBUG(inode, "Returning entry inode %#lx mode %#o size %#zx", \ + (entry).attr.st_ino, (entry).attr.st_mode, (entry).attr.st_size); \ if ((entry).attr_timeout > 0) { \ (inode)->ie_stat = (entry).attr; \ dfuse_mcache_set_time(inode); \ @@ -793,6 +841,13 @@ struct dfuse_inode_entry { /** File has been unlinked from daos */ bool ie_unlinked; + + /** Last file closed in this directory was read linearly. Directories only. + * + * Set on close() of a file in the directory to the value of linear_read from the fh. + * Checked on open of a file to determine if pre-caching is used. + */ + ATOMIC bool ie_linear_read; }; static inline struct dfuse_inode_entry * @@ -847,21 +902,21 @@ dfuse_cache_evict_dir(struct dfuse_info *dfuse_info, struct dfuse_inode_entry *i void dfuse_mcache_set_time(struct dfuse_inode_entry *ie); -/* Set the cache as invalid */ +/* Set the metadata cache as invalid */ void dfuse_mcache_evict(struct dfuse_inode_entry *ie); -/* Check the cache setting against a given timeout, and return time left */ +/* Check the metadata cache setting against a given timeout, and return time left */ bool dfuse_mcache_get_valid(struct dfuse_inode_entry *ie, double max_age, double *timeout); /* Data caching functions */ -/* Mark the cache as up-to-date from now */ +/* Mark the data cache as up-to-date from now */ void dfuse_dcache_set_time(struct dfuse_inode_entry *ie); -/* Set the cache as invalid */ +/* Set the data cache as invalid */ void dfuse_dcache_evict(struct dfuse_inode_entry *ie); @@ -873,6 +928,9 @@ dfuse_cache_evict(struct dfuse_inode_entry *ie); bool dfuse_dcache_get_valid(struct dfuse_inode_entry *ie, double max_age); +void +dfuse_pre_read(struct dfuse_info *dfuse_info, struct dfuse_obj_hdl *oh); + int check_for_uns_ep(struct dfuse_info *dfuse_info, struct dfuse_inode_entry *ie, char *attr, daos_size_t len); diff --git a/src/client/dfuse/dfuse_core.c b/src/client/dfuse/dfuse_core.c index c3ebda252e9..cc346df619d 100644 --- a/src/client/dfuse/dfuse_core.c +++ b/src/client/dfuse/dfuse_core.c @@ -1171,7 +1171,7 @@ dfuse_read_event_reset(void *arg) int rc; if (ev->de_iov.iov_buf == NULL) { - D_ALLOC(ev->de_iov.iov_buf, DFUSE_MAX_READ); + D_ALLOC_NZ(ev->de_iov.iov_buf, DFUSE_MAX_READ); if (ev->de_iov.iov_buf == NULL) return false; @@ -1195,7 +1195,7 @@ dfuse_write_event_reset(void *arg) int rc; if (ev->de_iov.iov_buf == NULL) { - D_ALLOC(ev->de_iov.iov_buf, DFUSE_MAX_READ); + D_ALLOC_NZ(ev->de_iov.iov_buf, DFUSE_MAX_READ); if (ev->de_iov.iov_buf == NULL) return false; diff --git a/src/client/dfuse/ops/create.c b/src/client/dfuse/ops/create.c index 8aa4222053c..5a544279920 100644 --- a/src/client/dfuse/ops/create.c +++ b/src/client/dfuse/ops/create.c @@ -111,6 +111,8 @@ dfuse_cb_create(fuse_req_t req, struct dfuse_inode_entry *parent, const char *na DFUSE_TRA_DEBUG(parent, "Parent:%#lx " DF_DE, parent->ie_stat.st_ino, DP_DE(name)); + atomic_store_relaxed(&parent->ie_linear_read, false); + /* O_LARGEFILE should always be set on 64 bit systems, and in fact is * defined to 0 so IOF defines LARGEFILE to the value that O_LARGEFILE * would otherwise be using and check that is set. diff --git a/src/client/dfuse/ops/open.c b/src/client/dfuse/ops/open.c index 1928b2b41af..2ac6a9e247c 100644 --- a/src/client/dfuse/ops/open.c +++ b/src/client/dfuse/ops/open.c @@ -15,6 +15,7 @@ dfuse_cb_open(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi) struct dfuse_obj_hdl *oh; struct fuse_file_info fi_out = {0}; int rc; + bool prefetch = false; ie = dfuse_inode_lookup(dfuse_info, ino); if (!ie) { @@ -29,6 +30,7 @@ dfuse_cb_open(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi) DFUSE_TRA_UP(oh, ie, "open handle"); dfuse_open_handle_init(dfuse_info, oh, ie); + oh->doh_parent_dir = dfuse_inode_lookup(dfuse_info, ie->ie_parent); /* Upgrade fd permissions from O_WRONLY to O_RDWR if wb caching is * enabled so the kernel can do read-modify-write @@ -52,10 +54,16 @@ dfuse_cb_open(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi) if (fi->flags & O_DIRECT) fi_out.direct_io = 1; - if (atomic_load_relaxed(&ie->ie_open_count) > 0) { - fi_out.keep_cache = 1; - } else if (dfuse_dcache_get_valid(ie, ie->ie_dfs->dfc_data_timeout)) { + /* If the file is already open or (potentially) in cache then allow any existing + * kernel cache to be used. If not then use pre-read. + * This should mean that pre-read is only used on the first read, and on files + * which pre-existed in the container. + */ + if (atomic_load_relaxed(&ie->ie_open_count) > 0 || + dfuse_dcache_get_valid(ie, ie->ie_dfs->dfc_data_timeout)) { fi_out.keep_cache = 1; + } else { + prefetch = true; } } else { fi_out.direct_io = 1; @@ -84,9 +92,23 @@ dfuse_cb_open(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi) atomic_fetch_add_relaxed(&ie->ie_open_count, 1); + /* Enable this for files up to the max read size. */ + if (prefetch && oh->doh_parent_dir && + atomic_load_relaxed(&oh->doh_parent_dir->ie_linear_read) && ie->ie_stat.st_size > 0 && + ie->ie_stat.st_size <= DFUSE_MAX_READ) { + D_ALLOC_PTR(oh->doh_readahead); + if (oh->doh_readahead) { + D_MUTEX_INIT(&oh->doh_readahead->dra_lock, 0); + D_MUTEX_LOCK(&oh->doh_readahead->dra_lock); + } + } + dfuse_inode_decref(dfuse_info, ie); DFUSE_REPLY_OPEN(oh, req, &fi_out); + if (oh->doh_readahead) + dfuse_pre_read(dfuse_info, oh); + return; err: dfuse_inode_decref(dfuse_info, ie); @@ -108,6 +130,26 @@ dfuse_cb_release(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi) DFUSE_TRA_DEBUG(oh, "Closing %d", oh->doh_caching); + if (oh->doh_readahead) { + struct dfuse_event *ev; + + /* Grab this lock first to ensure that the read cb has been completed. The + * callback might register an error and release ev so do not read it's value + * until after this has completed. + */ + D_MUTEX_LOCK(&oh->doh_readahead->dra_lock); + D_MUTEX_UNLOCK(&oh->doh_readahead->dra_lock); + + ev = oh->doh_readahead->dra_ev; + + D_MUTEX_DESTROY(&oh->doh_readahead->dra_lock); + if (ev) { + daos_event_fini(&ev->de_ev); + d_slab_release(ev->de_eqt->de_read_slab, ev); + } + D_FREE(oh->doh_readahead); + } + /* If the file was read from then set the data cache time for future use, however if the * file was written to then evict the metadata cache. * The problem here is that if the file was written to then the contents will be in the @@ -121,8 +163,9 @@ dfuse_cb_release(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi) if (atomic_load_relaxed(&oh->doh_write_count) != 0) { if (oh->doh_caching) { if (il_calls == 0) { - DFUSE_TRA_DEBUG(oh, "Evicting metadata cache"); + DFUSE_TRA_DEBUG(oh, "Evicting metadata cache, setting data cache"); dfuse_mcache_evict(oh->doh_ie); + dfuse_dcache_set_time(oh->doh_ie); } else { DFUSE_TRA_DEBUG(oh, "Evicting cache"); dfuse_cache_evict(oh->doh_ie); @@ -151,7 +194,18 @@ dfuse_cb_release(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi) DFUSE_REPLY_ZERO(oh, req); else DFUSE_REPLY_ERR_RAW(oh, req, rc); + if (oh->doh_parent_dir) { + bool use_linear_read = false; + if (oh->doh_linear_read && oh->doh_linear_read_eof) + use_linear_read = true; + + DFUSE_TRA_DEBUG(oh->doh_parent_dir, "Setting linear_read to %d", use_linear_read); + + atomic_store_relaxed(&oh->doh_parent_dir->ie_linear_read, use_linear_read); + + dfuse_inode_decref(dfuse_info, oh->doh_parent_dir); + } if (oh->doh_evict_on_close) { rc = fuse_lowlevel_notify_inval_entry(dfuse_info->di_session, oh->doh_ie->ie_parent, oh->doh_ie->ie_name, diff --git a/src/client/dfuse/ops/read.c b/src/client/dfuse/ops/read.c index 2e5ba497bb5..7a5bbedc0d2 100644 --- a/src/client/dfuse/ops/read.c +++ b/src/client/dfuse/ops/read.c @@ -30,7 +30,7 @@ dfuse_cb_read_complete(struct dfuse_event *ev) if (ev->de_len == 0) { DFUSE_TRA_DEBUG(oh, "%#zx-%#zx requested (EOF)", ev->de_req_position, - ev->de_req_position + ev->de_iov.iov_buf_len - 1); + ev->de_req_position + ev->de_req_len - 1); DFUSE_REPLY_BUFQ(oh, ev->de_req, ev->de_iov.iov_buf, ev->de_len); D_GOTO(release, 0); @@ -51,6 +51,55 @@ dfuse_cb_read_complete(struct dfuse_event *ev) d_slab_release(ev->de_eqt->de_read_slab, ev); } +static bool +dfuse_readahead_reply(fuse_req_t req, size_t len, off_t position, struct dfuse_obj_hdl *oh) +{ + size_t reply_len; + + if (oh->doh_readahead->dra_rc) { + DFUSE_REPLY_ERR_RAW(oh, req, oh->doh_readahead->dra_rc); + return true; + } + + if (!oh->doh_linear_read || oh->doh_readahead->dra_ev == NULL) { + DFUSE_TRA_DEBUG(oh, "Readahead disabled"); + return false; + } + + if (oh->doh_linear_read_pos != position) { + DFUSE_TRA_DEBUG(oh, "disabling readahead"); + daos_event_fini(&oh->doh_readahead->dra_ev->de_ev); + d_slab_release(oh->doh_readahead->dra_ev->de_eqt->de_read_slab, + oh->doh_readahead->dra_ev); + oh->doh_readahead->dra_ev = NULL; + return false; + } + + oh->doh_linear_read_pos = position + len; + if (position + len >= oh->doh_readahead->dra_ev->de_readahead_len) { + oh->doh_linear_read_eof = true; + } + + /* At this point there is a buffer of known length that contains the data, and a read + * request. + * If the attempted read is bigger than the data then it will be truncated. + * It the atttempted read is smaller than the buffer it will be met in full. + */ + + if (position + len < oh->doh_readahead->dra_ev->de_readahead_len) { + reply_len = len; + DFUSE_TRA_DEBUG(oh, "%#zx-%#zx read", position, position + reply_len - 1); + } else { + /* The read will be truncated */ + reply_len = oh->doh_readahead->dra_ev->de_readahead_len - position; + DFUSE_TRA_DEBUG(oh, "%#zx-%#zx read %#zx-%#zx not read (truncated)", position, + position + reply_len - 1, position + reply_len, position + len - 1); + } + + DFUSE_REPLY_BUFQ(oh, req, oh->doh_readahead->dra_ev->de_iov.iov_buf + position, reply_len); + return true; +} + void dfuse_cb_read(fuse_req_t req, fuse_ino_t ino, size_t len, off_t position, struct fuse_file_info *fi) { @@ -66,12 +115,36 @@ dfuse_cb_read(fuse_req_t req, fuse_ino_t ino, size_t len, off_t position, struct DFUSE_TRA_DEBUG(oh, "Returning EOF early without round trip %#zx", position); oh->doh_linear_read_eof = false; oh->doh_linear_read = false; + + if (oh->doh_readahead) { + D_MUTEX_LOCK(&oh->doh_readahead->dra_lock); + ev = oh->doh_readahead->dra_ev; + + oh->doh_readahead->dra_ev = NULL; + D_MUTEX_UNLOCK(&oh->doh_readahead->dra_lock); + + if (ev) { + daos_event_fini(&ev->de_ev); + d_slab_release(ev->de_eqt->de_read_slab, ev); + } + } DFUSE_REPLY_BUFQ(oh, req, NULL, 0); return; } - eqt_idx = atomic_fetch_add_relaxed(&dfuse_info->di_eqt_idx, 1); + if (oh->doh_readahead) { + bool replied; + + D_MUTEX_LOCK(&oh->doh_readahead->dra_lock); + replied = dfuse_readahead_reply(req, len, position, oh); + D_MUTEX_UNLOCK(&oh->doh_readahead->dra_lock); + if (replied) { + return; + } + } + + eqt_idx = atomic_fetch_add_relaxed(&dfuse_info->di_eqt_idx, 1); eqt = &dfuse_info->di_eqt[eqt_idx % dfuse_info->di_eq_count]; ev = d_slab_acquire(eqt->de_read_slab); @@ -131,3 +204,79 @@ dfuse_cb_read(fuse_req_t req, fuse_ino_t ino, size_t len, off_t position, struct d_slab_release(eqt->de_read_slab, ev); } } + +static void +dfuse_cb_pre_read_complete(struct dfuse_event *ev) +{ + struct dfuse_obj_hdl *oh = ev->de_oh; + + oh->doh_readahead->dra_rc = ev->de_ev.ev_error; + + if (ev->de_ev.ev_error != 0) { + oh->doh_readahead->dra_rc = ev->de_ev.ev_error; + daos_event_fini(&ev->de_ev); + d_slab_release(ev->de_eqt->de_read_slab, ev); + oh->doh_readahead->dra_ev = NULL; + } + + /* If the length is not as expected then the file has been modified since the last stat so + * discard this cache and use regular reads. Note that this will only detect files which + * have shrunk in size, not grown. + */ + if (ev->de_len != ev->de_readahead_len) { + daos_event_fini(&ev->de_ev); + d_slab_release(ev->de_eqt->de_read_slab, ev); + oh->doh_readahead->dra_ev = NULL; + } + + D_MUTEX_UNLOCK(&oh->doh_readahead->dra_lock); +} + +void +dfuse_pre_read(struct dfuse_info *dfuse_info, struct dfuse_obj_hdl *oh) +{ + struct dfuse_eq *eqt; + int rc; + struct dfuse_event *ev; + uint64_t eqt_idx; + size_t len = oh->doh_ie->ie_stat.st_size; + + eqt_idx = atomic_fetch_add_relaxed(&dfuse_info->di_eqt_idx, 1); + eqt = &dfuse_info->di_eqt[eqt_idx % dfuse_info->di_eq_count]; + + ev = d_slab_acquire(eqt->de_read_slab); + if (ev == NULL) + D_GOTO(err, rc = ENOMEM); + + ev->de_iov.iov_len = len; + ev->de_req = 0; + ev->de_sgl.sg_nr = 1; + ev->de_oh = oh; + ev->de_readahead_len = len; + ev->de_req_position = 0; + + ev->de_complete_cb = dfuse_cb_pre_read_complete; + oh->doh_readahead->dra_ev = ev; + + rc = dfs_read(oh->doh_dfs, oh->doh_obj, &ev->de_sgl, 0, &ev->de_len, &ev->de_ev); + if (rc != 0) { + D_GOTO(err, rc); + return; + } + + /* Send a message to the async thread to wake it up and poll for events */ + sem_post(&eqt->de_sem); + + /* Now ensure there are more descriptors for the next request */ + d_slab_restock(eqt->de_read_slab); + + return; +err: + oh->doh_readahead->dra_rc = rc; + if (ev) { + daos_event_fini(&ev->de_ev); + d_slab_release(eqt->de_read_slab, ev); + oh->doh_readahead->dra_ev = NULL; + } + D_MUTEX_UNLOCK(&oh->doh_readahead->dra_lock); +} diff --git a/utils/node_local_test.py b/utils/node_local_test.py index ec7c5ce3cf0..51872dca987 100755 --- a/utils/node_local_test.py +++ b/utils/node_local_test.py @@ -22,6 +22,7 @@ import stat import errno import argparse +import random import threading import functools import traceback @@ -2145,6 +2146,69 @@ def test_read(self): print(data) assert data == 'test' + def test_pre_read(self): + """Test the pre-read code. + + Test reading a file which is previously unknown to fuse with caching on. This should go + into the pre_read code and load the file contents automatically after the open call. + """ + dfuse = DFuse(self.server, self.conf, container=self.container) + dfuse.start(v_hint='pre_read_0') + + with open(join(dfuse.dir, 'file0'), 'w') as fd: + fd.write('test') + + with open(join(dfuse.dir, 'file1'), 'w') as fd: + fd.write('test') + + with open(join(dfuse.dir, 'file2'), 'w') as fd: + fd.write('testing') + + raw_data0 = ''.join(random.choices(['d', 'a', 'o', 's'], k=1024 * 1024)) # nosec + with open(join(dfuse.dir, 'file3'), 'w') as fd: + fd.write(raw_data0) + + raw_data1 = ''.join(random.choices(['d', 'a', 'o', 's'], k=(1024 * 1024) - 1)) # nosec + with open(join(dfuse.dir, 'file4'), 'w') as fd: + fd.write(raw_data1) + + if dfuse.stop(): + self.fatal_errors = True + + dfuse = DFuse(self.server, self.conf, caching=True, container=self.container) + dfuse.start(v_hint='pre_read_1') + + with open(join(dfuse.dir, 'file0'), 'r') as fd: + data0 = fd.read() + + with open(join(dfuse.dir, 'file1'), 'r') as fd: + data1 = fd.read(16) + + with open(join(dfuse.dir, 'file2'), 'r') as fd: + data2 = fd.read(2) + + with open(join(dfuse.dir, 'file3'), 'r') as fd: + data3 = fd.read() + + with open(join(dfuse.dir, 'file4'), 'r') as fd: + data4 = fd.read() + data5 = fd.read() + + # This should not use the pre-read feature, to be validated via the logs. + with open(join(dfuse.dir, 'file4'), 'r') as fd: + data6 = fd.read() + + if dfuse.stop(): + self.fatal_errors = True + print(data0) + assert data0 == 'test' + assert data1 == 'test' + assert data2 == 'te' + assert raw_data0 == data3 + assert raw_data1 == data4 + assert len(data5) == 0 + assert raw_data1 == data6 + def test_two_mounts(self): """Create two mounts, and check that a file created in one can be read from the other""" dfuse0 = DFuse(self.server,