Skip to content

Commit

Permalink
Embed key into dict entry
Browse files Browse the repository at this point in the history
Signed-off-by: Harkrishn Patro <harkrisp@amazon.com>
  • Loading branch information
hpatro committed May 23, 2024
1 parent a0aebb6 commit 2610832
Show file tree
Hide file tree
Showing 12 changed files with 154 additions and 76 deletions.
1 change: 0 additions & 1 deletion src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,6 @@ static void dbAddInternal(serverDb *db, robj *key, robj *val, int update_if_exis
return;
}
serverAssertWithInfo(NULL, key, de != NULL);
kvstoreDictSetKey(db->keys, slot, de, sdsdup(key->ptr));
initObjectLRUOrLFU(val);
kvstoreDictSetVal(db->keys, slot, de, val);
signalKeyAsReady(db, key, val->type);
Expand Down
58 changes: 38 additions & 20 deletions src/defrag.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
typedef struct defragCtx {
void *privdata;
int slot;
void *aux;
} defragCtx;

typedef struct defragPubSubCtx {
Expand Down Expand Up @@ -75,6 +76,36 @@ void *activeDefragAlloc(void *ptr) {
return newptr;
}

/* This method captures the expiry db dict entry which refers to data stored in keys db dict entry. */
void defragEntryStartCbForKeys(void *ctx, void *oldptr) {
defragCtx *defragctx = (defragCtx *)ctx;
serverDb *db = defragctx->privdata;
sds oldsds = (sds)dictGetKey((dictEntry *)oldptr);
int slot = defragctx->slot;
if (kvstoreDictSize(db->expires, slot)) {
dictEntry *expire_de = kvstoreDictFind(db->expires, slot, oldsds);
defragctx->aux = expire_de;
}
}

/* This method updates the key of expiry db dict entry. The key might be no longer valid
* as it could have been cleaned up during the defrag-realloc of the main dictionary. */
void defragEntryFinishCbForKeys(void *ctx, void *newptr) {
defragCtx *defragctx = (defragCtx *)ctx;
dictEntry *expire_de = (dictEntry *)defragctx->aux;
/* Item doesn't have TTL associated to it. */
if (!expire_de) return;
/* No reallocation happened. */
if (!newptr) {
expire_de = NULL;
return;
}
serverDb *db = defragctx->privdata;
sds newsds = (sds)dictGetKey((dictEntry *)newptr);
int slot = defragctx->slot;
kvstoreDictSetKey(db->expires, slot, expire_de, newsds);
}

/*Defrag helper for sds strings
*
* returns NULL in case the allocation wasn't moved.
Expand Down Expand Up @@ -649,26 +680,11 @@ void defragModule(serverDb *db, dictEntry *kde) {

/* for each key we scan in the main dict, this function will attempt to defrag
* all the various pointers it has. */
void defragKey(defragCtx *ctx, dictEntry *de) {
sds keysds = dictGetKey(de);
robj *newob, *ob;
unsigned char *newzl;
sds newsds;
void defragItem(defragCtx *ctx, dictEntry *de) {
serverDb *db = ctx->privdata;
int slot = ctx->slot;
/* Try to defrag the key name. */
newsds = activeDefragSds(keysds);
if (newsds) {
kvstoreDictSetKey(db->keys, slot, de, newsds);
if (kvstoreDictSize(db->expires, slot)) {
/* We can't search in db->expires for that key after we've released
* the pointer it holds, since it won't be able to do the string
* compare, but we can find the entry using key hash and pointer. */
uint64_t hash = kvstoreGetHash(db->expires, newsds);
dictEntry *expire_de = kvstoreDictFindEntryByPtrAndHash(db->expires, slot, keysds, hash);
if (expire_de) kvstoreDictSetKey(db->expires, slot, expire_de, newsds);
}
}
robj *newob, *ob;
unsigned char *newzl;

/* Try to defrag robj and / or string value. */
ob = dictGetVal(de);
Expand Down Expand Up @@ -724,7 +740,7 @@ void defragKey(defragCtx *ctx, dictEntry *de) {
/* Defrag scan callback for the main db dictionary. */
void defragScanCallback(void *privdata, const dictEntry *de) {
long long hits_before = server.stat_active_defrag_hits;
defragKey((defragCtx *)privdata, (dictEntry *)de);
defragItem((defragCtx *)privdata, (dictEntry *)de);
if (server.stat_active_defrag_hits != hits_before)
server.stat_active_defrag_key_hits++;
else
Expand Down Expand Up @@ -984,7 +1000,9 @@ void activeDefragCycle(void) {
endtime = start + timelimit;
latencyStartMonitor(latency);

dictDefragFunctions defragfns = {.defragAlloc = activeDefragAlloc};
dictDefragFunctions defragfns = {.defragAlloc = activeDefragAlloc,
.defragEntryStartCb = defragEntryStartCbForKeys,
.defragEntryFinishCb = defragEntryFinishCbForKeys};
do {
/* if we're not continuing a scan from the last call or loop, start a new one */
if (!defrag_stage && !defrag_cursor && (slot < 0)) {
Expand Down
102 changes: 71 additions & 31 deletions src/dict.c
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,17 @@ struct dictEntry {
struct dictEntry *next; /* Next entry in the same hash bucket. */
};

typedef struct {
union {
void *val;
uint64_t u64;
int64_t s64;
double d;
} v;
struct dictEntry *next; /* Next entry in the same hash bucket. */
unsigned char data[];
} embeddedDictEntry;

typedef struct {
void *key;
dictEntry *next;
Expand Down Expand Up @@ -124,6 +135,7 @@ uint64_t dictGenCaseHashFunction(const unsigned char *buf, size_t len) {
#define ENTRY_PTR_MASK 7 /* 111 */
#define ENTRY_PTR_NORMAL 0 /* 000 */
#define ENTRY_PTR_NO_VALUE 2 /* 010 */
#define ENTRY_PTR_EMBEDDED 4 /* 100 */

/* Returns 1 if the entry pointer is a pointer to a key, rather than to an
* allocated entry. Returns 0 otherwise. */
Expand All @@ -143,6 +155,11 @@ static inline int entryIsNoValue(const dictEntry *de) {
return ((uintptr_t)(void *)de & ENTRY_PTR_MASK) == ENTRY_PTR_NO_VALUE;
}


static inline int entryIsEmbedded(const dictEntry *de) {
return ((uintptr_t)(void *)de & ENTRY_PTR_MASK) == ENTRY_PTR_EMBEDDED;
}

/* Creates an entry without a value field. */
static inline dictEntry *createEntryNoValue(void *key, dictEntry *next) {
dictEntryNoValue *entry = zmalloc(sizeof(*entry));
Expand All @@ -151,6 +168,15 @@ static inline dictEntry *createEntryNoValue(void *key, dictEntry *next) {
return (dictEntry *)(void *)((uintptr_t)(void *)entry | ENTRY_PTR_NO_VALUE);
}

static inline dictEntry *createEmbeddedEntry(void *key, dictEntry *next, dictType *dt) {
size_t keylen = dt->keyLen(key);
embeddedDictEntry *entry = zmalloc(sizeof(*entry) + keylen + ENTRY_METADATA_BYTES);
size_t bytes_written = dt->keyToBytes(entry->data + ENTRY_METADATA_BYTES, key, &entry->data[0]);
assert(bytes_written == keylen);
entry->next = next;
return (dictEntry *)(void *)((uintptr_t)(void *)entry | ENTRY_PTR_EMBEDDED);
}

static inline dictEntry *encodeMaskedPtr(const void *ptr, unsigned int bits) {
assert(((uintptr_t)ptr & ENTRY_PTR_MASK) == 0);
return (dictEntry *)(void *)((uintptr_t)ptr | bits);
Expand All @@ -161,15 +187,24 @@ static inline void *decodeMaskedPtr(const dictEntry *de) {
return (void *)((uintptr_t)(void *)de & ~ENTRY_PTR_MASK);
}

static inline void *getEmbeddedKey(const dictEntry *de) {
embeddedDictEntry *entry = (embeddedDictEntry *)decodeMaskedPtr(de);
return &entry->data[ENTRY_METADATA_BYTES + entry->data[0]];
}

/* Decodes the pointer to an entry without value, when you know it is an entry
* without value. Hint: Use entryIsNoValue to check. */
static inline dictEntryNoValue *decodeEntryNoValue(const dictEntry *de) {
return decodeMaskedPtr(de);
}

static inline embeddedDictEntry *decodeEmbeddedEntry(const dictEntry *de) {
return decodeMaskedPtr(de);
}

/* Returns 1 if the entry has a value field and 0 otherwise. */
static inline int entryHasValue(const dictEntry *de) {
return entryIsNormal(de);
return entryIsNormal(de) || entryIsEmbedded(de);
}

/* ----------------------------- API implementation ------------------------- */
Expand Down Expand Up @@ -509,6 +544,8 @@ dictEntry *dictInsertAtPosition(dict *d, void *key, void *position) {
/* Allocate an entry without value. */
entry = createEntryNoValue(key, *bucket);
}
} else if (d->type->embedded_entry) {
entry = createEmbeddedEntry(key, *bucket, d->type);
} else {
/* Allocate the memory and store the new entry.
* Insert the element in top, with the assumption that in a database
Expand Down Expand Up @@ -656,6 +693,9 @@ void dictFreeUnlinkedEntry(dict *d, dictEntry *he) {
if (he == NULL) return;
dictFreeKey(d, he);
dictFreeVal(d, he);
/* Clear the embedded data */
if (entryIsEmbedded(he)) zfree(decodeEmbeddedEntry(he)->data);
/* Clear the dictEntry */
if (!entryIsKey(he)) zfree(decodeMaskedPtr(he));
}

Expand Down Expand Up @@ -801,7 +841,12 @@ void dictSetKey(dict *d, dictEntry *de, void *key) {

void dictSetVal(dict *d, dictEntry *de, void *val) {
assert(entryHasValue(de));
de->v.val = d->type->valDup ? d->type->valDup(d, val) : val;
void *v = d->type->valDup ? d->type->valDup(d, val) : val;
if (entryIsEmbedded(de)) {
decodeEmbeddedEntry(de)->v.val = v;
} else {
de->v.val = v;
}
}

void dictSetSignedIntegerVal(dictEntry *de, int64_t val) {
Expand Down Expand Up @@ -837,11 +882,15 @@ double dictIncrDoubleVal(dictEntry *de, double val) {
void *dictGetKey(const dictEntry *de) {
if (entryIsKey(de)) return (void *)de;
if (entryIsNoValue(de)) return decodeEntryNoValue(de)->key;
if (entryIsEmbedded(de)) return getEmbeddedKey(de);
return de->key;
}

void *dictGetVal(const dictEntry *de) {
assert(entryHasValue(de));
if (entryIsEmbedded(de)) {
return decodeEmbeddedEntry(de)->v.val;
}
return de->v.val;
}

Expand Down Expand Up @@ -871,6 +920,7 @@ double *dictGetDoubleValPtr(dictEntry *de) {
static dictEntry *dictGetNext(const dictEntry *de) {
if (entryIsKey(de)) return NULL; /* there's no next */
if (entryIsNoValue(de)) return decodeEntryNoValue(de)->next;
if (entryIsEmbedded(de)) return decodeEmbeddedEntry(de)->next;
return de->next;
}

Expand All @@ -879,14 +929,16 @@ static dictEntry *dictGetNext(const dictEntry *de) {
static dictEntry **dictGetNextRef(dictEntry *de) {
if (entryIsKey(de)) return NULL;
if (entryIsNoValue(de)) return &decodeEntryNoValue(de)->next;
if (entryIsEmbedded(de)) return &decodeEmbeddedEntry(de)->next;
return &de->next;
}

static void dictSetNext(dictEntry *de, dictEntry *next) {
assert(!entryIsKey(de));
if (entryIsNoValue(de)) {
dictEntryNoValue *entry = decodeEntryNoValue(de);
entry->next = next;
decodeEntryNoValue(de)->next = next;
} else if (entryIsEmbedded(de)) {
decodeEmbeddedEntry(de)->next = next;
} else {
de->next = next;
}
Expand Down Expand Up @@ -1164,7 +1216,7 @@ unsigned int dictGetSomeKeys(dict *d, dictEntry **des, unsigned int count) {

/* Reallocate the dictEntry, key and value allocations in a bucket using the
* provided allocation functions in order to defrag them. */
static void dictDefragBucket(dictEntry **bucketref, dictDefragFunctions *defragfns) {
static void dictDefragBucket(dictEntry **bucketref, dictDefragFunctions *defragfns, void *privdata) {
dictDefragAllocFunction *defragalloc = defragfns->defragAlloc;
dictDefragAllocFunction *defragkey = defragfns->defragKey;
dictDefragAllocFunction *defragval = defragfns->defragVal;
Expand All @@ -1182,6 +1234,17 @@ static void dictDefragBucket(dictEntry **bucketref, dictDefragFunctions *defragf
entry = newentry;
}
if (newkey) entry->key = newkey;
} else if (entryIsEmbedded(de)) {
defragfns->defragEntryStartCb(privdata, de);
embeddedDictEntry *entry = decodeEmbeddedEntry(de), *newentry;
if ((newentry = defragalloc(entry))) {
newde = encodeMaskedPtr(newentry, ENTRY_PTR_EMBEDDED);
entry = newentry;
defragfns->defragEntryFinishCb(privdata, newde);
} else {
defragfns->defragEntryFinishCb(privdata, NULL);
}
if (newval) entry->v.val = newval;
} else {
assert(entryIsNormal(de));
newde = defragalloc(de);
Expand Down Expand Up @@ -1345,7 +1408,7 @@ dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, dictDefragFunctio

/* Emit entries at cursor */
if (defragfns) {
dictDefragBucket(&d->ht_table[htidx0][v & m0], defragfns);
dictDefragBucket(&d->ht_table[htidx0][v & m0], defragfns, privdata);
}
de = d->ht_table[htidx0][v & m0];
while (de) {
Expand Down Expand Up @@ -1378,7 +1441,7 @@ dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, dictDefragFunctio

/* Emit entries at cursor */
if (defragfns) {
dictDefragBucket(&d->ht_table[htidx0][v & m0], defragfns);
dictDefragBucket(&d->ht_table[htidx0][v & m0], defragfns, privdata);
}
de = d->ht_table[htidx0][v & m0];
while (de) {
Expand All @@ -1392,7 +1455,7 @@ dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, dictDefragFunctio
do {
/* Emit entries at cursor */
if (defragfns) {
dictDefragBucket(&d->ht_table[htidx1][v & m1], defragfns);
dictDefragBucket(&d->ht_table[htidx1][v & m1], defragfns, privdata);
}
de = d->ht_table[htidx1][v & m1];
while (de) {
Expand Down Expand Up @@ -1565,29 +1628,6 @@ uint64_t dictGetHash(dict *d, const void *key) {
return dictHashKey(d, key);
}

/* Finds the dictEntry using pointer and pre-calculated hash.
* oldkey is a dead pointer and should not be accessed.
* the hash value should be provided using dictGetHash.
* no string / key comparison is performed.
* return value is a pointer to the dictEntry if found, or NULL if not found. */
dictEntry *dictFindEntryByPtrAndHash(dict *d, const void *oldptr, uint64_t hash) {
dictEntry *he;
unsigned long idx, table;

if (dictSize(d) == 0) return NULL; /* dict is empty */
for (table = 0; table <= 1; table++) {
idx = hash & DICTHT_SIZE_MASK(d->ht_size_exp[table]);
if (table == 0 && (long)idx < d->rehashidx) continue;
he = d->ht_table[table][idx];
while (he) {
if (oldptr == dictGetKey(he)) return he;
he = dictGetNext(he);
}
if (!dictIsRehashing(d)) return NULL;
}
return NULL;
}

/* Provides the old and new ht size for a given dictionary during rehashing. This method
* should only be invoked during initialization/rehashing. */
void dictRehashingInfo(dict *d, unsigned long long *from_size, unsigned long long *to_size) {
Expand Down
16 changes: 12 additions & 4 deletions src/dict.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ typedef struct dictType {
/* Allow a dict to carry extra caller-defined metadata. The
* extra memory is initialized to 0 when a dict is allocated. */
size_t (*dictMetadataBytes)(dict *d);
size_t (*keyLen)(const void *key);
size_t (*keyToBytes)(unsigned char *buf, const void *key, unsigned char *header_size);


/* Data */
void *userdata;
Expand All @@ -83,6 +86,7 @@ typedef struct dictType {
unsigned int keys_are_odd : 1;
/* TODO: Add a 'keys_are_even' flag and use a similar optimization if that
* flag is set. */
unsigned int embedded_entry : 1;
} dictType;

#define DICTHT_SIZE(exp) ((exp) == -1 ? 0 : (unsigned long)1 << (exp))
Expand Down Expand Up @@ -128,12 +132,17 @@ typedef struct dictStats {

typedef void(dictScanFunction)(void *privdata, const dictEntry *de);
typedef void *(dictDefragAllocFunction)(void *ptr);
typedef void(dictDefragEntryCb)(void *privdata, void *ptr);
typedef struct {
dictDefragAllocFunction *defragAlloc; /* Used for entries etc. */
dictDefragAllocFunction *defragKey; /* Defrag-realloc keys (optional) */
dictDefragAllocFunction *defragVal; /* Defrag-realloc values (optional) */
dictDefragAllocFunction *defragAlloc; /* Used for entries etc. */
dictDefragAllocFunction *defragKey; /* Defrag-realloc keys (optional) */
dictDefragAllocFunction *defragVal; /* Defrag-realloc values (optional) */
dictDefragEntryCb *defragEntryStartCb; /* Callback invoked prior to the start of defrag of dictEntry. */
dictDefragEntryCb *defragEntryFinishCb; /* Callback invoked after the defrag of dictEntry is tried. */
} dictDefragFunctions;

static const int ENTRY_METADATA_BYTES = 1;

/* This is the initial size of every hash table */
#define DICT_HT_INITIAL_EXP 2
#define DICT_HT_INITIAL_SIZE (1 << (DICT_HT_INITIAL_EXP))
Expand Down Expand Up @@ -237,7 +246,6 @@ unsigned long dictScan(dict *d, unsigned long v, dictScanFunction *fn, void *pri
unsigned long
dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, dictDefragFunctions *defragfns, void *privdata);
uint64_t dictGetHash(dict *d, const void *key);
dictEntry *dictFindEntryByPtrAndHash(dict *d, const void *oldptr, uint64_t hash);
void dictRehashingInfo(dict *d, unsigned long long *from_size, unsigned long long *to_size);

size_t dictGetStatsMsg(char *buf, size_t bufsize, dictStats *stats, int full);
Expand Down
Loading

0 comments on commit 2610832

Please sign in to comment.