Skip to content

Commit

Permalink
Replace dict with hashtable in kvstore and db
Browse files Browse the repository at this point in the history
* Add functions for embedding key and expire in robj (object.c)
  * When there's unused space, reserve an expire field to avoid realloting
    it later if expire is added.
  * Always reserve space for expire for large key names to avoid realloc.
* Update db functions (db.c)
  * dbAdd, setKey and setExpire reallocate the object when embedding key
  * setKey does not increment the reference counter, since it would require
    duplicating the object
* Remove code for shared integer objects. Keys are embedded in the objects,
  so all objects in the database need to be unique. Thus, we can't use shared
  objects as values.
  * Delete test cases about shared integers
* Adjust various commands
* Adjust defrag code
  * Improvement: Don't access the expires table before defrag has actually
    reallocated the object.
* Adjust test cases
* Adjust memory prefetch for new hash table implementation in
  IO-threading, using new `hashtableIncrementalFind` API
* Object free to be done in main-thread while keeping obj->ptr offloading in
  IO-thread since the DB object is now allocated by the main-thread and not by
  the io-thread as it used to be.
* Pass optional value to expireIfNeeded, to avoid looking up the expires table.

---------

Signed-off-by: Uri Yagelnik <uriy@amazon.com>
Signed-off-by: uriyage <78144248+uriyage@users.noreply.github.com>
Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
Co-authored-by: Uri Yagelnik <uriy@amazon.com>
  • Loading branch information
zuiderkwast and uriyage committed Nov 27, 2024
1 parent ac7f458 commit 1f6ca93
Show file tree
Hide file tree
Showing 42 changed files with 1,698 additions and 1,409 deletions.
12 changes: 6 additions & 6 deletions src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -2190,7 +2190,6 @@ static int rewriteFunctions(rio *aof) {
}

