Skip to content

Commit

Permalink
safekeeper: add membership configuration switch endpoint.
Browse files Browse the repository at this point in the history
Also add it to python client for tests, and add simple test itself.
  • Loading branch information
arssher committed Dec 25, 2024
1 parent db87b9e commit 060a731
Show file tree
Hide file tree
Showing 7 changed files with 197 additions and 4 deletions.
2 changes: 2 additions & 0 deletions libs/safekeeper_api/src/membership.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ pub const INVALID_GENERATION: Generation = 0;
pub struct SafekeeperId {
pub id: NodeId,
pub host: String,
/// We include here only port for computes -- that is, pg protocol tenant
/// only port, or wide pg protocol port if the former is not configured.
pub pg_port: u16,
}

Expand Down
15 changes: 15 additions & 0 deletions libs/safekeeper_api/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ pub enum WalReceiverStatus {
pub struct TimelineStatus {
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub mconf: Configuration,
pub acceptor_state: AcceptorStateStatus,
pub pg_info: ServerInfo,
pub flush_lsn: Lsn,
Expand All @@ -189,6 +190,20 @@ pub struct TimelineStatus {
pub walreceivers: Vec<WalReceiverState>,
}

/// Request to switch membership configuration.
#[derive(Serialize, Deserialize)]
#[serde(transparent)]
pub struct TimelineMembershipSwitchRequest {
pub mconf: Configuration,
}

/// In response both previous and current configuration is sent.
#[derive(Serialize, Deserialize)]
pub struct TimelineMembershipSwitchResponse {
pub previous_conf: Configuration,
pub current_conf: Configuration,
}

fn lsn_invalid() -> Lsn {
Lsn::INVALID
}
Expand Down
28 changes: 28 additions & 0 deletions safekeeper/src/http/routes.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use hyper::{Body, Request, Response, StatusCode};
use safekeeper_api::models;
use safekeeper_api::models::AcceptorStateStatus;
use safekeeper_api::models::SafekeeperStatus;
use safekeeper_api::models::TermSwitchApiEntry;
Expand Down Expand Up @@ -183,6 +184,7 @@ async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body
let status = TimelineStatus {
tenant_id: ttid.tenant_id,
timeline_id: ttid.timeline_id,
mconf: state.mconf,
acceptor_state: acc_state,
pg_info: state.server,
flush_lsn,
Expand Down Expand Up @@ -268,6 +270,28 @@ async fn timeline_snapshot_handler(request: Request<Body>) -> Result<Response<Bo
Ok(response)
}

/// Consider switching membership configuration of the timeline.
async fn timeline_membership_handler(
mut request: Request<Body>,
) -> Result<Response<Body>, ApiError> {
let ttid = TenantTimelineId::new(
parse_request_param(&request, "tenant_id")?,
parse_request_param(&request, "timeline_id")?,
);
check_permission(&request, Some(ttid.tenant_id))?;

let global_timelines = get_global_timelines(&request);
let tli = global_timelines.get(ttid).map_err(ApiError::from)?;

let data: models::TimelineMembershipSwitchRequest = json_request(&mut request).await?;
let response = tli
.membership_switch(data.mconf)
.await
.map_err(ApiError::InternalServerError)?;

json_response(StatusCode::OK, response)
}

async fn timeline_copy_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permission(&request, None)?;

Expand Down Expand Up @@ -619,6 +643,10 @@ pub fn make_router(
"/v1/tenant/:tenant_id/timeline/:timeline_id/snapshot/:destination_id",
|r| request_span(r, timeline_snapshot_handler),
)
.post(
"/v1/tenant/:tenant_id/timeline/:timeline_id/membership",
|r| request_span(r, timeline_membership_handler),
)
.post(
"/v1/tenant/:tenant_id/timeline/:source_timeline_id/copy",
|r| request_span(r, timeline_copy_handler),
Expand Down
30 changes: 29 additions & 1 deletion safekeeper/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ use std::{cmp::max, ops::Deref, time::SystemTime};
use anyhow::{bail, Result};
use postgres_ffi::WAL_SEGMENT_SIZE;
use safekeeper_api::{
membership::Configuration, models::TimelineTermBumpResponse, ServerInfo, Term, INITIAL_TERM,
membership::Configuration,
models::{TimelineMembershipSwitchResponse, TimelineTermBumpResponse},
ServerInfo, Term, INITIAL_TERM,
};
use serde::{Deserialize, Serialize};
use tracing::info;
use utils::{
id::{TenantId, TenantTimelineId, TimelineId},
lsn::Lsn,
Expand Down Expand Up @@ -258,6 +261,31 @@ where
current_term: after,
})
}

/// Switch into membership configuration `to` if it is higher than the
/// current one.
pub async fn membership_switch(
&mut self,
to: Configuration,
) -> Result<TimelineMembershipSwitchResponse> {
let before = self.mconf.clone();
// Is switch allowed?
if to.generation <= self.mconf.generation {
info!(
"ignoring request to switch conf to lower {}, current conf {}",
to, self.mconf
);
} else {
let mut state = self.start_change();
state.mconf = to.clone();
self.finish_change(&state).await?;
info!("switched conf to {} from {}", to, before);
}
Ok(TimelineMembershipSwitchResponse {
previous_conf: before,
current_conf: self.mconf.clone(),
})
}
}

