Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow failover for replicas #857

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CONFIG.md
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,15 @@ If the client doesn't specify, PgCat routes traffic to this role by default.
`replica` round-robin between replicas only without touching the primary,
`primary` all queries go to the primary unless otherwise specified.

### replica_to_primary_failover_enabled
```
path: pools.<pool_name>.replica_to_primary_failover_enabled
default: "false"
```

If set to true, when the specified role is `replica` (either by setting `default_role` or manually)
and all replicas are banned, queries will be sent to the primary (until a replica is back online).

### prepared_statements_cache_size
```
path: general.prepared_statements_cache_size
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ The setting will persist until it's changed again or the client disconnects.
By default, all queries are routed to the first available server; `default_role` setting controls this behavior.

### Failover
All servers are checked with a `;` (very fast) query before being given to a client. Additionally, the server health is monitored with every client query that it processes. If the server is not reachable, it will be banned and cannot serve any more transactions for the duration of the ban. The queries are routed to the remaining servers. If all servers become banned, the ban list is cleared: this is a safety precaution against false positives. The primary can never be banned.
All servers are checked with a `;` (very fast) query before being given to a client. Additionally, the server health is monitored with every client query that it processes. If the server is not reachable, it will be banned and cannot serve any more transactions for the duration of the ban. The queries are routed to the remaining servers. If `replica_to_primary_failover_enabled` is set to true and all replicas become banned, the query will be routed to the primary. If `replica_to_primary_failover_enabled` is false and all servers (replicas) become banned, the ban list is cleared: this is a safety precaution against false positives. The primary can never be banned.

The ban time can be changed with `ban_time`. The default is 60 seconds.

Expand Down
4 changes: 4 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,9 @@ pub struct Pool {
#[serde(default = "Pool::default_default_role")]
pub default_role: String,

#[serde(default)] // False
pub replica_to_primary_failover_enabled: bool,

#[serde(default)] // False
pub query_parser_enabled: bool,

Expand Down Expand Up @@ -734,6 +737,7 @@ impl Default for Pool {
pool_mode: Self::default_pool_mode(),
load_balancing_mode: Self::default_load_balancing_mode(),
default_role: String::from("any"),
replica_to_primary_failover_enabled: false,
query_parser_enabled: false,
query_parser_max_length: None,
query_parser_read_write_splitting: false,
Expand Down
51 changes: 37 additions & 14 deletions src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@ pub struct PoolSettings {
// Default server role to connect to.
pub default_role: Option<Role>,

// Whether or not we should use primary when replicas are unavailable
pub replica_to_primary_failover_enabled: bool,

// Enable/disable query parser.
pub query_parser_enabled: bool,

Expand Down Expand Up @@ -219,6 +222,7 @@ impl Default for PoolSettings {
user: User::default(),
db: String::default(),
default_role: None,
replica_to_primary_failover_enabled: false,
query_parser_enabled: false,
query_parser_max_length: None,
query_parser_read_write_splitting: false,
Expand Down Expand Up @@ -531,6 +535,8 @@ impl ConnectionPool {
"primary" => Some(Role::Primary),
_ => unreachable!(),
},
replica_to_primary_failover_enabled: pool_config
.replica_to_primary_failover_enabled,
query_parser_enabled: pool_config.query_parser_enabled,
query_parser_max_length: pool_config.query_parser_max_length,
query_parser_read_write_splitting: pool_config
Expand Down Expand Up @@ -731,6 +737,19 @@ impl ConnectionPool {
});
}

// If the role is replica and we allow sending traffic to primary when replicas are unavailble,
// we add primary address at the end of the list of candidates, this way it will be tried when
// replicas are all unavailable.
if role == Role::Replica && self.settings.replica_to_primary_failover_enabled {
let mut primaries = self
.addresses
.iter()
.flatten()
.filter(|address| address.role == Role::Primary)
.collect::<Vec<&Address>>();
candidates.insert(0, primaries.pop().unwrap());
}

// Indicate we're waiting on a server connection from a pool.
let now = Instant::now();
client_stats.waiting();
Expand Down Expand Up @@ -935,24 +954,28 @@ impl ConnectionPool {
return true;
}

// Check if all replicas are banned, in that case unban all of them
let replicas_available = self.addresses[address.shard]
.iter()
.filter(|addr| addr.role == Role::Replica)
.count();
// If we have replica to primary failover we should not unban replicas
// as we still have the primary to server traffic.
if !self.settings.replica_to_primary_failover_enabled {
// Check if all replicas are banned, in that case unban all of them
let replicas_available = self.addresses[address.shard]
.iter()
.filter(|addr| addr.role == Role::Replica)
.count();

debug!("Available targets: {}", replicas_available);
debug!("Available targets: {}", replicas_available);

let read_guard = self.banlist.read();
let all_replicas_banned = read_guard[address.shard].len() == replicas_available;
drop(read_guard);
let read_guard = self.banlist.read();
let all_replicas_banned = read_guard[address.shard].len() == replicas_available;
drop(read_guard);

if all_replicas_banned {
let mut write_guard = self.banlist.write();
warn!("Unbanning all replicas.");
write_guard[address.shard].clear();
if all_replicas_banned {
let mut write_guard = self.banlist.write();
warn!("Unbanning all replicas.");
write_guard[address.shard].clear();

return true;
return true;
}
}

// Check if ban time is expired
Expand Down
2 changes: 2 additions & 0 deletions src/query_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1443,6 +1443,7 @@ mod test {
load_balancing_mode: crate::config::LoadBalancingMode::Random,
shards: 2,
user: crate::config::User::default(),
replica_to_primary_failover_enabled: false,
default_role: Some(Role::Replica),
query_parser_enabled: true,
query_parser_max_length: None,
Expand Down Expand Up @@ -1522,6 +1523,7 @@ mod test {
shards: 5,
user: crate::config::User::default(),
default_role: Some(Role::Replica),
replica_to_primary_failover_enabled: false,
query_parser_enabled: true,
query_parser_max_length: None,
query_parser_read_write_splitting: true,
Expand Down
Loading