From 270a9dd91f381518bb73d83c36809d16168a7365 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Sun, 3 Nov 2024 15:26:21 +0100 Subject: [PATCH] postgres_ffi: make `WalGenerator` generic over record generator --- libs/postgres_ffi/src/wal_generator.rs | 230 +++++++++++------- libs/postgres_ffi/src/xlog_utils.rs | 14 +- .../tests/walproposer_sim/walproposer_disk.rs | 8 +- 3 files changed, 153 insertions(+), 99 deletions(-) diff --git a/libs/postgres_ffi/src/wal_generator.rs b/libs/postgres_ffi/src/wal_generator.rs index 97968c269b49..dc679eea3302 100644 --- a/libs/postgres_ffi/src/wal_generator.rs +++ b/libs/postgres_ffi/src/wal_generator.rs @@ -1,10 +1,10 @@ -use std::ffi::CStr; +use std::ffi::{CStr, CString}; use bytes::{Bytes, BytesMut}; use crc32c::crc32c_append; use utils::lsn::Lsn; -use super::bindings::{XLogLongPageHeaderData, XLogPageHeaderData, XLOG_PAGE_MAGIC}; +use super::bindings::{RmgrId, XLogLongPageHeaderData, XLogPageHeaderData, XLOG_PAGE_MAGIC}; use super::xlog_utils::{ XlLogicalMessage, XLOG_RECORD_CRC_OFFS, XLOG_SIZE_OF_XLOG_RECORD, XLP_BKP_REMOVABLE, XLP_FIRST_IS_CONTRECORD, @@ -16,11 +16,65 @@ use crate::pg_constants::{ }; use crate::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ}; -/// Generates binary WAL records for use in tests and benchmarks. Currently only generates logical -/// messages (effectively noops) with a fixed payload. It is used as an iterator which yields -/// encoded bytes for a single WAL record, including internal page headers if it spans pages. -/// Concatenating the bytes will yield a complete, well-formed WAL, which can be chunked at segment -/// boundaries if desired. Not optimized for performance. +/// A WAL record payload. Will be prefixed by an XLogRecord header when encoded. +pub struct Record { + pub rmid: RmgrId, + pub info: u8, + pub data: Bytes, +} + +impl Record { + /// Encodes the WAL record including an XLogRecord header. prev_lsn is the start position of + /// the previous record in the WAL -- this is ignored by the Safekeeper, but not Postgres. + pub fn encode(&self, prev_lsn: Lsn) -> Bytes { + // Prefix data with block ID and length. + let data_header = Bytes::from(match self.data.len() { + 0 => vec![], + 1..=255 => vec![XLR_BLOCK_ID_DATA_SHORT, self.data.len() as u8], + 256.. => { + let len_bytes = (self.data.len() as u32).to_le_bytes(); + [&[XLR_BLOCK_ID_DATA_LONG], len_bytes.as_slice()].concat() + } + }); + + // Construct the WAL record header. + let mut header = XLogRecord { + xl_tot_len: (XLOG_SIZE_OF_XLOG_RECORD + data_header.len() + self.data.len()) as u32, + xl_xid: 0, + xl_prev: prev_lsn.into(), + xl_info: self.info, + xl_rmid: self.rmid, + __bindgen_padding_0: [0; 2], + xl_crc: 0, // see below + }; + + // Compute the CRC checksum for the data, and the header up to the CRC field. + let mut crc = 0; + crc = crc32c_append(crc, &data_header); + crc = crc32c_append(crc, &self.data); + crc = crc32c_append(crc, &header.encode().unwrap()[0..XLOG_RECORD_CRC_OFFS]); + header.xl_crc = crc; + + // Encode the final header and record. + let header = header.encode().unwrap(); + + [header, data_header, self.data.clone()].concat().into() + } +} + +/// Generates WAL record payloads. +/// +/// TODO: currently only provides LogicalMessageGenerator for trivial noop messages. Add a generator +/// that creates a table and inserts rows. +pub trait RecordGenerator: Iterator {} + +impl> RecordGenerator for I {} + +/// Generates binary WAL for use in tests and benchmarks. The provided record generator constructs +/// the WAL records. It is used as an iterator which yields encoded bytes for a single WAL record, +/// including internal page headers if it spans pages. Concatenating the bytes will yield a +/// complete, well-formed WAL, which can be chunked at segment boundaries if desired. Not optimized +/// for performance. /// /// The WAL format is version-dependant (see e.g. `XLOG_PAGE_MAGIC`), so make sure to import this /// for the appropriate Postgres version (e.g. `postgres_ffi::v17::wal_generator::WalGenerator`). @@ -31,10 +85,10 @@ use crate::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ}; /// | Segment 1 | Segment 2 | Segment 3 | /// | Page 1 | Page 2 | Page 3 | Page 4 | Page 5 | Page 6 | Page 7 | Page 8 | Page 9 | /// | R1 | R2 |R3| R4 | R5 | R6 | R7 | R8 | -/// -/// TODO: support generating actual tables and rows. #[derive(Default)] -pub struct WalGenerator { +pub struct WalGenerator { + /// Generates record payloads for the WAL. + pub record_generator: R, /// Current LSN to append the next record at. /// /// Callers can modify this (and prev_lsn) to restart generation at a different LSN, but should @@ -46,73 +100,35 @@ pub struct WalGenerator { pub prev_lsn: Lsn, } -impl WalGenerator { - // For now, hardcode the message payload. - // TODO: support specifying the payload size. - const PREFIX: &CStr = c"prefix"; - const MESSAGE: &[u8] = b"message"; - - // Hardcode the sys, timeline, and DB IDs. We can make them configurable if we care about them. +impl WalGenerator { + // Hardcode the sys and timeline ID. We can make them configurable if we care about them. const SYS_ID: u64 = 0; const TIMELINE_ID: u32 = 1; - const DB_ID: u32 = 0; - - /// Creates a new WAL generator, which emits logical message records (noops). - pub fn new() -> Self { - Self::default() - } - /// Encodes a logical message (basically a noop), with the given prefix and message. - pub(crate) fn encode_logical_message(prefix: &CStr, message: &[u8]) -> Bytes { - let prefix = prefix.to_bytes_with_nul(); - let header = XlLogicalMessage { - db_id: Self::DB_ID, - transactional: 0, - prefix_size: prefix.len() as u64, - message_size: message.len() as u64, - }; - [&header.encode(), prefix, message].concat().into() + /// Creates a new WAL generator with the given record generator. + pub fn new(record_generator: R) -> WalGenerator { + Self { + record_generator, + lsn: Lsn(0), + prev_lsn: Lsn(0), + } } - /// Encode a WAL record with the given payload data (e.g. a logical message). - pub(crate) fn encode_record(data: Bytes, rmid: u8, info: u8, prev_lsn: Lsn) -> Bytes { - // Prefix data with block ID and length. - let data_header = Bytes::from(match data.len() { - 0 => vec![], - 1..=255 => vec![XLR_BLOCK_ID_DATA_SHORT, data.len() as u8], - 256.. => { - let len_bytes = (data.len() as u32).to_le_bytes(); - [&[XLR_BLOCK_ID_DATA_LONG], len_bytes.as_slice()].concat() - } - }); - - // Construct the WAL record header. - let mut header = XLogRecord { - xl_tot_len: (XLOG_SIZE_OF_XLOG_RECORD + data_header.len() + data.len()) as u32, - xl_xid: 0, - xl_prev: prev_lsn.into(), - xl_info: info, - xl_rmid: rmid, - __bindgen_padding_0: [0; 2], - xl_crc: 0, // see below - }; - - // Compute the CRC checksum for the data, and the header up to the CRC field. - let mut crc = 0; - crc = crc32c_append(crc, &data_header); - crc = crc32c_append(crc, &data); - crc = crc32c_append(crc, &header.encode().unwrap()[0..XLOG_RECORD_CRC_OFFS]); - header.xl_crc = crc; - - // Encode the final header and record. - let header = header.encode().unwrap(); - - [header, data_header, data].concat().into() + /// Appends a record with an arbitrary payload at the current LSN, then increments the LSN. + /// Returns the WAL bytes for the record, including page headers and padding, and the start LSN. + fn append_record(&mut self, record: Record) -> (Lsn, Bytes) { + let record = record.encode(self.prev_lsn); + let record = Self::insert_pages(record, self.lsn); + let record = Self::pad_record(record, self.lsn); + let lsn = self.lsn; + self.prev_lsn = self.lsn; + self.lsn += record.len() as u64; + (lsn, record) } - /// Injects page headers on 8KB page boundaries. Takes the current LSN position where the record + /// Inserts page headers on 8KB page boundaries. Takes the current LSN position where the record /// is to be appended. - fn encode_pages(record: Bytes, mut lsn: Lsn) -> Bytes { + fn insert_pages(record: Bytes, mut lsn: Lsn) -> Bytes { // Fast path: record fits in current page, and the page already has a header. if lsn.remaining_in_block() as usize >= record.len() && lsn.block_offset() > 0 { return record; @@ -173,31 +189,71 @@ impl WalGenerator { } [record, Bytes::from(vec![0; padding])].concat().into() } +} - /// Generates a record with an arbitrary payload at the current LSN, then increments the LSN. - pub fn generate_record(&mut self, data: Bytes, rmid: u8, info: u8) -> Bytes { - let record = Self::encode_record(data, rmid, info, self.prev_lsn); - let record = Self::encode_pages(record, self.lsn); - let record = Self::pad_record(record, self.lsn); - self.prev_lsn = self.lsn; - self.lsn += record.len() as u64; - record +/// Generates WAL records as an iterator. +impl Iterator for WalGenerator { + type Item = (Lsn, Bytes); + + fn next(&mut self) -> Option { + let record = self.record_generator.next()?; + Some(self.append_record(record)) } +} + +/// Generates logical message records (effectively noops) with a fixed message. +pub struct LogicalMessageGenerator { + prefix: CString, + message: Vec, +} - /// Generates a logical message at the current LSN. Can be used to construct arbitrary messages. - pub fn generate_logical_message(&mut self, prefix: &CStr, message: &[u8]) -> Bytes { - let data = Self::encode_logical_message(prefix, message); - self.generate_record(data, RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE) +impl LogicalMessageGenerator { + const DB_ID: u32 = 0; // hardcoded for now + const RM_ID: RmgrId = RM_LOGICALMSG_ID; + const INFO: u8 = XLOG_LOGICAL_MESSAGE; + + /// Creates a new LogicalMessageGenerator. + pub fn new(prefix: &CStr, message: &[u8]) -> Self { + Self { + prefix: prefix.to_owned(), + message: message.to_owned(), + } + } + + /// Encodes a logical message. + fn encode(prefix: &CStr, message: &[u8]) -> Bytes { + let prefix = prefix.to_bytes_with_nul(); + let header = XlLogicalMessage { + db_id: Self::DB_ID, + transactional: 0, + prefix_size: prefix.len() as u64, + message_size: message.len() as u64, + }; + [&header.encode(), prefix, message].concat().into() } } -/// Generate WAL records as an iterator. -impl Iterator for WalGenerator { - type Item = (Lsn, Bytes); +impl Iterator for LogicalMessageGenerator { + type Item = Record; fn next(&mut self) -> Option { - let lsn = self.lsn; - let record = self.generate_logical_message(Self::PREFIX, Self::MESSAGE); - Some((lsn, record)) + Some(Record { + rmid: Self::RM_ID, + info: Self::INFO, + data: Self::encode(&self.prefix, &self.message), + }) + } +} + +impl WalGenerator { + /// Convenience method for appending a WAL record with an arbitrary logical message at the + /// current WAL LSN position. Returns the start LSN and resulting WAL bytes. + pub fn append_logical_message(&mut self, prefix: &CStr, message: &[u8]) -> (Lsn, Bytes) { + let record = Record { + rmid: LogicalMessageGenerator::RM_ID, + info: LogicalMessageGenerator::INFO, + data: LogicalMessageGenerator::encode(prefix, message), + }; + self.append_record(record) } } diff --git a/libs/postgres_ffi/src/xlog_utils.rs b/libs/postgres_ffi/src/xlog_utils.rs index 78a965174f86..852b20eacec6 100644 --- a/libs/postgres_ffi/src/xlog_utils.rs +++ b/libs/postgres_ffi/src/xlog_utils.rs @@ -12,9 +12,9 @@ use super::bindings::{ CheckPoint, ControlFileData, DBState_DB_SHUTDOWNED, FullTransactionId, TimeLineID, TimestampTz, XLogLongPageHeaderData, XLogPageHeaderData, XLogRecPtr, XLogRecord, XLogSegNo, XLOG_PAGE_MAGIC, }; -use super::wal_generator::WalGenerator; +use super::wal_generator::LogicalMessageGenerator; use super::PG_MAJORVERSION; -use crate::pg_constants::{self, RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE}; +use crate::pg_constants; use crate::PG_TLI; use crate::{uint32, uint64, Oid}; use crate::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ}; @@ -493,12 +493,10 @@ pub fn encode_logical_message(prefix: &str, message: &str) -> Bytes { // This function can take untrusted input, so discard any NUL bytes in the prefix string. let prefix = CString::new(prefix.replace('\0', "")).expect("no NULs"); let message = message.as_bytes(); - WalGenerator::encode_record( - WalGenerator::encode_logical_message(&prefix, message), - RM_LOGICALMSG_ID, - XLOG_LOGICAL_MESSAGE, - Lsn(0), - ) + LogicalMessageGenerator::new(&prefix, message) + .next() + .unwrap() + .encode(Lsn(0)) } #[cfg(test)] diff --git a/safekeeper/tests/walproposer_sim/walproposer_disk.rs b/safekeeper/tests/walproposer_sim/walproposer_disk.rs index f70cd65dfc77..aefb3919a1b3 100644 --- a/safekeeper/tests/walproposer_sim/walproposer_disk.rs +++ b/safekeeper/tests/walproposer_sim/walproposer_disk.rs @@ -1,7 +1,7 @@ use std::{ffi::CStr, sync::Arc}; use parking_lot::{Mutex, MutexGuard}; -use postgres_ffi::v16::wal_generator::WalGenerator; +use postgres_ffi::v16::wal_generator::{LogicalMessageGenerator, WalGenerator}; use utils::lsn::Lsn; use super::block_storage::BlockStorage; @@ -18,7 +18,7 @@ impl DiskWalProposer { internal_available_lsn: Lsn(0), prev_lsn: Lsn(0), disk: BlockStorage::new(), - wal_generator: WalGenerator::new(), + wal_generator: WalGenerator::new(LogicalMessageGenerator::new(c"", &[])), }), }) } @@ -36,7 +36,7 @@ pub struct State { // actual WAL storage disk: BlockStorage, // WAL record generator - wal_generator: WalGenerator, + wal_generator: WalGenerator, } impl State { @@ -64,7 +64,7 @@ impl State { /// Inserts a logical record in the WAL at the current LSN. pub fn insert_logical_message(&mut self, prefix: &CStr, msg: &[u8]) { - let record = self.wal_generator.generate_logical_message(prefix, msg); + let (_, record) = self.wal_generator.append_logical_message(prefix, msg); self.disk.write(self.internal_available_lsn.into(), &record); self.prev_lsn = self.internal_available_lsn; self.internal_available_lsn += record.len() as u64;