Skip to content

Commit

Permalink
Extract the scripting engine code from the functions unit
Browse files Browse the repository at this point in the history
This commit creates a new unit for the scripting engine code by
extracting the existing code from the functions unit.

We're doing this refactor to prepare the code for runnning the `EVAL`
command using different scripting engines.

Signed-off-by: Ricardo Dias <ricardo.dias@percona.com>
  • Loading branch information
rjd15372 committed Nov 27, 2024
1 parent c7708aa commit 2db2efb
Show file tree
Hide file tree
Showing 11 changed files with 512 additions and 275 deletions.
3 changes: 2 additions & 1 deletion cmake/Modules/SourceFiles.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ set(VALKEY_SERVER_SRCS
${CMAKE_SOURCE_DIR}/src/connection.c
${CMAKE_SOURCE_DIR}/src/unix.c
${CMAKE_SOURCE_DIR}/src/server.c
${CMAKE_SOURCE_DIR}/src/logreqres.c)
${CMAKE_SOURCE_DIR}/src/logreqres.c
${CMAKE_SOURCE_DIR}/src/engine.c)

# valkey-cli
set(VALKEY_CLI_SRCS
Expand Down
4 changes: 2 additions & 2 deletions src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ else
endef
endif

# Determine install/uninstall Redis symlinks for compatibility when
# Determine install/uninstall Redis symlinks for compatibility when
# installing/uninstalling Valkey binaries (defaulting to `yes`)
USE_REDIS_SYMLINKS?=yes
ifeq ($(USE_REDIS_SYMLINKS),yes)
Expand Down Expand Up @@ -411,7 +411,7 @@ endif
ENGINE_NAME=valkey
SERVER_NAME=$(ENGINE_NAME)-server$(PROG_SUFFIX)
ENGINE_SENTINEL_NAME=$(ENGINE_NAME)-sentinel$(PROG_SUFFIX)
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o memory_prefetch.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o allocator_defrag.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o memory_prefetch.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o allocator_defrag.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o engine.o
ENGINE_CLI_NAME=$(ENGINE_NAME)-cli$(PROG_SUFFIX)
ENGINE_CLI_OBJ=anet.o adlist.o dict.o valkey-cli.o zmalloc.o release.o ae.o serverassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o
ENGINE_BENCHMARK_NAME=$(ENGINE_NAME)-benchmark$(PROG_SUFFIX)
Expand Down
284 changes: 284 additions & 0 deletions src/engine.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,284 @@
#include "engine.h"
#include "dict.h"
#include "functions.h"
#include "module.h"


typedef struct engineImpl {
/* Engine specific context */
ValkeyModuleScriptingEngineCtx *engine_ctx;

/* Callback functions implemented by the scripting engine module */
ValkeyModuleScriptingEngineCreateFunctionsLibraryFunc create_functions_library;
ValkeyModuleScriptingEngineCallFunctionFunc call_function;
ValkeyModuleScriptingEngineGetUsedMemoryFunc get_used_memory;
ValkeyModuleScriptingEngineGetFunctionMemoryOverheadFunc get_function_memory_overhead;
ValkeyModuleScriptingEngineGetEngineMemoryOverheadFunc get_engine_memory_overhead;
ValkeyModuleScriptingEngineFreeFunctionFunc free_function;
} engineImpl;

typedef struct engine {
sds name; /* Name of the engine */
ValkeyModule *module; /* the module that implements the scripting engine */
engineImpl *engine_impl; /* engine callbacks that allows to interact with the engine */
client *c; /* Client that is used to run commands */
ValkeyModuleCtx *module_ctx; /* Cache of the module context object */
} engine;


typedef struct engineManger {
dict *engines; /* engines dictionary */
size_t engine_cache_memory;
} engineManager;


static engineManager engineMgr = {
.engines = NULL,
.engine_cache_memory = 0,
};

static uint64_t dictStrCaseHash(const void *key) {
return dictGenCaseHashFunction((unsigned char *)key, strlen((char *)key));
}

dictType engineDictType = {
dictStrCaseHash, /* hash function */
NULL, /* key dup */
dictSdsKeyCaseCompare, /* key compare */
NULL, /* key destructor */
NULL, /* val destructor */
NULL /* allow to expand */
};

/* Initializes the scripting engine manager.
* The engine manager is responsible for managing the several scripting engines
* that are loaded in the server and implemented by Valkey Modules.
*
* Returns C_ERR if some error occurs during the initialization.
*/
int engineManagerInit(void) {
engineMgr.engines = dictCreate(&engineDictType);
return C_OK;
}

size_t engineManagerGetCacheMemory(void) {
return engineMgr.engine_cache_memory;
}

size_t engineManagerGetNumEngines(void) {
return dictSize(engineMgr.engines);
}

size_t engineManagerGetMemoryUsage(void) {
return dictMemUsage(engineMgr.engines) + sizeof(engineMgr);
}

/* Registers a new scripting engine in the engine manager.
*
* - `engine_name`: the name of the scripting engine. This name will match
* against the engine name specified in the script header
* using a shebang.
*
* - `engine_ctx`: engine specific context pointer.
*
* - `create_functions_library_func`: the function callback that creates the
* functions library.
*
* - `call_function_func`: the function callback that executes a library
* function.
*
* - `eval_script_func`: the function callback that executes a script.
*
* - `get_used_memory_func`: function callback to get current used memory by the
* engine.
*
* - `get_function_memory_overhead_func`: function callback to return memory
* overhead for a given function.
*
* - `get_engine_memory_overhead_func`: function callback to return memory
* overhead of the engine.
*
* - `free_function_func`: function callback to free the memory of a registered
* engine function.
*
* Returns C_ERR in case of an error during registration.
*/
int engineManagerRegisterEngine(const char *engine_name,
ValkeyModule *engine_module,
ValkeyModuleScriptingEngineCtx *engine_ctx,
ValkeyModuleScriptingEngineCreateFunctionsLibraryFunc create_functions_library_func,
ValkeyModuleScriptingEngineCallFunctionFunc call_function_func,
ValkeyModuleScriptingEngineGetUsedMemoryFunc get_used_memory_func,
ValkeyModuleScriptingEngineGetFunctionMemoryOverheadFunc get_function_memory_overhead_func,
ValkeyModuleScriptingEngineGetEngineMemoryOverheadFunc get_engine_memory_overhead_func,
ValkeyModuleScriptingEngineFreeFunctionFunc free_function_func) {
sds engine_name_sds = sdsnew(engine_name);

if (dictFetchValue(engineMgr.engines, engine_name_sds)) {
serverLog(LL_WARNING, "Same engine was registered twice");
sdsfree(engine_name_sds);
return C_ERR;
}

engineImpl *ei = zmalloc(sizeof(engineImpl));
*ei = (engineImpl){
.engine_ctx = engine_ctx,
.create_functions_library = create_functions_library_func,
.call_function = call_function_func,
.get_used_memory = get_used_memory_func,
.get_function_memory_overhead = get_function_memory_overhead_func,
.get_engine_memory_overhead = get_engine_memory_overhead_func,
.free_function = free_function_func,
};

client *c = createClient(NULL);
c->flag.deny_blocking = 1;
c->flag.script = 1;
c->flag.fake = 1;

engine *e = zmalloc(sizeof(*ei));
*e = (engine){
.name = engine_name_sds,
.module = engine_module,
.engine_impl = ei,
.c = c,
.module_ctx = engine_module ? moduleAllocateContext() : NULL,
};

dictAdd(engineMgr.engines, engine_name_sds, e);

engineMgr.engine_cache_memory += zmalloc_size(e) +
sdsAllocSize(e->name) +
zmalloc_size(ei) +
ei->get_engine_memory_overhead(ei->engine_ctx);

return C_OK;
}

/* Removes a scripting engine from the engine manager.
*
* - `engine_name`: name of the engine to remove
*/
int engineManagerUnregisterEngine(const char *engine_name) {
dictEntry *entry = dictUnlink(engineMgr.engines, engine_name);
if (entry == NULL) {
serverLog(LL_WARNING, "There's no engine registered with name %s", engine_name);
return C_ERR;
}

engine *e = dictGetVal(entry);

functionsRemoveLibFromEngine(e);

zfree(e->engine_impl);
sdsfree(e->name);
freeClient(e->c);
if (e->module_ctx) {
serverAssert(e->module != NULL);
zfree(e->module_ctx);
}
zfree(e);

dictFreeUnlinkedEntry(engineMgr.engines, entry);

return C_OK;
}

/*
* Lookups the engine with `engine_name` in the engine manager and returns it if
* it exists. Otherwise returns `NULL`.
*/
engine *engineManagerFind(sds engine_name) {
dictEntry *entry = dictFind(engineMgr.engines, engine_name);
if (entry) {
return dictGetVal(entry);
}
return NULL;
}

sds engineGetName(engine *engine) {
return engine->name;
}

client *engineGetClient(engine *engine) {
return engine->c;
}

ValkeyModule *engineGetModule(engine *engine) {
return engine->module;
}

/*
* Iterates the list of engines registered in the engine manager and calls the
* callback function with each engine.
*
* The `context` pointer is also passed in each callback call.
*/
void engineManagerForEachEngine(engineIterCallback callback, void *context) {
dictIterator *iter = dictGetIterator(engineMgr.engines);
dictEntry *entry = NULL;
while ((entry = dictNext(iter))) {
engine *e = dictGetVal(entry);
if (!callback(e, context)) {
break;
}
}
dictReleaseIterator(iter);
}

ValkeyModuleScriptingEngineCompiledFunction **engineCallCreateFunctionsLibrary(
engine *engine,
const char *code,
size_t timeout,
size_t *out_num_compiled_functions,
char **err) {
return engine->engine_impl->create_functions_library(
engine->engine_impl->engine_ctx,
code,
timeout,
out_num_compiled_functions,
err);
}

void engineCallFunction(engine *engine,
ValkeyModuleScriptingEngineFunctionCtx *func_ctx,
client *caller,
void *compiled_function,
ValkeyModuleString **keys,
size_t nkeys,
ValkeyModuleString **args,
size_t nargs) {
if (engine->module_ctx) {
moduleScriptingEngineInitContext(engine->module_ctx,
engine->module,
caller);
}

engine->engine_impl->call_function(
engine->module_ctx,
engine->engine_impl->engine_ctx,
func_ctx,
compiled_function,
keys,
nkeys,
args,
nargs);

if (engine->module_ctx) {
moduleFreeContext(engine->module_ctx);
}
}

size_t engineGetUsedMemory(engine *engine) {
return engine->engine_impl->get_used_memory(engine->engine_impl->engine_ctx);
}

size_t engineGetFunctionMemoryOverhead(engine *engine, void *compiled_function) {
return engine->engine_impl->get_function_memory_overhead(compiled_function);
}

void engineFreeFunction(engine *engine,
void *compiled_func) {
engine->engine_impl->free_function(engine->engine_impl->engine_ctx,
compiled_func);
}
70 changes: 70 additions & 0 deletions src/engine.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
#ifndef _ENGINE_H_
#define _ENGINE_H_

#include "server.h"

// Forward declaration of the engine structure.
typedef struct engine engine;

/*
* Callback function used to iterate the list of engines registered in the
* engine manager.
*
* - `engine`: the engine in the current iteration.
*
* - `context`: a generic pointer to a context object.
*
* If the callback function returns 0, then the iteration is stopped
* immediately.
*/
typedef int (*engineIterCallback)(engine *engine, void *context);

/*
* Engine manager API functions.
*/
int engineManagerInit(void);
size_t engineManagerGetCacheMemory(void);
size_t engineManagerGetNumEngines(void);
size_t engineManagerGetMemoryUsage(void);
int engineManagerRegisterEngine(const char *engine_name,
ValkeyModule *engine_module,
ValkeyModuleScriptingEngineCtx *engine_ctx,
ValkeyModuleScriptingEngineCreateFunctionsLibraryFunc create_functions_library_func,
ValkeyModuleScriptingEngineCallFunctionFunc call_function_func,
ValkeyModuleScriptingEngineGetUsedMemoryFunc get_used_memory_func,
ValkeyModuleScriptingEngineGetFunctionMemoryOverheadFunc get_function_memory_overhead_func,
ValkeyModuleScriptingEngineGetEngineMemoryOverheadFunc get_engine_memory_overhead_func,
ValkeyModuleScriptingEngineFreeFunctionFunc free_function_func);
int engineManagerUnregisterEngine(const char *engine_name);
engine *engineManagerFind(sds engine_name);
void engineManagerForEachEngine(engineIterCallback callback, void *context);

/*
* Engine API functions.
*/
sds engineGetName(engine *engine);
client *engineGetClient(engine *engine);
ValkeyModule *engineGetModule(engine *engine);

/*
* API to call engine callback functions.
*/
ValkeyModuleScriptingEngineCompiledFunction **engineCallCreateFunctionsLibrary(
engine *engine,
const char *code,
size_t timeout,
size_t *out_num_compiled_functions,
char **err);
void engineCallFunction(engine *engine,
ValkeyModuleScriptingEngineFunctionCtx *func_ctx,
client *caller,
void *compiled_function,
ValkeyModuleString **keys,
size_t nkeys,
ValkeyModuleString **args,
size_t nargs);
size_t engineGetUsedMemory(engine *engine);
size_t engineGetFunctionMemoryOverhead(engine *engine, void *compiled_function);
void engineFreeFunction(engine *engine, void *compiled_func);

#endif /* _ENGINE_H_ */
Loading

0 comments on commit 2db2efb

Please sign in to comment.