Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor parts of Plane into a plane-common crate (DIS-2967) #849

Merged
merged 10 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
383 changes: 323 additions & 60 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
[workspace]
resolver = "2"
members = [
"common",
"dynamic-proxy",
"plane/plane-tests",
"plane/plane-dynamic",
"plane",
"plane/plane-tests/plane-test-macro",
]
Expand Down
29 changes: 29 additions & 0 deletions common/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
[package]
name = "plane-common"
version = "0.5.1"
edition = "2021"

[dependencies]
axum = { version = "0.7.7", features = ["ws"] }
bollard = "0.17.0"
chrono = { version = "0.4.31", features = ["serde"] }
clap = "4.4.10"
data-encoding = "2.4.0"
futures-util = "0.3.29"
rand = "0.8.5"
reqwest = { version = "0.12.8", features = ["json"] }
serde = "1.0.109"
serde_json = "1.0.107"
serde_with = "3.4.0"
thiserror = "1.0.50"
tokio = { version = "1.33.0", features = ["sync"] }
tokio-tungstenite = "0.24.0"
tracing = "0.1.40"
tungstenite = "0.24.0"
url = "2.4.1"
valuable = { version = "0.1.0", features = ["derive"] }

[dev-dependencies]
anyhow = "1.0.93"
async-stream = "0.3.6"
axum = "0.7.9"
File renamed without changes.
68 changes: 68 additions & 0 deletions common/src/exponential_backoff.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use std::time::{Duration, SystemTime};

pub struct ExponentialBackoff {
initial_duration_millis: u128,
max_duration: Duration,
defer_duration: Duration,
multiplier: f64,
step: i32,
deferred_reset: Option<SystemTime>,
}

impl ExponentialBackoff {
pub fn new(
initial_duration: Duration,
max_duration: Duration,
multiplier: f64,
defer_duration: Duration,
) -> Self {
let initial_duration_millis = initial_duration.as_millis();

Self {
initial_duration_millis,
max_duration,
multiplier,
step: 0,
defer_duration,
deferred_reset: None,
}
}

/// Reset the backoff, but only if `wait` is not called again for at least `defer_duration`.
pub fn defer_reset(&mut self) {
self.deferred_reset = Some(SystemTime::now() + self.defer_duration);
}

pub async fn wait(&mut self) {
if let Some(deferred_reset) = self.deferred_reset {
self.deferred_reset = None;
if SystemTime::now() > deferred_reset {
self.reset();
return;
}
}

let duration = self.initial_duration_millis as f64 * self.multiplier.powi(self.step);
let duration = Duration::from_millis(duration as u64);
let duration = duration.min(self.max_duration);
tokio::time::sleep(duration).await;

self.step += 1;
}

pub fn reset(&mut self) {
self.deferred_reset = None;
self.step = 0;
}
}

impl Default for ExponentialBackoff {
fn default() -> Self {
Self::new(
Duration::from_secs(1),
Duration::from_secs(60),
1.1,
Duration::from_secs(60),
)
}
}
12 changes: 11 additions & 1 deletion plane/src/client/mod.rs → common/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use self::controller_address::AuthorizedAddress;
use crate::{
controller::{error::ApiError, StatusResponse},
names::{BackendName, DroneName},
protocol::{MessageFromDns, MessageFromDrone, MessageFromProxy},
typed_socket::client::TypedSocketConnector,
Expand All @@ -9,11 +8,22 @@ use crate::{
ConnectResponse, DrainResult, DronePoolName, RevokeRequest,
},
};
use protocol::{ApiError, StatusResponse};
use reqwest::{Response, StatusCode};
use serde::de::DeserializeOwned;
use url::{form_urlencoded, Url};

pub mod controller_address;
pub mod exponential_backoff;
pub mod log_types;
pub mod names;
pub mod protocol;
pub mod serialization;
pub mod sse;
pub mod typed_socket;
pub mod types;
pub mod util;
pub mod version;

#[derive(thiserror::Error, Debug)]
pub enum PlaneClientError {
Expand Down
11 changes: 1 addition & 10 deletions plane/src/log_types.rs → common/src/log_types.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::{net::SocketAddr, time::SystemTime};
use time::OffsetDateTime;
use std::net::SocketAddr;
use valuable::{Tuplable, TupleDef, Valuable, Value, Visit};

// See: https://github.com/tokio-rs/valuable/issues/86#issuecomment-1760446976
Expand All @@ -27,14 +26,6 @@ impl Tuplable for LoggableTime {
}
}

impl From<OffsetDateTime> for LoggableTime {
fn from(offset: OffsetDateTime) -> Self {
let t: SystemTime = offset.into();
let dt: DateTime<Utc> = t.into();
Self(dt)
}
}

#[derive(Clone, Copy, Serialize, Deserialize, Debug, PartialEq, Eq, PartialOrd)]
pub struct BackendAddr(pub SocketAddr);

Expand Down
24 changes: 13 additions & 11 deletions plane/src/names.rs → common/src/names.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{drone::runtime::docker::types::ContainerId, types::NodeKind};
use crate::types::NodeKind;
use clap::error::ErrorKind;
use serde::{Deserialize, Serialize};
use std::fmt::{Debug, Display};
Expand All @@ -17,8 +17,9 @@ pub enum NameError {
InvalidCharacter(char, usize),

#[error(
"too long ({0} characters; max is {} including prefix)",
MAX_NAME_LENGTH
"too long ({length} characters; max is {max} including prefix)",
length = "{0}",
max = MAX_NAME_LENGTH
)]
TooLong(usize),
}
Expand Down Expand Up @@ -163,17 +164,18 @@ entity_name!(DroneName, Some("dr"));
entity_name!(AcmeDnsServerName, Some("ns"));
entity_name!(BackendActionName, Some("ak"));

