diff --git a/.gitmodules b/.gitmodules index 502d027a..fbd06ed1 100644 --- a/.gitmodules +++ b/.gitmodules @@ -7,7 +7,7 @@ url = https://github.com/aerospike/aerospike-mod-lua.git [submodule "modules/jansson"] path = modules/jansson - url = https://github.com/aerospike/jansson.git + url = https://github.com/akheron/jansson ignore = dirty [submodule "modules/luajit"] path = modules/luajit @@ -22,3 +22,9 @@ path = modules/jemalloc url = https://github.com/aerospike/jemalloc.git ignore = dirty +[submodule "modules/spdk"] + path = modules/spdk + url = https://github.com/spdk/spdk +[submodule "modules/lthread"] + path = modules/lthread + url = https://github.com/mita/lthread diff --git a/README-spdk.md b/README-spdk.md new file mode 100755 index 00000000..3421bafc --- /dev/null +++ b/README-spdk.md @@ -0,0 +1,59 @@ +# How to build Aerospike with SPDK patch + +The build process has been tested on Ubuntu 20.04 + +1. Install dependencies to build SPDK + +``` +git submodule update --init --recursive +sudo ./modules/spdk/scripts/pkgdep.sh +``` + +2. Apply Aerospike SPDK patch + +``` +patch -p1 < aerospike-spdk.patch +``` + +3. Build Aerospike with SPDK + +``` +make USE_SPDK=1 USE_LTHREAD=1 +``` + +# How to setup Aerospike with SPDK + +1. Create a SPDK config file for your NVMe device + +``` +sudo env HUGEMEM=20480 modules/spdk/scripts/setup.sh +sudo mkdir -p /usr/local/etc/spdk/ +modules/spdk/scripts/gen_nvme.sh --json-with-subsystems | sudo tee /usr/local/etc/spdk/aerospike.conf +``` + +2. Update a Aerospike config file + +The developer configuration file, `aerospike_dev.conf`, contains basic settings. +You may want to change some parameters in the file. + +For example, the `service-lcores` parameter specifies a list of CPU which the lightweight threads run on. +The `device` parameter specifies a storage device such as the SPDK block device (bdev). + +# How to start Aerospike with SPDK + +1. setup SPDK + +This script only needs to be run once the system is up. + +``` +sudo env HUGEMEM=20480 modules/spdk/scripts/setup.sh +``` + +2. start Aerospike server + +``` +sudo su +make init +ulimit -n 200000 +make start +``` diff --git a/aerospike-spdk.patch b/aerospike-spdk.patch new file mode 100755 index 00000000..bb511015 --- /dev/null +++ b/aerospike-spdk.patch @@ -0,0 +1,2358 @@ +diff --git a/Makefile b/Makefile +index 1493584..91f3833 100644 +--- a/Makefile ++++ b/Makefile +@@ -49,9 +49,15 @@ lib: aslibs + $(MAKE) -C as $@ STATIC_LIB=1 + + .PHONY: aslibs +-aslibs: targetdirs version $(JANSSON)/Makefile $(JEMALLOC)/Makefile $(LUAJIT)/src/luaconf.h ++aslibs: targetdirs version $(SPDK)/include/spdk/config.h $(JANSSON)/Makefile $(JEMALLOC)/Makefile $(LUAJIT)/src/luaconf.h + ifeq ($(USE_LUAJIT),1) + $(MAKE) -C $(LUAJIT) Q= TARGET_SONAME=libluajit.so CCDEBUG=-g ++endif ++ifeq ($(USE_SPDK),1) ++ $(MAKE) -C $(SPDK) CONFIG_RTE_LIBRTE_TIMER=y ++endif ++ifeq ($(USE_LTHREAD),1) ++ $(MAKE) -C $(LTHREAD) DPDK=$(SPDK)/dpdk/build EXTRA_CFLAGS=-DLTHREAD_MAX_STACK_SIZE=1048576 + endif + $(MAKE) -C $(JEMALLOC) + $(MAKE) -C $(JANSSON) +@@ -96,6 +102,10 @@ cleanbasic: + .PHONY: cleanmodules + cleanmodules: + $(MAKE) -C $(COMMON) clean ++ if [ -e "$(SPDK)/include/spdk/config.h" ]; then \ ++ $(MAKE) -C $(SPDK) clean; \ ++ fi ++ $(MAKE) -C $(LTHREAD) DPDK=$(SPDK)/dpdk/build clean + if [ -e "$(JANSSON)/Makefile" ]; then \ + $(MAKE) -C $(JANSSON) clean; \ + $(MAKE) -C $(JANSSON) distclean; \ +@@ -126,6 +136,8 @@ GIT_CLEAN = git clean -fdx + .PHONY: cleangit + cleangit: + cd $(COMMON); $(GIT_CLEAN) ++ cd $(SPDK); $(GIT_CLEAN) ++ cd $(LTHREAD); $(GIT_CLEAN) + cd $(JANSSON); $(GIT_CLEAN) + cd $(JEMALLOC); $(GIT_CLEAN) + cd $(LUAJIT); $(GIT_CLEAN) +@@ -146,6 +158,9 @@ $(VERSION_OBJ): $(VERSION_SRC) + .PHONY: version + version: $(VERSION_OBJ) + ++$(SPDK)/include/spdk/config.h: ++ cd $(SPDK) && ./configure --without-isal ++ + $(JANSSON)/configure: + cd $(JANSSON) && autoreconf -i + +diff --git a/as/etc/aerospike_dev.conf b/as/etc/aerospike_dev.conf +index a1a6608..b5aacaa 100644 +--- a/as/etc/aerospike_dev.conf ++++ b/as/etc/aerospike_dev.conf +@@ -7,10 +7,19 @@ service { + # The number of concurrent connections to the database is limited by + # proto-fd-max, and by the system's maximum number of open file descriptors. + # See "man limits.conf" for how to set the system's "nofile" limit. +- proto-fd-max 1024 ++ proto-fd-max 200000 + + work-directory run/work + pidfile run/asd.pid ++ ++ spdk-json-conf /usr/local/etc/spdk/aerospike.conf ++ ++ service-threads 360 ++ service-lcores [0-17] ++# service-threads 160 ++# service-lcores 0-7 ++ ++ microsecond-histograms true + } + + mod-lua { +@@ -57,22 +66,44 @@ network { + + namespace test { + replication-factor 2 +- memory-size 4G ++ memory-size 100G + + storage-engine memory + } + + namespace bar { + replication-factor 2 +- memory-size 4G ++ memory-size 100G + +- storage-engine memory ++# storage-engine memory + + # To use file storage backing, comment out the line above and use the + # following lines instead. +-# storage-engine device { ++ storage-engine device { + # file /opt/aerospike/data/bar.dat + # filesize 16G + # data-in-memory true # Store data in memory in addition to file. +-# } ++ cold-start-empty true ++ ++ # RAM disk ++# device /dev/ram0 ++ ++ # NVMe ++# device /dev/nvme0n1 ++ ++ # SPDK bdev ++ device-backend spdk-bdev ++# device Raid0 ++ device Nvme0n1 ++ device Nvme1n1 ++ filesize 447G ++ enable-benchmarks-storage true ++ write-block-size 1M ++# post-write-queue 8192 ++ allow-batch-inline true ++ max-write-cache 4G ++ flush-max-ms 0 ++ defrag-lwm-pct 50 ++ defrag-sleep 0 ++ } + } +diff --git a/as/include/base/cfg.h b/as/include/base/cfg.h +index c11190f..476d89a 100644 +--- a/as/include/base/cfg.h ++++ b/as/include/base/cfg.h +@@ -60,7 +60,7 @@ struct as_namespace_s; + // + + #ifndef AS_NAMESPACE_SZ +-#define AS_NAMESPACE_SZ 2 ++#define AS_NAMESPACE_SZ 32 + #endif + + #define NO_NS_IX AS_NAMESPACE_SZ +@@ -138,6 +138,7 @@ typedef struct as_config_s { + uint32_t scan_max_done; // maximum number of finished scans kept for monitoring + uint32_t n_scan_threads_limit; + uint32_t n_service_threads; ++ char* service_lcores; + uint32_t sindex_builder_threads; // secondary index builder thread pool size + uint32_t sindex_gc_max_rate; // Max sindex entries processed per second for gc + uint32_t sindex_gc_period; // same as nsup_period for sindex gc +@@ -212,6 +213,8 @@ typedef struct as_config_s { + as_sec_config sec_cfg; + as_xdr_config xdr_cfg; // TODO - Forcing cfg.h to include xdr.h. Consider *. + ++ char* spdk_json_conf; ++ + uint32_t n_tls_specs; + cf_tls_spec tls_specs[MAX_TLS_SPECS]; + +diff --git a/as/include/base/datamodel.h b/as/include/base/datamodel.h +index 305568a..69811dc 100644 +--- a/as/include/base/datamodel.h ++++ b/as/include/base/datamodel.h +@@ -796,6 +796,9 @@ struct as_namespace_s { + uint32_t storage_write_block_size; + bool storage_data_in_memory; + ++ char* storage_device_backend; ++ bool storage_recycle_fds; ++ bool storage_allow_batch_inline; + bool storage_cache_replica_writes; + bool storage_cold_start_empty; + bool storage_commit_to_device; // relevant only for enterprise edition +diff --git a/as/include/base/service.h b/as/include/base/service.h +index 3bc086c..08caa9c 100644 +--- a/as/include/base/service.h ++++ b/as/include/base/service.h +@@ -29,6 +29,7 @@ + #include + #include + ++#include "aerospike/as_monitor.h" + #include "citrusleaf/cf_digest.h" + + #include "socket.h" +@@ -89,3 +90,8 @@ as_service_enqueue_internal(struct as_transaction_s* tr) + { + as_service_enqueue_internal_raw(tr, NULL, 0, false); + } ++ ++#ifdef USE_LTHREAD ++void as_service_run_threads(void); ++extern as_monitor as_service_run_monitor; ++#endif +diff --git a/as/include/storage/drv_ssd.h b/as/include/storage/drv_ssd.h +index fa66978..09e32c7 100644 +--- a/as/include/storage/drv_ssd.h ++++ b/as/include/storage/drv_ssd.h +@@ -56,6 +56,7 @@ struct as_index_s; + struct as_namespace_s; + struct as_storage_rd_s; + struct drv_ssd_s; ++struct ssd_ops; + + + //========================================================== +@@ -190,8 +191,30 @@ typedef struct drv_ssd_s { + histogram *hist_large_block_read; + histogram *hist_write; + histogram *hist_shadow_write; ++ ++ const struct ssd_ops *ops; ++ void *priv; + } drv_ssd; + ++typedef union { ++ int fd; ++ void *handle; ++} ssd_fd_t; ++ ++struct ssd_ops { ++ const char *name; ++ void (*init)(void); ++ void (*shutdown)(void); ++ bool (*init_device)(as_namespace *ns, drv_ssd *ssd, bool is_shadow); ++ void (*finish_device)(drv_ssd *ssd); ++ void *(*dma_alloc)(size_t sz); ++ void (*dma_free)(void *ptr); ++ ssd_fd_t (*open)(drv_ssd *ssd, bool is_shadow, int flags); ++ void (*close)(ssd_fd_t ssd_fd); ++ bool (*pread)(ssd_fd_t ssd_fd, void *buf, size_t size, off_t offset, bool bounce); ++ bool (*pwrite)(ssd_fd_t ssd_fd, void *buf, size_t size, off_t offset, bool bounce); ++}; ++ + + //------------------------------------------------ + // Per-namespace storage information. +@@ -262,9 +285,6 @@ void ssd_cold_start_init_repl_state(struct as_namespace_s *ns, struct as_index_s + void ssd_cold_start_init_xdr_state(const struct as_flat_record_s* flat, struct as_index_s* r); + + // Miscellaneous. +-int ssd_fd_get(drv_ssd *ssd); +-int ssd_shadow_fd_get(drv_ssd *ssd); +-void ssd_fd_put(drv_ssd *ssd, int fd); + void ssd_header_init_cfg(const struct as_namespace_s *ns, drv_ssd* ssd, drv_header *header); + void ssd_header_validate_cfg(const struct as_namespace_s *ns, drv_ssd* ssd, const drv_header *header); + void ssd_flush_final_cfg(struct as_namespace_s *ns); +diff --git a/as/include/storage/storage.h b/as/include/storage/storage.h +index af1e0c7..51e3f66 100644 +--- a/as/include/storage/storage.h ++++ b/as/include/storage/storage.h +@@ -68,8 +68,8 @@ typedef enum { + + // Artificial limit on write-block-size, in case we ever move to an + // SSD_HEADER_SIZE that's too big to be a write-block size limit. +-// MAX_WRITE_BLOCK_SIZE must be power of 2 and <= SSD_HEADER_SIZE. +-#define MAX_WRITE_BLOCK_SIZE (8 * 1024 * 1024) ++// MAX_WRITE_BLOCK_SIZE must be power of 2 ++#define MAX_WRITE_BLOCK_SIZE (128 * 1024 * 1024) + + // Artificial limit on write-block-size, must be power of 2 and >= RBLOCK_SIZE. + #define MIN_WRITE_BLOCK_SIZE (1024 * 1) +diff --git a/as/src/Makefile b/as/src/Makefile +index 751c622..cdbf688 100644 +--- a/as/src/Makefile ++++ b/as/src/Makefile +@@ -17,6 +17,18 @@ SYSTEMTAP_PROBES_H = $(GEN_DIR)/probes.h + SYSTEMTAP_PROBES_O = $(OBJECT_DIR)/probes.o + endif + ++USE_SPDK = 0 ++ ++ifeq ($(USE_SPDK),1) ++CFLAGS += -DUSE_SPDK ++endif ++ ++USE_LTHREAD = 0 ++ ++ifeq ($(USE_LTHREAD),1) ++CFLAGS += -DUSE_LTHREAD ++endif ++ + ifeq ($(USE_EE),1) + include $(EEREPO)/as/make_in/Makefile.vars + endif +@@ -133,6 +145,16 @@ else + endif + endif + ++ifeq ($(USE_SPDK),1) ++ INCLUDES += -I$(SPDK)/include ++endif ++ ++ifeq ($(USE_LTHREAD),1) ++ INCLUDES += -I$(SPDK)/dpdk/build/include ++ INCLUDES += -I$(LTHREAD)/ ++ INCLUDES += -I$(LTHREAD)/arch/x86 ++endif ++ + AS_LIBRARIES += $(LIBRARY_DIR)/libcf.a + AS_LIBRARIES += $(LIBRARY_DIR)/libai.a + AS_LIBRARIES += $(COMMON)/target/$(PLATFORM)/lib/libaerospike-common.a +@@ -171,6 +193,28 @@ else + LIBRARIES += -L$(JANSSON)/src/.libs -ljansson + endif + ++ifeq ($(USE_SPDK),1) ++ SPDK_ROOT_DIR := $(SPDK) ++ OS = Linux ++ include $(SPDK_ROOT_DIR)/mk/config.mk ++ include $(CONFIG_ENV)/env.mk ++ include $(SPDK_ROOT_DIR)/mk/spdk.modules.mk ++ SPDK_LIB_LIST = $(filter-out sock_vpp,$(ALL_MODULES_LIST)) ++ SPDK_LIB_LIST += bdev_ftl ftl bdev_aio bdev_virtio virtio ++ SPDK_LIB_LIST += thread util bdev conf accel rpc jsonrpc json log sock trace notify ++ SPDK_LIB_LIST += event event_bdev event_accel event_vmd ++ define spdk_lib_list_to_static_libs ++ $(1:%=$(SPDK_ROOT_DIR)/build/lib/libspdk_%.a) ++ endef ++ include $(SPDK_ROOT_DIR)/mk/spdk.app_vars.mk ++ LIBRARIES += -Wl,--whole-archive $(ENV_LIBS) $(SPDK)/dpdk/build/lib/librte_timer.a -Wl,--no-whole-archive ++ LIBRARIES += $(SPDK_LIB_LINKER_ARGS) $(ENV_LINKER_ARGS) -ldl -lrt -luuid -lcrypto -laio ++endif ++ ++ifeq ($(USE_LTHREAD),1) ++ LIBRARIES += -L$(LTHREAD) -llthread ++endif ++ + LIBRARIES += -L$(S2) -ls2 -ls2cellid -lgoogle-strings -lgoogle-base \ + -lgoogle-util-coding -lgoogle-util-math \ + $(shell curl-config --libs) -lstdc++ +diff --git a/as/src/base/batch.c b/as/src/base/batch.c +index ff749fc..f6b0113 100644 +--- a/as/src/base/batch.c ++++ b/as/src/base/batch.c +@@ -1059,7 +1059,7 @@ as_batch_queue_task(as_transaction* btr) + } + + // Submit transaction. +- if (info != 0 && ns->storage_data_in_memory) { ++ if (info != 0 && (ns->storage_data_in_memory || ns->storage_allow_batch_inline)) { + as_tsvc_process_transaction(&tr); + } + else { +diff --git a/as/src/base/cfg.c b/as/src/base/cfg.c +index d4e579f..3cda0a5 100644 +--- a/as/src/base/cfg.c ++++ b/as/src/base/cfg.c +@@ -293,9 +293,11 @@ typedef enum { + CASE_SERVICE_SCAN_MAX_DONE, + CASE_SERVICE_SCAN_THREADS_LIMIT, + CASE_SERVICE_SERVICE_THREADS, ++ CASE_SERVICE_SERVICE_LCORES, + CASE_SERVICE_SINDEX_BUILDER_THREADS, + CASE_SERVICE_SINDEX_GC_MAX_RATE, + CASE_SERVICE_SINDEX_GC_PERIOD, ++ CASE_SERVICE_SPDK_JSON_CONF, + CASE_SERVICE_STAY_QUIESCED, + CASE_SERVICE_TICKER_INTERVAL, + CASE_SERVICE_TRANSACTION_MAX_MS, +@@ -569,6 +571,9 @@ typedef enum { + CASE_NAMESPACE_STORAGE_DEVICE_WRITE_BLOCK_SIZE, + CASE_NAMESPACE_STORAGE_DEVICE_DATA_IN_MEMORY, + // Normally hidden: ++ CASE_NAMESPACE_STORAGE_DEVICE_BACKEND, ++ CASE_NAMESPACE_STORAGE_DEVICE_RECYCLE_FDS, ++ CASE_NAMESPACE_STORAGE_DEVICE_ALLOW_BATCH_INLINE, + CASE_NAMESPACE_STORAGE_DEVICE_CACHE_REPLICA_WRITES, + CASE_NAMESPACE_STORAGE_DEVICE_COLD_START_EMPTY, + CASE_NAMESPACE_STORAGE_DEVICE_COMMIT_TO_DEVICE, +@@ -801,9 +806,11 @@ const cfg_opt SERVICE_OPTS[] = { + { "scan-max-done", CASE_SERVICE_SCAN_MAX_DONE }, + { "scan-threads-limit", CASE_SERVICE_SCAN_THREADS_LIMIT }, + { "service-threads", CASE_SERVICE_SERVICE_THREADS }, ++ { "service-lcores", CASE_SERVICE_SERVICE_LCORES }, + { "sindex-builder-threads", CASE_SERVICE_SINDEX_BUILDER_THREADS }, + { "sindex-gc-max-rate", CASE_SERVICE_SINDEX_GC_MAX_RATE }, + { "sindex-gc-period", CASE_SERVICE_SINDEX_GC_PERIOD }, ++ { "spdk-json-conf", CASE_SERVICE_SPDK_JSON_CONF }, + { "stay-quiesced", CASE_SERVICE_STAY_QUIESCED }, + { "ticker-interval", CASE_SERVICE_TICKER_INTERVAL }, + { "transaction-max-ms", CASE_SERVICE_TRANSACTION_MAX_MS }, +@@ -1089,6 +1096,9 @@ const cfg_opt NAMESPACE_STORAGE_DEVICE_OPTS[] = { + { "scheduler-mode", CASE_NAMESPACE_STORAGE_DEVICE_SCHEDULER_MODE }, + { "write-block-size", CASE_NAMESPACE_STORAGE_DEVICE_WRITE_BLOCK_SIZE }, + { "data-in-memory", CASE_NAMESPACE_STORAGE_DEVICE_DATA_IN_MEMORY }, ++ { "device-backend", CASE_NAMESPACE_STORAGE_DEVICE_BACKEND }, ++ { "recycle-fds", CASE_NAMESPACE_STORAGE_DEVICE_RECYCLE_FDS }, ++ { "allow-batch-inline", CASE_NAMESPACE_STORAGE_DEVICE_ALLOW_BATCH_INLINE }, + { "cache-replica-writes", CASE_NAMESPACE_STORAGE_DEVICE_CACHE_REPLICA_WRITES }, + { "cold-start-empty", CASE_NAMESPACE_STORAGE_DEVICE_COLD_START_EMPTY }, + { "commit-to-device", CASE_NAMESPACE_STORAGE_DEVICE_COMMIT_TO_DEVICE }, +@@ -2322,6 +2332,9 @@ as_config_init(const char* config_file) + case CASE_SERVICE_SERVICE_THREADS: + c->n_service_threads = cfg_u32(&line, 1, MAX_SERVICE_THREADS); + break; ++ case CASE_SERVICE_SERVICE_LCORES: ++ c->service_lcores = cfg_strdup_no_checks(&line); ++ break; + case CASE_SERVICE_SINDEX_BUILDER_THREADS: + c->sindex_builder_threads = cfg_u32(&line, 1, MAX_SINDEX_BUILDER_THREADS); + break; +@@ -2335,6 +2348,9 @@ as_config_init(const char* config_file) + cfg_enterprise_only(&line); + c->stay_quiesced = cfg_bool(&line); + break; ++ case CASE_SERVICE_SPDK_JSON_CONF: ++ c->spdk_json_conf = cfg_strdup_no_checks(&line); ++ break; + case CASE_SERVICE_TICKER_INTERVAL: + c->ticker_interval = cfg_u32_no_checks(&line); + break; +@@ -3301,6 +3317,15 @@ as_config_init(const char* config_file) + case CASE_NAMESPACE_STORAGE_DEVICE_DATA_IN_MEMORY: + ns->storage_data_in_memory = cfg_bool(&line); + break; ++ case CASE_NAMESPACE_STORAGE_DEVICE_BACKEND: ++ ns->storage_device_backend = cfg_strdup_no_checks(&line); ++ break; ++ case CASE_NAMESPACE_STORAGE_DEVICE_RECYCLE_FDS: ++ ns->storage_recycle_fds = cfg_bool(&line); ++ break; ++ case CASE_NAMESPACE_STORAGE_DEVICE_ALLOW_BATCH_INLINE: ++ ns->storage_allow_batch_inline = cfg_bool(&line); ++ break; + case CASE_NAMESPACE_STORAGE_DEVICE_CACHE_REPLICA_WRITES: + ns->storage_cache_replica_writes = cfg_bool(&line); + break; +diff --git a/as/src/base/namespace.c b/as/src/base/namespace.c +index 804242f..e663dda 100644 +--- a/as/src/base/namespace.c ++++ b/as/src/base/namespace.c +@@ -142,6 +142,8 @@ as_namespace_create(char *name) + // Note - default true is consistent with AS_STORAGE_ENGINE_MEMORY, but + // cfg.c will set default false for AS_STORAGE_ENGINE_SSD. + ++ ns->storage_device_backend = "posix"; ++ ns->storage_recycle_fds = true; + ns->storage_scheduler_mode = NULL; // null indicates default is to not change scheduler mode + ns->storage_write_block_size = 1024 * 1024; + ns->storage_defrag_lwm_pct = 50; // defrag if occupancy of block is < 50% +diff --git a/as/src/base/service.c b/as/src/base/service.c +index e195058..0769db6 100644 +--- a/as/src/base/service.c ++++ b/as/src/base/service.c +@@ -36,6 +36,16 @@ + #include + #include + #include ++#ifdef USE_LTHREAD ++#include ++#include ++#include ++#include ++#include ++ ++#include "lthread_api.h" ++#include "lthread_diag_api.h" ++#endif + + #include "aerospike/as_atomic.h" + #include "citrusleaf/alloc.h" +@@ -154,6 +164,162 @@ rearm(as_file_handle* fd_h, uint32_t events) + events | EPOLLONESHOT | EPOLLRDHUP, fd_h); + } + ++#ifdef USE_LTHREAD ++ ++static void ++create_service_lthread(struct lthread **lt, uint32_t sid) ++{ ++ thread_ctx* ctx = cf_malloc(sizeof(thread_ctx)); ++ ++ cf_detail(AS_SERVICE, "starting sid %u ctx %p", sid, ctx); ++ ++ if (as_config_is_cpu_pinned()) { ++ ctx->i_cpu = (cf_topo_cpu_index)(sid % cf_topo_count_cpus()); ++ } ++ ++ ctx->lock = &g_thread_locks[sid]; ++ cf_poll_create(&ctx->poll); ++ cf_epoll_queue_init(&ctx->trans_q, AS_TRANSACTION_HEAD_SIZE, 64); ++ ++ lthread_create(lt, -1, run_service, ctx); ++ ++ cf_mutex_lock(&g_thread_locks[sid]); ++ ++ g_thread_ctxs[sid] = ctx; ++ ++ cf_mutex_unlock(&g_thread_locks[sid]); ++} ++ ++struct init_data { ++ uint32_t sid; ++ uint32_t lcores; ++}; ++ ++static void *initial_lthread(void *args) ++{ ++ struct init_data *data = args; ++ uint32_t n_threads = g_config.n_service_threads / data->lcores; ++ struct lthread *lt[n_threads]; ++ ++ for (uint32_t i = 0; i < n_threads; i++) { ++ create_service_lthread(<[i], data->sid + data->lcores * i); ++ } ++ ++ for (uint32_t i = 0; i < n_threads; i++) { ++ lthread_join(lt[i], NULL); ++ } ++ ++ lthread_scheduler_shutdown(rte_lcore_id()); ++ lthread_detach(); ++ ++ return NULL; ++} ++ ++struct sched_data { ++ cf_atomic32 id; ++ uint32_t lcores; ++}; ++ ++static int ++lthread_scheduler(void *args) ++{ ++ struct sched_data *sched_data = args; ++ struct init_data init_data; ++ struct lthread *lt; ++ ++ init_data.lcores = sched_data->lcores; ++ init_data.sid = (uint32_t)cf_atomic32_incr(&sched_data->id); ++ ++ lthread_create(<, -1, initial_lthread, &init_data); ++ lthread_run(); ++ ++ return 0; ++} ++ ++static void *run_lthreads(void *arg __attribute__((unused))) ++{ ++ char *args[3]; ++ int argc = 0; ++ ++ args[argc++] = "asd_lthread"; ++ if (g_config.service_lcores) { ++ args[argc++] = "--lcores"; ++ args[argc++] = g_config.service_lcores; ++ } ++ ++ int ret = rte_eal_init(argc, (char **)&args); ++ ++ if (ret < 0) { ++ if (rte_errno == EALREADY) { ++ as_monitor_notify(&as_service_run_monitor); ++ } else { ++ cf_crash(AS_SERVICE, "Invalid EAL parameters"); ++ } ++ } else { ++ as_service_run_threads(); ++ } ++ ++ return NULL; ++} ++ ++as_monitor as_service_run_monitor; ++ ++void ++as_service_run_threads(void) ++{ ++ struct sched_data sched_data; ++ ++ int ret = rte_timer_subsystem_init(); ++ if (ret < 0) ++ cf_crash(AS_SERVICE, "Failed to initialize timer subsystem"); ++ ++ unsigned lcore_id; ++ ++ cf_atomic32_set(&sched_data.id, UINT32_MAX); ++ sched_data.lcores = 0; ++ ++ for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) { ++ if (rte_lcore_is_enabled(lcore_id)) ++ sched_data.lcores++; ++ } ++ ++ if (g_config.n_service_threads % sched_data.lcores) { ++ cf_crash_nostack(AS_SERVICE, "'service-threads' must be a multiple of the number of lcores (%u)", ++ sched_data.lcores); ++ } ++ ++ lthread_num_schedulers_set((int)sched_data.lcores); ++ rte_eal_mp_remote_launch(lthread_scheduler, &sched_data, CALL_MAIN); ++ ++ RTE_LCORE_FOREACH_WORKER(lcore_id) { ++ rte_eal_wait_lcore(lcore_id); ++ } ++} ++ ++static void create_service_threads(void) ++{ ++ cf_thread_create_detached(run_lthreads, NULL); ++} ++ ++static void service_yield(void) ++{ ++ lthread_yield(); ++} ++ ++#else ++ ++static void create_service_threads(void) ++{ ++ for (uint32_t i = 0; i < g_config.n_service_threads; i++) { ++ create_service_thread(i); ++ } ++} ++ ++static void service_yield(void) ++{ ++} ++ ++#endif + + //========================================================== + // Public API. +@@ -171,9 +337,7 @@ as_service_init(void) + cf_mutex_init(&g_thread_locks[i]); + } + +- for (uint32_t i = 0; i < g_config.n_service_threads; i++) { +- create_service_thread(i); +- } ++ create_service_threads(); + } + + void +@@ -613,11 +777,21 @@ run_service(void* udata) + as_xdr_init_poll(poll); + + while (true) { ++#ifdef USE_LTHREAD ++ int timeout = 0; ++#else ++ int timeout = -1; ++#endif + cf_poll_event events[N_EVENTS]; +- int32_t n_events = cf_poll_wait(poll, events, N_EVENTS, -1); ++ int32_t n_events = cf_poll_wait(poll, events, N_EVENTS, timeout); + + cf_assert(n_events >= 0, AS_SERVICE, "unexpected EINTR"); + ++ if (n_events == 0) { ++ service_yield(); ++ continue; ++ } ++ + for (uint32_t i = 0; i < (uint32_t)n_events; i++) { + uint32_t mask = events[i].events; + void* data = events[i].data; +@@ -693,6 +867,8 @@ run_service(void* udata) + // the transaction. We'll rearm at the end of the transaction. + start_transaction(fd_h); + } ++ ++ service_yield(); + } + + return NULL; +diff --git a/as/src/base/thr_info.c b/as/src/base/thr_info.c +index 5b03580..053bd27 100644 +--- a/as/src/base/thr_info.c ++++ b/as/src/base/thr_info.c +@@ -5360,8 +5360,13 @@ info_get_sindexes(char *name, cf_dyn_buf *db) + } + + static int32_t +-oldest_nvme_age(const char *path) ++oldest_nvme_age(as_namespace *ns, const char *path) + { ++ if (strcmp(ns->storage_device_backend, "posix")) { ++ cf_detail(AS_INFO, "device info is not supported by %s: %s", ns->storage_device_backend, path); ++ return -1; ++ } ++ + cf_storage_device_info *info = cf_storage_get_device_info(path); + + if (info == NULL) { +@@ -5384,7 +5389,7 @@ add_index_device_stats(as_namespace *ns, cf_dyn_buf *db) + { + for (uint32_t i = 0; i < ns->n_xmem_mounts; i++) { + info_append_indexed_int(db, "index-type.mount", i, "age", +- oldest_nvme_age(ns->xmem_mounts[i])); ++ oldest_nvme_age(ns, ns->xmem_mounts[i])); + } + } + +@@ -5412,7 +5417,7 @@ add_data_device_stats(as_namespace *ns, cf_dyn_buf *db) + info_append_indexed_uint32(db, tag, i, "shadow_write_q", stats.shadow_write_q_sz); + + info_append_indexed_int(db, tag, i, "age", +- oldest_nvme_age(ns->storage_devices[i])); ++ oldest_nvme_age(ns, ns->storage_devices[i])); + } + } + +diff --git a/as/src/storage/drv_ssd.c b/as/src/storage/drv_ssd.c +index 6da5830..e03f835 100644 +--- a/as/src/storage/drv_ssd.c ++++ b/as/src/storage/drv_ssd.c +@@ -29,6 +29,7 @@ + + #include + #include ++#include + #include + #include + #include +@@ -69,6 +70,915 @@ + #include "storage/storage.h" + #include "transaction/rw_utils.h" + ++#ifdef USE_SPDK ++ ++#include "spdk/bdev.h" ++#include "spdk/conf.h" ++#include "spdk/env.h" ++#include "spdk/thread.h" ++ ++#ifdef USE_LTHREAD ++ ++#include "spdk_internal/event.h" ++#include "base/service.h" ++#include "lthread_api.h" ++ ++#define EXTRA_SPDK_THREADS 4 /* for write and defrag threads */ ++#else ++ ++#include "spdk/event.h" ++ ++#endif ++ ++struct ssd_spdk_thread { ++ struct spdk_thread *thread; ++#ifdef USE_LTHREAD ++ struct spdk_io_channel *ch; ++ struct spdk_io_channel *shadow_ch; ++ struct ssd_spdk_priv *priv; ++ bool used; ++#else ++ pthread_mutex_t mutex; ++#endif ++}; ++ ++struct ssd_spdk_priv { ++ struct spdk_bdev_desc *desc; ++ struct spdk_bdev_desc *shadow_desc; ++#ifdef USE_LTHREAD ++ bool done; ++#else ++ int rr; ++#endif ++ pthread_cond_t cond; ++ pthread_mutex_t mutex; ++ TAILQ_ENTRY(ssd_spdk_priv) link; ++ struct ssd_spdk_thread *threads; ++}; ++ ++struct ssd_spdk_data { ++ cf_tid tid; ++ TAILQ_HEAD(, ssd_spdk_priv) devices; ++ pthread_cond_t cond; ++ pthread_mutex_t mutex; ++#ifdef USE_LTHREAD ++ bool done; ++#endif ++}; ++ ++static struct ssd_spdk_data *ssd_spdk_data; ++ ++static void * ++ssd_spdk_dma_alloc(size_t sz) ++{ ++ void *ptr = spdk_dma_malloc(sz, HI_IO_MIN_SIZE, NULL); ++ ++ if (!ptr) ++ cf_crash(AS_DRV_SSD, "failed to allocate dma buffer"); ++ ++ return ptr; ++} ++ ++static void ++ssd_spdk_dma_free(void *ptr) ++{ ++ spdk_dma_free(ptr); ++} ++ ++extern uint64_t ++check_file_size(as_namespace *ns, uint64_t file_size, const char *tag); ++ ++static void ++ssd_spdk_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, ++ void *event_ctx) ++{ ++ cf_warning(AS_DRV_SSD, "Unsupported bdev event: type %d", type); ++} ++ ++static void ++ssd_spdk_init_device_start(void *ctx) ++{ ++ drv_ssd *ssd = ctx; ++ struct ssd_spdk_priv *priv = ssd->priv; ++ int rc; ++ ++ if (ssd->name && !priv->desc) { ++ const char *name = ssd->name; ++ ++ rc = spdk_bdev_open_ext(name, true, ssd_spdk_bdev_event_cb, NULL, &priv->desc); ++ if (rc) ++ cf_crash(AS_DRV_SSD, "unable to open bdev %s", name); ++ } ++ ++ if (ssd->shadow_name && !priv->shadow_desc) { ++ const char *name = ssd->shadow_name; ++ ++ rc = spdk_bdev_open_ext(name, true, ssd_spdk_bdev_event_cb, NULL, &priv->shadow_desc); ++ if (rc) ++ cf_crash(AS_DRV_SSD, "unable to open bdev %s", name); ++ } ++ ++#ifdef USE_LTHREAD ++ priv->done = true; ++#else ++ pthread_mutex_lock(&priv->mutex); ++ pthread_cond_signal(&priv->cond); ++ pthread_mutex_unlock(&priv->mutex); ++#endif ++} ++ ++static void ++ssd_empty_header(drv_ssd *ssd, bool is_shadow); ++ ++static bool ++ssd_spdk_init_device(as_namespace *ns, drv_ssd *ssd, bool is_shadow) ++{ ++ const char *name = is_shadow ? ssd->shadow_name : ssd->name; ++ struct spdk_bdev *bdev; ++ uint64_t io_min_size; ++ uint64_t size; ++ struct ssd_spdk_priv *priv; ++ struct ssd_spdk_thread *thread; ++ ++ bdev = spdk_bdev_get_by_name(name); ++ if (!bdev) ++ cf_crash(AS_DRV_SSD, "unable to find bdev with name %s", name); ++ ++ priv = ssd->priv; ++ if (!priv) { ++ int n; ++ ++ priv = cf_malloc(sizeof(*priv)); ++ ++#ifdef USE_LTHREAD ++ n = RTE_MAX_LCORE + EXTRA_SPDK_THREADS; ++ priv->done = false; ++#else ++ n = spdk_env_get_core_count(); ++#endif ++ pthread_mutex_init(&priv->mutex, NULL); ++ pthread_cond_init(&priv->cond, NULL); ++ priv->desc = NULL; ++ priv->shadow_desc = NULL; ++ ++ TAILQ_INSERT_TAIL(&ssd_spdk_data->devices, priv, link); ++ ++ priv->threads = cf_calloc(n, sizeof(priv->threads[0])); ++ for (int i = 0; i < n; i++) { ++ thread = &priv->threads[i]; ++ ++ if ((i >= RTE_MAX_LCORE) || (i < spdk_env_get_core_count())) { ++ thread->thread = spdk_thread_create("aerospike_spdk_rw", NULL); ++ if (!thread->thread) ++ cf_crash(AS_DRV_SSD, "failed to allocate spdk thread"); ++ } ++#ifdef USE_LTHREAD ++ thread->priv = priv; ++#else ++ pthread_mutex_init(&thread->mutex, NULL); ++#endif ++ } ++ ++ ssd->priv = priv; ++ } ++ ++#ifdef USE_LTHREAD ++ thread = &priv->threads[RTE_MAX_LCORE]; ++ ++ spdk_set_thread(thread->thread); ++ spdk_thread_send_msg(thread->thread, ssd_spdk_init_device_start, ssd); ++ do { ++ spdk_thread_poll(thread->thread, 0, 0); ++ } while (!priv->done); ++#else ++ thread = &priv->threads[0]; ++ ++ pthread_mutex_lock(&priv->mutex); ++ spdk_thread_send_msg(thread->thread, ssd_spdk_init_device_start, ssd); ++ pthread_cond_wait(&priv->cond, &priv->mutex); ++ pthread_mutex_unlock(&priv->mutex); ++#endif ++ ++ io_min_size = spdk_bdev_get_block_size(bdev); ++ size = spdk_bdev_get_num_blocks(bdev) * io_min_size; ++ ++ if (ns->storage_filesize) ++ size = MIN(ns->storage_filesize, size); ++ size = check_file_size(ns, size, "usable SPDK device"); ++ ++ if (!is_shadow) { ++ ssd->file_size = size; ++ ssd->io_min_size = io_min_size; ++ if (ns->cold_start && ns->storage_cold_start_empty) { ++ ssd_empty_header(ssd, false); ++ ++ cf_info(AS_DRV_SSD, "cold-start-empty - erased header of %s", name); ++ } ++ ++ ns->drive_size += ssd->file_size; // increment total storage size ++ ++ cf_info(AS_DRV_SSD, "opened device %s: usable size %lu, io-min-size %lu", ++ name, ssd->file_size, ssd->io_min_size); ++ ++ } else { ++ if (size < ssd->file_size) { ++ cf_crash(AS_DRV_SSD, "shadow device %s is smaller than main device - %lu < %lu", ++ ssd->shadow_name, size, ssd->file_size); ++ } ++ ++ ssd->shadow_io_min_size = io_min_size; ++ ++ if (ns->cold_start && ns->storage_cold_start_empty) { ++ ssd_empty_header(ssd, true); ++ ++ cf_info(AS_DRV_SSD, "cold-start-empty - erased header of %s", name); ++ } ++ ++ cf_info(AS_DRV_SSD, "shadow device %s is compatible with main device, shadow-io-min-size %lu", ++ name, ssd->shadow_io_min_size); ++ } ++ ++ return true; ++} ++ ++static void ssd_spdk_close(ssd_fd_t ssd_fd); ++ ++static void ++ssd_spdk_finish_device(drv_ssd *ssd) ++{ ++ struct ssd_spdk_priv *priv = ssd->priv; ++ ++ if (ssd->priv) { ++ ssd_fd_t fd; ++ ++ TAILQ_REMOVE(&ssd_spdk_data->devices, priv, link); ++ ++ if (priv->desc) { ++ while (cf_queue_pop(ssd->fd_q, &fd, CF_QUEUE_NOWAIT) == CF_QUEUE_OK) { ++ ssd_spdk_close(fd); ++ } ++ spdk_bdev_close(priv->desc); ++ } ++ ++ if (priv->shadow_desc) { ++ while (cf_queue_pop(ssd->shadow_fd_q, &fd, CF_QUEUE_NOWAIT) == CF_QUEUE_OK) { ++ ssd_spdk_close(fd); ++ } ++ spdk_bdev_close(priv->shadow_desc); ++ } ++ ++#ifdef USE_LTHREAD ++ for (int i = 0; i < RTE_MAX_LCORE + EXTRA_SPDK_THREADS; i++) { ++#else ++ for (int i = 0; i < spdk_env_get_core_count(); i++) { ++#endif ++ struct ssd_spdk_thread *thread = &priv->threads[i]; ++ ++ if (thread->thread) ++ spdk_thread_exit(thread->thread); ++ } ++ cf_free(priv->threads); ++ ++ cf_free(priv); ++ ssd->priv = NULL; ++ } ++} ++ ++enum ssd_spdk_rw { ++ SSD_SPDK_READ, ++ SSD_SPDK_READ_BOUNCE, ++ SSD_SPDK_WRITE, ++ SSD_SPDK_WRITE_BOUNCE, ++}; ++ ++struct ssd_spdk_fd { ++ struct spdk_bdev_desc *desc; ++ struct ssd_spdk_thread *thread; ++#ifdef USE_LTHREAD ++#else ++ struct spdk_io_channel *ch; ++#endif ++ ++ void *bounce; ++ size_t bounce_size; ++ ++ enum ssd_spdk_rw rw; ++ void *buf; ++ size_t size; ++ off_t offset; ++ ++ bool success; ++#ifdef USE_LTHREAD ++ bool done; ++ struct ssd_spdk_priv *priv; ++#else ++ pthread_cond_t cond; ++ pthread_mutex_t mutex; ++#endif ++}; ++ ++#ifdef USE_LTHREAD ++ ++static struct ssd_spdk_thread * ++ssd_spdk_thread_get(struct ssd_spdk_priv *priv) ++{ ++ struct ssd_spdk_thread *thread; ++ ++ if (lthread_current()) { ++ thread = &priv->threads[spdk_env_get_current_core()]; ++ } else { ++ pthread_mutex_lock(&priv->mutex); ++ ++ thread = NULL; ++ do { ++ for (int i = RTE_MAX_LCORE; i < RTE_MAX_LCORE + EXTRA_SPDK_THREADS; i++) { ++ if (!priv->threads[i].used) { ++ thread = &priv->threads[i]; ++ break; ++ } ++ } ++ if (!thread) ++ pthread_cond_wait(&priv->cond, &priv->mutex); ++ } while (!thread); ++ ++ thread->used = true; ++ pthread_mutex_unlock(&priv->mutex); ++ } ++ ++ if (!thread->thread) ++ cf_crash(AS_DRV_SSD, "Invalid spdk thread context"); ++ ++ spdk_set_thread(thread->thread); ++ ++ return thread; ++} ++ ++static struct spdk_io_channel * ++ssd_spdk_get_io_channel(struct ssd_spdk_thread *thread, struct spdk_bdev_desc *desc) ++{ ++ struct ssd_spdk_priv *priv = thread->priv; ++ struct spdk_io_channel *ch; ++ ++ if (priv->desc == desc) { ++ if (thread->ch) ++ return thread->ch; ++ } else if (priv->shadow_desc == desc) { ++ if (thread->shadow_ch) ++ return thread->shadow_ch; ++ } else { ++ cf_crash(AS_DRV_SSD, "Invalid spdk_bdev_desc specified"); ++ } ++ ++ ch = spdk_bdev_get_io_channel(desc); ++ ++ if (priv->desc == desc) { ++ thread->ch = ch; ++ } else if (priv->shadow_desc == desc) { ++ thread->shadow_ch = ch; ++ } ++ ++ return ch; ++} ++ ++static void ++ssd_spdk_thread_put(struct ssd_spdk_priv *priv, struct ssd_spdk_thread *thread) ++{ ++ if (!lthread_current()) { ++ pthread_mutex_lock(&priv->mutex); ++ thread->used = false; ++ pthread_cond_signal(&priv->cond); ++ pthread_mutex_unlock(&priv->mutex); ++ } ++ spdk_set_thread(NULL); ++} ++ ++#endif ++ ++static ssd_fd_t ++ssd_spdk_open(drv_ssd *ssd, bool is_shadow, int flags) ++{ ++ struct ssd_spdk_priv *priv = ssd->priv; ++ struct ssd_spdk_fd *handle; ++ ssd_fd_t ssd_fd = { .fd = -1 }; ++ ++ handle = cf_malloc(sizeof(*handle)); ++ ++ handle->desc = is_shadow ? priv->shadow_desc : priv->desc; ++ ++#ifdef USE_LTHREAD ++ handle->priv = priv; ++#else ++ pthread_mutex_init(&handle->mutex, NULL); ++ pthread_cond_init(&handle->cond, NULL); ++ ++ handle->thread = &priv->threads[(priv->rr++) % spdk_env_get_core_count()]; ++ ++ handle->ch = NULL; ++#endif ++ handle->bounce_size = 0; ++ ++ ssd_fd.handle = handle; ++ ++ return ssd_fd; ++} ++ ++static void ++ssd_spdk_close_start(void *ctx) ++{ ++ struct ssd_spdk_fd *handle = ctx; ++ ++ if (handle->bounce_size) ++ spdk_dma_free(handle->bounce); ++#ifdef USE_LTHREAD ++ handle->done = true; ++#else ++ if (handle->ch) ++ spdk_put_io_channel(handle->ch); ++ ++ pthread_mutex_lock(&handle->mutex); ++ pthread_cond_signal(&handle->cond); ++ pthread_mutex_unlock(&handle->mutex); ++#endif ++} ++ ++static void ++ssd_spdk_close(ssd_fd_t ssd_fd) ++{ ++ struct ssd_spdk_fd *handle = ssd_fd.handle; ++ ++#ifdef USE_LTHREAD ++ struct ssd_spdk_thread *thread; ++ ++ handle->done = false; ++ thread = ssd_spdk_thread_get(handle->priv); ++ spdk_thread_send_msg(thread->thread, ssd_spdk_close_start, handle); ++ ++ do { ++ spdk_thread_poll(thread->thread, 0, 0); ++ if (lthread_current()) ++ lthread_yield(); ++ else ++ sched_yield(); ++ } while (!handle->done); ++ ++ ssd_spdk_thread_put(handle->priv, thread); ++#else ++ pthread_mutex_lock(&handle->mutex); ++ ++ pthread_mutex_lock(&handle->thread->mutex); ++ spdk_thread_send_msg(handle->thread->thread, ssd_spdk_close_start, handle); ++ pthread_mutex_unlock(&handle->thread->mutex); ++ ++ pthread_cond_wait(&handle->cond, &handle->mutex); ++ pthread_mutex_unlock(&handle->mutex); ++#endif ++ cf_free(handle); ++} ++ ++static void ++ssd_spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) ++{ ++ struct ssd_spdk_fd *handle = cb_arg; ++ ++ if (handle->rw == SSD_SPDK_READ_BOUNCE) ++ memcpy(handle->buf, handle->bounce, handle->size); ++ ++#ifdef USE_LTHREAD ++ handle->success = success; ++ handle->done = true; ++#else ++ pthread_mutex_lock(&handle->mutex); ++ handle->success = success; ++ pthread_cond_signal(&handle->cond); ++ pthread_mutex_unlock(&handle->mutex); ++#endif ++ ++ spdk_bdev_free_io(bdev_io); ++} ++ ++static void ++ssd_spdk_prw_start(void *ctx) ++{ ++ struct ssd_spdk_fd *handle = ctx; ++ void *buf = handle->buf; ++ struct spdk_io_channel *ch; ++ ++#ifdef USE_LTHREAD ++ ch = ssd_spdk_get_io_channel(handle->thread, handle->desc); ++ if (!ch) { ++ handle->success = false; ++ handle->done = true; ++ return; ++ } ++#else ++ if (!handle->ch) ++ handle->ch = spdk_bdev_get_io_channel(handle->desc); ++ ++ if (!handle->ch) { ++ pthread_mutex_lock(&handle->mutex); ++ handle->success = false; ++ pthread_cond_signal(&handle->cond); ++ pthread_mutex_unlock(&handle->mutex); ++ ++ return; ++ } ++ ch = handle->ch; ++#endif ++ ++ if (handle->rw == SSD_SPDK_READ_BOUNCE || handle->rw == SSD_SPDK_WRITE_BOUNCE) { ++ if (handle->bounce_size < handle->size) { ++ if (handle->bounce_size) { ++ spdk_dma_free(handle->bounce); ++ } ++ handle->bounce = ssd_spdk_dma_alloc(handle->size); ++ handle->bounce_size = handle->size; ++ } ++ if (handle->rw == SSD_SPDK_WRITE_BOUNCE) ++ memcpy(handle->bounce, handle->buf, handle->size); ++ ++ buf = handle->bounce; ++ } ++ ++ switch (handle->rw) { ++ case SSD_SPDK_WRITE: ++ case SSD_SPDK_WRITE_BOUNCE: ++ spdk_bdev_write(handle->desc, ch, buf, handle->offset, handle->size, ++ ssd_spdk_bdev_io_complete, handle); ++ break; ++ case SSD_SPDK_READ: ++ case SSD_SPDK_READ_BOUNCE: ++ spdk_bdev_read(handle->desc, ch, buf, handle->offset, handle->size, ++ ssd_spdk_bdev_io_complete, handle); ++ break; ++ } ++} ++ ++static bool ++ssd_spdk_prw(ssd_fd_t ssd_fd, void *buf, size_t size, off_t offset, enum ssd_spdk_rw rw) ++{ ++ struct ssd_spdk_fd *handle = ssd_fd.handle; ++ ++#ifdef USE_LTHREAD ++ handle->done = false; ++#else ++ pthread_mutex_lock(&handle->mutex); ++#endif ++ ++ handle->rw = rw; ++ handle->buf = buf; ++ handle->size = size; ++ handle->offset = offset; ++ handle->success = false; ++ ++#ifdef USE_LTHREAD ++ handle->thread = ssd_spdk_thread_get(handle->priv); ++ ++ spdk_thread_send_msg(handle->thread->thread, ssd_spdk_prw_start, handle); ++ ++ do { ++ spdk_thread_poll(handle->thread->thread, 0, 0); ++ if (lthread_current()) ++ lthread_yield(); ++ else ++ sched_yield(); ++ } while (!handle->done); ++ ++ ssd_spdk_thread_put(handle->priv, handle->thread); ++ handle->thread = NULL; ++#else ++ pthread_mutex_lock(&handle->thread->mutex); ++ spdk_thread_send_msg(handle->thread->thread, ssd_spdk_prw_start, handle); ++ pthread_mutex_unlock(&handle->thread->mutex); ++ ++ pthread_cond_wait(&handle->cond, &handle->mutex); ++ pthread_mutex_unlock(&handle->mutex); ++#endif ++ ++ return handle->success; ++} ++ ++static bool ++ssd_spdk_pread(ssd_fd_t ssd_fd, void *buf, size_t size, off_t offset, bool bounce) ++{ ++ return ssd_spdk_prw(ssd_fd, buf, size, offset, bounce ? SSD_SPDK_READ_BOUNCE : SSD_SPDK_READ); ++} ++ ++static bool ++ssd_spdk_pwrite(ssd_fd_t ssd_fd, void *buf, size_t size, off_t offset, bool bounce) ++{ ++ return ssd_spdk_prw(ssd_fd, buf, size, offset, bounce ? SSD_SPDK_WRITE_BOUNCE : SSD_SPDK_WRITE); ++} ++ ++#ifndef USE_LTHREAD ++ ++static void ++ssd_spdk_start(void *arg) ++{ ++ struct ssd_spdk_data *data = arg; ++ ++ pthread_mutex_lock(&data->mutex); ++ pthread_cond_signal(&data->cond); ++ pthread_mutex_unlock(&data->mutex); ++} ++ ++extern void as_sig_handle_term(int sig_num, siginfo_t *info, void *ctx); ++ ++static void ++ssd_spdk_shutdown_cb(void) ++{ ++ as_sig_handle_term(SIGTERM, NULL, NULL); ++} ++ ++static void * ++run_ssd_spdk_start(void *arg) ++{ ++ struct ssd_spdk_data *data = arg; ++ struct spdk_app_opts opts = {}; ++ int rc; ++ ++ cf_info(AS_DRV_SSD, "spdk conf is %s", g_config.spdk_json_conf); ++ ++ spdk_app_opts_init(&opts, sizeof(opts)); ++ opts.name = "aerospike"; ++ if (g_config.service_lcores) ++ opts.reactor_mask = g_config.service_lcores; ++ opts.shutdown_cb = ssd_spdk_shutdown_cb; ++ opts.json_config_file = g_config.spdk_json_conf; ++ ++ rc = spdk_app_start(&opts, ssd_spdk_start, data); ++ if (rc) { ++ cf_crash(AS_DRV_SSD, "Error starting spdk application"); ++ } else { ++ spdk_app_fini(); ++ } ++ ++ return NULL; ++} ++ ++static void ++ssd_spdk_shutdown(void) ++{ ++ if (ssd_spdk_data && TAILQ_EMPTY(&ssd_spdk_data->devices)) { ++ spdk_app_stop(0); ++ cf_thread_join(ssd_spdk_data->tid); ++ } ++} ++ ++#else ++ ++static void ++ssd_spdk_bdev_init_done(int rc, void *cb_arg) ++{ ++ struct ssd_spdk_data *data = cb_arg; ++ ++ data->done = true; ++} ++ ++static void ++ssd_spdk_bdev_init_start(void *arg) ++{ ++ struct ssd_spdk_data *data = arg; ++ ++ spdk_app_json_config_load(g_config.spdk_json_conf, SPDK_DEFAULT_RPC_ADDR, ++ ssd_spdk_bdev_init_done, data, true); ++} ++ ++static void * ++run_ssd_spdk_start(void *arg) ++{ ++ struct spdk_env_opts opts; ++ struct spdk_thread *thread; ++ ++ cf_info(AS_DRV_SSD, "spdk conf is %s", g_config.spdk_json_conf); ++ ++ spdk_env_opts_init(&opts); ++ opts.name = "aerospike"; ++ if (g_config.service_lcores) { ++ opts.core_mask = g_config.service_lcores; ++ opts.env_context = cf_strdup("--log-level=lib.eal:7"); ++ } ++ ++ if (spdk_env_init(&opts) < 0) { ++ cf_crash(AS_DRV_SSD, "unable to initialize SPDK env"); ++ } ++ ++ spdk_thread_lib_init(NULL, 0); ++ ++ thread = spdk_thread_create("spdk_aerospike", NULL); ++ if (!thread) { ++ cf_crash(AS_DRV_SSD, "failed to allocate spdk thread"); ++ } ++ ++ ssd_spdk_data->done = false; ++ spdk_thread_send_msg(thread, ssd_spdk_bdev_init_start, ssd_spdk_data); ++ ++ do { ++ spdk_thread_poll(thread, 0, 0); ++ } while (!ssd_spdk_data->done); ++ ++ pthread_mutex_lock(&ssd_spdk_data->mutex); ++ pthread_cond_signal(&ssd_spdk_data->cond); ++ pthread_mutex_unlock(&ssd_spdk_data->mutex); ++ ++ as_monitor_init(&as_service_run_monitor); ++ as_monitor_begin(&as_service_run_monitor); ++ as_monitor_wait(&as_service_run_monitor); ++ ++ as_service_run_threads(); ++ ++ return NULL; ++} ++ ++static void ++ssd_spdk_shutdown(void) ++{ ++} ++ ++#endif ++ ++static void ++ssd_spdk_init(void) ++{ ++ if (!g_config.spdk_json_conf || ssd_spdk_data) { ++ return; ++ } ++ ++ ssd_spdk_data = cf_malloc(sizeof(*ssd_spdk_data)); ++ ++ TAILQ_INIT(&ssd_spdk_data->devices); ++ pthread_mutex_init(&ssd_spdk_data->mutex, NULL); ++ pthread_cond_init(&ssd_spdk_data->cond, NULL); ++ ++ pthread_mutex_lock(&ssd_spdk_data->mutex); ++ ssd_spdk_data->tid = cf_thread_create_joinable(run_ssd_spdk_start, ssd_spdk_data); ++ pthread_cond_wait(&ssd_spdk_data->cond, &ssd_spdk_data->mutex); ++ pthread_mutex_unlock(&ssd_spdk_data->mutex); ++} ++ ++static const struct ssd_ops ssd_spdk_ops = { ++ .name = "spdk-bdev", ++ .init = ssd_spdk_init, ++ .shutdown = ssd_spdk_shutdown, ++ .init_device = ssd_spdk_init_device, ++ .finish_device = ssd_spdk_finish_device, ++ .dma_alloc = ssd_spdk_dma_alloc, ++ .dma_free = ssd_spdk_dma_free, ++ .open = ssd_spdk_open, ++ .close = ssd_spdk_close, ++ .pread = ssd_spdk_pread, ++ .pwrite = ssd_spdk_pwrite, ++}; ++ ++#else /* USE_SPDK */ ++ ++static bool ++ssd_spdk_init_device(as_namespace *ns, drv_ssd *ssd, bool is_shadow) ++{ ++ cf_crash(AS_DRV_SSD, "SPDK support is disabled"); ++} ++ ++static const struct ssd_ops ssd_spdk_ops = { ++ .name = "spdk-bdev", ++ .init_device = ssd_spdk_init_device, ++}; ++ ++#endif /* USE_SPDK */ ++ ++static bool ++ssd_init_device(as_namespace *ns, drv_ssd *ssd, bool is_shadow) ++{ ++ if (ssd->ops->init_device) ++ return ssd->ops->init_device(ns, ssd, is_shadow); ++ ++ return true; ++} ++ ++static void ++ssd_finish_device(drv_ssd *ssd) ++{ ++ if (ssd->ops->finish_device) ++ ssd->ops->finish_device(ssd); ++} ++ ++static inline void * ++ssd_dma_alloc(drv_ssd *ssd, size_t sz) ++{ ++ if (ssd->ops->dma_alloc) { ++ return ssd->ops->dma_alloc(sz); ++ } ++ return cf_valloc(sz); ++} ++ ++static inline void ++ssd_dma_free(drv_ssd *ssd, void *ptr) ++{ ++ if (ssd->ops->dma_free) { ++ return ssd->ops->dma_free(ptr); ++ } ++ cf_free(ptr); ++} ++ ++static ssd_fd_t ++ssd_open(drv_ssd *ssd, bool is_shadow, int flags) ++{ ++ return ssd->ops->open(ssd, is_shadow, flags); ++} ++ ++static void ++ssd_close(drv_ssd *ssd, ssd_fd_t ssd_fd) ++{ ++ ssd->ops->close(ssd_fd); ++} ++ ++static bool ++ssd_pread_all(drv_ssd *ssd, ssd_fd_t ssd_fd, void *buf, size_t size, off_t offset) ++{ ++ return ssd->ops->pread(ssd_fd, buf, size, offset, false); ++} ++ ++static bool ++ssd_pread_all_bounce(drv_ssd *ssd, ssd_fd_t ssd_fd, void *buf, size_t size, off_t offset) ++{ ++ return ssd->ops->pread(ssd_fd, buf, size, offset, true); ++} ++ ++static bool ++ssd_pwrite_all(drv_ssd *ssd, ssd_fd_t ssd_fd, void *buf, size_t size, off_t offset) ++{ ++ return ssd->ops->pwrite(ssd_fd, buf, size, offset, false); ++} ++ ++static bool ++ssd_pwrite_all_bounce(drv_ssd *ssd, ssd_fd_t ssd_fd, void *buf, size_t size, off_t offset) ++{ ++ return ssd->ops->pwrite(ssd_fd, buf, size, offset, true); ++} ++ ++static bool ++ssd_fd_is_error(ssd_fd_t ssd_fd) ++{ ++ return ssd_fd.fd == -1; ++} ++ ++static ssd_fd_t ++ssd_posix_open(drv_ssd *ssd, bool is_shadow, int flags) ++{ ++ ssd_fd_t ssd_fd; ++ ++ ssd_fd.fd = open(is_shadow ? ssd->shadow_name : ssd->name, flags, S_IRUSR | S_IWUSR); ++ ++ return ssd_fd; ++} ++ ++static void ++ssd_posix_close(ssd_fd_t ssd_fd) ++{ ++ close(ssd_fd.fd); ++} ++ ++static bool ++ssd_posix_pread(ssd_fd_t ssd_fd, void *buf, size_t size, off_t offset, bool bounce) ++{ ++ return pread_all(ssd_fd.fd, buf, size, offset); ++} ++ ++static bool ++ssd_posix_pwrite(ssd_fd_t ssd_fd, void *buf, size_t size, off_t offset, bool bounce) ++{ ++ return pwrite_all(ssd_fd.fd, buf, size, offset); ++} ++ ++static const struct ssd_ops ssd_posix_ops = { ++ .name = "posix", ++ .open = ssd_posix_open, ++ .close = ssd_posix_close, ++ .pread = ssd_posix_pread, ++ .pwrite = ssd_posix_pwrite, ++}; ++ ++static const struct ssd_ops *ssd_backends[] = { ++ &ssd_spdk_ops, ++ &ssd_posix_ops, ++ NULL, ++}; ++ ++static void ssd_backends_init(void) ++{ ++ for (const struct ssd_ops **ops = &ssd_backends[0]; *ops; ops++) { ++ if ((*ops)->init) { ++ (*ops)->init(); ++ } ++ } ++} ++ ++static void ssd_backends_shutdown(void) ++{ ++ for (const struct ssd_ops **ops = &ssd_backends[0]; *ops; ops++) { ++ if ((*ops)->shutdown) { ++ (*ops)->shutdown(); ++ } ++ } ++} + + //========================================================== + // Constants. +@@ -82,16 +992,20 @@ + // + + // Get an open file descriptor from the pool, or a fresh one if necessary. +-int ++ssd_fd_t + ssd_fd_get(drv_ssd *ssd) + { +- int fd = -1; +- int rv = cf_queue_pop(ssd->fd_q, (void*)&fd, CF_QUEUE_NOWAIT); ++ ssd_fd_t fd = { .fd = -1 }; ++ int rv = CF_QUEUE_EMPTY; ++ ++ if (ssd->ns->storage_recycle_fds) { ++ rv = cf_queue_pop(ssd->fd_q, (void*)&fd, CF_QUEUE_NOWAIT); ++ } + + if (rv != CF_QUEUE_OK) { +- fd = open(ssd->name, ssd->open_flag, S_IRUSR | S_IWUSR); ++ fd = ssd_open(ssd, false, ssd->open_flag); + +- if (-1 == fd) { ++ if (ssd_fd_is_error(fd)) { + cf_crash(AS_DRV_SSD, "%s: DEVICE FAILED open: errno %d (%s)", + ssd->name, errno, cf_strerror(errno)); + } +@@ -101,17 +1015,20 @@ ssd_fd_get(drv_ssd *ssd) + } + + +-int ++ssd_fd_t + ssd_fd_cache_get(drv_ssd *ssd) + { +- int fd = -1; +- int rv = cf_queue_pop(ssd->fd_cache_q, (void*)&fd, CF_QUEUE_NOWAIT); ++ ssd_fd_t fd = { .fd = -1 }; ++ int rv = CF_QUEUE_EMPTY; ++ ++ if (ssd->ns->storage_recycle_fds) { ++ rv = cf_queue_pop(ssd->fd_cache_q, (void*)&fd, CF_QUEUE_NOWAIT); ++ } + + if (rv != CF_QUEUE_OK) { +- fd = open(ssd->name, ssd->open_flag & ~(O_DIRECT | O_DSYNC), +- S_IRUSR | S_IWUSR); ++ fd = ssd_open(ssd, false, ssd->open_flag & ~(O_DIRECT | O_DSYNC)); + +- if (-1 == fd) { ++ if (ssd_fd_is_error(fd)) { + cf_crash(AS_DRV_SSD, "%s: DEVICE FAILED open: errno %d (%s)", + ssd->name, errno, cf_strerror(errno)); + } +@@ -121,16 +1038,20 @@ ssd_fd_cache_get(drv_ssd *ssd) + } + + +-int ++ssd_fd_t + ssd_shadow_fd_get(drv_ssd *ssd) + { +- int fd = -1; +- int rv = cf_queue_pop(ssd->shadow_fd_q, (void*)&fd, CF_QUEUE_NOWAIT); ++ ssd_fd_t fd = { .fd = -1 }; ++ int rv = CF_QUEUE_EMPTY; ++ ++ if (ssd->ns->storage_recycle_fds) { ++ rv = cf_queue_pop(ssd->shadow_fd_q, (void*)&fd, CF_QUEUE_NOWAIT); ++ } + + if (rv != CF_QUEUE_OK) { +- fd = open(ssd->shadow_name, ssd->open_flag, S_IRUSR | S_IWUSR); ++ fd = ssd_open(ssd, true, ssd->open_flag); + +- if (-1 == fd) { ++ if (ssd_fd_is_error(fd)) { + cf_crash(AS_DRV_SSD, "%s: DEVICE FAILED open: errno %d (%s)", + ssd->shadow_name, errno, cf_strerror(errno)); + } +@@ -142,23 +1063,35 @@ ssd_shadow_fd_get(drv_ssd *ssd) + + // Save an open file descriptor in the pool + void +-ssd_fd_put(drv_ssd *ssd, int fd) ++ssd_fd_put(drv_ssd *ssd, ssd_fd_t fd) + { +- cf_queue_push(ssd->fd_q, (void*)&fd); ++ if (ssd->ns->storage_recycle_fds) { ++ cf_queue_push(ssd->fd_q, (void*)&fd); ++ } else { ++ ssd_close(ssd, fd); ++ } + } + + + static inline void +-ssd_fd_cache_put(drv_ssd *ssd, int fd) ++ssd_fd_cache_put(drv_ssd *ssd, ssd_fd_t fd) + { +- cf_queue_push(ssd->fd_cache_q, (void*)&fd); ++ if (ssd->ns->storage_recycle_fds) { ++ cf_queue_push(ssd->fd_cache_q, (void*)&fd); ++ } else { ++ ssd_close(ssd, fd); ++ } + } + + + static inline void +-ssd_shadow_fd_put(drv_ssd *ssd, int fd) ++ssd_shadow_fd_put(drv_ssd *ssd, ssd_fd_t fd) + { +- cf_queue_push(ssd->shadow_fd_q, (void*)&fd); ++ if (ssd->ns->storage_recycle_fds) { ++ cf_queue_push(ssd->shadow_fd_q, (void*)&fd); ++ } else { ++ ssd_close(ssd, fd); ++ } + } + + +@@ -299,7 +1232,7 @@ swb_create(drv_ssd *ssd) + { + ssd_write_buf *swb = (ssd_write_buf*)cf_malloc(sizeof(ssd_write_buf)); + +- swb->buf = cf_valloc(ssd->write_block_size); ++ swb->buf = ssd_dma_alloc(ssd, ssd->write_block_size); + + swb->n_vacated = 0; + swb->vacated_capacity = VACATED_CAPACITY_STEP; +@@ -313,7 +1246,7 @@ static inline void + swb_destroy(ssd_write_buf *swb) + { + cf_free(swb->vacated_wblocks); +- cf_free(swb->buf); ++ ssd_dma_free(swb->ssd, swb->buf); + cf_free(swb); + } + +@@ -507,6 +1440,7 @@ ssd_block_free(drv_ssd *ssd, uint64_t rblock_id, uint32_t n_rblocks, char *msg) + rblock_id); + + cf_assert(start_offset >= DRV_HEADER_SIZE && ++ start_offset >= ssd->write_block_size && + wblock_id < ssd->n_wblocks && wblock_id == end_wblock_id, + AS_DRV_SSD, "%s: %s: freeing bad range rblock_id %lu n_rblocks %u", + ssd->name, msg, rblock_id, n_rblocks); +@@ -734,16 +1668,15 @@ ssd_defrag_wblock(drv_ssd *ssd, uint32_t wblock_id, uint8_t *read_buf) + goto Finished; + } + +- int fd = ssd_fd_get(ssd); ++ ssd_fd_t fd = ssd_fd_get(ssd); + uint64_t file_offset = WBLOCK_ID_TO_OFFSET(ssd, wblock_id); + + uint64_t start_ns = ssd->ns->storage_benchmarks_enabled ? cf_getns() : 0; + +- if (! pread_all(fd, read_buf, ssd->write_block_size, (off_t)file_offset)) { ++ if (! ssd_pread_all(ssd, fd, read_buf, ssd->write_block_size, (off_t)file_offset)) { + cf_warning(AS_DRV_SSD, "%s: read failed: errno %d (%s)", ssd->name, + errno, cf_strerror(errno)); +- close(fd); +- fd = -1; ++ ssd_close(ssd, fd); + goto Finished; + } + +@@ -830,7 +1763,7 @@ run_defrag(void *pv_data) + drv_ssd *ssd = (drv_ssd*)pv_data; + as_namespace *ns = ssd->ns; + uint32_t wblock_id; +- uint8_t *read_buf = cf_valloc(ssd->write_block_size); ++ uint8_t *read_buf = ssd_dma_alloc(ssd, ssd->write_block_size); + + while (true) { + uint32_t q_min = as_load_uint32(&ns->storage_defrag_queue_min); +@@ -859,6 +1792,7 @@ run_defrag(void *pv_data) + usleep(1000); + } + } ++ ssd_dma_free(ssd, read_buf); + + return NULL; + } +@@ -1133,16 +2067,16 @@ ssd_read_record(as_storage_rd *rd, bool pickle_only) + + read_buf = cf_valloc(read_size); + +- int fd = rd->read_page_cache ? ssd_fd_cache_get(ssd) : ssd_fd_get(ssd); ++ ssd_fd_t fd = rd->read_page_cache ? ssd_fd_cache_get(ssd) : ssd_fd_get(ssd); + + uint64_t start_ns = ns->storage_benchmarks_enabled ? cf_getns() : 0; + uint64_t start_us = as_health_sample_device_read() ? cf_getus() : 0; + +- if (! pread_all(fd, read_buf, read_size, (off_t)read_offset)) { ++ if (! ssd_pread_all_bounce(ssd, fd, read_buf, read_size, (off_t)read_offset)) { + cf_warning(AS_DRV_SSD, "%s: read failed: size %lu: errno %d (%s)", + ssd->name, read_size, errno, cf_strerror(errno)); + cf_free(read_buf); +- close(fd); ++ ssd_close(ssd, fd); + return -1; + } + +@@ -1305,12 +2239,12 @@ ssd_flush_swb(drv_ssd *ssd, ssd_write_buf *swb) + ; + } + +- int fd = ssd_fd_get(ssd); ++ ssd_fd_t fd = ssd_fd_get(ssd); + off_t write_offset = (off_t)WBLOCK_ID_TO_OFFSET(ssd, swb->wblock_id); + + uint64_t start_ns = ssd->ns->storage_benchmarks_enabled ? cf_getns() : 0; + +- if (! pwrite_all(fd, swb->buf, ssd->write_block_size, write_offset)) { ++ if (! ssd_pwrite_all(ssd, fd, swb->buf, ssd->write_block_size, write_offset)) { + cf_crash(AS_DRV_SSD, "%s: DEVICE FAILED write: errno %d (%s)", + ssd->name, errno, cf_strerror(errno)); + } +@@ -1326,12 +2260,12 @@ ssd_flush_swb(drv_ssd *ssd, ssd_write_buf *swb) + void + ssd_shadow_flush_swb(drv_ssd *ssd, ssd_write_buf *swb) + { +- int fd = ssd_shadow_fd_get(ssd); ++ ssd_fd_t fd = ssd_shadow_fd_get(ssd); + off_t write_offset = (off_t)WBLOCK_ID_TO_OFFSET(ssd, swb->wblock_id); + + uint64_t start_ns = ssd->ns->storage_benchmarks_enabled ? cf_getns() : 0; + +- if (! pwrite_all(fd, swb->buf, ssd->write_block_size, write_offset)) { ++ if (! ssd_pwrite_all(ssd, fd, swb->buf, ssd->write_block_size, write_offset)) { + cf_crash(AS_DRV_SSD, "%s: DEVICE FAILED write: errno %d (%s)", + ssd->shadow_name, errno, cf_strerror(errno)); + } +@@ -1681,7 +2615,7 @@ as_storage_dump_wb_summary_ssd(const as_namespace *ns) + // Note: This is a sparse array that could be more efficiently stored. + // (In addition, ranges of block sizes could be binned together to + // compress the histogram, rather than using one bin per block size.) +- uint32_t *wb_hist = cf_calloc(1, sizeof(uint32_t) * MAX_WRITE_BLOCK_SIZE); ++ uint32_t *wb_hist = cf_calloc(1, sizeof(uint32_t) * ns->storage_write_block_size); + + for (uint32_t d = 0; d < ssds->n_ssds; d++) { + drv_ssd *ssd = &ssds->ssds[d]; +@@ -1741,7 +2675,7 @@ as_storage_dump_wb_summary_ssd(const as_namespace *ns) + (defraggable_sz + non_defraggable_sz) / + MAX(1, (total_num_defraggable + total_num_above_wm))); + +- for (uint32_t i = 0; i < MAX_WRITE_BLOCK_SIZE; i++) { ++ for (uint32_t i = 0; i < ns->storage_write_block_size; i++) { + if (wb_hist[i] > 0) { + cf_info(AS_DRV_SSD, "WBH: %u block%s of size %u bytes", + wb_hist[i], (wb_hist[i] != 1 ? "s" : ""), i); +@@ -2115,7 +3049,7 @@ ssd_read_header(drv_ssd *ssd) + bool use_shadow = ns->cold_start && ssd->shadow_name; + + const char *ssd_name; +- int fd; ++ ssd_fd_t fd; + size_t read_size; + + if (use_shadow) { +@@ -2131,7 +3065,7 @@ ssd_read_header(drv_ssd *ssd) + + drv_header *header = cf_valloc(read_size); + +- if (! pread_all(fd, (void*)header, read_size, 0)) { ++ if (! ssd_pread_all_bounce(ssd, fd, (void *)header, read_size, 0)) { + cf_crash(AS_DRV_SSD, "%s: read failed: errno %d (%s)", ssd_name, errno, + cf_strerror(errno)); + } +@@ -2186,6 +3120,7 @@ ssd_read_header(drv_ssd *ssd) + + if (header->unique.pristine_offset != 0 && // always 0 before 4.6 + (header->unique.pristine_offset < DRV_HEADER_SIZE || ++ header->unique.pristine_offset < ssd->write_block_size || + header->unique.pristine_offset > ssd->file_size)) { + cf_crash(AS_DRV_SSD, "%s: bad pristine offset %lu", ssd_name, + header->unique.pristine_offset); +@@ -2222,7 +3157,7 @@ ssd_init_header(as_namespace *ns, drv_ssd *ssd) + + + void +-ssd_empty_header(int fd, const char* device_name) ++ssd_posix_empty_header(int fd, const char* device_name) + { + void *h = cf_valloc(DRV_HEADER_SIZE); + +@@ -2236,6 +3171,28 @@ ssd_empty_header(int fd, const char* device_name) + cf_free(h); + } + ++#ifdef USE_SPDK ++ ++static void ++ssd_empty_header(drv_ssd *ssd, bool is_shadow) ++{ ++ ssd_fd_t fd; ++ void *h = cf_valloc(DRV_HEADER_SIZE); ++ ++ memset(h, 0, DRV_HEADER_SIZE); ++ ++ fd = ssd_open(ssd, is_shadow, ssd->open_flag); ++ if (ssd_fd_is_error(fd)) ++ cf_crash(AS_DRV_SSD, "DEVICE FAILED open"); ++ ++ if (!ssd_pwrite_all_bounce(ssd, fd, h, DRV_HEADER_SIZE, 0)) ++ cf_crash(AS_DRV_SSD, "DEVICE FAILED write"); ++ ++ ssd_close(ssd, fd); ++ cf_free(h); ++} ++ ++#endif /* USE_SPDK */ + + void + ssd_write_header(drv_ssd *ssd, uint8_t *header, uint8_t *from, size_t size) +@@ -2248,9 +3205,9 @@ ssd_write_header(drv_ssd *ssd, uint8_t *header, uint8_t *from, size_t size) + uint8_t *flush = header + flush_offset; + size_t flush_sz = flush_end_offset - flush_offset; + +- int fd = ssd_fd_get(ssd); ++ ssd_fd_t fd = ssd_fd_get(ssd); + +- if (! pwrite_all(fd, (void*)flush, flush_sz, flush_offset)) { ++ if (! ssd_pwrite_all_bounce(ssd, fd, (void*)flush, flush_sz, flush_offset)) { + cf_crash(AS_DRV_SSD, "%s: DEVICE FAILED write: errno %d (%s)", + ssd->name, errno, cf_strerror(errno)); + } +@@ -2269,7 +3226,7 @@ ssd_write_header(drv_ssd *ssd, uint8_t *header, uint8_t *from, size_t size) + + fd = ssd_shadow_fd_get(ssd); + +- if (! pwrite_all(fd, (void*)flush, flush_sz, flush_offset)) { ++ if (! ssd_pwrite_all_bounce(ssd, fd, (void*)flush, flush_sz, flush_offset)) { + cf_crash(AS_DRV_SSD, "%s: DEVICE FAILED write: errno %d (%s)", + ssd->shadow_name, errno, cf_strerror(errno)); + } +@@ -2517,29 +3474,30 @@ ssd_cold_start_sweep(drv_ssds *ssds, drv_ssd *ssd) + { + size_t wblock_size = ssd->write_block_size; + +- uint8_t *buf = cf_valloc(wblock_size); ++ uint8_t *buf = ssd_dma_alloc(ssd, wblock_size); + + bool read_shadow = ssd->shadow_name; + const char *read_ssd_name = read_shadow ? ssd->shadow_name : ssd->name; +- int fd = read_shadow ? ssd_shadow_fd_get(ssd) : ssd_fd_get(ssd); +- int write_fd = read_shadow ? ssd_fd_get(ssd) : -1; ++ ssd_fd_t fd = read_shadow ? ssd_shadow_fd_get(ssd) : ssd_fd_get(ssd); ++ ssd_fd_t write_fd = read_shadow ? ssd_fd_get(ssd) : (ssd_fd_t) { .fd = -1 }; + + // Loop over all wblocks, unless we encounter 10 contiguous unused wblocks. + + ssd->sweep_wblock_id = ssd->first_wblock_id; + +- uint64_t file_offset = DRV_HEADER_SIZE; ++ uint64_t file_offset = ssd->write_block_size <= DRV_HEADER_SIZE ? ++ DRV_HEADER_SIZE : ssd->write_block_size; + uint32_t n_unused_wblocks = 0; + + bool prefetch = cf_arenax_want_prefetch(ssd->ns->arena); + + while (file_offset < ssd->file_size && n_unused_wblocks < 10) { +- if (! pread_all(fd, buf, wblock_size, (off_t)file_offset)) { ++ if (! ssd_pread_all(ssd, fd, buf, wblock_size, (off_t)file_offset)) { + cf_crash(AS_DRV_SSD, "%s: read failed: errno %d (%s)", + read_ssd_name, errno, cf_strerror(errno)); + } + +- if (read_shadow && ! pwrite_all(write_fd, (void*)buf, wblock_size, ++ if (read_shadow && ! ssd_pwrite_all(ssd, write_fd, (void*)buf, wblock_size, + (off_t)file_offset)) { + cf_crash(AS_DRV_SSD, "%s: write failed: errno %d (%s)", ssd->name, + errno, cf_strerror(errno)); +@@ -2614,15 +3572,15 @@ ssd_cold_start_sweep(drv_ssds *ssds, drv_ssd *ssd) + + ssd->sweep_wblock_id = (uint32_t)(ssd->file_size / wblock_size); + +- if (fd != -1) { ++ if (!ssd_fd_is_error(fd)) { + read_shadow ? ssd_shadow_fd_put(ssd, fd) : ssd_fd_put(ssd, fd); + } + +- if (write_fd != -1) { ++ if (!ssd_fd_is_error(write_fd)) { + ssd_fd_put(ssd, write_fd); + } + +- cf_free(buf); ++ ssd_dma_free(ssd, buf); + } + + +@@ -2740,7 +3698,7 @@ si_startup_sweep(drv_ssds* ssds, drv_ssd* ssd) + size_t wblock_size = ssd->write_block_size; + + uint8_t* buf = cf_valloc(wblock_size); +- int fd = ssd_fd_get(ssd); ++ ssd_fd_t fd = ssd_fd_get(ssd); + uint64_t file_offset = DRV_HEADER_SIZE; + + bool prefetch = cf_arenax_want_prefetch(ssd->ns->arena); +@@ -2755,7 +3713,7 @@ si_startup_sweep(drv_ssds* ssds, drv_ssd* ssd) + continue; + } + +- if (! pread_all(fd, buf, wblock_size, (off_t)file_offset)) { ++ if (! ssd_pread_all(ssd, fd, buf, wblock_size, (off_t)file_offset)) { + cf_crash(AS_DRV_SSD, "%s: read failed: errno %d (%s)", ssd->name, + errno, cf_strerror(errno)); + } +@@ -3148,18 +4106,20 @@ ssd_init_synchronous(drv_ssds *ssds) + } + + +-static uint64_t ++uint64_t + check_file_size(as_namespace *ns, uint64_t file_size, const char *tag) + { ++ uint64_t first_wblock_offset = ns->storage_write_block_size <= DRV_HEADER_SIZE ? ++ DRV_HEADER_SIZE : ns->storage_write_block_size; + cf_assert(sizeof(off_t) > 4, AS_DRV_SSD, "this OS supports only 32-bit (4g) files - compile with 64 bit offsets"); + +- if (file_size > DRV_HEADER_SIZE) { ++ if (file_size > first_wblock_offset) { + off_t unusable_size = +- (file_size - DRV_HEADER_SIZE) % ns->storage_write_block_size; ++ (file_size - first_wblock_offset) % ns->storage_write_block_size; + + if (unusable_size != 0) { +- cf_info(AS_DRV_SSD, "%s size must be header size %u + multiple of %u, rounding down", +- tag, DRV_HEADER_SIZE, ns->storage_write_block_size); ++ cf_info(AS_DRV_SSD, "%s size must be header size %lu + multiple of %u, rounding down", ++ tag, first_wblock_offset, ns->storage_write_block_size); + file_size -= unusable_size; + } + +@@ -3170,9 +4130,9 @@ check_file_size(as_namespace *ns, uint64_t file_size, const char *tag) + } + } + +- if (file_size <= DRV_HEADER_SIZE) { +- cf_crash(AS_DRV_SSD, "%s size %ld must be greater than header size %d", +- tag, file_size, DRV_HEADER_SIZE); ++ if (file_size <= first_wblock_offset) { ++ cf_crash(AS_DRV_SSD, "%s size %ld must be greater than header size %ld", ++ tag, file_size, first_wblock_offset); + } + + return file_size; +@@ -3218,6 +4178,20 @@ ssd_init_devices(as_namespace *ns, drv_ssds **ssds_p) + + ssd->name = ns->storage_devices[i]; + ++ for (const struct ssd_ops **ops = &ssd_backends[0]; *ops; ops++) { ++ if (!strcmp(ns->storage_device_backend, (*ops)->name)) { ++ ssd->ops = *ops; ++ break; ++ } ++ } ++ ++ if (!ssd->ops) { ++ cf_crash(AS_DRV_SSD, "Unknown storage device backend: %s", ns->storage_device_backend); ++ } else if (ssd->ops != &ssd_posix_ops) { ++ ssd_init_device(ns, ssd, false); ++ continue; ++ } ++ + // Note - can't configure commit-to-device and disable-odsync. + ssd->open_flag = O_RDWR | O_DIRECT | + (ns->storage_disable_odsync ? 0 : O_DSYNC); +@@ -3233,11 +4207,13 @@ ssd_init_devices(as_namespace *ns, drv_ssds **ssds_p) + + ioctl(fd, BLKGETSIZE64, &size); // gets the number of bytes + ++ if (ns->storage_filesize) ++ size = MIN(ns->storage_filesize, size); + ssd->file_size = check_file_size(ns, size, "usable device"); + ssd->io_min_size = find_io_min_size(fd, ssd->name); + + if (ns->cold_start && ns->storage_cold_start_empty) { +- ssd_empty_header(fd, ssd->name); ++ ssd_posix_empty_header(fd, ssd->name); + + cf_info(AS_DRV_SSD, "cold-start-empty - erased header of %s", + ssd->name); +@@ -3274,6 +4250,11 @@ ssd_init_shadow_devices(as_namespace *ns, drv_ssds *ssds) + + ssd->shadow_name = ns->storage_shadows[i]; + ++ if (ssd->ops != &ssd_posix_ops) { ++ ssd_init_device(ns, ssd, true); ++ continue; ++ } ++ + int fd = open(ssd->shadow_name, ssd->open_flag, S_IRUSR | S_IWUSR); + + if (fd == -1) { +@@ -3293,7 +4274,7 @@ ssd_init_shadow_devices(as_namespace *ns, drv_ssds *ssds) + ssd->shadow_io_min_size = find_io_min_size(fd, ssd->shadow_name); + + if (ns->cold_start && ns->storage_cold_start_empty) { +- ssd_empty_header(fd, ssd->shadow_name); ++ ssd_posix_empty_header(fd, ssd->shadow_name); + + cf_info(AS_DRV_SSD, "cold-start-empty - erased header of %s", + ssd->shadow_name); +@@ -3329,6 +4310,7 @@ ssd_init_files(as_namespace *ns, drv_ssds **ssds_p) + drv_ssd *ssd = &ssds->ssds[i]; + + ssd->name = ns->storage_devices[i]; ++ ssd->ops = &ssd_posix_ops; + + if (ns->cold_start && ns->storage_cold_start_empty) { + if (unlink(ssd->name) == 0) { +@@ -3449,9 +4431,9 @@ ssd_set_pristine_offset(drv_ssds *ssds) + for (int i = 0; i < ssds->n_ssds; i++) { + drv_ssd *ssd = &ssds->ssds[i]; + +- int fd = ssd_fd_get(ssd); ++ ssd_fd_t fd = ssd_fd_get(ssd); + +- if (! pread_all(fd, (void *)header_unique, HI_IO_MIN_SIZE, offset)) { ++ if (! ssd_pread_all_bounce(ssd, fd, (void *)header_unique, HI_IO_MIN_SIZE, offset)) { + cf_crash(AS_DRV_SSD, "%s: read failed: errno %d (%s)", + ssd->name, errno, cf_strerror(errno)); + } +@@ -3459,7 +4441,7 @@ ssd_set_pristine_offset(drv_ssds *ssds) + header_unique->pristine_offset = + (uint64_t)ssd->pristine_wblock_id * ssd->write_block_size; + +- if (! pwrite_all(fd, (void *)header_unique, HI_IO_MIN_SIZE, offset)) { ++ if (! ssd_pwrite_all_bounce(ssd, fd, (void *)header_unique, HI_IO_MIN_SIZE, offset)) { + cf_crash(AS_DRV_SSD, "%s: DEVICE FAILED write: errno %d (%s)", + ssd->name, errno, cf_strerror(errno)); + } +@@ -3503,6 +4485,8 @@ as_storage_init_ssd(as_namespace *ns) + { + drv_ssds *ssds; + ++ ssd_backends_init(); ++ + if (ns->n_storage_devices != 0) { + ssd_init_devices(ns, &ssds); + ssd_init_shadow_devices(ns, ssds); +@@ -3540,7 +4524,9 @@ as_storage_init_ssd(as_namespace *ns) + snprintf(histname, sizeof(histname), "{%s}-device-write-size", ns->name); + ns->device_write_size_hist = histogram_create(histname, HIST_SIZE); + +- uint32_t first_wblock_id = DRV_HEADER_SIZE / ns->storage_write_block_size; ++ uint64_t first_wblock_offset = ns->storage_write_block_size <= DRV_HEADER_SIZE ? ++ DRV_HEADER_SIZE : ns->storage_write_block_size; ++ uint32_t first_wblock_id = first_wblock_offset / ns->storage_write_block_size; + + // Finish initializing drv_ssd structures (non-zero-value members). + for (int i = 0; i < ssds->n_ssds; i++) { +@@ -3568,11 +4554,11 @@ as_storage_init_ssd(as_namespace *ns) + + // Note: free_wblock_q, defrag_wblock_q created after loading devices. + +- ssd->fd_q = cf_queue_create(sizeof(int), true); +- ssd->fd_cache_q = cf_queue_create(sizeof(int), true); ++ ssd->fd_q = cf_queue_create(sizeof(ssd_fd_t), true); ++ ssd->fd_cache_q = cf_queue_create(sizeof(ssd_fd_t), true); + + if (ssd->shadow_name) { +- ssd->shadow_fd_q = cf_queue_create(sizeof(int), true); ++ ssd->shadow_fd_q = cf_queue_create(sizeof(ssd_fd_t), true); + } + + ssd->swb_write_q = cf_queue_create(sizeof(void*), true); +@@ -3590,17 +4576,17 @@ as_storage_init_ssd(as_namespace *ns) + } + + snprintf(histname, sizeof(histname), "{%s}-%s-read", ns->name, ssd->name); +- ssd->hist_read = histogram_create(histname, HIST_MILLISECONDS); ++ ssd->hist_read = histogram_create(histname, HIST_MICROSECONDS); + + snprintf(histname, sizeof(histname), "{%s}-%s-large-block-read", ns->name, ssd->name); +- ssd->hist_large_block_read = histogram_create(histname, HIST_MILLISECONDS); ++ ssd->hist_large_block_read = histogram_create(histname, HIST_MICROSECONDS); + + snprintf(histname, sizeof(histname), "{%s}-%s-write", ns->name, ssd->name); +- ssd->hist_write = histogram_create(histname, HIST_MILLISECONDS); ++ ssd->hist_write = histogram_create(histname, HIST_MICROSECONDS); + + if (ssd->shadow_name) { + snprintf(histname, sizeof(histname), "{%s}-%s-shadow-write", ns->name, ssd->name); +- ssd->hist_shadow_write = histogram_create(histname, HIST_MILLISECONDS); ++ ssd->hist_shadow_write = histogram_create(histname, HIST_MICROSECONDS); + } + + ssd_init_commit(ssd); +@@ -4133,4 +5119,10 @@ as_storage_shutdown_ssd(as_namespace *ns) + + ssd_set_pristine_offset(ssds); + ssd_set_trusted(ssds); ++ ++ for (int i = 0; i < ssds->n_ssds; i++) { ++ ssd_finish_device(&ssds->ssds[i]); ++ } ++ ++ ssd_backends_shutdown(); + } +diff --git a/cf/src/Makefile b/cf/src/Makefile +index c7e0276..5d3fe61 100644 +--- a/cf/src/Makefile ++++ b/cf/src/Makefile +@@ -28,6 +28,13 @@ INCLUDES += -I$(COMMON)/src/include + INCLUDES += -I$(JANSSON)/src + INCLUDES += -I$(JEMALLOC)/include + ++ifeq ($(USE_LTHREAD),1) ++ CFLAGS += -DUSE_LTHREAD ++ INCLUDES += -I$(SPDK)/dpdk/build/include ++ INCLUDES += -I$(LTHREAD)/ ++ INCLUDES += -I$(LTHREAD)/arch/x86 ++endif ++ + OBJECTS = $(SOURCES:%.c=$(OBJECT_DIR)/%.o) + DEPENDENCIES = $(OBJECTS:%.o=%.d) + +diff --git a/cf/src/cf_mutex.c b/cf/src/cf_mutex.c +index d249cfd..537c581 100644 +--- a/cf/src/cf_mutex.c ++++ b/cf/src/cf_mutex.c +@@ -36,6 +36,9 @@ + + #include "log.h" + ++#ifdef USE_LTHREAD ++#include "lthread_api.h" ++#endif + + //========================================================== + // Typedefs & constants. +@@ -57,8 +60,10 @@ sys_futex(void *uaddr, int op, int val) + #define xchg(__ptr, __val) __sync_lock_test_and_set(__ptr, __val) + #define cmpxchg(__ptr, __cmp, __set) __sync_val_compare_and_swap(__ptr, __cmp, __set) + #define cpu_relax() asm volatile("pause\n": : :"memory") ++#ifndef USE_LTHREAD + #define unlikely(__expr) __builtin_expect(!! (__expr), 0) + #define likely(__expr) __builtin_expect(!! (__expr), 1) ++#endif + + + //========================================================== +@@ -72,6 +77,15 @@ cf_mutex_lock(cf_mutex *m) + return; // was not locked + } + ++#ifdef USE_LTHREAD ++ if (lthread_current()) { ++ while (!cf_mutex_trylock(m)) { ++ lthread_yield(); ++ } ++ return; ++ } ++#endif ++ + if (m->u32 == 2) { + sys_futex(m, FUTEX_WAIT_PRIVATE, 2); + } +diff --git a/make_in/Makefile.in b/make_in/Makefile.in +index 3a946bd..567c88d 100644 +--- a/make_in/Makefile.in ++++ b/make_in/Makefile.in +@@ -31,7 +31,7 @@ SRCDIR = + MARCH_NATIVE = $(shell uname -m) + + # If GCC v4.4.7 or later, use DWARF version 4, othewise use version 2: +-ifeq ($(shell $(DEPTH)/build/VersionCheck.py 'gcc -dumpversion' 4.4.7), 1) ++ifeq ($(shell $(DEPTH)/build/VersionCheck.py '$(CC) -dumpversion' 4.4.7), 1) + DWARF_VERSION=4 + else + DWARF_VERSION=2 +@@ -67,6 +67,10 @@ endif + # O3 also enables -finline-functions, among other things. + COMMON_CFLAGS = -gdwarf-$(DWARF_VERSION) -g3 $(OPTFLAGS) -fno-common -fno-strict-aliasing -Wall $(AS_CFLAGS) $(AS_EE_CFLAGS) + ++ifeq ($(shell $(DEPTH)/build/VersionCheck.py '$(CC) -dumpversion' 8), 1) ++ COMMON_CFLAGS += -fno-stack-clash-protection ++endif ++ + # Code generated for the "nocona" architecture has been determined to run well on a wide variety of current machines. + ifneq ($(ARCH),$(filter $(ARCH),ppc64 ppc64le)) + COMMON_CFLAGS += -march=nocona +@@ -79,7 +83,7 @@ COMMON_CFLAGS += -MMD + COMMON_CFLAGS += -Werror + + # Override certain warnings under GCC v9+. +-ifeq ($(shell $(DEPTH)/build/VersionCheck.py 'gcc -dumpversion' 9), 1) ++ifeq ($(shell $(DEPTH)/build/VersionCheck.py '$(CC) -dumpversion' 9), 1) + # Disable compilation failure due to warnings about possibly unaligned pointers into packed structs. + COMMON_CFLAGS += -Wno-address-of-packed-member + endif +diff --git a/make_in/Makefile.vars b/make_in/Makefile.vars +index 8a0ce0b..09b2066 100644 +--- a/make_in/Makefile.vars ++++ b/make_in/Makefile.vars +@@ -42,6 +42,9 @@ LD_LUAJIT = static + # Default mode used for linking the Lua library: + LD_LUA = static + ++# Default mode used for linking the SPDK library: ++LD_SPDK = static ++ + # Options to pass to Jansson's "configure" script. + JANSSON_CONFIG_OPT = + +@@ -80,6 +83,8 @@ MOD_LUA_PATH := $(realpath $(DEPTH)/modules/mod-lua) + JEMALLOC_PATH := $(realpath $(DEPTH)/modules/jemalloc) + LUAJIT_PATH := $(realpath $(DEPTH)/modules/luajit) + S2_PATH := $(realpath $(DEPTH)/modules/s2-geometry-library/geometry) ++SPDK_PATH := $(realpath $(DEPTH)/modules/spdk) ++LTHREAD_PATH := $(realpath $(DEPTH)/modules/lthread) + + # Overridable values used by sub-makefiles: + AI = $(AI_PATH) +@@ -91,6 +96,8 @@ MOD_LUA = $(MOD_LUA_PATH) + JEMALLOC = $(JEMALLOC_PATH) + LUAJIT = $(LUAJIT_PATH) + S2 = $(S2_PATH) ++SPDK = $(SPDK_PATH) ++LTHREAD = $(LTHREAD_PATH) + + # Programs, for which GNU Make doesn't define implicit variables: + OBJCOPY := objcopy diff --git a/as/src/base/batch.c b/as/src/base/batch.c index 209cb60d..ff749fc3 100644 --- a/as/src/base/batch.c +++ b/as/src/base/batch.c @@ -1024,7 +1024,14 @@ as_batch_queue_task(as_transaction* btr) if (data + sizeof(as_msg_op) > limit) { goto TranEnd; } + op = (as_msg_op*)data; + + // Swap can touch metadata bytes beyond as_msg_op struct. + if (as_msg_op_get_value_p(op) > limit) { + goto TranEnd; + } + as_msg_swap_op(op); op = as_msg_op_get_next(op); data = (uint8_t*)op; diff --git a/as/src/base/particle_map.c b/as/src/base/particle_map.c index 450258f0..9afe84ad 100644 --- a/as/src/base/particle_map.c +++ b/as/src/base/particle_map.c @@ -685,6 +685,12 @@ map_wire_size(const as_particle *p) packed_map map; if (! packed_map_init_from_particle(&map, p, false)) { + as_bin b = { + .particle = (as_particle *)p + }; + + as_bin_state_set_from_type(&b, AS_PARTICLE_TYPE_MAP); + cdt_bin_print(&b, "map"); cf_crash(AS_PARTICLE, "map_wire_size() invalid packed map"); } @@ -958,10 +964,11 @@ map_flat_size(const as_particle *p) packed_map map; if (! packed_map_init_from_particle(&map, p, false)) { - const as_bin b = { + as_bin b = { .particle = (as_particle *)p }; + as_bin_state_set_from_type(&b, AS_PARTICLE_TYPE_MAP); cdt_bin_print(&b, "map"); cf_crash(AS_PARTICLE, "map_flat_size() invalid packed map"); } diff --git a/as/src/base/record.c b/as/src/base/record.c index 84bb1984..31d39d00 100644 --- a/as/src/base/record.c +++ b/as/src/base/record.c @@ -520,7 +520,12 @@ record_apply_dim_single_bin(as_remote_record *rr, as_storage_rd *rd) as_bin_destroy_all(rd->bins, rd->n_bins); // Move the new bin into the index. - as_single_bin_copy(rd->bins, &new_bin); + if (n_new_bins == 1) { + as_single_bin_copy(rd->bins, &new_bin); + } + else { + as_bin_set_empty(rd->bins); + } rd->n_bins = n_new_bins; rd->bins = &new_bin; diff --git a/as/src/base/secondary_index.c b/as/src/base/secondary_index.c index 0bd6c6f2..aa8026f1 100644 --- a/as/src/base/secondary_index.c +++ b/as/src/base/secondary_index.c @@ -3619,7 +3619,6 @@ as_sindex_sbins_list_diff_populate(as_sindex_bin *sbins, as_namespace *ns, const as_sindex *si = &ns->sindex[simatch]; if (! as_sindex_isactive(si)) { - ele = ele->next; continue; } diff --git a/as/src/base/transaction.c b/as/src/base/transaction.c index 590df6f4..e3561d61 100644 --- a/as/src/base/transaction.c +++ b/as/src/base/transaction.c @@ -276,6 +276,12 @@ as_transaction_prepare(as_transaction *tr, bool swap) as_msg_op* op = (as_msg_op*)p_read; if (swap) { + // Swap can touch metadata bytes beyond as_msg_op struct. + if (as_msg_op_get_value_p(op) > p_end) { + cf_warning(AS_PROTO, "bad as_msg_op"); + return false; + } + as_msg_swap_op(op); } diff --git a/as/src/fabric/partition_balance.c b/as/src/fabric/partition_balance.c index 683a8ed2..a6266a53 100644 --- a/as/src/fabric/partition_balance.c +++ b/as/src/fabric/partition_balance.c @@ -1350,8 +1350,11 @@ find_family(const as_partition_version* self_version, uint32_t n_families, const as_partition_version family_versions[]) { for (uint32_t n = 0; n < n_families; n++) { - if (is_family_same(self_version, &family_versions[n])) { - return n; + const as_partition_version* version_n = &family_versions[n]; + + if (is_family_same(self_version, version_n)) { + // Identical subsets with no full parent can't share family. + return version_n->subset == 0 ? n : VERSION_FAMILY_UNIQUE; } } diff --git a/as/src/storage/drv_ssd.c b/as/src/storage/drv_ssd.c index 8ac05b2b..6da5830b 100644 --- a/as/src/storage/drv_ssd.c +++ b/as/src/storage/drv_ssd.c @@ -2452,7 +2452,13 @@ ssd_cold_start_add_record(drv_ssds* ssds, drv_ssd* ssd, if (ns->single_bin) { as_bin_destroy_all(old_bins, n_old_bins); - as_single_bin_copy(as_index_get_single_bin(r), rd.bins); + + if (rd.n_bins == 1) { + as_single_bin_copy(as_index_get_single_bin(r), rd.bins); + } + else { + as_bin_set_empty(as_index_get_single_bin(r)); + } } else { // Success - adjust sindex, looking at old and new bins. diff --git a/as/src/transaction/duplicate_resolve.c b/as/src/transaction/duplicate_resolve.c index 3e28b906..6a882a3d 100644 --- a/as/src/transaction/duplicate_resolve.c +++ b/as/src/transaction/duplicate_resolve.c @@ -393,14 +393,14 @@ dup_res_handle_ack(cf_node node, msg* m) dup_res_translate_result_code(rw); + cf_atomic64_add(&rw->rsv.ns->n_dup_res_ask, rw->n_dest_nodes); + bool delete_from_hash = rw->dup_res_cb(rw); rw->dup_res_complete = true; cf_mutex_unlock(&rw->lock); - cf_atomic64_add(&rw->rsv.ns->n_dup_res_ask, rw->n_dest_nodes); - if (delete_from_hash) { rw_request_hash_delete(&hkey, rw); } diff --git a/cf/include/log.h b/cf/include/log.h index d868e7c9..e2522974 100644 --- a/cf/include/log.h +++ b/cf/include/log.h @@ -1,7 +1,7 @@ /* * log.h * - * Copyright (C) 2019 Aerospike, Inc. + * Copyright (C) 2019-2021 Aerospike, Inc. * * Portions may be licensed to Aerospike, Inc. under one or more contributor * license agreements. diff --git a/modules/jansson b/modules/jansson index e9ebfa7e..684e18c9 160000 --- a/modules/jansson +++ b/modules/jansson @@ -1 +1 @@ -Subproject commit e9ebfa7e77a6bee77df44e096b100e7131044059 +Subproject commit 684e18c927e89615c2d501737e90018f4930d6c5 diff --git a/modules/lthread b/modules/lthread new file mode 160000 index 00000000..0386ae3a --- /dev/null +++ b/modules/lthread @@ -0,0 +1 @@ +Subproject commit 0386ae3a8cdd8543ce16eecf42214b8f2f17b191 diff --git a/modules/spdk b/modules/spdk new file mode 160000 index 00000000..1f0dd58a --- /dev/null +++ b/modules/spdk @@ -0,0 +1 @@ +Subproject commit 1f0dd58a43b5bc8118b123eca1b07781b052293d