Skip to content

Commit

Permalink
DAOS-15682 dfuse: Perform reads in larger chunks.
Browse files Browse the repository at this point in the history
When dfuse sees I/O as well-aligned 128k reads then read MB at
a time and cache the result allowing for faster read bandwidth
for well behaved applicaions and large files.

Create a new in-memory descriptor for file contents, pull in a
whole descriptor on first read and perform all other reads from
the same result.

This should give much higher bandwidth for well behaved applications
and should be easy to extend to proper readahead in future.

Test-tag: test_dfuse_caching_check

Signed-off-by: Ashley Pittman <ashley.m.pittman@intel.com>
  • Loading branch information
ashleypittman committed Apr 22, 2024
1 parent cb13dab commit 9886fd4
Show file tree
Hide file tree
Showing 4 changed files with 331 additions and 11 deletions.
7 changes: 7 additions & 0 deletions src/client/dfuse/dfuse.h
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,7 @@ struct dfuse_event {
union {
struct dfuse_obj_hdl *de_oh;
struct dfuse_inode_entry *de_ie;
struct read_chunk_data *de_cd;
};
off_t de_req_position; /**< The file position requested by fuse */
union {
Expand Down Expand Up @@ -1001,6 +1002,8 @@ struct dfuse_inode_entry {

/* Entry on the evict list */
d_list_t ie_evict_entry;

struct read_chunk_core *ie_chunk;
};

/* Flush write-back cache writes to a inode. It does this by waiting for and then releasing an
Expand Down Expand Up @@ -1098,6 +1101,10 @@ dfuse_compute_inode(struct dfuse_cont *dfs,
void
dfuse_cache_evict_dir(struct dfuse_info *dfuse_info, struct dfuse_inode_entry *ie);

/* Free any read chunk data for an inode */
void
read_chunk_close(struct dfuse_inode_entry *ie);

/* Metadata caching functions. */

/* Mark the cache as up-to-date from now */
Expand Down
6 changes: 5 additions & 1 deletion src/client/dfuse/ops/open.c
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ dfuse_cb_release(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi)
struct dfuse_obj_hdl *oh = (struct dfuse_obj_hdl *)fi->fh;
struct dfuse_inode_entry *ie = NULL;
int rc;
uint32_t oc;
uint32_t il_calls;

/* Perform the opposite of what the ioctl call does, always change the open handle count
Expand Down Expand Up @@ -204,7 +205,10 @@ dfuse_cb_release(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi)
if (il_calls != 0) {
atomic_fetch_sub_relaxed(&oh->doh_ie->ie_il_count, 1);
}
atomic_fetch_sub_relaxed(&oh->doh_ie->ie_open_count, 1);
oc = atomic_fetch_sub_relaxed(&oh->doh_ie->ie_open_count, 1);
if (oc == 1) {
read_chunk_close(oh->doh_ie);
}

if (oh->doh_evict_on_close) {
ie = oh->doh_ie;
Expand Down
317 changes: 309 additions & 8 deletions src/client/dfuse/ops/read.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,309 @@ dfuse_readahead_reply(fuse_req_t req, size_t len, off_t position, struct dfuse_o
return true;
}

static struct dfuse_eq *
pick_eqt(struct dfuse_info *dfuse_info)
{
uint64_t eqt_idx;

eqt_idx = atomic_fetch_add_relaxed(&dfuse_info->di_eqt_idx, 1);
return &dfuse_info->di_eqt[eqt_idx % dfuse_info->di_eq_count];
}

/* Readahead and coalescing
*
* This code attempts to predict application and kernel I/O patterns and preemptively read file
* data ahead of when it's requested.
*
* For some kernels read I/O size is limited to 128k when using the page cache or 1Mb when using
* direct I/O. To get around the preformance impact of them detect when well aligned 128k reads

Check failure on line 118 in src/client/dfuse/ops/read.c

View workflow job for this annotation

GitHub Actions / Codespell

preformance ==> performance
* are received and read an entire buffers worth, then for future requests the data should already
* be in cache.
*
* TODO: Ensure that the linear_read flag remains set correctly by this code.
*/

#define K128 (1024 * 128)
#define CHUNK_SIZE (1024 * 1024)

struct read_chunk_data {
struct dfuse_event *ev;
fuse_req_t reqs[8];
struct dfuse_obj_hdl *ohs[8];
bool done[8];
d_list_t list;
int bucket;
bool complete;
struct dfuse_eq *eqt;
bool exiting;
int rc;
};

struct read_chunk_core {
d_list_t entries;
};

/* Global lock for all readahead operations. Each inode has a struct read_chunk_core * entry
* which is checked for NULL and set whilst holding this lock. Each read_chunk_core then has
* a list of read_chunk_data and again, this lock protects all lists on all inodes. This avoids
* the need for a per-inode lock which for many files would consume considerable memory but does
* mean there is potentially some lock contention. The lock however is only held for list
* manipulation, no dfs or kernel calls are made whilst holding the lock.
*/
static pthread_mutex_t rc_lock = PTHREAD_MUTEX_INITIALIZER;

static void
chunk_free(struct read_chunk_data *cd)
{
d_list_del(&cd->list);
d_slab_release(cd->eqt->de_read_slab, cd->ev);
D_FREE(cd);
}

/* Called when the last open file handle on a inode is closed. This needs to free everything which
* is complete and for anything that isn't flag it for deletion in the callback.
*/
void
read_chunk_close(struct dfuse_inode_entry *ie)
{
struct read_chunk_data *cd, *cdn;

D_MUTEX_LOCK(&rc_lock);
if (!ie->ie_chunk)
goto out;

d_list_for_each_entry_safe(cd, cdn, &ie->ie_chunk->entries, list) {
if (cd->complete) {
chunk_free(cd);
} else {
cd->exiting = true;
}
}
D_FREE(ie->ie_chunk);
out:
D_MUTEX_UNLOCK(&rc_lock);
}

static void
chunk_cb(struct dfuse_event *ev)
{
struct read_chunk_data *cd = ev->de_cd;
fuse_req_t req;

cd->rc = ev->de_ev.ev_error;
daos_event_fini(&ev->de_ev);

do {
int i;
int done_count = 0;

req = 0;

D_MUTEX_LOCK(&rc_lock);

if (cd->exiting) {
chunk_free(cd);
D_MUTEX_UNLOCK(&rc_lock);
return;
}

cd->complete = true;
for (i = 0; i < 8; i++) {
if (cd->reqs[i]) {
req = cd->reqs[i];
cd->reqs[i] = 0;
cd->done[i] = true;
break;
} else if (cd->done[i]) {
done_count++;
}
}

if (done_count == 8) {
D_ASSERT(!req);
d_slab_release(cd->eqt->de_read_slab, cd->ev);
cd->ev = NULL;
}
D_MUTEX_UNLOCK(&rc_lock);

if (req) {
size_t position = (cd->bucket * CHUNK_SIZE) + (i * K128);

if (cd->rc != 0) {
DFUSE_REPLY_ERR_RAW(cd->ohs[i], req, cd->rc);
} else {
DFUSE_TRA_DEBUG(cd->ohs[i], "%#zx-%#zx read", position,
position + K128 - 1);
DFUSE_REPLY_BUFQ(cd->ohs[i], req, ev->de_iov.iov_buf + (i * K128),
K128);
}
}
} while (req);
}

/* Submut a read to dfs.
*
* Returns true on success.
*/
static bool
chunk_fetch(fuse_req_t req, struct dfuse_obj_hdl *oh, struct read_chunk_data *cd, int bucket,
int slot, struct dfuse_inode_entry *ie)
{
struct dfuse_info *dfuse_info = fuse_req_userdata(req);
struct dfuse_event *ev;
struct dfuse_eq *eqt;
int rc;

eqt = pick_eqt(dfuse_info);

ev = d_slab_acquire(eqt->de_read_slab);
if (ev == NULL) {
cd->rc = ENOMEM;
return false;
}

ev->de_iov.iov_len = CHUNK_SIZE;
ev->de_req = req;
ev->de_cd = cd;
ev->de_sgl.sg_nr = 1;
ev->de_req_len = CHUNK_SIZE;
ev->de_req_position = CHUNK_SIZE * bucket;

ev->de_complete_cb = chunk_cb;

cd->ev = ev;
cd->eqt = eqt;
cd->reqs[slot] = req;
cd->ohs[slot] = oh;

rc = dfs_read(ie->ie_dfs->dfs_ns, ie->ie_obj, &ev->de_sgl, ev->de_req_position, &ev->de_len,
&ev->de_ev);
if (rc != 0)
goto err;

/* Send a message to the async thread to wake it up and poll for events */
sem_post(&eqt->de_sem);

/* Now ensure there are more descriptors for the next request */
d_slab_restock(eqt->de_read_slab);

return true;

err:
daos_event_fini(&ev->de_ev);
d_slab_release(eqt->de_read_slab, ev);
cd->rc = rc;
return false;
}

/* Try and do a bulk read.
*
* Retruns true if it was able to handle the read.

Check failure on line 300 in src/client/dfuse/ops/read.c

View workflow job for this annotation

GitHub Actions / Codespell

Retruns ==> Returns
*/
static bool
chunk_read(fuse_req_t req, size_t len, off_t position, struct dfuse_obj_hdl *oh)
{
struct dfuse_inode_entry *ie = oh->doh_ie;
struct read_chunk_core *cc;
struct read_chunk_data *cd;
off_t last;
int bucket;
int slot;
bool submit = false;
bool rcb = false;

last = position = (position + (K128 - 1)) & -K128;

if (len != K128)
return false;

if ((position % K128) != 0)
return false;

last = D_ALIGNUP(position + len - 1, CHUNK_SIZE);

if (last > oh->doh_ie->ie_stat.st_size)
return false;

bucket = D_ALIGNUP(position + len, CHUNK_SIZE);
bucket = (bucket / CHUNK_SIZE) - 1;

slot = (position / K128) % 8;

DFUSE_TRA_DEBUG(oh, "read bucket %#zx-%#zx last %#zx size %#zx bucket %d slot %d", position,
position + len - 1, last, ie->ie_stat.st_size, bucket, slot);

D_MUTEX_LOCK(&rc_lock);
if (ie->ie_chunk == NULL) {
D_ALLOC_PTR(ie->ie_chunk);
if (ie->ie_chunk == NULL)
goto err;
D_INIT_LIST_HEAD(&ie->ie_chunk->entries);
}
cc = ie->ie_chunk;

d_list_for_each_entry(cd, &cc->entries, list)
if (cd->bucket == bucket)
goto found;

D_ALLOC_PTR(cd);
if (cd == NULL)
goto err;

d_list_add_tail(&cd->list, &cc->entries);
cd->bucket = bucket;
submit = true;

found:
D_MUTEX_UNLOCK(&rc_lock);

cd->reqs[slot] = req;

if (submit) {
DFUSE_TRA_DEBUG(oh, "submit for bucket %d[%d]", bucket, slot);
rcb = chunk_fetch(req, oh, cd, bucket, slot, ie);
} else {
struct dfuse_event *ev = NULL;

/* Not check if this read request is complete or not yet, if it isn't then just
* save req in the right slot however if it is then reply here. After the call to
* DFUSE_REPLY_* then no reference is held on either the open file or the inode so
* at that point they could be closed, so decided if ev needs to be released whilst
* holding the lock and keep a copy of all data structures needed.
*
* TODO: Implement this in a safe way. A reference count on ev is probably
* required. For now this is safe but will maintain all previously read events
* in memory.
*/
D_MUTEX_LOCK(&rc_lock);
if (cd->complete) {
cd->done[slot] = true;
ev = ev->de_iov.iov_buf;
} else {
cd->reqs[slot] = req;
cd->ohs[slot] = oh;
}
D_MUTEX_UNLOCK(&rc_lock);

if (ev) {
if (cd->rc != 0) {
DFUSE_REPLY_ERR_RAW(oh, req, cd->rc);
} else {
DFUSE_TRA_DEBUG(oh, "%#zx-%#zx read", position,
position + K128 - 1);
DFUSE_REPLY_BUFQ(oh, req, ev->de_iov.iov_buf + (slot * K128), K128);
}
}
rcb = true;
}

return rcb;

err:
D_MUTEX_UNLOCK(&rc_lock);
return false;
}

void
dfuse_cb_read(fuse_req_t req, fuse_ino_t ino, size_t len, off_t position, struct fuse_file_info *fi)
{
Expand All @@ -109,7 +412,6 @@ dfuse_cb_read(fuse_req_t req, fuse_ino_t ino, size_t len, off_t position, struct
struct dfuse_eq *eqt;
int rc;
struct dfuse_event *ev;
uint64_t eqt_idx;

DFUSE_IE_STAT_ADD(oh->doh_ie, DS_READ);

Expand Down Expand Up @@ -146,8 +448,10 @@ dfuse_cb_read(fuse_req_t req, fuse_ino_t ino, size_t len, off_t position, struct
}
}

eqt_idx = atomic_fetch_add_relaxed(&dfuse_info->di_eqt_idx, 1);
eqt = &dfuse_info->di_eqt[eqt_idx % dfuse_info->di_eq_count];
if (oh->doh_ie->ie_dfs->dfc_data_timeout != 0 && chunk_read(req, len, position, oh))
return;

eqt = pick_eqt(dfuse_info);

ev = d_slab_acquire(eqt->de_read_slab);
if (ev == NULL)
Expand Down Expand Up @@ -242,13 +546,10 @@ dfuse_pre_read(struct dfuse_info *dfuse_info, struct dfuse_obj_hdl *oh)
struct dfuse_eq *eqt;
int rc;
struct dfuse_event *ev;
uint64_t eqt_idx;
size_t len = oh->doh_ie->ie_stat.st_size;

eqt_idx = atomic_fetch_add_relaxed(&dfuse_info->di_eqt_idx, 1);
eqt = &dfuse_info->di_eqt[eqt_idx % dfuse_info->di_eq_count];

ev = d_slab_acquire(eqt->de_read_slab);
eqt = pick_eqt(dfuse_info);
ev = d_slab_acquire(eqt->de_read_slab);
if (ev == NULL)
D_GOTO(err, rc = ENOMEM);

Expand Down
Loading

0 comments on commit 9886fd4

Please sign in to comment.