impl TryFrom<ContainerId> for BackendName {
type Error = NameError;

fn try_from(value: ContainerId) -> Result<Self, Self::Error> {
value
.as_str()
impl BackendName {
pub fn from_container_id(container_id: String) -> Result<Self, NameError> {
container_id
.strip_prefix("plane-")
.ok_or_else(|| NameError::InvalidPrefix(value.to_string(), "plane-".to_string()))?
.ok_or_else(|| NameError::InvalidPrefix(container_id.clone(), "plane-".to_string()))?
.to_string()
.try_into()
}

pub fn to_container_id(&self) -> String {
format!("plane-{}", self)
}
}

pub trait NodeName: Name {
Expand Down Expand Up @@ -294,7 +296,7 @@ mod tests {

#[test]
fn test_backend_name_from_invalid_container_id() {
let container_id = ContainerId::from("invalid-123".to_string());
let container_id = "invalid-123".to_string();
assert_eq!(
Err(NameError::InvalidPrefix(
"invalid-123".to_string(),
Expand Down
71 changes: 69 additions & 2 deletions plane/src/protocol.rs → common/src/protocol.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,45 @@
use std::fmt::Display;

use crate::{
database::backend::{BackendActionMessage, BackendMetricsMessage},
log_types::{BackendAddr, LoggableTime},
names::{BackendActionName, BackendName},
typed_socket::ChannelMessage,
types::{
backend_state::TerminationReason, BackendState, BearerToken, ClusterName, KeyConfig,
SecretToken, Subdomain, TerminationKind,
NodeId, SecretToken, Subdomain, TerminationKind,
},
};
use serde::{Deserialize, Serialize};
use serde_json::Value;

#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
pub enum ApiErrorKind {
FailedToAcquireKey,
KeyUnheldNoSpawnConfig,
KeyHeldUnhealthy,
KeyHeld,
NoDroneAvailable,
FailedToRemoveKey,
DatabaseError,
NoClusterProvided,
NotFound,
InvalidClusterName,
Other,
}

#[derive(thiserror::Error, Debug, Serialize, Deserialize)]
pub struct ApiError {
pub id: String,
pub kind: ApiErrorKind,
pub message: String,
}

impl Display for ApiError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}

#[derive(Serialize, Deserialize, Debug, Clone, valuable::Valuable, PartialEq)]
pub struct KeyDeadlines {
/// When the key should be renewed.
Expand Down Expand Up @@ -132,6 +161,29 @@ pub enum MessageFromDrone {
RenewKey(RenewKeyRequest),
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct BackendMetricsMessage {
pub backend_id: BackendName,
/// Memory used by backend excluding inactive file cache, same as use shown by docker stats
/// ref: https://github.com/docker/cli/blob/master/cli/command/container/stats_helpers.go#L227C45-L227C45
pub mem_used: u64,
/// Memory used by backend in bytes
/// (calculated using kernel memory used by cgroup + page cache memory used by cgroup)
pub mem_total: u64,
/// Active memory (non reclaimable)
pub mem_active: u64,
/// Inactive memory (reclaimable)
pub mem_inactive: u64,
/// Unevictable memory (mlock etc)
pub mem_unevictable: u64,
/// The backend's memory limit
pub mem_limit: u64,
/// Nanoseconds of CPU used by backend since last message
pub cpu_used: u64,
/// Total CPU nanoseconds for system since last message
pub sys_cpu: u64,
}

impl ChannelMessage for MessageFromDrone {
type Reply = MessageToDrone;
}
Expand All @@ -146,6 +198,14 @@ pub struct RenewKeyResponse {
pub deadlines: Option<KeyDeadlines>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BackendActionMessage {
pub action_id: BackendActionName,
pub backend_id: BackendName,
pub drone_id: NodeId,
pub action: BackendAction,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum MessageToDrone {
Action(BackendActionMessage),
Expand Down Expand Up @@ -263,3 +323,10 @@ pub enum MessageToDns {
impl ChannelMessage for MessageToDns {
type Reply = MessageFromDns;
}

#[derive(Serialize, Deserialize, Debug)]
pub struct StatusResponse {
pub status: String,
pub version: String,
pub hash: String,
}
File renamed without changes.
2 changes: 1 addition & 1 deletion plane/src/client/sse.rs → common/src/sse.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::PlaneClientError;
use crate::util::ExponentialBackoff;
use crate::exponential_backoff::ExponentialBackoff;
use reqwest::{
header::{HeaderValue, ACCEPT, CONNECTION},
Client, Response,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use super::{ChannelMessage, Handshake, SocketAction, TypedSocket};
use crate::client::controller_address::AuthorizedAddress;
use crate::client::PlaneClientError;
use crate::controller_address::AuthorizedAddress;
use crate::exponential_backoff::ExponentialBackoff;
use crate::names::NodeName;
use crate::{plane_version_info, util::ExponentialBackoff};
use crate::version::plane_version_info;
use crate::PlaneClientError;
use futures_util::{SinkExt, StreamExt};
use std::marker::PhantomData;
use tokio::net::TcpStream;
Expand Down Expand Up @@ -191,7 +192,7 @@ async fn new_client<T: ChannelMessage>(

#[cfg(test)]
mod test {
use crate::client::controller_address::AuthorizedAddress;
use crate::controller_address::AuthorizedAddress;

#[test]
fn test_url_no_token() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::client::PlaneClientError;
use crate::PlaneVersionInfo;
use crate::version::PlaneVersionInfo;
use crate::PlaneClientError;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
Expand Down
Loading
Loading