Skip to content

Commit

Permalink
add some method for lsmt, rename crc32c for zfile
Browse files Browse the repository at this point in the history
Signed-off-by: liulanzheng <lanzheng.liulz@alibaba-inc.com>
  • Loading branch information
liulanzheng committed Jan 19, 2024
1 parent 1f9fd97 commit a3d5799
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 44 deletions.
81 changes: 64 additions & 17 deletions src/overlaybd/lsmt/file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ limitations under the License.
*/
#include "file.h"
#include <cstdint>
#include <openssl/bio.h>
#include <string.h>
#include <stdarg.h>
#include <memory>
Expand Down Expand Up @@ -165,7 +164,7 @@ struct HeaderTrailer {

UUID::String uuid; // 37 bytes.
UUID::String parent_uuid; // 37 bytes.
uint16_t reserved; // Reserved.
uint16_t reserved; // Reserved.

static const uint8_t LSMT_V1 = 1; // v1 (UUID check)
static const uint8_t LSMT_SUB_V1 = 1; // .1 deprecated level range.
Expand All @@ -179,7 +178,8 @@ struct HeaderTrailer {

class LSMTReadOnlyFile;
static LSMTReadOnlyFile *open_file_ro(IFile *file, bool ownership, bool reserve_tag);
static HeaderTrailer *verify_ht(IFile *file, char *buf, bool is_trailer = false, ssize_t st_size = -1);
static HeaderTrailer *verify_ht(IFile *file, char *buf, bool is_trailer = false,
ssize_t st_size = -1);

static const uint32_t ALIGNMENT = 512; // same as trim block size.
static const uint32_t ALIGNMENT4K = 4096;
Expand Down Expand Up @@ -520,6 +520,10 @@ class LSMTReadOnlyFile : public IFileRW {
return 0;
}

virtual std::vector<IFile *> get_lower_files() const override {
return m_files;
}

template <typename T1, typename T2, typename T3>
inline void forward(void *&buffer, T1 &offset, T2 &count, T3 step) {
(char *&)buffer += step * ALIGNMENT;
Expand Down Expand Up @@ -618,7 +622,8 @@ class LSMTReadOnlyFile : public IFileRW {
if (!pht->is_sealed()) {
LOG_ERROR_RETURN(ENOTSUP, -1, "Commit a compacted LSMTReadonlyFile is not allowed.");
}
CompactOptions opts(&m_files, (SegmentMapping*)m_index->buffer(), m_index->size(), m_vsize, &args);
CompactOptions opts(&m_files, (SegmentMapping *)m_index->buffer(), m_index->size(), m_vsize,
&args);

atomic_uint64_t _no_use_var(0);
return compact(opts, _no_use_var);
Expand Down Expand Up @@ -1053,9 +1058,9 @@ class LSMTSparseFile : public LSMTFile {
class LSMTWarpFile : public LSMTFile {
public:
const static int READ_BUFFER_SIZE = 65536;
IFile* m_target_file = nullptr;
IFile *m_target_file = nullptr;

LSMTWarpFile(){
LSMTWarpFile() {
m_filetype = LSMTFileType::WarpFile;
}
~LSMTWarpFile() {
Expand All @@ -1074,11 +1079,11 @@ class LSMTWarpFile : public LSMTFile {
};
m.tag = tag;
auto file = m_files[tag];
LOG_DEBUG("insert segment: `, filePtr: `", m,file);
LOG_DEBUG("insert segment: `, filePtr: `", m, file);
auto ret = file->pwrite(buf, count, offset);
if (ret != (ssize_t)count) {
LOG_ERRNO_RETURN(0, -1, "write failed, file:`, ret:`, pos:`, count:`",
file, ret, offset, count);
LOG_ERRNO_RETURN(0, -1, "write failed, file:`, ret:`, pos:`, count:`", file, ret,
offset, count);
}
static_cast<IMemoryIndex0 *>(m_index)->insert(m);
append_index(m);
Expand All @@ -1102,8 +1107,8 @@ class LSMTWarpFile : public LSMTFile {
while (lba.count > 0) {
SegmentMapping m;
m.offset = lba.offset / ALIGNMENT;
m.length = (Segment::MAX_LENGTH < lba.count / ALIGNMENT ?
Segment::MAX_LENGTH : lba.count / ALIGNMENT);
m.length = (Segment::MAX_LENGTH < lba.count / ALIGNMENT ? Segment::MAX_LENGTH
: lba.count / ALIGNMENT);
m.moffset = lba.roffset / ALIGNMENT;
m.tag = m_rw_tag + (uint8_t)SegmentType::remoteData;
LOG_DEBUG("insert segment: ` into findex: `", m, m_findex);
Expand Down Expand Up @@ -1211,8 +1216,8 @@ static HeaderTrailer *verify_ht(IFile *file, char *buf, bool is_trailer, ssize_t
LOG_ERRNO_RETURN(0, nullptr, "failed to read file trailer.");
if (!pht->verify_magic() || !pht->is_trailer() || !pht->is_data_file() || !pht->is_sealed())
LOG_ERROR_RETURN(0, nullptr,
"trailer magic, trailer type, "
"file type or sealedness doesn't match");
"trailer magic, trailer type, "
"file type or sealedness doesn't match");
return pht;
}

Expand Down Expand Up @@ -1266,13 +1271,14 @@ static SegmentMapping *do_load_index(IFile *file, HeaderTrailer *pheader_trailer
if (ibuf[i].offset != SegmentMapping::INVALID_OFFSET) {
ibuf[index_size] = ibuf[i];
ibuf[index_size].tag = (warp_file_tag ? ibuf[i].tag : 0);
if (min_tag > ibuf[index_size].tag) min_tag = ibuf[index_size].tag;
if (min_tag > ibuf[index_size].tag)
min_tag = ibuf[index_size].tag;
index_size++;
}
}
if (warp_file_tag) {
LOG_INFO("rebuild index tag for LSMTWarpFile.");
for (size_t i = 0; i<index_size; i++) {
for (size_t i = 0; i < index_size; i++) {
if (warp_file_tag == 1) /* only fsmeta */
ibuf[i].tag = (uint8_t)SegmentType::fsMeta;
if (warp_file_tag == 2) /* only remote data */
Expand Down Expand Up @@ -1451,8 +1457,7 @@ IFileRW *open_warpfile_rw(IFile *findex, IFile *fsmeta_file, IFile *target_file,
auto rst = new LSMTWarpFile;
rst->m_files.resize(2);
LSMT::HeaderTrailer ht;
auto p = do_load_index(findex, &ht, false,
3);
auto p = do_load_index(findex, &ht, false, 3);
auto pi = create_memory_index0(p, ht.index_size, 0, -1);
if (!pi) {
delete[] p;
Expand Down Expand Up @@ -1785,4 +1790,46 @@ IFileRW *stack_files(IFileRW *upper_layer, IFileRO *lower_layers, bool ownership
return rst;
}

IMemoryIndex *open_file_index(IFile *file) {
HeaderTrailer ht;
auto p = do_load_index(file, &ht, true);
if (!p) {
LOG_ERROR_RETURN(0, nullptr, "failed to load index");
}

auto pi = create_memory_index(p, ht.index_size, HeaderTrailer::SPACE / ALIGNMENT,
ht.index_offset / ALIGNMENT, true, ht.virtual_size);
if (!pi) {
delete[] p;
LOG_ERROR_RETURN(0, nullptr, "failed to create memory index");
}
return pi;
}

IFileRO *open_files_with_merged_index(IFile **src_files, size_t n, IMemoryIndex *index,
bool ownership) {
vector<IFile *> m_files(src_files, src_files + n);
auto rst = new LSMTReadOnlyFile;
rst->m_index = index;
rst->m_files = move(m_files);
rst->m_vsize = index->vsize();
rst->m_uuid.resize(rst->m_files.size());
rst->m_file_ownership = ownership;
return rst;
}

int is_lsmt(IFile *file) {
char buf[HeaderTrailer::SPACE];
auto ret = file->pread(buf, HeaderTrailer::SPACE, 0);
if (ret < (ssize_t)HeaderTrailer::SPACE)
LOG_ERRNO_RETURN(0, -1, "failed to read file header.");
auto pht = (HeaderTrailer *)buf;
if (!pht->verify_magic() || !pht->is_header()) {
LOG_DEBUG("file: ` is not lsmt object", file);
return 1;
}
LOG_DEBUG("file: ` is lsmt object", file);
return 0;
}

} // namespace LSMT
27 changes: 19 additions & 8 deletions src/overlaybd/lsmt/file.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ IMemoryIndex -> IMemoryIndex0 -> IComboIndex -> Index0 ( set<SegmentMap> ) -> Co
#pragma once
#include <inttypes.h>
#include <cstddef>
#include <vector>
#include <photon/fs/filesystem.h>
#include <photon/fs/virtual-file.h>
#include <photon/common/uuid.h>
Expand All @@ -45,6 +46,8 @@ class IFileRO : public photon::fs::VirtualReadOnlyFile {

// return uuid of m_files[layer_idx];
virtual int get_uuid(UUID &out, size_t layer_idx = 0) const = 0;

virtual std::vector<IFile *> get_lower_files() const = 0;
};

struct CommitArgs {
Expand Down Expand Up @@ -104,23 +107,25 @@ struct LayerInfo {
UUID uuid;
char *user_tag = nullptr; // a user provided string of message, 256B at most
bool sparse_rw = false;
size_t len = 0; // len of user_tag; if it's 0, it will be detected with strlen()
LayerInfo(photon::fs::IFile *_fdata = nullptr, photon::fs::IFile *_findex = nullptr) : fdata(_fdata), findex(_findex) {
size_t len = 0; // len of user_tag; if it's 0, it will be detected with strlen()
LayerInfo(photon::fs::IFile *_fdata = nullptr, photon::fs::IFile *_findex = nullptr)
: fdata(_fdata), findex(_findex) {
parent_uuid.clear();
uuid.generate();
}
};

struct WarpFileArgs {
photon::fs::IFile *findex = nullptr;
photon::fs::IFile *fsmeta = nullptr; // sparse_file
photon::fs::IFile *target_file = nullptr; // eg. remote target, local data file
photon::fs::IFile *fsmeta = nullptr; // sparse_file
photon::fs::IFile *target_file = nullptr; // eg. remote target, local data file
uint64_t virtual_size;
UUID::String parent_uuid;
UUID uuid;
char *user_tag = nullptr; // a user provided string of message, 256B at most
size_t len = 0; // len of user_tag; if it's 0, it will be detected with strlen()
WarpFileArgs(photon::fs::IFile *findex, photon::fs::IFile *fsmeta, photon::fs::IFile *target_file)
WarpFileArgs(photon::fs::IFile *findex, photon::fs::IFile *fsmeta,
photon::fs::IFile *target_file)
: findex(findex), fsmeta(fsmeta), target_file(target_file) {
uuid.generate();
}
Expand All @@ -131,7 +136,8 @@ extern "C" IFileRW *create_file_rw(const LayerInfo &args, bool ownership = false
// open a writable LSMT file constitued by a data file and a index file,
// optionally obtaining the ownerships of the underlying files,
// thus they will be destructed automatically.
extern "C" IFileRW *open_file_rw(photon::fs::IFile *fdata, photon::fs::IFile *findex, bool ownership = false);
extern "C" IFileRW *open_file_rw(photon::fs::IFile *fdata, photon::fs::IFile *findex,
bool ownership = false);

// open a read-only LSMT file, which was created by
// `close_seal()`ing or `commit()`ing a R/W LSMT file.
Expand All @@ -148,9 +154,10 @@ extern "C" IFileRO *open_files_ro(photon::fs::IFile **files, size_t n, bool owne
extern "C" IFileRW *create_warpfile(WarpFileArgs &args, bool ownership = false);

extern "C" IFileRW *open_warpfile_rw(photon::fs::IFile *findex, photon::fs::IFile *fsmeta_file,
photon::fs::IFile *target_file, bool ownership = false);
photon::fs::IFile *target_file, bool ownership = false);

extern "C" IFileRO *open_warpfile_ro(photon::fs::IFile *warpfile, photon::fs::IFile *target_file, bool ownership = false);
extern "C" IFileRO *open_warpfile_ro(photon::fs::IFile *warpfile, photon::fs::IFile *target_file,
bool ownership = false);

// merge multiple RO files (layers) into a single RO file (layer)
// returning 0 for success, -1 otherwise
Expand All @@ -164,4 +171,8 @@ extern "C" int merge_files_ro(photon::fs::IFile **src_files, size_t n, const Com
extern "C" IFileRW *stack_files(IFileRW *upper_layer, IFileRO *lower_layers, bool ownership = false,
bool check_order = true);

IMemoryIndex *open_file_index(photon::fs::IFile *file);
IFileRO *open_files_with_merged_index(photon::fs::IFile **src_files, size_t n, IMemoryIndex *index,
bool ownership = false);
int is_lsmt(photon::fs::IFile *file);
} // namespace LSMT
23 changes: 16 additions & 7 deletions src/overlaybd/lsmt/index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class Index : public IMemoryIndex {
const SegmentMapping *pbegin = nullptr;
const SegmentMapping *pend = nullptr;
uint64_t alloc_blk = 0;
uint64_t virtual_size = 0;

inline void get_alloc_blks() {
for (auto m : mapping) {
Expand All @@ -72,16 +73,18 @@ class Index : public IMemoryIndex {
delete[] pbegin;
}
}
Index(const SegmentMapping *pmappings = nullptr, size_t n = 0, bool ownership = true)
: ownership(ownership) {
Index(const SegmentMapping *pmappings = nullptr, size_t n = 0, bool ownership = true,
uint64_t vsize = 0)
: ownership(ownership), virtual_size(vsize) {
if (n == 0 || pmappings == nullptr) {
pbegin = pend = nullptr;
return;
}
pbegin = pmappings;
pend = pbegin + n;
}
Index(vector<SegmentMapping> &&m) : mapping(std::move(m)) {
Index(vector<SegmentMapping> &&m, uint64_t vsize = 0)
: mapping(std::move(m)), virtual_size(vsize) {
if (mapping.size()) {
pbegin = &mapping[0];
pend = pbegin + mapping.size();
Expand Down Expand Up @@ -146,6 +149,10 @@ class Index : public IMemoryIndex {
m.tag += delta;
return 0;
}

uint64_t vsize() const override {
return virtual_size;
}
};

class LevelIndex : public Index {
Expand Down Expand Up @@ -385,6 +392,7 @@ class Index0 : public IComboIndex {
virtual const IMemoryIndex0 *front_index() const override {
return this;
}
UNIMPLEMENTED(size_t vsize() const override);
};

static void merge_indexes(uint8_t level, vector<SegmentMapping> &mapping, const Index **pindexes,
Expand Down Expand Up @@ -540,10 +548,10 @@ IMemoryIndex0 *create_memory_index0(const SegmentMapping *pmappings, size_t n,
}

IMemoryIndex *create_memory_index(const SegmentMapping *pmappings, size_t n, uint64_t moffset_begin,
uint64_t moffset_end, bool ownership) {
uint64_t moffset_end, bool ownership, uint64_t vsize) {
auto ok1 = verify_mapping_order(pmappings, n);
auto ok2 = verify_mapping_moffset(pmappings, n, moffset_begin, moffset_end);
return (ok1 && ok2) ? new Index(pmappings, n, ownership) : nullptr;
return (ok1 && ok2) ? new Index(pmappings, n, ownership, vsize) : nullptr;
}

IMemoryIndex *create_level_index(const SegmentMapping *pmappings, size_t n, uint64_t moffset_begin,
Expand Down Expand Up @@ -605,7 +613,8 @@ static void merge_indexes(uint8_t level, vector<SegmentMapping> &mapping, const
}
}

IComboIndex *create_combo_index(IMemoryIndex0 *index0, const IMemoryIndex *index, uint8_t ro_index_count, bool ownership) {
IComboIndex *create_combo_index(IMemoryIndex0 *index0, const IMemoryIndex *index,
uint8_t ro_index_count, bool ownership) {
if (!index0 || !index)
LOG_ERROR_RETURN(EINVAL, nullptr, "invalid argument(s)");

Expand Down Expand Up @@ -666,6 +675,6 @@ IMemoryIndex *merge_memory_indexes(const IMemoryIndex **pindexes, size_t n) {
auto pi = (const Index **)pindexes;
mapping.reserve(pi[0]->size());
merge_indexes(0, mapping, pi, n, 0, UINT64_MAX);
return new Index(std::move(mapping));
return new Index(std::move(mapping), pindexes[0]->vsize());
}
} // namespace LSMT
4 changes: 3 additions & 1 deletion src/overlaybd/lsmt/index.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ class IMemoryIndex {

// number of 512B blocks allocated
virtual uint64_t block_count() const = 0;

virtual uint64_t vsize() const = 0;
};

// the level 0 memory index, which supports write
Expand Down Expand Up @@ -169,7 +171,7 @@ inline IMemoryIndex0 *create_memory_index0() {
// the mapped offset must be within [moffset_begin, moffset_end)
extern "C" IMemoryIndex *create_memory_index(const SegmentMapping *pmappings, std::size_t n,
uint64_t moffset_begin, uint64_t moffset_end,
bool ownership = true);
bool ownership = true, uint64_t vsize = 0);

// merge multiple indexes into a single one index
// the `tag` field of each element in the result is subscript of `pindexes`:
Expand Down
16 changes: 5 additions & 11 deletions src/overlaybd/zfile/zfile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,7 @@ const static uint8_t FLAG_VALID_FALSE = 0;
const static uint8_t FLAG_VALID_TRUE = 1;
const static uint8_t FLAG_VALID_CRC_CHECK = 2;

template <typename T>
static std::unique_ptr<T[]> new_align_mem(size_t _size, size_t alignment = ALIGNMENT_4K) {
size_t size = (_size + alignment - 1) / alignment * alignment;
return std::unique_ptr<T[]>(new T[size]);
}

inline uint32_t crc32c(void *buf, size_t size) {
inline uint32_t crc32c_salt(void *buf, size_t size) {
return crc32::crc32c_extend(buf, size, NOI_WELL_KNOWN_PRIME);
}
/* ZFile Format:
Expand Down Expand Up @@ -489,7 +483,7 @@ class CompressionFile : public VirtualReadOnlyFile {
int retry = 3;
again:
if (m_ht.opt.verify) {
auto c = crc32c((void *)block.buffer(), block.compressed_size);
auto c = crc32c_salt((void *)block.buffer(), block.compressed_size);
if (c != block.crc32_code()) {
if ((valid == FLAG_VALID_TRUE) && (retry--)) {
int reload_res = block.reload();
Expand Down Expand Up @@ -563,7 +557,7 @@ ssize_t compress_data(ICompressor *compressor, const unsigned char *buf, size_t
// LOG_DEBUG("compress buffer {offset: `, count: `} into ` bytes.", i, step, ret);
compressed_len = ret;
if (gen_crc) {
auto crc32_code = crc32c(dest_buf, compressed_len);
auto crc32_code = crc32c_salt(dest_buf, compressed_len);
*((uint32_t *)&dest_buf[compressed_len]) = crc32_code;
LOG_DEBUG("append ` bytes crc32_code: `", sizeof(uint32_t), crc32_code);
compressed_len += sizeof(uint32_t);
Expand Down Expand Up @@ -1157,7 +1151,7 @@ int zfile_compress(IFile *file, IFile *as, const CompressArgs *args) {
LOG_ERRNO_RETURN(0, -1, "failed to write compressed data.");
}
if (crc32_verify) {
auto crc32_code = crc32c(&compressed_data[j * buf_size], compressed_len[j]);
auto crc32_code = crc32c_salt(&compressed_data[j * buf_size], compressed_len[j]);
LOG_DEBUG("append ` bytes crc32_code: {offset: `, count: `, crc32: `}",
sizeof(uint32_t), moffset, compressed_len[j], HEX(crc32_code).width(8));
compressed_len[j] += sizeof(uint32_t);
Expand Down Expand Up @@ -1212,7 +1206,7 @@ int zfile_decompress(IFile *src, IFile *dst) {
for (off_t offset = 0; offset < raw_data_size; offset += block_size) {
auto len = (ssize_t)std::min(block_size, (size_t)raw_data_size - offset);
auto readn = file->pread(raw_buf.get(), len, offset);
LOG_DEBUG("readn: `, crc32: `", readn, HEX(crc32c(raw_buf.get(), len)).width(8));
LOG_DEBUG("readn: `, crc32: `", readn, HEX(crc32c_salt(raw_buf.get(), len)).width(8));
if (readn != len)
return -1;
if (dst->write(raw_buf.get(), readn) != readn) {
Expand Down

0 comments on commit a3d5799

Please sign in to comment.