From a3d579913575774f9417d838cf193eba947c2729 Mon Sep 17 00:00:00 2001 From: liulanzheng Date: Fri, 19 Jan 2024 12:02:03 +0800 Subject: [PATCH] add some method for lsmt, rename crc32c for zfile Signed-off-by: liulanzheng --- src/overlaybd/lsmt/file.cpp | 81 +++++++++++++++++++++++++++-------- src/overlaybd/lsmt/file.h | 27 ++++++++---- src/overlaybd/lsmt/index.cpp | 23 +++++++--- src/overlaybd/lsmt/index.h | 4 +- src/overlaybd/zfile/zfile.cpp | 16 +++---- 5 files changed, 107 insertions(+), 44 deletions(-) diff --git a/src/overlaybd/lsmt/file.cpp b/src/overlaybd/lsmt/file.cpp index 6371f886..19815b64 100644 --- a/src/overlaybd/lsmt/file.cpp +++ b/src/overlaybd/lsmt/file.cpp @@ -15,7 +15,6 @@ limitations under the License. */ #include "file.h" #include -#include #include #include #include @@ -165,7 +164,7 @@ struct HeaderTrailer { UUID::String uuid; // 37 bytes. UUID::String parent_uuid; // 37 bytes. - uint16_t reserved; // Reserved. + uint16_t reserved; // Reserved. static const uint8_t LSMT_V1 = 1; // v1 (UUID check) static const uint8_t LSMT_SUB_V1 = 1; // .1 deprecated level range. @@ -179,7 +178,8 @@ struct HeaderTrailer { class LSMTReadOnlyFile; static LSMTReadOnlyFile *open_file_ro(IFile *file, bool ownership, bool reserve_tag); -static HeaderTrailer *verify_ht(IFile *file, char *buf, bool is_trailer = false, ssize_t st_size = -1); +static HeaderTrailer *verify_ht(IFile *file, char *buf, bool is_trailer = false, + ssize_t st_size = -1); static const uint32_t ALIGNMENT = 512; // same as trim block size. static const uint32_t ALIGNMENT4K = 4096; @@ -520,6 +520,10 @@ class LSMTReadOnlyFile : public IFileRW { return 0; } + virtual std::vector get_lower_files() const override { + return m_files; + } + template inline void forward(void *&buffer, T1 &offset, T2 &count, T3 step) { (char *&)buffer += step * ALIGNMENT; @@ -618,7 +622,8 @@ class LSMTReadOnlyFile : public IFileRW { if (!pht->is_sealed()) { LOG_ERROR_RETURN(ENOTSUP, -1, "Commit a compacted LSMTReadonlyFile is not allowed."); } - CompactOptions opts(&m_files, (SegmentMapping*)m_index->buffer(), m_index->size(), m_vsize, &args); + CompactOptions opts(&m_files, (SegmentMapping *)m_index->buffer(), m_index->size(), m_vsize, + &args); atomic_uint64_t _no_use_var(0); return compact(opts, _no_use_var); @@ -1053,9 +1058,9 @@ class LSMTSparseFile : public LSMTFile { class LSMTWarpFile : public LSMTFile { public: const static int READ_BUFFER_SIZE = 65536; - IFile* m_target_file = nullptr; + IFile *m_target_file = nullptr; - LSMTWarpFile(){ + LSMTWarpFile() { m_filetype = LSMTFileType::WarpFile; } ~LSMTWarpFile() { @@ -1074,11 +1079,11 @@ class LSMTWarpFile : public LSMTFile { }; m.tag = tag; auto file = m_files[tag]; - LOG_DEBUG("insert segment: `, filePtr: `", m,file); + LOG_DEBUG("insert segment: `, filePtr: `", m, file); auto ret = file->pwrite(buf, count, offset); if (ret != (ssize_t)count) { - LOG_ERRNO_RETURN(0, -1, "write failed, file:`, ret:`, pos:`, count:`", - file, ret, offset, count); + LOG_ERRNO_RETURN(0, -1, "write failed, file:`, ret:`, pos:`, count:`", file, ret, + offset, count); } static_cast(m_index)->insert(m); append_index(m); @@ -1102,8 +1107,8 @@ class LSMTWarpFile : public LSMTFile { while (lba.count > 0) { SegmentMapping m; m.offset = lba.offset / ALIGNMENT; - m.length = (Segment::MAX_LENGTH < lba.count / ALIGNMENT ? - Segment::MAX_LENGTH : lba.count / ALIGNMENT); + m.length = (Segment::MAX_LENGTH < lba.count / ALIGNMENT ? Segment::MAX_LENGTH + : lba.count / ALIGNMENT); m.moffset = lba.roffset / ALIGNMENT; m.tag = m_rw_tag + (uint8_t)SegmentType::remoteData; LOG_DEBUG("insert segment: ` into findex: `", m, m_findex); @@ -1211,8 +1216,8 @@ static HeaderTrailer *verify_ht(IFile *file, char *buf, bool is_trailer, ssize_t LOG_ERRNO_RETURN(0, nullptr, "failed to read file trailer."); if (!pht->verify_magic() || !pht->is_trailer() || !pht->is_data_file() || !pht->is_sealed()) LOG_ERROR_RETURN(0, nullptr, - "trailer magic, trailer type, " - "file type or sealedness doesn't match"); + "trailer magic, trailer type, " + "file type or sealedness doesn't match"); return pht; } @@ -1266,13 +1271,14 @@ static SegmentMapping *do_load_index(IFile *file, HeaderTrailer *pheader_trailer if (ibuf[i].offset != SegmentMapping::INVALID_OFFSET) { ibuf[index_size] = ibuf[i]; ibuf[index_size].tag = (warp_file_tag ? ibuf[i].tag : 0); - if (min_tag > ibuf[index_size].tag) min_tag = ibuf[index_size].tag; + if (min_tag > ibuf[index_size].tag) + min_tag = ibuf[index_size].tag; index_size++; } } if (warp_file_tag) { LOG_INFO("rebuild index tag for LSMTWarpFile."); - for (size_t i = 0; im_files.resize(2); LSMT::HeaderTrailer ht; - auto p = do_load_index(findex, &ht, false, - 3); + auto p = do_load_index(findex, &ht, false, 3); auto pi = create_memory_index0(p, ht.index_size, 0, -1); if (!pi) { delete[] p; @@ -1785,4 +1790,46 @@ IFileRW *stack_files(IFileRW *upper_layer, IFileRO *lower_layers, bool ownership return rst; } +IMemoryIndex *open_file_index(IFile *file) { + HeaderTrailer ht; + auto p = do_load_index(file, &ht, true); + if (!p) { + LOG_ERROR_RETURN(0, nullptr, "failed to load index"); + } + + auto pi = create_memory_index(p, ht.index_size, HeaderTrailer::SPACE / ALIGNMENT, + ht.index_offset / ALIGNMENT, true, ht.virtual_size); + if (!pi) { + delete[] p; + LOG_ERROR_RETURN(0, nullptr, "failed to create memory index"); + } + return pi; +} + +IFileRO *open_files_with_merged_index(IFile **src_files, size_t n, IMemoryIndex *index, + bool ownership) { + vector m_files(src_files, src_files + n); + auto rst = new LSMTReadOnlyFile; + rst->m_index = index; + rst->m_files = move(m_files); + rst->m_vsize = index->vsize(); + rst->m_uuid.resize(rst->m_files.size()); + rst->m_file_ownership = ownership; + return rst; +} + +int is_lsmt(IFile *file) { + char buf[HeaderTrailer::SPACE]; + auto ret = file->pread(buf, HeaderTrailer::SPACE, 0); + if (ret < (ssize_t)HeaderTrailer::SPACE) + LOG_ERRNO_RETURN(0, -1, "failed to read file header."); + auto pht = (HeaderTrailer *)buf; + if (!pht->verify_magic() || !pht->is_header()) { + LOG_DEBUG("file: ` is not lsmt object", file); + return 1; + } + LOG_DEBUG("file: ` is lsmt object", file); + return 0; +} + } // namespace LSMT diff --git a/src/overlaybd/lsmt/file.h b/src/overlaybd/lsmt/file.h index 51bbcf16..ccd13bac 100644 --- a/src/overlaybd/lsmt/file.h +++ b/src/overlaybd/lsmt/file.h @@ -25,6 +25,7 @@ IMemoryIndex -> IMemoryIndex0 -> IComboIndex -> Index0 ( set ) -> Co #pragma once #include #include +#include #include #include #include @@ -45,6 +46,8 @@ class IFileRO : public photon::fs::VirtualReadOnlyFile { // return uuid of m_files[layer_idx]; virtual int get_uuid(UUID &out, size_t layer_idx = 0) const = 0; + + virtual std::vector get_lower_files() const = 0; }; struct CommitArgs { @@ -104,8 +107,9 @@ struct LayerInfo { UUID uuid; char *user_tag = nullptr; // a user provided string of message, 256B at most bool sparse_rw = false; - size_t len = 0; // len of user_tag; if it's 0, it will be detected with strlen() - LayerInfo(photon::fs::IFile *_fdata = nullptr, photon::fs::IFile *_findex = nullptr) : fdata(_fdata), findex(_findex) { + size_t len = 0; // len of user_tag; if it's 0, it will be detected with strlen() + LayerInfo(photon::fs::IFile *_fdata = nullptr, photon::fs::IFile *_findex = nullptr) + : fdata(_fdata), findex(_findex) { parent_uuid.clear(); uuid.generate(); } @@ -113,14 +117,15 @@ struct LayerInfo { struct WarpFileArgs { photon::fs::IFile *findex = nullptr; - photon::fs::IFile *fsmeta = nullptr; // sparse_file - photon::fs::IFile *target_file = nullptr; // eg. remote target, local data file + photon::fs::IFile *fsmeta = nullptr; // sparse_file + photon::fs::IFile *target_file = nullptr; // eg. remote target, local data file uint64_t virtual_size; UUID::String parent_uuid; UUID uuid; char *user_tag = nullptr; // a user provided string of message, 256B at most size_t len = 0; // len of user_tag; if it's 0, it will be detected with strlen() - WarpFileArgs(photon::fs::IFile *findex, photon::fs::IFile *fsmeta, photon::fs::IFile *target_file) + WarpFileArgs(photon::fs::IFile *findex, photon::fs::IFile *fsmeta, + photon::fs::IFile *target_file) : findex(findex), fsmeta(fsmeta), target_file(target_file) { uuid.generate(); } @@ -131,7 +136,8 @@ extern "C" IFileRW *create_file_rw(const LayerInfo &args, bool ownership = false // open a writable LSMT file constitued by a data file and a index file, // optionally obtaining the ownerships of the underlying files, // thus they will be destructed automatically. -extern "C" IFileRW *open_file_rw(photon::fs::IFile *fdata, photon::fs::IFile *findex, bool ownership = false); +extern "C" IFileRW *open_file_rw(photon::fs::IFile *fdata, photon::fs::IFile *findex, + bool ownership = false); // open a read-only LSMT file, which was created by // `close_seal()`ing or `commit()`ing a R/W LSMT file. @@ -148,9 +154,10 @@ extern "C" IFileRO *open_files_ro(photon::fs::IFile **files, size_t n, bool owne extern "C" IFileRW *create_warpfile(WarpFileArgs &args, bool ownership = false); extern "C" IFileRW *open_warpfile_rw(photon::fs::IFile *findex, photon::fs::IFile *fsmeta_file, - photon::fs::IFile *target_file, bool ownership = false); + photon::fs::IFile *target_file, bool ownership = false); -extern "C" IFileRO *open_warpfile_ro(photon::fs::IFile *warpfile, photon::fs::IFile *target_file, bool ownership = false); +extern "C" IFileRO *open_warpfile_ro(photon::fs::IFile *warpfile, photon::fs::IFile *target_file, + bool ownership = false); // merge multiple RO files (layers) into a single RO file (layer) // returning 0 for success, -1 otherwise @@ -164,4 +171,8 @@ extern "C" int merge_files_ro(photon::fs::IFile **src_files, size_t n, const Com extern "C" IFileRW *stack_files(IFileRW *upper_layer, IFileRO *lower_layers, bool ownership = false, bool check_order = true); +IMemoryIndex *open_file_index(photon::fs::IFile *file); +IFileRO *open_files_with_merged_index(photon::fs::IFile **src_files, size_t n, IMemoryIndex *index, + bool ownership = false); +int is_lsmt(photon::fs::IFile *file); } // namespace LSMT diff --git a/src/overlaybd/lsmt/index.cpp b/src/overlaybd/lsmt/index.cpp index 9891a5c5..bb92ce25 100644 --- a/src/overlaybd/lsmt/index.cpp +++ b/src/overlaybd/lsmt/index.cpp @@ -61,6 +61,7 @@ class Index : public IMemoryIndex { const SegmentMapping *pbegin = nullptr; const SegmentMapping *pend = nullptr; uint64_t alloc_blk = 0; + uint64_t virtual_size = 0; inline void get_alloc_blks() { for (auto m : mapping) { @@ -72,8 +73,9 @@ class Index : public IMemoryIndex { delete[] pbegin; } } - Index(const SegmentMapping *pmappings = nullptr, size_t n = 0, bool ownership = true) - : ownership(ownership) { + Index(const SegmentMapping *pmappings = nullptr, size_t n = 0, bool ownership = true, + uint64_t vsize = 0) + : ownership(ownership), virtual_size(vsize) { if (n == 0 || pmappings == nullptr) { pbegin = pend = nullptr; return; @@ -81,7 +83,8 @@ class Index : public IMemoryIndex { pbegin = pmappings; pend = pbegin + n; } - Index(vector &&m) : mapping(std::move(m)) { + Index(vector &&m, uint64_t vsize = 0) + : mapping(std::move(m)), virtual_size(vsize) { if (mapping.size()) { pbegin = &mapping[0]; pend = pbegin + mapping.size(); @@ -146,6 +149,10 @@ class Index : public IMemoryIndex { m.tag += delta; return 0; } + + uint64_t vsize() const override { + return virtual_size; + } }; class LevelIndex : public Index { @@ -385,6 +392,7 @@ class Index0 : public IComboIndex { virtual const IMemoryIndex0 *front_index() const override { return this; } + UNIMPLEMENTED(size_t vsize() const override); }; static void merge_indexes(uint8_t level, vector &mapping, const Index **pindexes, @@ -540,10 +548,10 @@ IMemoryIndex0 *create_memory_index0(const SegmentMapping *pmappings, size_t n, } IMemoryIndex *create_memory_index(const SegmentMapping *pmappings, size_t n, uint64_t moffset_begin, - uint64_t moffset_end, bool ownership) { + uint64_t moffset_end, bool ownership, uint64_t vsize) { auto ok1 = verify_mapping_order(pmappings, n); auto ok2 = verify_mapping_moffset(pmappings, n, moffset_begin, moffset_end); - return (ok1 && ok2) ? new Index(pmappings, n, ownership) : nullptr; + return (ok1 && ok2) ? new Index(pmappings, n, ownership, vsize) : nullptr; } IMemoryIndex *create_level_index(const SegmentMapping *pmappings, size_t n, uint64_t moffset_begin, @@ -605,7 +613,8 @@ static void merge_indexes(uint8_t level, vector &mapping, const } } -IComboIndex *create_combo_index(IMemoryIndex0 *index0, const IMemoryIndex *index, uint8_t ro_index_count, bool ownership) { +IComboIndex *create_combo_index(IMemoryIndex0 *index0, const IMemoryIndex *index, + uint8_t ro_index_count, bool ownership) { if (!index0 || !index) LOG_ERROR_RETURN(EINVAL, nullptr, "invalid argument(s)"); @@ -666,6 +675,6 @@ IMemoryIndex *merge_memory_indexes(const IMemoryIndex **pindexes, size_t n) { auto pi = (const Index **)pindexes; mapping.reserve(pi[0]->size()); merge_indexes(0, mapping, pi, n, 0, UINT64_MAX); - return new Index(std::move(mapping)); + return new Index(std::move(mapping), pindexes[0]->vsize()); } } // namespace LSMT diff --git a/src/overlaybd/lsmt/index.h b/src/overlaybd/lsmt/index.h index 544ee035..2ed88366 100644 --- a/src/overlaybd/lsmt/index.h +++ b/src/overlaybd/lsmt/index.h @@ -126,6 +126,8 @@ class IMemoryIndex { // number of 512B blocks allocated virtual uint64_t block_count() const = 0; + + virtual uint64_t vsize() const = 0; }; // the level 0 memory index, which supports write @@ -169,7 +171,7 @@ inline IMemoryIndex0 *create_memory_index0() { // the mapped offset must be within [moffset_begin, moffset_end) extern "C" IMemoryIndex *create_memory_index(const SegmentMapping *pmappings, std::size_t n, uint64_t moffset_begin, uint64_t moffset_end, - bool ownership = true); + bool ownership = true, uint64_t vsize = 0); // merge multiple indexes into a single one index // the `tag` field of each element in the result is subscript of `pindexes`: diff --git a/src/overlaybd/zfile/zfile.cpp b/src/overlaybd/zfile/zfile.cpp index e0ba88cd..6d623ec5 100644 --- a/src/overlaybd/zfile/zfile.cpp +++ b/src/overlaybd/zfile/zfile.cpp @@ -51,13 +51,7 @@ const static uint8_t FLAG_VALID_FALSE = 0; const static uint8_t FLAG_VALID_TRUE = 1; const static uint8_t FLAG_VALID_CRC_CHECK = 2; -template -static std::unique_ptr new_align_mem(size_t _size, size_t alignment = ALIGNMENT_4K) { - size_t size = (_size + alignment - 1) / alignment * alignment; - return std::unique_ptr(new T[size]); -} - -inline uint32_t crc32c(void *buf, size_t size) { +inline uint32_t crc32c_salt(void *buf, size_t size) { return crc32::crc32c_extend(buf, size, NOI_WELL_KNOWN_PRIME); } /* ZFile Format: @@ -489,7 +483,7 @@ class CompressionFile : public VirtualReadOnlyFile { int retry = 3; again: if (m_ht.opt.verify) { - auto c = crc32c((void *)block.buffer(), block.compressed_size); + auto c = crc32c_salt((void *)block.buffer(), block.compressed_size); if (c != block.crc32_code()) { if ((valid == FLAG_VALID_TRUE) && (retry--)) { int reload_res = block.reload(); @@ -563,7 +557,7 @@ ssize_t compress_data(ICompressor *compressor, const unsigned char *buf, size_t // LOG_DEBUG("compress buffer {offset: `, count: `} into ` bytes.", i, step, ret); compressed_len = ret; if (gen_crc) { - auto crc32_code = crc32c(dest_buf, compressed_len); + auto crc32_code = crc32c_salt(dest_buf, compressed_len); *((uint32_t *)&dest_buf[compressed_len]) = crc32_code; LOG_DEBUG("append ` bytes crc32_code: `", sizeof(uint32_t), crc32_code); compressed_len += sizeof(uint32_t); @@ -1157,7 +1151,7 @@ int zfile_compress(IFile *file, IFile *as, const CompressArgs *args) { LOG_ERRNO_RETURN(0, -1, "failed to write compressed data."); } if (crc32_verify) { - auto crc32_code = crc32c(&compressed_data[j * buf_size], compressed_len[j]); + auto crc32_code = crc32c_salt(&compressed_data[j * buf_size], compressed_len[j]); LOG_DEBUG("append ` bytes crc32_code: {offset: `, count: `, crc32: `}", sizeof(uint32_t), moffset, compressed_len[j], HEX(crc32_code).width(8)); compressed_len[j] += sizeof(uint32_t); @@ -1212,7 +1206,7 @@ int zfile_decompress(IFile *src, IFile *dst) { for (off_t offset = 0; offset < raw_data_size; offset += block_size) { auto len = (ssize_t)std::min(block_size, (size_t)raw_data_size - offset); auto readn = file->pread(raw_buf.get(), len, offset); - LOG_DEBUG("readn: `, crc32: `", readn, HEX(crc32c(raw_buf.get(), len)).width(8)); + LOG_DEBUG("readn: `, crc32: `", readn, HEX(crc32c_salt(raw_buf.get(), len)).width(8)); if (readn != len) return -1; if (dst->write(raw_buf.get(), readn) != readn) {