Skip to content

Commit

Permalink
Added support for resuming download to existing binlog storage (#31)
Browse files Browse the repository at this point in the history
'binsrv::filesystem_storage' now supports reading the content of the data
directory upon initialization, analyzing its entries, compare them with the
content of the binlog index file and setting last used binlog name / position
as the result.

'binsrv::events::reader_context' class extended with ability to correctly
process the stream of binlog events that starts not from the very beginning but
from the specified position (binlog name / offset). In this case we expect an
artificial RORATE event followed by a pseudo FDE (the one that has next event
position equal to 0 and should not be written to the filesystem storage).

'binlog_streaming.binsrv' MTR test case now also resumes streaming binlog
events to a file storage directory created at a previous run.
  • Loading branch information
percona-ysorokin authored Feb 1, 2024
1 parent 8b14b32 commit 84f3db9
Show file tree
Hide file tree
Showing 9 changed files with 282 additions and 134 deletions.
10 changes: 6 additions & 4 deletions mtr/binlog_streaming/r/binsrv.result
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@ FLUSH BINARY LOGS;

*** Determining the second binary log name.

*** Filling the table with some more data and dropping the table.
*** Filling the table with some more data.
INSERT INTO t1 VALUES(DEFAULT);
DROP TABLE t1;

*** Generating a configuration file in JSON format for the Binlog
*** Server utility.
Expand Down Expand Up @@ -59,14 +58,17 @@ SET @binsrv_config_json = JSON_OBJECT(

*** Comparing server and downloaded versions of the second binlog file.

*** Cleaning up the Binlog Server utility storage directory.
*** Filling the table with some more data and dropping the table.
INSERT INTO t1 VALUES(DEFAULT);
DROP TABLE t1;

*** FLUSHING the binlog one more time to make sure that the second one
*** is no longer open.
FLUSH BINARY LOGS;

*** Executing the Binlog Server utility one more time (the second
*** binlog is no longer open / in use).
*** binlog is no longer open / in use). Here we should also continue
*** streaming binlog events from the last saved position.

*** Comparing server and downloaded versions of the first binlog file
*** one more time.
Expand Down
12 changes: 6 additions & 6 deletions mtr/binlog_streaming/t/binsrv.test
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,8 @@ FLUSH BINARY LOGS;
--let $second_binlog = query_get_value(SHOW MASTER STATUS, File, 1)

--echo
--echo *** Filling the table with some more data and dropping the table.
--echo *** Filling the table with some more data.
INSERT INTO t1 VALUES(DEFAULT);
DROP TABLE t1;

--echo
--echo *** Generating a configuration file in JSON format for the Binlog
Expand Down Expand Up @@ -145,9 +144,9 @@ EOF
--remove_file $PATCHED_BINLOG_FILE

--echo
--echo *** Cleaning up the Binlog Server utility storage directory.
--force-rmdir $binsrv_storage_path
--mkdir $binsrv_storage_path
--echo *** Filling the table with some more data and dropping the table.
INSERT INTO t1 VALUES(DEFAULT);
DROP TABLE t1;

--echo
--echo *** FLUSHING the binlog one more time to make sure that the second one
Expand All @@ -156,7 +155,8 @@ FLUSH BINARY LOGS;

--echo
--echo *** Executing the Binlog Server utility one more time (the second
--echo *** binlog is no longer open / in use).
--echo *** binlog is no longer open / in use). Here we should also continue
--echo *** streaming binlog events from the last saved position.
--exec $BINSRV $binsrv_config_file_path > /dev/null 2>&1

--echo
Expand Down
154 changes: 87 additions & 67 deletions src/app.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
#include "util/exception_location_helpers.hpp"
#include "util/nv_tuple.hpp"

