Skip to content

Commit

Permalink
Add START_WAL_PUSH proto_version and allow_timeline_creation options. (
Browse files Browse the repository at this point in the history
…#10406)

## Problem

As part of #8614 we need to
pass options to START_WAL_PUSH.

## Summary of changes

Add two options. `allow_timeline_creation`, default true, disables
implicit timeline creation in the connection from compute. Eventually
such creation will be forbidden completely, but as we migrate to
configurations we need to support both: current mode and configurations
enabled where creation by compute is disabled.

`proto_version` specifies compute <-> sk protocol version. We have it
currently in the first greeting package also, but I plan to change tag
size from u64 to u8, which would make it hard to use. Command is more
appropriate place for it anyway.
  • Loading branch information
arssher authored Jan 16, 2025
1 parent 2eda484 commit 6fe4c67
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 29 deletions.
107 changes: 100 additions & 7 deletions safekeeper/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,70 @@ pub struct SafekeeperPostgresHandler {

/// Parsed Postgres command.
enum SafekeeperPostgresCommand {
StartWalPush,
StartReplication { start_lsn: Lsn, term: Option<Term> },
StartWalPush {
proto_version: u32,
// Eventually timelines will be always created explicitly by storcon.
// This option allows legacy behaviour for compute to do that until we
// fully migrate.
allow_timeline_creation: bool,
},
StartReplication {
start_lsn: Lsn,
term: Option<Term>,
},
IdentifySystem,
TimelineStatus,
JSONCtrl { cmd: AppendLogicalMessage },
JSONCtrl {
cmd: AppendLogicalMessage,
},
}

fn parse_cmd(cmd: &str) -> anyhow::Result<SafekeeperPostgresCommand> {
if cmd.starts_with("START_WAL_PUSH") {
Ok(SafekeeperPostgresCommand::StartWalPush)
// Allow additional options in postgres START_REPLICATION style like
// START_WAL_PUSH (proto_version '3', allow_timeline_creation 'false').
// Parsing here is very naive and breaks in case of commas or
// whitespaces in values, but enough for our purposes.
let re = Regex::new(r"START_WAL_PUSH(\s+?\((.*)\))?").unwrap();
let caps = re
.captures(cmd)
.context(format!("failed to parse START_WAL_PUSH command {}", cmd))?;
// capture () content
let options = caps.get(2).map(|m| m.as_str()).unwrap_or("");
// default values
let mut proto_version = 2;
let mut allow_timeline_creation = true;
for kvstr in options.split(",") {
if kvstr.is_empty() {
continue;
}
let mut kvit = kvstr.split_whitespace();
let key = kvit.next().context(format!(
"failed to parse key in kv {} in command {}",
kvstr, cmd
))?;
let value = kvit.next().context(format!(
"failed to parse value in kv {} in command {}",
kvstr, cmd
))?;
let value_trimmed = value.trim_matches('\'');
if key == "proto_version" {
proto_version = value_trimmed.parse::<u32>().context(format!(
"failed to parse proto_version value {} in command {}",
value, cmd
))?;
}
if key == "allow_timeline_creation" {
allow_timeline_creation = value_trimmed.parse::<bool>().context(format!(
"failed to parse allow_timeline_creation value {} in command {}",
value, cmd
))?;
}
}
Ok(SafekeeperPostgresCommand::StartWalPush {
proto_version,
allow_timeline_creation,
})
} else if cmd.starts_with("START_REPLICATION") {
let re = Regex::new(
// We follow postgres START_REPLICATION LOGICAL options to pass term.
Expand Down Expand Up @@ -95,7 +149,7 @@ fn parse_cmd(cmd: &str) -> anyhow::Result<SafekeeperPostgresCommand> {

fn cmd_to_string(cmd: &SafekeeperPostgresCommand) -> &str {
match cmd {
SafekeeperPostgresCommand::StartWalPush => "START_WAL_PUSH",
SafekeeperPostgresCommand::StartWalPush { .. } => "START_WAL_PUSH",
SafekeeperPostgresCommand::StartReplication { .. } => "START_REPLICATION",
SafekeeperPostgresCommand::TimelineStatus => "TIMELINE_STATUS",
SafekeeperPostgresCommand::IdentifySystem => "IDENTIFY_SYSTEM",
Expand Down Expand Up @@ -293,8 +347,11 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
self.ttid = TenantTimelineId::new(tenant_id, timeline_id);

match cmd {
SafekeeperPostgresCommand::StartWalPush => {
self.handle_start_wal_push(pgb)
SafekeeperPostgresCommand::StartWalPush {
proto_version,
allow_timeline_creation,
} => {
self.handle_start_wal_push(pgb, proto_version, allow_timeline_creation)
.instrument(info_span!("WAL receiver"))
.await
}
Expand Down Expand Up @@ -467,3 +524,39 @@ impl SafekeeperPostgresHandler {
}
}
}

#[cfg(test)]
mod tests {
use super::SafekeeperPostgresCommand;

/// Test parsing of START_WAL_PUSH command
#[test]
fn test_start_wal_push_parse() {
let cmd = "START_WAL_PUSH";
let parsed = super::parse_cmd(cmd).expect("failed to parse");
match parsed {
SafekeeperPostgresCommand::StartWalPush {
proto_version,
allow_timeline_creation,
} => {
assert_eq!(proto_version, 2);
assert!(allow_timeline_creation);
}
_ => panic!("unexpected command"),
}

let cmd =
"START_WAL_PUSH (proto_version '3', allow_timeline_creation 'false', unknown 'hoho')";
let parsed = super::parse_cmd(cmd).expect("failed to parse");
match parsed {
SafekeeperPostgresCommand::StartWalPush {
proto_version,
allow_timeline_creation,
} => {
assert_eq!(proto_version, 3);
assert!(!allow_timeline_creation);
}
_ => panic!("unexpected command"),
}
}
}
57 changes: 39 additions & 18 deletions safekeeper/src/receive_wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,14 @@ impl SafekeeperPostgresHandler {
pub async fn handle_start_wal_push<IO: AsyncRead + AsyncWrite + Unpin>(
&mut self,
pgb: &mut PostgresBackend<IO>,
proto_version: u32,
allow_timeline_creation: bool,
) -> Result<(), QueryError> {
let mut tli: Option<WalResidentTimeline> = None;
if let Err(end) = self.handle_start_wal_push_guts(pgb, &mut tli).await {
if let Err(end) = self
.handle_start_wal_push_guts(pgb, &mut tli, proto_version, allow_timeline_creation)
.await
{
// Log the result and probably send it to the client, closing the stream.
let handle_end_fut = pgb.handle_copy_stream_end(end);
// If we managed to create the timeline, augment logging with current LSNs etc.
Expand All @@ -222,6 +227,8 @@ impl SafekeeperPostgresHandler {
&mut self,
pgb: &mut PostgresBackend<IO>,
tli: &mut Option<WalResidentTimeline>,
proto_version: u32,
allow_timeline_creation: bool,
) -> Result<(), CopyStreamHandlerEnd> {
// The `tli` parameter is only used for passing _out_ a timeline, one should
// not have been passed in.
Expand Down Expand Up @@ -250,12 +257,17 @@ impl SafekeeperPostgresHandler {
conn_id: self.conn_id,
pgb_reader: &mut pgb_reader,
peer_addr,
proto_version,
acceptor_handle: &mut acceptor_handle,
global_timelines: self.global_timelines.clone(),
};

// Read first message and create timeline if needed.
let res = network_reader.read_first_message().await;
// Read first message and create timeline if needed and allowed. This
// won't be when timelines will be always created by storcon and
// allow_timeline_creation becomes false.
let res = network_reader
.read_first_message(allow_timeline_creation)
.await;

let network_res = if let Ok((timeline, next_msg)) = res {
let pageserver_feedback_rx: tokio::sync::broadcast::Receiver<PageserverFeedback> =
Expand Down Expand Up @@ -313,6 +325,7 @@ struct NetworkReader<'a, IO> {
conn_id: ConnectionId,
pgb_reader: &'a mut PostgresBackendReader<IO>,
peer_addr: SocketAddr,
proto_version: u32,
// WalAcceptor is spawned when we learn server info from walproposer and
// create timeline; handle is put here.
acceptor_handle: &'a mut Option<JoinHandle<anyhow::Result<()>>>,
Expand All @@ -322,9 +335,10 @@ struct NetworkReader<'a, IO> {
impl<IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'_, IO> {
async fn read_first_message(
&mut self,
allow_timeline_creation: bool,
) -> Result<(WalResidentTimeline, ProposerAcceptorMessage), CopyStreamHandlerEnd> {
// Receive information about server to create timeline, if not yet.
let next_msg = read_message(self.pgb_reader).await?;
let next_msg = read_message(self.pgb_reader, self.proto_version).await?;
let tli = match next_msg {
ProposerAcceptorMessage::Greeting(ref greeting) => {
info!(
Expand All @@ -336,17 +350,22 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'_, IO> {
system_id: greeting.system_id,
wal_seg_size: greeting.wal_seg_size,
};
let tli = self
.global_timelines
.create(
self.ttid,
Configuration::empty(),
server_info,
Lsn::INVALID,
Lsn::INVALID,
)
.await
.context("create timeline")?;
let tli = if allow_timeline_creation {
self.global_timelines
.create(
self.ttid,
Configuration::empty(),
server_info,
Lsn::INVALID,
Lsn::INVALID,
)
.await
.context("create timeline")?
} else {
self.global_timelines
.get(self.ttid)
.context("get timeline")?
};
tli.wal_residence_guard().await?
}
_ => {
Expand Down Expand Up @@ -375,24 +394,26 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'_, IO> {
));

// Forward all messages to WalAcceptor
read_network_loop(self.pgb_reader, msg_tx, next_msg).await
read_network_loop(self.pgb_reader, msg_tx, next_msg, self.proto_version).await
}
}

/// Read next message from walproposer.
/// TODO: Return Ok(None) on graceful termination.
async fn read_message<IO: AsyncRead + AsyncWrite + Unpin>(
pgb_reader: &mut PostgresBackendReader<IO>,
proto_version: u32,
) -> Result<ProposerAcceptorMessage, CopyStreamHandlerEnd> {
let copy_data = pgb_reader.read_copy_message().await?;
let msg = ProposerAcceptorMessage::parse(copy_data)?;
let msg = ProposerAcceptorMessage::parse(copy_data, proto_version)?;
Ok(msg)
}

async fn read_network_loop<IO: AsyncRead + AsyncWrite + Unpin>(
pgb_reader: &mut PostgresBackendReader<IO>,
msg_tx: Sender<ProposerAcceptorMessage>,
mut next_msg: ProposerAcceptorMessage,
proto_version: u32,
) -> Result<(), CopyStreamHandlerEnd> {
/// Threshold for logging slow WalAcceptor sends.
const SLOW_THRESHOLD: Duration = Duration::from_secs(5);
Expand Down Expand Up @@ -425,7 +446,7 @@ async fn read_network_loop<IO: AsyncRead + AsyncWrite + Unpin>(
WAL_RECEIVER_QUEUE_DEPTH_TOTAL.inc();
WAL_RECEIVER_QUEUE_SIZE_TOTAL.add(size as i64);

next_msg = read_message(pgb_reader).await?;
next_msg = read_message(pgb_reader, proto_version).await?;
}
}

Expand Down
11 changes: 9 additions & 2 deletions safekeeper/src/safekeeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use utils::{
lsn::Lsn,
};

const SK_PROTOCOL_VERSION: u32 = 2;
pub const SK_PROTOCOL_VERSION: u32 = 2;
pub const UNKNOWN_SERVER_VERSION: u32 = 0;

#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
Expand Down Expand Up @@ -317,7 +317,14 @@ pub enum ProposerAcceptorMessage {

impl ProposerAcceptorMessage {
/// Parse proposer message.
pub fn parse(msg_bytes: Bytes) -> Result<ProposerAcceptorMessage> {
pub fn parse(msg_bytes: Bytes, proto_version: u32) -> Result<ProposerAcceptorMessage> {
if proto_version != SK_PROTOCOL_VERSION {
bail!(
"incompatible protocol version {}, expected {}",
proto_version,
SK_PROTOCOL_VERSION
);
}
// xxx using Reader is inefficient but easy to work with bincode
let mut stream = msg_bytes.reader();
// u64 is here to avoid padding; it will be removed once we stop packing C structs into the wire as is
Expand Down
6 changes: 4 additions & 2 deletions safekeeper/tests/walproposer_sim/safekeeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ use desim::{
};
use http::Uri;
use safekeeper::{
safekeeper::{ProposerAcceptorMessage, SafeKeeper, UNKNOWN_SERVER_VERSION},
safekeeper::{
ProposerAcceptorMessage, SafeKeeper, SK_PROTOCOL_VERSION, UNKNOWN_SERVER_VERSION,
},
state::{TimelinePersistentState, TimelineState},
timeline::TimelineError,
wal_storage::Storage,
Expand Down Expand Up @@ -285,7 +287,7 @@ impl ConnState {
bail!("finished processing START_REPLICATION")
}

let msg = ProposerAcceptorMessage::parse(copy_data)?;
let msg = ProposerAcceptorMessage::parse(copy_data, SK_PROTOCOL_VERSION)?;
debug!("got msg: {:?}", msg);
self.process(msg, global)
} else {
Expand Down

1 comment on commit 6fe4c67

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7477 tests run: 7087 passed, 1 failed, 389 skipped (full report)


Failures on Postgres 16

# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_layer_map[release-pg16-github-actions-selfhosted]"
Flaky tests (1)

Postgres 16

  • test_timeline_ancestor_detach_idempotent_success[shards_initial_after2]: release-x86-64

Code coverage* (full report)

  • functions: 33.7% (8426 of 25017 functions)
  • lines: 49.2% (70468 of 143315 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
6fe4c67 at 2025-01-16T10:23:31.969Z :recycle:

Please sign in to comment.