int rewriteAppendOnlyFileRio(rio *aof) {
dictEntry *de;
int j;
long key_count = 0;
long long updated_time = 0;
Expand Down Expand Up @@ -2219,17 +2218,18 @@ int rewriteAppendOnlyFileRio(rio *aof) {

kvs_it = kvstoreIteratorInit(db->keys);
/* Iterate this DB writing every entry */
while ((de = kvstoreIteratorNext(kvs_it)) != NULL) {
void *next;
while (kvstoreIteratorNext(kvs_it, &next)) {
robj *o = next;
sds keystr;
robj key, *o;
robj key;
long long expiretime;
size_t aof_bytes_before_key = aof->processed_bytes;

keystr = dictGetKey(de);
o = dictGetVal(de);
keystr = objectGetKey(o);
initStaticStringObject(key, keystr);

expiretime = getExpire(db, &key);
expiretime = objectGetExpire(o);

/* Save the key and associated value */
if (o->type == OBJ_STRING) {
Expand Down
5 changes: 2 additions & 3 deletions src/bitops.c
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ robj *lookupStringForBitCommand(client *c, uint64_t maxbit, int *dirty) {

if (o == NULL) {
o = createObject(OBJ_STRING, sdsnewlen(NULL, byte + 1));
dbAdd(c->db, c->argv[1], o);
dbAdd(c->db, c->argv[1], &o);
if (dirty) *dirty = 1;
} else {
o = dbUnshareStringValue(c->db, c->argv[1], o);
Expand Down Expand Up @@ -772,9 +772,8 @@ void bitopCommand(client *c) {
/* Store the computed value into the target key */
if (maxlen) {
o = createObject(OBJ_STRING, res);
setKey(c, c->db, targetkey, o, 0);
setKey(c, c->db, targetkey, &o, 0);
notifyKeyspaceEvent(NOTIFY_STRING, "set", targetkey, c->db->id);
decrRefCount(o);
server.dirty++;
} else if (dbDelete(c->db, targetkey)) {
signalModifiedKey(c, c->db, targetkey);
Expand Down
20 changes: 10 additions & 10 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,9 @@ void restoreCommand(client *c) {
}

/* Create the key and set the TTL if any */
dbAdd(c->db, key, obj);
dbAdd(c->db, key, &obj);
if (ttl) {
setExpire(c, c->db, key, ttl);
obj = setExpire(c, c->db, key, ttl);
if (!absttl) {
/* Propagate TTL as absolute timestamp */
robj *ttl_obj = createStringObjectFromLongLong(ttl);
Expand Down Expand Up @@ -811,7 +811,7 @@ static int shouldReturnTlsInfo(void) {
}

unsigned int countKeysInSlot(unsigned int slot) {
return kvstoreDictSize(server.db->keys, slot);
return kvstoreHashtableSize(server.db->keys, slot);
}

void clusterCommandHelp(client *c) {
Expand Down Expand Up @@ -908,16 +908,16 @@ void clusterCommand(client *c) {
unsigned int keys_in_slot = countKeysInSlot(slot);
unsigned int numkeys = maxkeys > keys_in_slot ? keys_in_slot : maxkeys;
addReplyArrayLen(c, numkeys);
kvstoreDictIterator *kvs_di = NULL;
dictEntry *de = NULL;
kvs_di = kvstoreGetDictIterator(server.db->keys, slot);
kvstoreHashtableIterator *kvs_di = NULL;
kvs_di = kvstoreGetHashtableIterator(server.db->keys, slot);
for (unsigned int i = 0; i < numkeys; i++) {
de = kvstoreDictIteratorNext(kvs_di);
serverAssert(de != NULL);
sds sdskey = dictGetKey(de);
void *next;
serverAssert(kvstoreHashtableIteratorNext(kvs_di, &next));
robj *valkey = next;
sds sdskey = objectGetKey(valkey);
addReplyBulkCBuffer(c, sdskey, sdslen(sdskey));
}
kvstoreReleaseDictIterator(kvs_di);
kvstoreReleaseHashtableIterator(kvs_di);
} else if ((!strcasecmp(c->argv[1]->ptr, "slaves") || !strcasecmp(c->argv[1]->ptr, "replicas")) && c->argc == 3) {
/* CLUSTER REPLICAS <NODE ID> */
clusterNode *n = clusterLookupNode(c->argv[2]->ptr, sdslen(c->argv[2]->ptr));
Expand Down
15 changes: 8 additions & 7 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -6123,12 +6123,13 @@ unsigned int delKeysInSlot(unsigned int hashslot) {
server.server_del_keys_in_slot = 1;
unsigned int j = 0;

kvstoreDictIterator *kvs_di = NULL;
dictEntry *de = NULL;
kvs_di = kvstoreGetDictSafeIterator(server.db->keys, hashslot);
while ((de = kvstoreDictIteratorNext(kvs_di)) != NULL) {
kvstoreHashtableIterator *kvs_di = NULL;
void *next;
kvs_di = kvstoreGetHashtableSafeIterator(server.db->keys, hashslot);
while (kvstoreHashtableIteratorNext(kvs_di, &next)) {
robj *valkey = next;
enterExecutionUnit(1, 0);
sds sdskey = dictGetKey(de);
sds sdskey = objectGetKey(valkey);
robj *key = createStringObject(sdskey, sdslen(sdskey));
dbDelete(&server.db[0], key);
propagateDeletion(&server.db[0], key, server.lazyfree_lazy_server_del);
Expand All @@ -6143,7 +6144,7 @@ unsigned int delKeysInSlot(unsigned int hashslot) {
j++;
server.dirty++;
}
kvstoreReleaseDictIterator(kvs_di);
kvstoreReleaseHashtableIterator(kvs_di);

server.server_del_keys_in_slot = 0;
serverAssert(server.execution_nesting == 0);
Expand All @@ -6152,7 +6153,7 @@ unsigned int delKeysInSlot(unsigned int hashslot) {

/* Get the count of the channels for a given slot. */
unsigned int countChannelsInSlot(unsigned int hashslot) {
return kvstoreDictSize(server.pubsubshard_channels, hashslot);
return kvstoreHashtableSize(server.pubsubshard_channels, hashslot);
}

clusterNode *getMyClusterNode(void) {
Expand Down
Loading

0 comments on commit 1f6ca93

Please sign in to comment.