namespace {

void log_span_dump(binsrv::basic_logger &logger,
util::const_byte_span portion) {
static constexpr std::size_t bytes_per_dump_line{16U};
Expand Down Expand Up @@ -78,7 +80,7 @@ void log_event_common_header(
<< ", server id:" << common_header.get_server_id_raw()
<< ", event size:" << common_header.get_event_size_raw()
<< ", next event position:" << common_header.get_next_event_position_raw()
<< ", flags: " << common_header.get_flags_raw() << " ("
<< ", flags:" << common_header.get_flags_raw() << " ("
<< common_header.get_readable_flags() << ')';

logger.log(binsrv::log_severity::debug, oss.str());
Expand Down Expand Up @@ -111,6 +113,72 @@ void log_format_description_event(
logger.log(binsrv::log_severity::debug, oss.str());
}

void receive_binlog_events(binsrv::basic_logger &logger,
easymysql::binlog &binlog,
binsrv::filesystem_storage &storage) {
// Network streams are requested with COM_BINLOG_DUMP and
// each Binlog Event response is prepended with 00 OK-byte.
static constexpr std::byte expected_event_packet_prefix{'\0'};

util::const_byte_span portion;

binsrv::event::reader_context context{};

while (!(portion = binlog.fetch()).empty()) {
if (portion[0] != expected_event_packet_prefix) {
util::exception_location().raise<std::invalid_argument>(
"unexpected event prefix");
}
portion = portion.subspan(1U);
logger.log(binsrv::log_severity::info,
"fetched " + std::to_string(std::size(portion)) +
"-byte(s) event from binlog");

// TODO: just for redirection to another byte stream we need to parse
// the ROTATE and FORMAT_DESCRIPTION events only, every other one
// can be just considered as a data portion (unless we want to do
// basic integrity checks like event sizes / position and CRC)
const binsrv::event::event current_event{context, portion};

const auto &current_common_header = current_event.get_common_header();
log_event_common_header(logger, current_common_header);

const auto code = current_common_header.get_type_code();
if (code == binsrv::event::code_type::format_description) {
const auto &current_fde_post_header =
current_event
.get_post_header<binsrv::event::code_type::format_description>();
const auto &current_fde_body =
current_event
.get_body<binsrv::event::code_type::format_description>();

log_format_description_event(logger, current_fde_post_header,
current_fde_body);
}
log_span_dump(logger, portion);

const auto is_artificial{current_common_header.get_flags().has_element(
binsrv::event::flag_type::artificial)};
const auto is_pseudo{current_common_header.get_next_event_position_raw() ==
0U};

if (code == binsrv::event::code_type::rotate && is_artificial) {
const auto &current_rotate_body =
current_event.get_body<binsrv::event::code_type::rotate>();

storage.open_binlog(current_rotate_body.get_binlog());
}
if (!is_artificial && !is_pseudo) {
storage.write_event(portion);
}
if (code == binsrv::event::code_type::rotate && !is_artificial) {
storage.close_binlog();
}
}
}

} // anonymous namespace

int main(int argc, char *argv[]) {
using namespace std::string_literals;

Expand Down Expand Up @@ -186,6 +254,21 @@ int main(int argc, char *argv[]) {
msg += storage.get_root_path();
logger->log(binsrv::log_severity::info, msg);

const auto last_binlog_name{storage.get_binlog_name()};
// if storage position is detected to be 0 (empty data directory), we
// start streaming from the position magic_binlog_offset (4)
const auto last_binlog_position{
std::max(binsrv::event::magic_binlog_offset, storage.get_position())};
if (last_binlog_name.empty()) {
msg = "filesystem binlog storage initialized on an empty directory";
} else {
msg = "filesystem binlog storage initialized at \"";
msg += last_binlog_name;
msg += "\":";
msg += std::to_string(last_binlog_position);
}
logger->log(binsrv::log_severity::info, msg);

const auto &connection_config = config->root().get<"connection">();
logger->log(binsrv::log_severity::info,
"mysql connection string: " +
Expand Down Expand Up @@ -218,74 +301,11 @@ int main(int argc, char *argv[]) {
logger->log(binsrv::log_severity::info, msg);

static constexpr std::uint32_t default_server_id{0U};
auto binlog =
connection.create_binlog(default_server_id, std::string_view{},
binsrv::event::magic_binlog_offset);
auto binlog = connection.create_binlog(default_server_id, last_binlog_name,
last_binlog_position);
logger->log(binsrv::log_severity::info, "opened binary log connection");

// TODO: The first event is either a START_EVENT_V3 or a
// FORMAT_DESCRIPTION_EVENT while the last event is either a
// STOP_EVENT or ROTATE_EVENT. For Binlog Version 4 (current one)
// only FORMAT_DESCRIPTION_EVENT / ROTATE_EVENT pair should be
// acceptable.

// Network streams are requested with COM_BINLOG_DUMP and
// each Binlog Event response is prepended with 00 OK-byte.
static constexpr std::byte expected_event_packet_prefix{'\0'};

util::const_byte_span portion;

binsrv::event::reader_context context{};

while (!(portion = binlog.fetch()).empty()) {
if (portion[0] != expected_event_packet_prefix) {
util::exception_location().raise<std::invalid_argument>(
"unexpected event prefix");
}
portion = portion.subspan(1U);
logger->log(binsrv::log_severity::info,
"fetched " + std::to_string(std::size(portion)) +
"-byte(s) event from binlog");

// TODO: just for redirection to another byte stream we need to parse
// the ROTATE and FORMAT_DESCRIPTION events only, every other one
// can be just considered as a data portion (unless we want to do
// basic integrity checks like event sizes / position and CRC)
const binsrv::event::event current_event{context, portion};

const auto &current_common_header = current_event.get_common_header();
log_event_common_header(*logger, current_common_header);

const auto code = current_common_header.get_type_code();
if (code == binsrv::event::code_type::format_description) {
const auto &current_fde_post_header = current_event.get_post_header<
binsrv::event::code_type::format_description>();
const auto &current_fde_body =
current_event
.get_body<binsrv::event::code_type::format_description>();

log_format_description_event(*logger, current_fde_post_header,
current_fde_body);
}
log_span_dump(*logger, portion);

switch (code) {
case binsrv::event::code_type::rotate:
if (current_common_header.get_flags().has_element(
binsrv::event::flag_type::artificial)) {
const auto &current_rotate_body =
current_event.get_body<binsrv::event::code_type::rotate>();

storage.open_binlog(current_rotate_body.get_binlog());
} else {
storage.write_event(portion);
storage.close_binlog();
}
break;
default:
storage.write_event(portion);
}
}
receive_binlog_events(*logger, binlog, storage);

exit_code = EXIT_SUCCESS;
} catch (...) {
Expand Down
76 changes: 44 additions & 32 deletions src/binsrv/event/reader_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,22 @@ reader_context::reader_context()

void reader_context::process_event(const event &current_event) {
const auto &common_header{current_event.get_common_header()};
const auto code = common_header.get_type_code();
if (code == code_type::rotate &&
common_header.get_flags().has_element(flag_type::artificial)) {
const auto code{common_header.get_type_code()};
const auto is_artificial{
common_header.get_flags().has_element(flag_type::artificial)};
const auto is_pseudo{common_header.get_next_event_position_raw() == 0U};
// artificial rotate events must always have enext event position and
// timestamp setto 0
if (code == code_type::rotate && is_artificial) {
if (common_header.get_timestamp_raw() != 0U) {
util::exception_location().raise<std::logic_error>(
"non-zero timestamp found in an artificial rotate event");
}
if (!is_pseudo) {
util::exception_location().raise<std::logic_error>(
"non-zero next event position found in an artificial rotate event");
}

// we expect an artificial rotate event to be the very first event in the
// newly-created binlog file
if (position_ != 0U) {
Expand All @@ -31,45 +44,44 @@ void reader_context::process_event(const event &current_event) {
"file");
}

// check if next event position and timestamp are set 0
if (common_header.get_timestamp_raw() != 0U) {
util::exception_location().raise<std::logic_error>(
"non-zero timestamp found in an artificial rotate event");
}
if (common_header.get_next_event_position_raw() != 0U) {
position_ = static_cast<std::uint32_t>(
current_event.get_post_header<code_type::rotate>()
.get_position_id_raw());
}
if (!is_artificial && !is_pseudo) {
// every non-artificial event must be preceded by the FDE
// (the exception is FDE itself)
if (code != code_type::format_description && !fde_processed_) {
// TODO: this check should be performed just after the common header is
// parsed to make sure we rely on proper post_header lengths
util::exception_location().raise<std::logic_error>(
"non-zero next event position found in an artificial rotate event");
"a non-artificial event encountered before format description event");
}
position_ = static_cast<std::uint32_t>(magic_binlog_offset);

} else {
// every other event (including non-artificial rotate event) are
// processed the usual way

// check if common_header.next_event_position matches current position
// plus common_header.event_size
if (position_ + common_header.get_event_size_raw() !=
common_header.get_next_event_position_raw()) {
util::exception_location().raise<std::logic_error>(
"unexpected next event position on the event common header");
"unexpected next event position in the event common header");
}
if (code == code_type::rotate) {
// position in non-artificial rotate event post header must be equal to
// magic_binlog_offset (4)
if (current_event.get_post_header<code_type::rotate>()
.get_position_id_raw() != magic_binlog_offset) {
util::exception_location().raise<std::logic_error>(
"unexpected position in an non-artificial rotate event post "
"header");
}

// normal (non-artificial) event is expected to be the last event in
// binlog - so we reset the current position here
position_ = 0U;
} else {
// for every other event we simply advance current position
position_ = common_header.get_next_event_position_raw();
// simply advance current position
position_ = common_header.get_next_event_position_raw();
}

if (code == code_type::rotate && !is_artificial) {
// position in non-artificial rotate event post header must be equal to
// magic_binlog_offset (4)
if (current_event.get_post_header<code_type::rotate>()
.get_position_id_raw() != magic_binlog_offset) {
util::exception_location().raise<std::logic_error>(
"unexpected position in an non-artificial rotate event post "
"header");
}

// normal (non-artificial) event is expected to be the last event in
// binlog - so we reset the current position here
position_ = 0U;
}

// TODO: add some kind of state machine where the expected sequence of events
Expand Down
2 changes: 1 addition & 1 deletion src/binsrv/event/reader_context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class [[nodiscard]] reader_context {
// NOLINTNEXTLINE(cppcoreguidelines-use-default-member-init,modernize-use-default-member-init)
checksum_algorithm_type checksum_algorithm_;
post_header_length_container post_header_lengths_{};
std::uint32_t position_{0ULL};
std::uint32_t position_{0U};

void process_event(const event &current_event);
};
Expand Down
Loading

0 comments on commit 84f3db9

Please sign in to comment.