Skip to content

Commit

Permalink
reticulating splines
Browse files Browse the repository at this point in the history
  • Loading branch information
rcgoodfellow committed Jan 10, 2024
1 parent b591daf commit 295107c
Show file tree
Hide file tree
Showing 13 changed files with 176 additions and 35 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions ddm-admin-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ progenitor::generate_api!(
spec = "../openapi/ddm-admin.json",
inner_type = slog::Logger,
pre_hook = (|log: &slog::Logger, request: &reqwest::Request| {
slog::debug!(log, "client request";
slog::trace!(log, "client request";
"method" => %request.method(),
"uri" => %request.url(),
"body" => ?&request.body(),
);
}),
post_hook = (|log: &slog::Logger, result: &Result<_, _>| {
slog::debug!(log, "client response"; "result" => ?result);
slog::trace!(log, "client response"; "result" => ?result);
}),
replace = {
IpPrefix = ddm::db::IpPrefix,
Expand Down
4 changes: 2 additions & 2 deletions mg-admin-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ progenitor::generate_api!(
spec = "../openapi/mg-admin.json",
inner_type = slog::Logger,
pre_hook = (|log: &slog::Logger, request: &reqwest::Request| {
slog::debug!(log, "client request";
slog::trace!(log, "client request";
"method" => %request.method(),
"uri" => %request.url(),
"body" => ?&request.body(),
);
}),
post_hook = (|log: &slog::Logger, result: &Result<_, _>| {
slog::debug!(log, "client response"; "result" => ?result);
slog::trace!(log, "client response"; "result" => ?result);
}),
replace = {
Prefix4 = rdb::Prefix4,
Expand Down
36 changes: 29 additions & 7 deletions mg-lower/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub fn run(
let dpd = new_dpd_client(&log);
let ddm = new_ddm_client(&log);
let mut generation =
match initialize(tep, &db, &log, &dpd, &ddm, rt.clone()) {
match full_sync(tep, &db, &log, &dpd, &ddm, rt.clone()) {
Ok(gen) => gen,
Err(e) => {
error!(log, "initializing failed: {e}");
Expand All @@ -68,7 +68,7 @@ pub fn run(

// handle any changes that occur
loop {
match rx.recv() {
match rx.recv_timeout(Duration::from_secs(1)) {
Ok(change) => {
generation = match handle_change(
tep,
Expand All @@ -88,17 +88,32 @@ pub fn run(
}
}
}
Err(e) => {
error!(log, "mg-lower watch rx: {e}");
// if we've not received updates in the timeout interval, to a
// full sync in case something has changed out from under us.
Err(_) => {
generation =
match full_sync(tep, &db, &log, &dpd, &ddm, rt.clone())
{
Ok(gen) => gen,
Err(e) => {
error!(log, "initializing failed: {e}");
info!(
log,
"restarting sync loop in one second"
);
sleep(Duration::from_secs(1));
continue;
}
}
}
}
}
}
}

/// Initialize the underlying platform with a complete set of routes from the
/// Synchronize the underlying platforms with a complete set of routes from the
/// RIB.
fn initialize(
fn full_sync(
tep: Ipv6Addr, // tunnel endpoint address
db: &Db,
log: &Logger,
Expand Down Expand Up @@ -160,8 +175,15 @@ fn handle_change(
generation: u64,
rt: Arc<tokio::runtime::Handle>,
) -> Result<u64, Error> {
info!(
log,
"mg-lower: handling rib change generation {} -> {}",
generation,
change.generation
);

if change.generation > generation + 1 {
return initialize(tep, db, log, dpd, ddm, rt.clone());
return full_sync(tep, db, log, dpd, ddm, rt.clone());
}
let to_add: Vec<rdb::Route4ImportKey> =
change.import.added.clone().into_iter().collect();
Expand Down
1 change: 1 addition & 0 deletions mgadm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ tabwriter.workspace = true
colored.workspace = true
humantime.workspace = true
serde_json.workspace = true
thiserror.workspace = true
9 changes: 9 additions & 0 deletions mgadm/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use slog::Logger;
use std::net::{IpAddr, SocketAddr};

mod bgp;
mod static_routing;

#[derive(Parser, Debug)]
#[command(version, about, long_about = None, styles = oxide_cli_style())]
Expand All @@ -29,8 +30,13 @@ struct Cli {

#[derive(Subcommand, Debug)]
enum Commands {
/// BGP management commands.
#[command(subcommand)]
Bgp(bgp::Commands),

/// Static routing management commands.
#[command(subcommand)]
Static(static_routing::Commands),
}

#[tokio::main]
Expand All @@ -45,6 +51,9 @@ async fn main() -> Result<()> {

match cli.command {
Commands::Bgp(command) => bgp::commands(command, client).await?,
Commands::Static(command) => {
static_routing::commands(command, client).await?
}
}
Ok(())
}
Expand Down
97 changes: 97 additions & 0 deletions mgadm/src/static_routing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

use anyhow::Result;
use clap::{Args, Subcommand};
use mg_admin_client::types;
use mg_admin_client::Client;
use rdb::Prefix4;
use std::net::{AddrParseError, Ipv4Addr};
use std::num::ParseIntError;
use thiserror::Error;

#[derive(Subcommand, Debug)]
pub enum Commands {
GetV4Routes,
AddV4Route(StaticRoute4),
RemoveV4Routes(StaticRoute4),
}

#[derive(Debug, Error)]
pub enum Ipv4PrefixParseError {
#[error("expected CIDR representation <addr>/<mask>")]
Cidr,

#[error("address parse error: {0}")]
Addr(#[from] AddrParseError),

#[error("mask parse error: {0}")]
Mask(#[from] ParseIntError),
}

#[derive(Debug, Args)]
pub struct StaticRoute4 {
pub destination: Ipv4Prefix,
pub nexthop: Ipv4Addr,
}

#[derive(Debug, Clone, Copy)]
pub struct Ipv4Prefix {
pub addr: Ipv4Addr,
pub len: u8,
}

impl std::str::FromStr for Ipv4Prefix {
type Err = Ipv4PrefixParseError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
let parts: Vec<&str> = s.split('/').collect();
if parts.len() < 2 {
return Err(Ipv4PrefixParseError::Cidr);
}

Ok(Ipv4Prefix {
addr: Ipv4Addr::from_str(parts[0])?,
len: u8::from_str(parts[1])?,
})
}
}

pub async fn commands(command: Commands, client: Client) -> Result<()> {
match command {
Commands::GetV4Routes => {
let routes = client.static_list_v4_routes().await?;
println!("{:#?}", routes);
}
Commands::AddV4Route(route) => {
let arg = types::AddStaticRoute4Request {
routes: types::StaticRoute4List {
list: vec![types::StaticRoute4 {
prefix: Prefix4 {
value: route.destination.addr,
length: route.destination.len,
},
nexthop: route.nexthop,
}],
},
};
client.static_add_v4_route(&arg).await?;
}
Commands::RemoveV4Routes(route) => {
let arg = types::AddStaticRoute4Request {
routes: types::StaticRoute4List {
list: vec![types::StaticRoute4 {
prefix: Prefix4 {
value: route.destination.addr,
length: route.destination.len,
},
nexthop: route.nexthop,
}],
},
};
client.static_add_v4_route(&arg).await?;
}
}
Ok(())
}
1 change: 0 additions & 1 deletion mgd/src/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ pub fn start_server(
context: Arc<HandlerContext>,
) -> Result<JoinHandle<()>, String> {
let sa = SocketAddr::new(addr, port);

let ds_config = ConfigDropshot {
bind_address: sa,
..Default::default()
Expand Down
10 changes: 0 additions & 10 deletions mgd/src/bgp_admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,16 +307,6 @@ pub(crate) fn add_router(

router.run();

#[cfg(feature = "default")]
{
let rt = Arc::new(tokio::runtime::Handle::current());
let log = ctx.log.clone();
let db = db.clone();
std::thread::spawn(move || {
mg_lower::run(ctx.tep, db, log, rt);
});
}

routers.insert(rq.asn, router);
db.add_bgp_router(
rq.asn,
Expand Down
11 changes: 11 additions & 0 deletions mgd/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,17 @@ async fn run(args: RunArgs) {
db: db.clone(),
});

#[cfg(feature = "default")]
{
let rt = Arc::new(tokio::runtime::Handle::current());
let ctx = context.clone();
let log = log.clone();
let db = ctx.db.clone();
std::thread::spawn(move || {
mg_lower::run(ctx.tep, db, log, rt);
});
}

start_bgp_routers(
context.clone(),
db.get_bgp_routers()
Expand Down
2 changes: 1 addition & 1 deletion mgd/src/static_admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl From<StaticRoute4> for Route4ImportKey {
prefix: val.prefix,
nexthop: val.nexthop,
id: 0,
priority: 0,
priority: 100,
}
}
}
Expand Down
29 changes: 20 additions & 9 deletions rdb/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
use crate::error::Error;
use crate::types::*;
use mg_common::{lock, read_lock, write_lock};
use slog::{error, Logger};
use slog::{error, info, Logger};
use std::collections::{HashMap, HashSet};
use std::net::IpAddr;
use std::sync::atomic::{AtomicU64, Ordering};
Expand Down Expand Up @@ -271,8 +271,18 @@ impl Db {
lock!(self.imported).replace(r);
let after = self.effective_set_for_prefix4(r.prefix);

if let Some(change_set) = self.import_route_change_set(before, after) {
if let Some(change_set) = self.import_route_change_set(&before, &after)
{
info!(
self.log,
"sending notification for change set {:#?}", change_set,
);
self.notify(change_set);
} else {
info!(
self.log,
"no effective change for {:#?} -> {:#?}", before, after
);
}
}

Expand All @@ -281,7 +291,8 @@ impl Db {
lock!(self.imported).remove(&r);
let after = self.effective_set_for_prefix4(r.prefix);

if let Some(change_set) = self.import_route_change_set(before, after) {
if let Some(change_set) = self.import_route_change_set(&before, &after)
{
self.notify(change_set);
}
}
Expand Down Expand Up @@ -342,21 +353,21 @@ impl Db {
/// bumping the RIB generation number if there are changes.
fn import_route_change_set(
&self,
before: HashSet<Route4ImportKey>,
after: HashSet<Route4ImportKey>,
before: &HashSet<Route4ImportKey>,
after: &HashSet<Route4ImportKey>,
) -> Option<ChangeSet> {
let added: HashSet<Route4ImportKey> =
after.difference(&before).copied().collect();
after.difference(before).copied().collect();

let removed: HashSet<Route4ImportKey> =
before.difference(&after).copied().collect();

let gen = self.generation.fetch_add(1, Ordering::SeqCst);
before.difference(after).copied().collect();

if added.is_empty() && removed.is_empty() {
return None;
}

let gen = self.generation.fetch_add(1, Ordering::SeqCst);

Some(ChangeSet::from_import(
ImportChangeSet { added, removed },
gen,
Expand Down
6 changes: 3 additions & 3 deletions rdb/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ pub struct Policy {
pub priority: u16,
}

#[derive(Clone, Default)]
#[derive(Clone, Default, Debug)]
pub struct ImportChangeSet {
pub added: HashSet<Route4ImportKey>,
pub removed: HashSet<Route4ImportKey>,
Expand All @@ -274,7 +274,7 @@ impl ImportChangeSet {
}
}

#[derive(Clone, Default)]
#[derive(Clone, Default, Debug)]
pub struct OriginChangeSet {
pub added: HashSet<Prefix4>,
pub removed: HashSet<Prefix4>,
Expand All @@ -295,7 +295,7 @@ impl OriginChangeSet {
}
}

#[derive(Clone, Default)]
#[derive(Clone, Default, Debug)]
pub struct ChangeSet {
pub generation: u64,
pub import: ImportChangeSet,
Expand Down

0 comments on commit 295107c

Please sign in to comment.