Skip to content

Commit

Permalink
fix: pika-migrate don't support Stream type
Browse files Browse the repository at this point in the history
  • Loading branch information
pro-spild committed Jan 9, 2025
1 parent 13ecac6 commit 63a3746
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 5 deletions.
1 change: 1 addition & 0 deletions tools/pika_migrate/include/migrator_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class MigratorThread : public net::Thread {
void MigrateHashesDB();
void MigrateSetsDB();
void MigrateZsetsDB();
void MigrateStreamsDB();

virtual void *ThreadMain();

Expand Down
99 changes: 94 additions & 5 deletions tools/pika_migrate/src/migrator_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,87 @@ void MigratorThread::MigrateZsetsDB() {
}
}

void MigratorThread::MigrateStreamsDB() {
int64_t scan_batch_num = g_pika_conf->sync_batch_num() * 10;
if (MAX_BATCH_NUM < scan_batch_num) {
if (g_pika_conf->sync_batch_num() < MAX_BATCH_NUM) {
scan_batch_num = MAX_BATCH_NUM;
} else {
scan_batch_num = g_pika_conf->sync_batch_num() * 2;
}
}

int64_t ttl = -1;
int64_t cursor = 0;
storage::Status s;
std::vector<std::string> keys;
int64_t timestamp;

while (true) {
cursor = storage_->Scan(storage::DataType::kStreams, cursor, "*", scan_batch_num, &keys);

for (const auto& key : keys) {
std::vector<storage::IdMessage> id_message;
storage::StreamScanArgs arg;
storage::StreamUtils::StreamParseIntervalId("-", arg.start_sid, &arg.start_ex, 0);
storage::StreamUtils::StreamParseIntervalId("+", arg.end_sid, &arg.end_ex, UINT64_MAX);

storage::Status s = storage_->XRange(key, arg, id_message);
if (!s.ok()) {
LOG(WARNING) << "db->XRange(key:" << key << ") = " << s.ToString();
continue;
}
auto it = id_message.begin();
while (!should_exit_ && it != id_message.end()) {
net::RedisCmdArgsType argv;
std::string cmd;

argv.push_back("XADD");
argv.push_back(key);
for (int idx = 0;
idx < g_pika_conf->sync_batch_num() && !should_exit_ && it != id_message.end();
idx++, it++) {
std::vector<std::string> message;
storage::StreamUtils::DeserializeMessage(it->value, message);
storage::streamID sid;
sid.DeserializeFrom(it->field);
argv.push_back(sid.ToString());
for (const auto& m : message) {
argv.push_back(m);
}
}

net::SerializeRedisCommand(argv, &cmd);
PlusNum();
DispatchKey(cmd, key);
}

ttl = -1;
timestamp = storage_->TTL(key);
if (timestamp != -2) {
ttl = timestamp;
}

if (s.ok() && ttl > 0) {
net::RedisCmdArgsType argv;
std::string cmd;

argv.push_back("EXPIRE");
argv.push_back(key);
argv.push_back(std::to_string(ttl));

net::SerializeRedisCommand(argv, &cmd);
PlusNum();
DispatchKey(cmd, key);
}
}

if (!cursor) {
break;
}
}
}

void MigratorThread::MigrateDB() {
switch (int(type_)) {
case int(storage::DataType::kStrings) : {
Expand Down Expand Up @@ -399,6 +480,10 @@ void MigratorThread::MigrateDB() {
break;
}

case int(storage::DataType::kStreams) : {
MigrateStreamsDB();
break;
}
default: {
LOG(WARNING) << "illegal db type " << type_;
break;
Expand All @@ -418,23 +503,27 @@ void MigratorThread::DispatchKey(const std::string &command, const std::string&
const char* GetDBTypeString(int type) {
switch (type) {
case int(storage::DataType::kStrings) : {
return "storage::DataType::kStrings";
return "storage::kStrings";
}

case int(storage::DataType::kLists) : {
return "storage::DataType::kLists";
return "storage::kLists";
}

case int(storage::DataType::kHashes) : {
return "storage::DataType::kHashes";
return "storage::kHashes";
}

case int(storage::DataType::kSets) : {
return "storage::DataType::kSets";
return "storage::kSets";
}

case int(storage::DataType::kZSets) : {
return "storage::DataType::kZSets";
return "storage::kZSets";
}

case int(storage::DataType::kStreams) : {
return "storage::kStreams";
}

default: {
Expand Down
1 change: 1 addition & 0 deletions tools/pika_migrate/src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1630,6 +1630,7 @@ void PikaServer::RetransmitData(const std::string& path) {
migrators.emplace_back(std::make_shared<MigratorThread>(storage_, &pika_senders, int(storage::DataType::kHashes), thread_num));
migrators.emplace_back(std::make_shared<MigratorThread>(storage_, &pika_senders, int(storage::DataType::kSets), thread_num));
migrators.emplace_back(std::make_shared<MigratorThread>(storage_, &pika_senders, int(storage::DataType::kZSets), thread_num));
migrators.emplace_back(std::make_shared<MigratorThread>(storage_, &pika_senders, int(storage::DataType::kStreams), thread_num));

for (size_t i = 0; i < pika_senders.size(); i++) {
pika_senders[i]->StartThread();
Expand Down

0 comments on commit 63a3746

Please sign in to comment.