diff --git a/rdsn b/rdsn index c9d5c2239a..33318f3ec2 160000 --- a/rdsn +++ b/rdsn @@ -1 +1 @@ -Subproject commit c9d5c2239ad94af981e8b3f81f41c39f2083a07e +Subproject commit 33318f3ec2a12324c17d3ff418985f25cc73c943 diff --git a/rocksdb b/rocksdb index f0e22c376e..52492c3131 160000 --- a/rocksdb +++ b/rocksdb @@ -1 +1 @@ -Subproject commit f0e22c376e8de3ca576f4a37c857172d54cc4fa7 +Subproject commit 52492c31313921d0476751fffc77b84ead156363 diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp index e536a00b86..651bfb7789 100644 --- a/src/server/pegasus_server_impl.cpp +++ b/src/server/pegasus_server_impl.cpp @@ -48,6 +48,8 @@ std::shared_ptr pegasus_server_impl::_s_block_cache; ::dsn::task_ptr pegasus_server_impl::_update_server_rdb_stat; ::dsn::perf_counter_wrapper pegasus_server_impl::_pfc_rdb_block_cache_mem_usage; const std::string pegasus_server_impl::COMPRESSION_HEADER = "per_level:"; +const std::string pegasus_server_impl::DATA_COLUMN_FAMILY_NAME = "default"; +const std::string pegasus_server_impl::META_COLUMN_FAMILY_NAME = "pegasus_meta_cf"; pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r) : dsn::apps::rrdb_service(r), @@ -452,8 +454,7 @@ pegasus_server_impl::~pegasus_server_impl() { if (_is_open) { dassert(_db != nullptr, ""); - delete _db; - _db = nullptr; + release_db(); } } @@ -1647,6 +1648,7 @@ ::dsn::error_code pegasus_server_impl::start(int argc, char **argv) // 2, we can parse restore info from app env, which is stored in argv // 3, restore_dir is exist // + bool db_exist = true; auto path = ::dsn::utils::filesystem::path_combine(data_dir(), "rdb"); if (::dsn::utils::filesystem::path_exists(path)) { // only case 1 @@ -1662,6 +1664,7 @@ ::dsn::error_code pegasus_server_impl::start(int argc, char **argv) replica_name()); return ::dsn::ERR_FILE_OPERATION_FAILED; } else { + db_exist = false; dinfo("%s: open a new db, path = %s", replica_name(), path.c_str()); } } else { @@ -1688,6 +1691,7 @@ ::dsn::error_code pegasus_server_impl::start(int argc, char **argv) restore_dir.c_str()); return ::dsn::ERR_FILE_OPERATION_FAILED; } else { + db_exist = false; dwarn( "%s: try to restore and restore_dir(%s) isn't exist, but we don't force " "it, the role of this replica must not primary, so we open a new db on the " @@ -1702,16 +1706,36 @@ ::dsn::error_code pegasus_server_impl::start(int argc, char **argv) ddebug("%s: start to open rocksDB's rdb(%s)", replica_name(), path.c_str()); - auto status = rocksdb::DB::Open(rocksdb::Options(_db_opts, _data_cf_opts), path, &_db); + bool need_open_with_meta_cf = false; + // Check meta CF only when db exist. + if (db_exist && check_meta_cf(path, &need_open_with_meta_cf) != ::dsn::ERR_OK) { + derror_replica("check meta column family failed"); + return ::dsn::ERR_LOCAL_APP_FAILURE; + } + std::vector column_families( + {{DATA_COLUMN_FAMILY_NAME, _data_cf_opts}}); + if (need_open_with_meta_cf) { + column_families.emplace_back(rocksdb::ColumnFamilyDescriptor( + META_COLUMN_FAMILY_NAME, rocksdb::ColumnFamilyOptions())); + } + std::vector handles_opened; + auto status = rocksdb::DB::Open(_db_opts, path, column_families, &handles_opened, &_db); if (status.ok()) { + dcheck_eq_replica(column_families.size(), handles_opened.size()); + dcheck_eq_replica(handles_opened[0]->GetName(), DATA_COLUMN_FAMILY_NAME); + _data_cf = handles_opened[0]; + if (handles_opened.size() == 2) { + dcheck_eq_replica(handles_opened[1]->GetName(), META_COLUMN_FAMILY_NAME); + _meta_cf = handles_opened[1]; + } + _last_committed_decree = _db->GetLastFlushedDecree(); _pegasus_data_version = _db->GetPegasusDataVersion(); if (_pegasus_data_version > PEGASUS_DATA_VERSION_MAX) { derror("%s: open app failed, unsupported data version %" PRIu32, replica_name(), _pegasus_data_version); - delete _db; - _db = nullptr; + release_db(); return ::dsn::ERR_LOCAL_APP_FAILURE; } @@ -1736,8 +1760,7 @@ ::dsn::error_code pegasus_server_impl::start(int argc, char **argv) auto err = async_checkpoint(false); if (err != ::dsn::ERR_OK) { derror("%s: create checkpoint failed, error = %s", replica_name(), err.to_string()); - delete _db; - _db = nullptr; + release_db(); return err; } dassert(last_flushed == last_durable_decree(), @@ -1822,8 +1845,7 @@ ::dsn::error_code pegasus_server_impl::stop(bool clear_state) _context_cache.clear(); _is_open = false; - delete _db; - _db = nullptr; + release_db(); std::deque reserved_checkpoints; { @@ -2894,5 +2916,41 @@ void pegasus_server_impl::set_partition_version(int32_t partition_version) // TODO(heyuchen): set filter _partition_version in further pr } +::dsn::error_code pegasus_server_impl::check_meta_cf(const std::string &path, + bool *need_open_with_meta_cf) +{ + *need_open_with_meta_cf = false; + std::vector column_families; + auto s = rocksdb::DB::ListColumnFamilies(rocksdb::DBOptions(), path, &column_families); + if (!s.ok()) { + derror_replica("rocksdb::DB::ListColumnFamilies failed, error = {}", s.ToString()); + return ::dsn::ERR_LOCAL_APP_FAILURE; + } + + for (const auto &column_family : column_families) { + if (column_family == DATA_COLUMN_FAMILY_NAME) { + continue; + } + if (column_family == META_COLUMN_FAMILY_NAME) { + *need_open_with_meta_cf = true; + continue; + } + dassert_replica(false, "Column family '{}' should not present"); + } + return ::dsn::ERR_OK; +} + +void pegasus_server_impl::release_db() +{ + _db->DestroyColumnFamilyHandle(_data_cf); + _data_cf = nullptr; + if (_meta_cf != nullptr) { + _db->DestroyColumnFamilyHandle(_meta_cf); + } + _meta_cf = nullptr; + delete _db; + _db = nullptr; +} + } // namespace server } // namespace pegasus diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h index 14409cfe7f..1f001bfb48 100644 --- a/src/server/pegasus_server_impl.h +++ b/src/server/pegasus_server_impl.h @@ -300,8 +300,14 @@ class pegasus_server_impl : public ::dsn::apps::rrdb_service return false; } + ::dsn::error_code check_meta_cf(const std::string &path, bool *need_open_with_meta_cf); + void release_db(); + private: static const std::string COMPRESSION_HEADER; + // Column family names. + static const std::string DATA_COLUMN_FAMILY_NAME; + static const std::string META_COLUMN_FAMILY_NAME; dsn::gpid _gpid; std::string _primary_address; @@ -322,7 +328,9 @@ class pegasus_server_impl : public ::dsn::apps::rrdb_service rocksdb::ReadOptions _data_cf_rd_opts; std::string _usage_scenario; - rocksdb::DB *_db; + rocksdb::DB *_db = nullptr; + rocksdb::ColumnFamilyHandle *_data_cf = nullptr; + rocksdb::ColumnFamilyHandle *_meta_cf = nullptr; static std::shared_ptr _s_block_cache; volatile bool _is_open; uint32_t _pegasus_data_version;