Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DAOS-13216 dfuse: Add a pre-read feature for non-cached files. #12015

Merged
merged 45 commits into from
Oct 7, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
709d5c9
DAOS-0000 dfuse: Add a pre-read feature for non-cached files.
ashleypittman Apr 21, 2023
b3af6ba
Skip some logging.
ashleypittman Apr 24, 2023
5134632
Fix a potential crash on free.
ashleypittman Apr 24, 2023
6e8f258
Add some comments on testing.
ashleypittman Apr 24, 2023
332628c
Fix another crash.
ashleypittman Apr 24, 2023
ade793e
Fix another crash.
ashleypittman Apr 24, 2023
2871a77
Fix compile.
ashleypittman Apr 24, 2023
f6110ae
Merge branch 'master' into amd/pre-read
ashleypittman Apr 27, 2023
3b9de4a
Test properly with seperate data cache times.
ashleypittman Apr 27, 2023
0da5cfd
Fix a formatting issue.
ashleypittman Apr 27, 2023
2b042fa
Merge branch 'master' into amd/pre-read
ashleypittman Apr 28, 2023
ce17d7a
Merge branch 'master' into amd/pre-read
ashleypittman May 2, 2023
ea1784a
Update to read up to max buffer size.
ashleypittman May 2, 2023
4d90774
Merge branch 'master' into amd/pre-read
ashleypittman May 5, 2023
3e0e986
Merge branch 'master' into amd/pre-read
ashleypittman May 9, 2023
db037f3
Merge branch 'master' into amd/pre-read
ashleypittman May 11, 2023
049e4cf
Merge branch 'master' into amd/pre-read
ashleypittman Jun 5, 2023
23cd4b0
Merge branch 'master' into amd/pre-read
ashleypittman Jun 8, 2023
0b6c159
fix formatting
ashleypittman Jun 8, 2023
899dc22
Merge branch 'master' into amd/pre-read
ashleypittman Jun 9, 2023
26e0c8d
Merge branch 'master' into amd/pre-read
ashleypittman Jun 19, 2023
9bb0a49
Merge branch 'master' into amd/pre-read
ashleypittman Jun 29, 2023
5bdfa4f
Merge branch 'master' into amd/pre-read
ashleypittman Jul 3, 2023
27c4e7d
Improve the decision on when to readahead.
ashleypittman Jul 3, 2023
59a4bb2
Merge branch 'master' into amd/pre-read
ashleypittman Jul 3, 2023
238a793
Update comment.
ashleypittman Jul 3, 2023
7b8aafd
DAOS-13493 rebuild: upgrate the whole group (#12550)
Jul 3, 2023
fd554cd
DAOS-13471 control: Add JSON-format version to utilities (#12508)
mjmac Jul 3, 2023
e636b7a
DAOS-13471 control: Fix conflict with go build. (#12574)
ashleypittman Jul 4, 2023
8b0a3e3
Add metrics for testing.
ashleypittman Jul 4, 2023
a8a3d8f
Fix error path.
ashleypittman Jul 4, 2023
beae16e
Back out metrics.
ashleypittman Jul 6, 2023
d99b473
Merge branch 'master' into amd/pre-read
ashleypittman Jul 6, 2023
554c3cf
Merge branch 'master' into amd/pre-read
ashleypittman Jul 13, 2023
0f1fc21
Fix merge.
ashleypittman Jul 13, 2023
7a31d3f
Merge branch 'master' into amd/pre-read
ashleypittman Jul 17, 2023
7a362a1
Merge branch 'master' into amd/pre-read
ashleypittman Jul 24, 2023
6005826
Merge branch 'master' into amd/pre-read
ashleypittman Jul 25, 2023
0d17e1c
Merge branch 'master' into amd/pre-read
ashleypittman Aug 7, 2023
3756450
Merge branch 'master' into amd/pre-read
ashleypittman Aug 10, 2023
87ef42b
Merge branch 'master' into amd/pre-read
ashleypittman Aug 22, 2023
036d047
Merge branch 'master' into amd/pre-read
ashleypittman Sep 7, 2023
931b578
Merge branch 'master' into amd/pre-read
ashleypittman Sep 25, 2023
661c033
Merge branch 'master' into amd/pre-read
ashleypittman Sep 28, 2023
4d3f28e
Merge branch 'master' into amd/pre-read
ashleypittman Oct 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 45 additions & 2 deletions src/client/dfuse/dfuse.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,49 @@ struct dfuse_readdir_entry {
off_t dre_next_offset;
};

/* Preread.
*
* If a file is opened when caching is on but the file is not cached and the size small enough
* to fit in a single buffer then start a pre-read of the file on open, then when reads do occur
* they can happen directly from the buffer. For 'linear reads' of a file this means that the
* read can be triggered sooner and performed as one dfs request. Make use of the pre-read
* code to only use this for trivial reads, if a file is not read linearly or it's written to
* then back off to the regular behavior, which will likely use the kernel cache.
*
* This works by creating a new descriptor which is pointed to by the open handle, on open dfuse
* decides if it will use pre-read and if so allocate a new descriptor, add it to the open handle
* and then once it's replied to the open immediately issue a read. The new descriptor includes
* a lock which is locked by open before it replies to the kernel request and unlocked by the dfs
* read callback. Read requests then take the lock to ensure the dfs read is complete and reply
* directly with the data in the buffer.
*
* This works up to the buffer size minus one, the pre-read tries to read the expected file size
* plus one so if the file size has changed then dfuse will detect this and back off to regular
* read.
*
* A dfuse_event is hung off this new descriptor and these come from the same pool as regular reads,
* this buffer is kept as long as it's needed but released as soon as possible, either on error or
* when EOF is returned to the kernel. If it's still present on release then it's freed then.
*
* TODO: The current issue with pread is that DFuse keeps a single cache time per inode and this
* is used for both data and metadata. For this feature to work the inode cache needs to be valid
* and the data cache needs to be empty.
*/
struct dfuse_read_ahead {
pthread_mutex_t dra_lock;
struct dfuse_event *dra_ev;
int dra_rc;
};

/** what is returned as the handle for fuse fuse_file_info on create/open/opendir */
struct dfuse_obj_hdl {
/** pointer to dfs_t */
dfs_t *doh_dfs;
/** the DFS object handle. Not created for directories. */
dfs_obj_t *doh_obj;

struct dfuse_read_ahead *doh_readahead;

/** the inode entry for the file */
struct dfuse_inode_entry *doh_ie;

Expand Down Expand Up @@ -213,7 +250,10 @@ struct dfuse_event {
struct dfuse_eq *de_eqt;
struct dfuse_obj_hdl *de_oh;
off_t de_req_position; /**< The file position requested by fuse */
size_t de_req_len;
union {
size_t de_req_len;
size_t de_readahead_len;
};
void (*de_complete_cb)(struct dfuse_event *ev);
};

Expand Down Expand Up @@ -543,7 +583,7 @@ struct fuse_lowlevel_ops dfuse_ops;
#define DFUSE_REPLY_ENTRY(inode, req, entry) \
do { \
int __rc; \
DFUSE_TRA_DEBUG(inode, "Returning entry inode %#lx mode %#o size %zi", \
DFUSE_TRA_DEBUG(inode, "Returning entry inode %#lx mode %#o size %#zx", \
(entry).attr.st_ino, (entry).attr.st_mode, (entry).attr.st_size); \
if (entry.attr_timeout > 0) { \
(inode)->ie_stat = entry.attr; \
Expand Down Expand Up @@ -695,6 +735,9 @@ dfuse_cache_evict(struct dfuse_inode_entry *ie);
bool
dfuse_cache_get_valid(struct dfuse_inode_entry *ie, double max_age, double *timeout);

void
dfuse_pre_read(struct dfuse_projection_info *fs_handle, struct dfuse_obj_hdl *oh);

int
check_for_uns_ep(struct dfuse_projection_info *fs_handle,
struct dfuse_inode_entry *ie, char *attr, daos_size_t len);
Expand Down
29 changes: 29 additions & 0 deletions src/client/dfuse/ops/open.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ dfuse_cb_open(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi)
struct dfuse_obj_hdl *oh = NULL;
struct fuse_file_info fi_out = {0};
int rc;
bool prefetch = false;

rlink = d_hash_rec_find(&fs_handle->dpi_iet, &ino, sizeof(ino));
if (!rlink) {
Expand Down Expand Up @@ -58,6 +59,7 @@ dfuse_cb_open(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi)
fi_out.keep_cache = 1;
} else if (dfuse_cache_get_valid(ie, ie->ie_dfs->dfc_data_timeout, NULL)) {
fi_out.keep_cache = 1;
prefetch = true;
}

if (fi_out.keep_cache)
Expand Down Expand Up @@ -89,9 +91,21 @@ dfuse_cb_open(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi)

atomic_fetch_add_relaxed(&ie->ie_open_count, 1);

if (prefetch && ie->ie_stat.st_size < (1024 * 1024)) {
D_ALLOC_PTR(oh->doh_readahead);
if (oh->doh_readahead) {
D_MUTEX_INIT(&oh->doh_readahead->dra_lock, 0);
D_MUTEX_LOCK(&oh->doh_readahead->dra_lock);
}
}

d_hash_rec_decref(&fs_handle->dpi_iet, rlink);
DFUSE_REPLY_OPEN(oh, req, &fi_out);

if (oh->doh_readahead) {
dfuse_pre_read(fs_handle, oh);
}

return;
err:
d_hash_rec_decref(&fs_handle->dpi_iet, rlink);
Expand All @@ -112,6 +126,21 @@ dfuse_cb_release(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi)

DFUSE_TRA_DEBUG(oh, "Closing %d %d", oh->doh_caching, oh->doh_keep_cache);

if (oh->doh_readahead) {
struct dfuse_event *ev = oh->doh_readahead->dra_ev;

/* Grab this lock first to ensure that the read cb has been completed */
D_MUTEX_LOCK(&oh->doh_readahead->dra_lock);
D_MUTEX_UNLOCK(&oh->doh_readahead->dra_lock);

D_MUTEX_DESTROY(&oh->doh_readahead->dra_lock);
if (ev) {
daos_event_fini(&ev->de_ev);
d_slab_release(ev->de_eqt->de_read_slab, ev);
}
D_FREE(oh->doh_readahead);
}

/* If the file was read from then set the cache time for future use, however if the
* file was written to then evict the cache.
* The problem here is that if the file was written to then the contents will be in the
Expand Down
152 changes: 151 additions & 1 deletion src/client/dfuse/ops/read.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ dfuse_cb_read_complete(struct dfuse_event *ev)

if (ev->de_len == 0) {
DFUSE_TRA_DEBUG(oh, "%#zx-%#zx requested (EOF)", ev->de_req_position,
ev->de_req_position + ev->de_iov.iov_buf_len - 1);
ev->de_req_position + ev->de_req_len - 1);

DFUSE_REPLY_BUFQ(oh, ev->de_req, ev->de_iov.iov_buf, ev->de_len);
D_GOTO(release, 0);
Expand All @@ -51,6 +51,51 @@ dfuse_cb_read_complete(struct dfuse_event *ev)
d_slab_release(ev->de_eqt->de_read_slab, ev);
}

static bool
dfuse_readahead_reply(fuse_req_t req, size_t len, off_t position, struct dfuse_obj_hdl *oh)
{
size_t reply_len;

if (oh->doh_readahead->dra_rc) {
DFUSE_REPLY_ERR_RAW(oh, req, oh->doh_readahead->dra_rc);
return true;
}

if (!oh->doh_linear_read || oh->doh_readahead->dra_ev == NULL) {
DFUSE_TRA_ERROR(oh, "Readahead disabled");
return false;
}

if (oh->doh_linear_read_pos != position) {
DFUSE_TRA_ERROR(oh, "disabling readahead");
return false;
}

oh->doh_linear_read_pos = position + len;
if (position + len >= oh->doh_readahead->dra_ev->de_readahead_len) {
oh->doh_linear_read_eof = true;
}

/* At this point there is a buffer of known length that contains the data, and a read
* request.
* If the attempted read is bigger than the data then it will be truncated.
* It the atttempted read is smaller than the buffer it will be met in full.
*/

if (position + len < oh->doh_readahead->dra_ev->de_readahead_len) {
reply_len = len;
DFUSE_TRA_DEBUG(oh, "%#zx-%#zx read", position, position + reply_len - 1);
} else {
/* The read will be truncated */
reply_len = oh->doh_readahead->dra_ev->de_readahead_len - position;
DFUSE_TRA_DEBUG(oh, "%#zx-%#zx read %#zx-%#zx not read (truncated)", position,
position + reply_len - 1, position + reply_len, position + len - 1);
}

DFUSE_REPLY_BUFQ(oh, req, oh->doh_readahead->dra_ev->de_iov.iov_buf + position, reply_len);
return true;
}

void
dfuse_cb_read(fuse_req_t req, fuse_ino_t ino, size_t len, off_t position, struct fuse_file_info *fi)
{
Expand All @@ -68,10 +113,36 @@ dfuse_cb_read(fuse_req_t req, fuse_ino_t ino, size_t len, off_t position, struct
DFUSE_TRA_DEBUG(oh, "Returning EOF early without round trip %#zx", position);
oh->doh_linear_read_eof = false;
oh->doh_linear_read = false;

if (oh->doh_readahead) {
ev = oh->doh_readahead->dra_ev;

D_MUTEX_LOCK(&oh->doh_readahead->dra_lock);
ev = oh->doh_readahead->dra_ev;
oh->doh_readahead->dra_ev = NULL;
D_MUTEX_UNLOCK(&oh->doh_readahead->dra_lock);

if (ev) {
daos_event_fini(&ev->de_ev);
d_slab_release(ev->de_eqt->de_read_slab, ev);
}
}
DFUSE_REPLY_BUFQ(oh, req, NULL, 0);
return;
}

if (oh->doh_readahead) {
bool replied;

D_MUTEX_LOCK(&oh->doh_readahead->dra_lock);
replied = dfuse_readahead_reply(req, len, position, oh);
D_MUTEX_UNLOCK(&oh->doh_readahead->dra_lock);

if (replied) {
return;
}
}

eqt = &fs_handle->dpi_eqt[eqt_idx % fs_handle->dpi_eqt_count];

ev = d_slab_acquire(eqt->de_read_slab);
Expand Down Expand Up @@ -131,3 +202,82 @@ dfuse_cb_read(fuse_req_t req, fuse_ino_t ino, size_t len, off_t position, struct
d_slab_release(eqt->de_read_slab, ev);
}
}

static void
dfuse_cb_pre_read_complete(struct dfuse_event *ev)
{
struct dfuse_obj_hdl *oh = ev->de_oh;

oh->doh_readahead->dra_rc = ev->de_ev.ev_error;

if (ev->de_ev.ev_error != 0) {
oh->doh_readahead->dra_rc = ev->de_ev.ev_error;
daos_event_fini(&ev->de_ev);
d_slab_release(ev->de_eqt->de_read_slab, ev);
oh->doh_readahead->dra_ev = NULL;
}

/* If the length is not as expected then the stat size was incorrect so do not use this
* read and fallback into the normal read case.
*/
if (ev->de_len != ev->de_readahead_len) {
daos_event_fini(&ev->de_ev);
d_slab_release(ev->de_eqt->de_read_slab, ev);
oh->doh_readahead->dra_ev = NULL;
}

D_MUTEX_UNLOCK(&oh->doh_readahead->dra_lock);
}

void
dfuse_pre_read(struct dfuse_projection_info *fs_handle, struct dfuse_obj_hdl *oh)
{
struct dfuse_eq *eqt;
int rc;
struct dfuse_event *ev;
uint64_t eqt_idx;
size_t len = oh->doh_ie->ie_stat.st_size;

eqt_idx = atomic_fetch_add_relaxed(&fs_handle->dpi_eqt_idx, 1);

eqt = &fs_handle->dpi_eqt[eqt_idx % fs_handle->dpi_eqt_count];

ev = d_slab_acquire(eqt->de_read_slab);
if (ev == NULL)
D_GOTO(err, rc = ENOMEM);

/* Request a read one byte bigger than the expected file size, this way we should see
* a truncated read and through that will be able to detect if the file has been
* modified
*/
ev->de_iov.iov_len = len + 1;
ev->de_req = 0;
ev->de_sgl.sg_nr = 1;
ev->de_oh = oh;
ev->de_readahead_len = len;
ev->de_req_position = 0;

ev->de_complete_cb = dfuse_cb_pre_read_complete;
oh->doh_readahead->dra_ev = ev;

rc = dfs_read(oh->doh_dfs, oh->doh_obj, &ev->de_sgl, 0, &ev->de_len, &ev->de_ev);
if (rc != 0) {
D_GOTO(err, rc);
return;
}

/* Send a message to the async thread to wake it up and poll for events */
sem_post(&eqt->de_sem);

/* Now ensure there are more descriptors for the next request */
d_slab_restock(eqt->de_read_slab);

return;
err:
oh->doh_readahead->dra_rc = rc;
if (ev) {
daos_event_fini(&ev->de_ev);
d_slab_release(eqt->de_read_slab, ev);
}
D_MUTEX_UNLOCK(&oh->doh_readahead->dra_lock);
}
59 changes: 59 additions & 0 deletions utils/node_local_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import stat
import errno
import argparse
import random
import threading
import functools
import traceback
Expand Down Expand Up @@ -1853,6 +1854,64 @@ def test_read(self):
print(data)
assert data == 'test'

def test_pre_read(self):
"""Test the pre-read code.

Test reading a file which is previously unknown to fuse with caching on. This should go
into the pre_read code and load the file contents automatically after the open call.
"""
dfuse = DFuse(self.server, self.conf, container=self.container)
dfuse.start(v_hint='pre_read_0')

with open(join(dfuse.dir, 'file0'), 'w') as fd:
fd.write('test')

with open(join(dfuse.dir, 'file1'), 'w') as fd:
fd.write('test')

with open(join(dfuse.dir, 'file2'), 'w') as fd:
fd.write('testing')

raw_data0 = ''.join(random.choices(['d', 'a', 'o', 's'], k=1024 * 1024)) # nosec
with open(join(dfuse.dir, 'file3'), 'w') as fd:
fd.write(raw_data0)

raw_data1 = ''.join(random.choices(['d', 'a', 'o', 's'], k=(1024 * 1024) - 1)) # nosec
with open(join(dfuse.dir, 'file4'), 'w') as fd:
fd.write(raw_data1)

if dfuse.stop():
self.fatal_errors = True

dfuse = DFuse(self.server, self.conf, caching=True, container=self.container)
dfuse.start(v_hint='pre_read_1')

with open(join(dfuse.dir, 'file0'), 'r') as fd:
data0 = fd.read()

with open(join(dfuse.dir, 'file1'), 'r') as fd:
data1 = fd.read(16)

with open(join(dfuse.dir, 'file2'), 'r') as fd:
data2 = fd.read(2)

with open(join(dfuse.dir, 'file3'), 'r') as fd:
data3 = fd.read()

with open(join(dfuse.dir, 'file4'), 'r') as fd:
data4 = fd.read()
data5 = fd.read()

if dfuse.stop():
self.fatal_errors = True
print(data0)
assert data0 == 'test'
assert data1 == 'test'
assert data2 == 'te'
assert raw_data0 == data3
assert raw_data1 == data4
assert len(data5) == 0

def test_two_mounts(self):
"""Create two mounts, and check that a file created in one can be read from the other"""
dfuse0 = DFuse(self.server,
Expand Down