Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Embed key into dict entry #541

Merged
merged 22 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,6 @@ static void dbAddInternal(serverDb *db, robj *key, robj *val, int update_if_exis
return;
}
serverAssertWithInfo(NULL, key, de != NULL);
kvstoreDictSetKey(db->keys, slot, de, sdsdup(key->ptr));
initObjectLRUOrLFU(val);
kvstoreDictSetVal(db->keys, slot, de, val);
signalKeyAsReady(db, key, val->type);
Expand Down
58 changes: 38 additions & 20 deletions src/defrag.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
typedef struct defragCtx {
void *privdata;
int slot;
void *aux;
} defragCtx;

typedef struct defragPubSubCtx {
Expand Down Expand Up @@ -75,6 +76,36 @@ void *activeDefragAlloc(void *ptr) {
return newptr;
}

/* This method captures the expiry db dict entry which refers to data stored in keys db dict entry. */
void defragEntryStartCbForKeys(void *ctx, void *oldptr) {
defragCtx *defragctx = (defragCtx *)ctx;
serverDb *db = defragctx->privdata;
sds oldsds = (sds)dictGetKey((dictEntry *)oldptr);
int slot = defragctx->slot;
if (kvstoreDictSize(db->expires, slot)) {
dictEntry *expire_de = kvstoreDictFind(db->expires, slot, oldsds);
defragctx->aux = expire_de;
}
}

/* This method updates the key of expiry db dict entry. The key might be no longer valid
* as it could have been cleaned up during the defrag-realloc of the main dictionary. */
void defragEntryFinishCbForKeys(void *ctx, void *newptr) {
defragCtx *defragctx = (defragCtx *)ctx;
dictEntry *expire_de = (dictEntry *)defragctx->aux;
/* Item doesn't have TTL associated to it. */
if (!expire_de) return;
/* No reallocation happened. */
if (!newptr) {
expire_de = NULL;
return;
}
serverDb *db = defragctx->privdata;
sds newsds = (sds)dictGetKey((dictEntry *)newptr);
int slot = defragctx->slot;
kvstoreDictSetKey(db->expires, slot, expire_de, newsds);
}

/*Defrag helper for sds strings
*
* returns NULL in case the allocation wasn't moved.
Expand Down Expand Up @@ -649,26 +680,11 @@ void defragModule(serverDb *db, dictEntry *kde) {

/* for each key we scan in the main dict, this function will attempt to defrag
* all the various pointers it has. */
void defragKey(defragCtx *ctx, dictEntry *de) {
sds keysds = dictGetKey(de);
robj *newob, *ob;
unsigned char *newzl;
sds newsds;
void defragItem(defragCtx *ctx, dictEntry *de) {
serverDb *db = ctx->privdata;
int slot = ctx->slot;
/* Try to defrag the key name. */
newsds = activeDefragSds(keysds);
if (newsds) {
kvstoreDictSetKey(db->keys, slot, de, newsds);
if (kvstoreDictSize(db->expires, slot)) {
/* We can't search in db->expires for that key after we've released
* the pointer it holds, since it won't be able to do the string
* compare, but we can find the entry using key hash and pointer. */
uint64_t hash = kvstoreGetHash(db->expires, newsds);
dictEntry *expire_de = kvstoreDictFindEntryByPtrAndHash(db->expires, slot, keysds, hash);
if (expire_de) kvstoreDictSetKey(db->expires, slot, expire_de, newsds);
}
}
robj *newob, *ob;
unsigned char *newzl;

/* Try to defrag robj and / or string value. */
ob = dictGetVal(de);
Expand Down Expand Up @@ -724,7 +740,7 @@ void defragKey(defragCtx *ctx, dictEntry *de) {
/* Defrag scan callback for the main db dictionary. */
void defragScanCallback(void *privdata, const dictEntry *de) {
long long hits_before = server.stat_active_defrag_hits;
defragKey((defragCtx *)privdata, (dictEntry *)de);
defragItem((defragCtx *)privdata, (dictEntry *)de);
hpatro marked this conversation as resolved.
Show resolved Hide resolved
if (server.stat_active_defrag_hits != hits_before)
server.stat_active_defrag_key_hits++;
else
Expand Down Expand Up @@ -984,7 +1000,9 @@ void activeDefragCycle(void) {
endtime = start + timelimit;
latencyStartMonitor(latency);

dictDefragFunctions defragfns = {.defragAlloc = activeDefragAlloc};
dictDefragFunctions defragfns = {.defragAlloc = activeDefragAlloc,
.defragEntryStartCb = defragEntryStartCbForKeys,
.defragEntryFinishCb = defragEntryFinishCbForKeys};
do {
/* if we're not continuing a scan from the last call or loop, start a new one */
if (!defrag_stage && !defrag_cursor && (slot < 0)) {
Expand Down
102 changes: 71 additions & 31 deletions src/dict.c
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,17 @@ struct dictEntry {
struct dictEntry *next; /* Next entry in the same hash bucket. */
};

typedef struct {
union {
void *val;
uint64_t u64;
int64_t s64;
double d;
} v;
struct dictEntry *next; /* Next entry in the same hash bucket. */
unsigned char data[];
} embeddedDictEntry;

hpatro marked this conversation as resolved.
Show resolved Hide resolved
typedef struct {
void *key;
dictEntry *next;
Expand Down Expand Up @@ -124,6 +135,7 @@ uint64_t dictGenCaseHashFunction(const unsigned char *buf, size_t len) {
#define ENTRY_PTR_MASK 7 /* 111 */
#define ENTRY_PTR_NORMAL 0 /* 000 */
#define ENTRY_PTR_NO_VALUE 2 /* 010 */
#define ENTRY_PTR_EMBEDDED 4 /* 100 */
zuiderkwast marked this conversation as resolved.
Show resolved Hide resolved

/* Returns 1 if the entry pointer is a pointer to a key, rather than to an
* allocated entry. Returns 0 otherwise. */
Expand All @@ -143,6 +155,11 @@ static inline int entryIsNoValue(const dictEntry *de) {
return ((uintptr_t)(void *)de & ENTRY_PTR_MASK) == ENTRY_PTR_NO_VALUE;
}


static inline int entryIsEmbedded(const dictEntry *de) {
return ((uintptr_t)(void *)de & ENTRY_PTR_MASK) == ENTRY_PTR_EMBEDDED;
}

/* Creates an entry without a value field. */
static inline dictEntry *createEntryNoValue(void *key, dictEntry *next) {
dictEntryNoValue *entry = zmalloc(sizeof(*entry));
Expand All @@ -151,6 +168,15 @@ static inline dictEntry *createEntryNoValue(void *key, dictEntry *next) {
return (dictEntry *)(void *)((uintptr_t)(void *)entry | ENTRY_PTR_NO_VALUE);
}

static inline dictEntry *createEmbeddedEntry(void *key, dictEntry *next, dictType *dt) {
size_t keylen = dt->keyLen(key);
embeddedDictEntry *entry = zmalloc(sizeof(*entry) + keylen + ENTRY_METADATA_BYTES);
size_t bytes_written = dt->keyToBytes(entry->data + ENTRY_METADATA_BYTES, key, &entry->data[0]);
assert(bytes_written == keylen);
hpatro marked this conversation as resolved.
Show resolved Hide resolved
entry->next = next;
return (dictEntry *)(void *)((uintptr_t)(void *)entry | ENTRY_PTR_EMBEDDED);
madolson marked this conversation as resolved.
Show resolved Hide resolved
}

static inline dictEntry *encodeMaskedPtr(const void *ptr, unsigned int bits) {
assert(((uintptr_t)ptr & ENTRY_PTR_MASK) == 0);
return (dictEntry *)(void *)((uintptr_t)ptr | bits);
Expand All @@ -161,15 +187,24 @@ static inline void *decodeMaskedPtr(const dictEntry *de) {
return (void *)((uintptr_t)(void *)de & ~ENTRY_PTR_MASK);
}

static inline void *getEmbeddedKey(const dictEntry *de) {
embeddedDictEntry *entry = (embeddedDictEntry *)decodeMaskedPtr(de);
return &entry->data[ENTRY_METADATA_BYTES + entry->data[0]];
hpatro marked this conversation as resolved.
Show resolved Hide resolved
}

/* Decodes the pointer to an entry without value, when you know it is an entry
* without value. Hint: Use entryIsNoValue to check. */
static inline dictEntryNoValue *decodeEntryNoValue(const dictEntry *de) {
return decodeMaskedPtr(de);
}

static inline embeddedDictEntry *decodeEmbeddedEntry(const dictEntry *de) {
return decodeMaskedPtr(de);
}

/* Returns 1 if the entry has a value field and 0 otherwise. */
static inline int entryHasValue(const dictEntry *de) {
return entryIsNormal(de);
return entryIsNormal(de) || entryIsEmbedded(de);
hpatro marked this conversation as resolved.
Show resolved Hide resolved
}

/* ----------------------------- API implementation ------------------------- */
Expand Down Expand Up @@ -509,6 +544,8 @@ dictEntry *dictInsertAtPosition(dict *d, void *key, void *position) {
/* Allocate an entry without value. */
entry = createEntryNoValue(key, *bucket);
}
} else if (d->type->embedded_entry) {
entry = createEmbeddedEntry(key, *bucket, d->type);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way dictEntry is used has no type safety. Admittedly, this is not a new issue but the addition of embeddedDictEntry is making it worse. The following code path looks problematic to me. can you please double check?

setGenericComamd() -> setKey() -> dbAdd() -> dbAddInsternal() -> kvstoreDictSetKey() -> dictSetKey()

I don't seedictSetKey getting patched to handle this new embedded dict entry.

I would suggest making dictEntry an opaque struct next and force every function go through an inline accessor function/macro. I think this is the only certain way to ensure we don't accidentally use the wrong data type.

Copy link
Member

@madolson madolson Jun 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't the dictEntry already opaque?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not in dict.c.

Copy link
Member

@madolson madolson Jun 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I don't agree it should be opaque in dict.c. It seems like a very small number of actual touch points we actually have to make sure are correct. We could even reduce the number. I'm not sure making it opaque will help all that much.

Discussed offline, Ping has a separate proposal for adding guardrails that he will publish.

hpatro marked this conversation as resolved.
Show resolved Hide resolved
} else {
/* Allocate the memory and store the new entry.
* Insert the element in top, with the assumption that in a database
Expand Down Expand Up @@ -656,6 +693,9 @@ void dictFreeUnlinkedEntry(dict *d, dictEntry *he) {
if (he == NULL) return;
dictFreeKey(d, he);
dictFreeVal(d, he);
/* Clear the embedded data */
if (entryIsEmbedded(he)) zfree(decodeEmbeddedEntry(he)->data);
hpatro marked this conversation as resolved.
Show resolved Hide resolved
/* Clear the dictEntry */
if (!entryIsKey(he)) zfree(decodeMaskedPtr(he));
}

Expand Down Expand Up @@ -801,7 +841,12 @@ void dictSetKey(dict *d, dictEntry *de, void *key) {

void dictSetVal(dict *d, dictEntry *de, void *val) {
assert(entryHasValue(de));
de->v.val = d->type->valDup ? d->type->valDup(d, val) : val;
void *v = d->type->valDup ? d->type->valDup(d, val) : val;
if (entryIsEmbedded(de)) {
decodeEmbeddedEntry(de)->v.val = v;
} else {
de->v.val = v;
}
}

void dictSetSignedIntegerVal(dictEntry *de, int64_t val) {
Expand Down Expand Up @@ -837,11 +882,15 @@ double dictIncrDoubleVal(dictEntry *de, double val) {
void *dictGetKey(const dictEntry *de) {
if (entryIsKey(de)) return (void *)de;
if (entryIsNoValue(de)) return decodeEntryNoValue(de)->key;
if (entryIsEmbedded(de)) return getEmbeddedKey(de);
return de->key;
}

void *dictGetVal(const dictEntry *de) {
assert(entryHasValue(de));
if (entryIsEmbedded(de)) {
return decodeEmbeddedEntry(de)->v.val;
}
return de->v.val;
}

Expand Down Expand Up @@ -871,6 +920,7 @@ double *dictGetDoubleValPtr(dictEntry *de) {
static dictEntry *dictGetNext(const dictEntry *de) {
if (entryIsKey(de)) return NULL; /* there's no next */
if (entryIsNoValue(de)) return decodeEntryNoValue(de)->next;
if (entryIsEmbedded(de)) return decodeEmbeddedEntry(de)->next;
return de->next;
}

Expand All @@ -879,14 +929,16 @@ static dictEntry *dictGetNext(const dictEntry *de) {
static dictEntry **dictGetNextRef(dictEntry *de) {
if (entryIsKey(de)) return NULL;
if (entryIsNoValue(de)) return &decodeEntryNoValue(de)->next;
if (entryIsEmbedded(de)) return &decodeEmbeddedEntry(de)->next;
return &de->next;
}

static void dictSetNext(dictEntry *de, dictEntry *next) {
assert(!entryIsKey(de));
if (entryIsNoValue(de)) {
dictEntryNoValue *entry = decodeEntryNoValue(de);
entry->next = next;
decodeEntryNoValue(de)->next = next;
} else if (entryIsEmbedded(de)) {
decodeEmbeddedEntry(de)->next = next;
} else {
de->next = next;
}
Expand Down Expand Up @@ -1164,7 +1216,7 @@ unsigned int dictGetSomeKeys(dict *d, dictEntry **des, unsigned int count) {

/* Reallocate the dictEntry, key and value allocations in a bucket using the
* provided allocation functions in order to defrag them. */
static void dictDefragBucket(dictEntry **bucketref, dictDefragFunctions *defragfns) {
static void dictDefragBucket(dictEntry **bucketref, dictDefragFunctions *defragfns, void *privdata) {
dictDefragAllocFunction *defragalloc = defragfns->defragAlloc;
dictDefragAllocFunction *defragkey = defragfns->defragKey;
dictDefragAllocFunction *defragval = defragfns->defragVal;
Expand All @@ -1182,6 +1234,17 @@ static void dictDefragBucket(dictEntry **bucketref, dictDefragFunctions *defragf
entry = newentry;
}
if (newkey) entry->key = newkey;
} else if (entryIsEmbedded(de)) {
defragfns->defragEntryStartCb(privdata, de);
embeddedDictEntry *entry = decodeEmbeddedEntry(de), *newentry;
if ((newentry = defragalloc(entry))) {
newde = encodeMaskedPtr(newentry, ENTRY_PTR_EMBEDDED);
entry = newentry;
defragfns->defragEntryFinishCb(privdata, newde);
} else {
defragfns->defragEntryFinishCb(privdata, NULL);
}
if (newval) entry->v.val = newval;
} else {
assert(entryIsNormal(de));
newde = defragalloc(de);
Expand Down Expand Up @@ -1345,7 +1408,7 @@ dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, dictDefragFunctio

/* Emit entries at cursor */
if (defragfns) {
dictDefragBucket(&d->ht_table[htidx0][v & m0], defragfns);
dictDefragBucket(&d->ht_table[htidx0][v & m0], defragfns, privdata);
}
de = d->ht_table[htidx0][v & m0];
while (de) {
Expand Down Expand Up @@ -1378,7 +1441,7 @@ dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, dictDefragFunctio

/* Emit entries at cursor */
if (defragfns) {
dictDefragBucket(&d->ht_table[htidx0][v & m0], defragfns);
dictDefragBucket(&d->ht_table[htidx0][v & m0], defragfns, privdata);
}
de = d->ht_table[htidx0][v & m0];
while (de) {
Expand All @@ -1392,7 +1455,7 @@ dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, dictDefragFunctio
do {
/* Emit entries at cursor */
if (defragfns) {
dictDefragBucket(&d->ht_table[htidx1][v & m1], defragfns);
dictDefragBucket(&d->ht_table[htidx1][v & m1], defragfns, privdata);
}
de = d->ht_table[htidx1][v & m1];
while (de) {
Expand Down Expand Up @@ -1565,29 +1628,6 @@ uint64_t dictGetHash(dict *d, const void *key) {
return dictHashKey(d, key);
}

/* Finds the dictEntry using pointer and pre-calculated hash.
* oldkey is a dead pointer and should not be accessed.
* the hash value should be provided using dictGetHash.
* no string / key comparison is performed.
* return value is a pointer to the dictEntry if found, or NULL if not found. */
dictEntry *dictFindEntryByPtrAndHash(dict *d, const void *oldptr, uint64_t hash) {
dictEntry *he;
unsigned long idx, table;

if (dictSize(d) == 0) return NULL; /* dict is empty */
for (table = 0; table <= 1; table++) {
idx = hash & DICTHT_SIZE_MASK(d->ht_size_exp[table]);
if (table == 0 && (long)idx < d->rehashidx) continue;
he = d->ht_table[table][idx];
while (he) {
if (oldptr == dictGetKey(he)) return he;
he = dictGetNext(he);
}
if (!dictIsRehashing(d)) return NULL;
}
return NULL;
}

/* Provides the old and new ht size for a given dictionary during rehashing. This method
* should only be invoked during initialization/rehashing. */
void dictRehashingInfo(dict *d, unsigned long long *from_size, unsigned long long *to_size) {
Expand Down
16 changes: 12 additions & 4 deletions src/dict.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ typedef struct dictType {
/* Allow a dict to carry extra caller-defined metadata. The
* extra memory is initialized to 0 when a dict is allocated. */
size_t (*dictMetadataBytes)(dict *d);
size_t (*keyLen)(const void *key);
size_t (*keyToBytes)(unsigned char *buf, const void *key, unsigned char *header_size);


/* Data */
void *userdata;
Expand All @@ -83,6 +86,7 @@ typedef struct dictType {
unsigned int keys_are_odd : 1;
/* TODO: Add a 'keys_are_even' flag and use a similar optimization if that
hpatro marked this conversation as resolved.
Show resolved Hide resolved
* flag is set. */
unsigned int embedded_entry : 1;
zuiderkwast marked this conversation as resolved.
Show resolved Hide resolved
} dictType;

#define DICTHT_SIZE(exp) ((exp) == -1 ? 0 : (unsigned long)1 << (exp))
Expand Down Expand Up @@ -128,12 +132,17 @@ typedef struct dictStats {

typedef void(dictScanFunction)(void *privdata, const dictEntry *de);
typedef void *(dictDefragAllocFunction)(void *ptr);
typedef void(dictDefragEntryCb)(void *privdata, void *ptr);
typedef struct {
dictDefragAllocFunction *defragAlloc; /* Used for entries etc. */
dictDefragAllocFunction *defragKey; /* Defrag-realloc keys (optional) */
dictDefragAllocFunction *defragVal; /* Defrag-realloc values (optional) */
dictDefragAllocFunction *defragAlloc; /* Used for entries etc. */
dictDefragAllocFunction *defragKey; /* Defrag-realloc keys (optional) */
dictDefragAllocFunction *defragVal; /* Defrag-realloc values (optional) */
dictDefragEntryCb *defragEntryStartCb; /* Callback invoked prior to the start of defrag of dictEntry. */
dictDefragEntryCb *defragEntryFinishCb; /* Callback invoked after the defrag of dictEntry is tried. */
} dictDefragFunctions;

static const int ENTRY_METADATA_BYTES = 1;

/* This is the initial size of every hash table */
#define DICT_HT_INITIAL_EXP 2
#define DICT_HT_INITIAL_SIZE (1 << (DICT_HT_INITIAL_EXP))
Expand Down Expand Up @@ -237,7 +246,6 @@ unsigned long dictScan(dict *d, unsigned long v, dictScanFunction *fn, void *pri
unsigned long
dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, dictDefragFunctions *defragfns, void *privdata);
uint64_t dictGetHash(dict *d, const void *key);
dictEntry *dictFindEntryByPtrAndHash(dict *d, const void *oldptr, uint64_t hash);
void dictRehashingInfo(dict *d, unsigned long long *from_size, unsigned long long *to_size);

size_t dictGetStatsMsg(char *buf, size_t bufsize, dictStats *stats, int full);
Expand Down
Loading
Loading