impl<CTRL> Deref for TimelineState<CTRL>
Expand Down
20 changes: 19 additions & 1 deletion safekeeper/src/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
use anyhow::{anyhow, bail, Result};
use camino::{Utf8Path, Utf8PathBuf};
use remote_storage::RemotePath;
use safekeeper_api::models::{PeerInfo, TimelineTermBumpResponse};
use safekeeper_api::membership::Configuration;
use safekeeper_api::models::{
PeerInfo, TimelineMembershipSwitchResponse, TimelineTermBumpResponse,
};
use safekeeper_api::Term;
use tokio::fs::{self};
use tokio_util::sync::CancellationToken;
Expand Down Expand Up @@ -188,6 +191,13 @@ impl StateSK {
self.state_mut().term_bump(to).await
}

pub async fn membership_switch(
&mut self,
to: Configuration,
) -> Result<TimelineMembershipSwitchResponse> {
self.state_mut().membership_switch(to).await
}

/// Close open WAL files to release FDs.
fn close_wal_store(&mut self) {
if let StateSK::Loaded(sk) = self {
Expand Down Expand Up @@ -768,6 +778,14 @@ impl Timeline {
state.sk.term_bump(to).await
}

pub async fn membership_switch(
self: &Arc<Self>,
to: Configuration,
) -> Result<TimelineMembershipSwitchResponse> {
let mut state = self.write_shared_state().await;
state.sk.membership_switch(to).await
}

/// Guts of [`Self::wal_residence_guard`] and [`Self::try_wal_residence_guard`]
async fn do_wal_residence_guard(
self: &Arc<Self>,
Expand Down
40 changes: 39 additions & 1 deletion test_runner/fixtures/safekeeper/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class Walreceiver:

@dataclass
class SafekeeperTimelineStatus:
mconf: Configuration
term: int
last_log_term: int
pg_version: int # Not exactly a PgVersion, safekeeper returns version as int, for example 150002 for 15.2
Expand Down Expand Up @@ -73,7 +74,7 @@ def from_json(cls, d: dict[str, Any]) -> TermBumpResponse:
class SafekeeperId:
id: int
host: str
pg_port: str
pg_port: int


@dataclass
Expand All @@ -82,6 +83,16 @@ class Configuration:
members: list[SafekeeperId]
new_members: list[SafekeeperId] | None

@classmethod
def from_json(cls, d: dict[str, Any]) -> Configuration:
generation = d["generation"]
members = d["members"]
new_members = d.get("new_members")
return Configuration(generation, members, new_members)

def to_json(self) -> str:
return json.dumps(self, cls=EnhancedJSONEncoder)


@dataclass
class TimelineCreateRequest:
Expand All @@ -97,6 +108,18 @@ def to_json(self) -> str:
return json.dumps(self, cls=EnhancedJSONEncoder)


@dataclass
class TimelineMembershipSwitchResponse:
previous_conf: Configuration
current_conf: Configuration

@classmethod
def from_json(cls, d: dict[str, Any]) -> TimelineMembershipSwitchResponse:
previous_conf = Configuration.from_json(d["previous_conf"])
current_conf = Configuration.from_json(d["current_conf"])
return TimelineMembershipSwitchResponse(previous_conf, current_conf)


class SafekeeperHttpClient(requests.Session, MetricsGetter):
HTTPError = requests.HTTPError

Expand Down Expand Up @@ -171,6 +194,7 @@ def timeline_status(
resj = res.json()
walreceivers = [Walreceiver(wr["conn_id"], wr["status"]) for wr in resj["walreceivers"]]
return SafekeeperTimelineStatus(
mconf=Configuration.from_json(resj["mconf"]),
term=resj["acceptor_state"]["term"],
last_log_term=resj["acceptor_state"]["epoch"],
pg_version=resj["pg_info"]["pg_version"],
Expand All @@ -196,6 +220,10 @@ def timeline_start_lsn_non_zero() -> Lsn:
def get_commit_lsn(self, tenant_id: TenantId, timeline_id: TimelineId) -> Lsn:
return self.timeline_status(tenant_id, timeline_id).commit_lsn

# Get timeline membership configuration.
def get_membership(self, tenant_id: TenantId, timeline_id: TimelineId) -> Configuration:
return self.timeline_status(tenant_id, timeline_id).mconf

# only_local doesn't remove segments in the remote storage.
def timeline_delete(
self, tenant_id: TenantId, timeline_id: TimelineId, only_local: bool = False
Expand Down Expand Up @@ -242,6 +270,16 @@ def pull_timeline(self, body: dict[str, Any]) -> dict[str, Any]:
assert isinstance(res_json, dict)
return res_json

def membership_switch(
self, tenant_id: TenantId, timeline_id: TimelineId, to: Configuration
) -> TimelineMembershipSwitchResponse:
res = self.post(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/membership",
data=to.to_json(),
)
res.raise_for_status()
return TimelineMembershipSwitchResponse.from_json(res.json())

def copy_timeline(self, tenant_id: TenantId, timeline_id: TimelineId, body: dict[str, Any]):
res = self.post(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/copy",
Expand Down
66 changes: 65 additions & 1 deletion test_runner/regress/test_wal_acceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,12 @@
default_remote_storage,
s3_storage,
)
from fixtures.safekeeper.http import Configuration, SafekeeperHttpClient, TimelineCreateRequest
from fixtures.safekeeper.http import (
Configuration,
SafekeeperHttpClient,
SafekeeperId,
TimelineCreateRequest,
)
from fixtures.safekeeper.utils import wait_walreceivers_absent
from fixtures.utils import (
PropagatingThread,
Expand Down Expand Up @@ -2243,6 +2248,65 @@ def unevicted_on_dest():
wait_until(unevicted_on_dest, interval=0.1, timeout=1.0)


# Basic test for http API membership related calls: create timeline and switch
# configuration. Normally these are called by storage controller, but this
# allows to test them separately.
@run_only_on_default_postgres("tests only safekeeper API")
def test_membership_api(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 1
env = neon_env_builder.init_start()

tenant_id = env.initial_tenant
timeline_id = env.initial_timeline

sk = env.safekeepers[0]
http_cli = sk.http_client()

sk_id_1 = SafekeeperId(env.safekeepers[0].id, "localhost", sk.port.pg_tenant_only)
sk_id_2 = SafekeeperId(11, "localhost", 5434) # just a mock

# Request to switch before timeline creation should fail.
init_conf = Configuration(generation=1, members=[sk_id_1], new_members=None)
with pytest.raises(requests.exceptions.HTTPError):
http_cli.membership_switch(tenant_id, timeline_id, init_conf)

# Create timeline.
create_r = TimelineCreateRequest(
tenant_id, timeline_id, init_conf, 150002, Lsn("0/1000000"), commit_lsn=None
)
log.info(f"sending {create_r.to_json()}")
http_cli.timeline_create(create_r)

# Switch into some conf.
joint_conf = Configuration(generation=4, members=[sk_id_1], new_members=[sk_id_2])
resp = http_cli.membership_switch(tenant_id, timeline_id, joint_conf)
log.info(f"joint switch resp: {resp}")
assert resp.previous_conf.generation == 1
assert resp.current_conf.generation == 4

# Restart sk, conf should be preserved.
sk.stop().start()
after_restart = http_cli.get_membership(tenant_id, timeline_id)
log.info(f"conf after restart: {after_restart}")
assert after_restart.generation == 4

# Switch into non join conf.
non_joint = Configuration(generation=5, members=[sk_id_2], new_members=None)
resp = http_cli.membership_switch(tenant_id, timeline_id, non_joint)
log.info(f"non joint switch resp: {resp}")
assert resp.previous_conf.generation == 4
assert resp.current_conf.generation == 5

# Switch request to lower conf should be ignored.
lower_conf = Configuration(generation=3, members=[], new_members=None)
resp = http_cli.membership_switch(tenant_id, timeline_id, lower_conf)
log.info(f"lower switch resp: {resp}")
assert resp.previous_conf.generation == 5
assert resp.current_conf.generation == 5

# endpoint = env.endpoints.create_start("main")


# In this test we check for excessive START_REPLICATION and START_WAL_PUSH queries
# when compute is active, but there are no writes to the timeline. In that case
# pageserver should maintain a single connection to safekeeper and don't attempt
Expand Down

0 comments on commit 060a731

Please sign in to comment.