diff --git a/Cargo.lock b/Cargo.lock index e43e2756..40095eb6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -110,7 +110,7 @@ checksum = "080e9890a082662b09c1ad45f567faeeb47f22b5fb23895fbe1e651e718e25ca" [[package]] name = "api_identity" version = "0.1.0" -source = "git+https://github.com/oxidecomputer/omicron?branch=main#205382f7ee151f09a5c6c11ed4ae73b14f0d64b3" +source = "git+https://github.com/oxidecomputer/omicron?branch=main#030adce411fe37c9e2d3c70ee5a6cdbdfd49f3f9" dependencies = [ "omicron-workspace-hack", "proc-macro2", @@ -355,9 +355,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.31" +version = "0.4.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38" +checksum = "41daef31d7a747c5c847246f36de49ced6f7403b4cdabc807a97b5cc184cda7a" dependencies = [ "android-tzdata", "iana-time-zone", @@ -365,7 +365,7 @@ dependencies = [ "num-traits", "serde", "wasm-bindgen", - "windows-targets 0.48.5", + "windows-targets 0.52.0", ] [[package]] @@ -435,6 +435,12 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "702fc72eb24e5a1e48ce58027a675bc24edd52096d5397d4aea7c6dd9eca0bd1" +[[package]] +name = "cobs" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67ba02a97a2bd10f4b59b25c7973101c79642302776489e030cd13cdab09ed15" + [[package]] name = "colorchoice" version = "1.0.0" @@ -454,7 +460,7 @@ dependencies = [ [[package]] name = "common" version = "0.1.0" -source = "git+https://github.com/oxidecomputer/dendrite?branch=main#3aad7391aa81e377c203b6275dfd14e24fb940c4" +source = "git+https://github.com/oxidecomputer/dendrite?branch=main#e71baaa05d41db3eac6a0e8d7aa4a08b04f33ad6" dependencies = [ "anyhow", "chrono", @@ -617,7 +623,9 @@ dependencies = [ "hostname", "hyper", "ispf", - "libnet", + "libnet 0.1.0 (git+https://github.com/oxidecomputer/netadm-sys?branch=main)", + "opte-ioctl", + "oxide-vpc", "schemars", "serde", "serde_json", @@ -632,6 +640,7 @@ dependencies = [ name = "ddm-admin-client" version = "0.1.0" dependencies = [ + "ddm", "percent-encoding", "progenitor", "reqwest", @@ -668,7 +677,7 @@ dependencies = [ "ddm", "dpd-client", "hostname", - "libnet", + "libnet 0.1.0 (git+https://github.com/oxidecomputer/netadm-sys?branch=main)", "slog", "slog-async", "slog-bunyan", @@ -681,6 +690,38 @@ version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ffe7ed1d93f4553003e20b629abe9085e1e81b1429520f897f8f8860bc6dfc21" +[[package]] +name = "defmt" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8a2d011b2fee29fb7d659b83c43fce9a2cb4df453e16d441a51448e448f3f98" +dependencies = [ + "bitflags 1.3.2", + "defmt-macros", +] + +[[package]] +name = "defmt-macros" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54f0216f6c5acb5ae1a47050a6645024e6edafc2ee32d421955eccfef12ef92e" +dependencies = [ + "defmt-parser", + "proc-macro-error", + "proc-macro2", + "quote", + "syn 2.0.48", +] + +[[package]] +name = "defmt-parser" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "269924c02afd7f94bc4cecbfa5c379f6ffcf9766b3408fe63d22c728654eccd0" +dependencies = [ + "thiserror", +] + [[package]] name = "deranged" version = "0.3.11" @@ -764,7 +805,7 @@ dependencies = [ [[package]] name = "dpd-client" version = "0.1.0" -source = "git+https://github.com/oxidecomputer/dendrite?branch=main#3aad7391aa81e377c203b6275dfd14e24fb940c4" +source = "git+https://github.com/oxidecomputer/dendrite?branch=main#e71baaa05d41db3eac6a0e8d7aa4a08b04f33ad6" dependencies = [ "async-trait", "chrono", @@ -786,7 +827,7 @@ dependencies = [ [[package]] name = "dropshot" version = "0.9.1-dev" -source = "git+https://github.com/oxidecomputer/dropshot?branch=main#0f06446abbe0c89df2ec64cbe64d88133da915b1" +source = "git+https://github.com/oxidecomputer/dropshot?branch=main#29ae98d1f909c6832661408a4c03f929e8afa6e9" dependencies = [ "async-stream", "async-trait", @@ -832,7 +873,7 @@ dependencies = [ [[package]] name = "dropshot_endpoint" version = "0.9.1-dev" -source = "git+https://github.com/oxidecomputer/dropshot?branch=main#0f06446abbe0c89df2ec64cbe64d88133da915b1" +source = "git+https://github.com/oxidecomputer/dropshot?branch=main#29ae98d1f909c6832661408a4c03f929e8afa6e9" dependencies = [ "proc-macro2", "quote", @@ -864,6 +905,12 @@ version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07" +[[package]] +name = "embedded-io" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef1a6892d9eef45c8fa6b9e0086428a2cca8491aca8f787c534a3d6d0bcb3ced" + [[package]] name = "encoding_rs" version = "0.8.33" @@ -1177,6 +1224,15 @@ version = "1.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eabb4a44450da02c90444cf74558da904edde8fb4e9035a9a6a4e15445af0bd7" +[[package]] +name = "hash32" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47d60b12902ba28e2730cd37e95b8c9223af2808df9e902d4df49588d1470606" +dependencies = [ + "byteorder", +] + [[package]] name = "hashbrown" version = "0.12.3" @@ -1198,6 +1254,16 @@ version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" +[[package]] +name = "heapless" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bfb9eb618601c89945a70e254898da93b13be0388091d42117462b265bb3fad" +dependencies = [ + "hash32", + "stable_deref_trait", +] + [[package]] name = "heck" version = "0.3.3" @@ -1398,6 +1464,11 @@ dependencies = [ "unicode-normalization", ] +[[package]] +name = "illumos-sys-hdrs" +version = "0.1.0" +source = "git+https://github.com/oxidecomputer/opte?branch=master#1d29ef60a18179babfb44f0f7a3c2fe71034a2c1" + [[package]] name = "indexmap" version = "1.9.3" @@ -1486,6 +1557,15 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "kstat-macro" +version = "0.1.0" +source = "git+https://github.com/oxidecomputer/opte?branch=master#1d29ef60a18179babfb44f0f7a3c2fe71034a2c1" +dependencies = [ + "quote", + "syn 2.0.48", +] + [[package]] name = "lab" version = "0.1.0" @@ -1524,7 +1604,7 @@ dependencies = [ "colored", "futures", "libc", - "libnet", + "libnet 0.1.0 (git+https://github.com/oxidecomputer/netadm-sys?branch=main)", "portpicker", "propolis-client", "propolis-server-config", @@ -1572,6 +1652,25 @@ dependencies = [ "tracing", ] +[[package]] +name = "libnet" +version = "0.1.0" +source = "git+https://github.com/oxidecomputer/netadm-sys#d44d9e084f39e844f8083d4d9b39a331061ebbcc" +dependencies = [ + "anyhow", + "cfg-if", + "colored", + "dlpi", + "libc", + "num_enum 0.5.11", + "nvpair", + "nvpair-sys", + "rusty-doors", + "socket2 0.4.10", + "thiserror", + "tracing", +] + [[package]] name = "libredox" version = "0.0.1" @@ -1614,6 +1713,12 @@ dependencies = [ "serde", ] +[[package]] +name = "managed" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ca88d725a0a943b096803bd34e73a4437208b6077654cc4ecb2947a5f91618d" + [[package]] name = "match_cfg" version = "0.1.0" @@ -1675,8 +1780,10 @@ version = "0.1.0" dependencies = [ "anyhow", "common", + "ddm-admin-client", "dpd-client", - "libnet", + "http 0.2.11", + "libnet 0.1.0 (git+https://github.com/oxidecomputer/netadm-sys?branch=main)", "rdb", "slog", "thiserror", @@ -1698,7 +1805,7 @@ version = "0.1.0" dependencies = [ "anyhow", "ddm-admin-client", - "libnet", + "libnet 0.1.0 (git+https://github.com/oxidecomputer/netadm-sys?branch=main)", "slog", "slog-async", "slog-envlogger", @@ -1725,6 +1832,7 @@ dependencies = [ "slog-envlogger", "slog-term", "tabwriter", + "thiserror", "tokio", ] @@ -1739,6 +1847,7 @@ dependencies = [ "http 0.2.11", "mg-common", "mg-lower", + "rand", "rdb", "schemars", "serde", @@ -1979,7 +2088,7 @@ dependencies = [ [[package]] name = "omicron-common" version = "0.1.0" -source = "git+https://github.com/oxidecomputer/omicron?branch=main#205382f7ee151f09a5c6c11ed4ae73b14f0d64b3" +source = "git+https://github.com/oxidecomputer/omicron?branch=main#030adce411fe37c9e2d3c70ee5a6cdbdfd49f3f9" dependencies = [ "anyhow", "api_identity", @@ -2119,10 +2228,66 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "opte" +version = "0.1.0" +source = "git+https://github.com/oxidecomputer/opte?branch=master#1d29ef60a18179babfb44f0f7a3c2fe71034a2c1" +dependencies = [ + "cfg-if", + "dyn-clone", + "illumos-sys-hdrs", + "kstat-macro", + "opte-api", + "postcard", + "serde", + "smoltcp", + "version_check", +] + +[[package]] +name = "opte-api" +version = "0.1.0" +source = "git+https://github.com/oxidecomputer/opte?branch=master#1d29ef60a18179babfb44f0f7a3c2fe71034a2c1" +dependencies = [ + "illumos-sys-hdrs", + "ipnetwork", + "postcard", + "serde", + "smoltcp", +] + +[[package]] +name = "opte-ioctl" +version = "0.1.0" +source = "git+https://github.com/oxidecomputer/opte?branch=master#1d29ef60a18179babfb44f0f7a3c2fe71034a2c1" +dependencies = [ + "libc", + "libnet 0.1.0 (git+https://github.com/oxidecomputer/netadm-sys)", + "opte", + "oxide-vpc", + "postcard", + "serde", + "thiserror", +] + +[[package]] +name = "oxide-vpc" +version = "0.1.0" +source = "git+https://github.com/oxidecomputer/opte?branch=master#1d29ef60a18179babfb44f0f7a3c2fe71034a2c1" +dependencies = [ + "cfg-if", + "illumos-sys-hdrs", + "opte", + "poptrie", + "serde", + "smoltcp", + "zerocopy 0.7.32", +] + [[package]] name = "oximeter" version = "0.1.0" -source = "git+https://github.com/oxidecomputer/omicron?branch=main#205382f7ee151f09a5c6c11ed4ae73b14f0d64b3" +source = "git+https://github.com/oxidecomputer/omicron?branch=main#030adce411fe37c9e2d3c70ee5a6cdbdfd49f3f9" dependencies = [ "bytes", "chrono", @@ -2142,7 +2307,7 @@ dependencies = [ [[package]] name = "oximeter-macro-impl" version = "0.1.0" -source = "git+https://github.com/oxidecomputer/omicron?branch=main#205382f7ee151f09a5c6c11ed4ae73b14f0d64b3" +source = "git+https://github.com/oxidecomputer/omicron?branch=main#030adce411fe37c9e2d3c70ee5a6cdbdfd49f3f9" dependencies = [ "omicron-workspace-hack", "proc-macro2", @@ -2317,6 +2482,11 @@ version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2900ede94e305130c13ddd391e0ab7cbaeb783945ae07a279c268cb05109c6cb" +[[package]] +name = "poptrie" +version = "0.1.0" +source = "git+https://github.com/oxidecomputer/poptrie?branch=multipath#ca52bef3f87ff1a67d81b3c6e601dcb5fdbcc165" + [[package]] name = "portpicker" version = "0.1.1" @@ -2326,6 +2496,17 @@ dependencies = [ "rand", ] +[[package]] +name = "postcard" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a55c51ee6c0db07e68448e336cf8ea4131a620edefebf9893e759b2d793420f8" +dependencies = [ + "cobs", + "embedded-io", + "serde", +] + [[package]] name = "postgres-protocol" version = "0.6.6" @@ -3372,6 +3553,20 @@ dependencies = [ "thiserror", ] +[[package]] +name = "smoltcp" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a1a996951e50b5971a2c8c0fa05a381480d70a933064245c4a223ddc87ccc97" +dependencies = [ + "bitflags 1.3.2", + "byteorder", + "cfg-if", + "defmt", + "heapless", + "managed", +] + [[package]] name = "socket2" version = "0.4.10" @@ -3404,6 +3599,12 @@ version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +[[package]] +name = "stable_deref_trait" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" + [[package]] name = "stringprep" version = "0.1.4" @@ -4145,7 +4346,7 @@ name = "util" version = "0.1.0" dependencies = [ "anyhow", - "libnet", + "libnet 0.1.0 (git+https://github.com/oxidecomputer/netadm-sys?branch=main)", "slog", "slog-async", "slog-envlogger", @@ -4561,6 +4762,7 @@ version = "0.7.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "74d4d3961e53fa4c9a25a8637fc2bfaf2595b3d3ae34875568a5cf64787716be" dependencies = [ + "byteorder", "zerocopy-derive 0.7.32", ] @@ -4644,7 +4846,7 @@ version = "0.1.0" source = "git+https://github.com/oxidecomputer/falcon?branch=main#a223368cbb036279b3ddc1c803ce5a0219440204" dependencies = [ "anyhow", - "libnet", + "libnet 0.1.0 (git+https://github.com/oxidecomputer/netadm-sys?branch=main)", "tokio", "zone 0.3.0", ] diff --git a/Cargo.toml b/Cargo.toml index 0a80fc7a..b78274da 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -74,8 +74,17 @@ sled = "0.34" ciborium = "0.2" http = "0.2" humantime = "2.1" +rand = "0.8" mg-common = { path = "mg-common" } +[workspace.dependencies.opte-ioctl] +git = "https://github.com/oxidecomputer/opte" +branch = "master" + +[workspace.dependencies.oxide-vpc] +git = "https://github.com/oxidecomputer/opte" +branch = "master" + [workspace.dependencies.dpd-client] git = "https://github.com/oxidecomputer/dendrite" branch = "main" diff --git a/bgp/src/session.rs b/bgp/src/session.rs index 633c5d78..3c1b7009 100644 --- a/bgp/src/session.rs +++ b/bgp/src/session.rs @@ -1128,7 +1128,9 @@ impl SessionRunner { id, priority, }; - self.db.set_nexthop4(k); + if let Err(e) = self.db.set_nexthop4(k, false) { + err!(self; "failed to set nexthop {k:#?}: {e}"); + } } } diff --git a/ddm-admin-client/Cargo.toml b/ddm-admin-client/Cargo.toml index ced8cb2e..143b0f53 100644 --- a/ddm-admin-client/Cargo.toml +++ b/ddm-admin-client/Cargo.toml @@ -10,3 +10,4 @@ slog = "2.7" percent-encoding.workspace = true reqwest.workspace = true progenitor.workspace = true +ddm = { path = "../ddm" } diff --git a/ddm-admin-client/src/lib.rs b/ddm-admin-client/src/lib.rs index ee7d75ae..7f9fe78d 100644 --- a/ddm-admin-client/src/lib.rs +++ b/ddm-admin-client/src/lib.rs @@ -2,17 +2,28 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. +pub use ddm::db::IpPrefix; +pub use ddm::db::Ipv4Prefix; +pub use ddm::db::Ipv6Prefix; +pub use ddm::db::TunnelOrigin; + progenitor::generate_api!( spec = "../openapi/ddm-admin.json", inner_type = slog::Logger, pre_hook = (|log: &slog::Logger, request: &reqwest::Request| { - slog::debug!(log, "client request"; + slog::trace!(log, "client request"; "method" => %request.method(), "uri" => %request.url(), "body" => ?&request.body(), ); }), post_hook = (|log: &slog::Logger, result: &Result<_, _>| { - slog::debug!(log, "client response"; "result" => ?result); + slog::trace!(log, "client response"; "result" => ?result); }), + replace = { + IpPrefix = ddm::db::IpPrefix, + Ipv4Prefix = ddm::db::Ipv4Prefix, + Ipv6Prefix = ddm::db::Ipv6Prefix, + TunnelOrigin = ddm::db::TunnelOrigin, + } ); diff --git a/ddm/Cargo.toml b/ddm/Cargo.toml index b8806d38..6412d1bb 100644 --- a/ddm/Cargo.toml +++ b/ddm/Cargo.toml @@ -20,3 +20,5 @@ serde_json.workspace = true libnet.workspace = true dpd-client.workspace = true dendrite-common.workspace = true +opte-ioctl.workspace = true +oxide-vpc.workspace = true diff --git a/ddm/src/admin.rs b/ddm/src/admin.rs index d9fb1398..de7c30f6 100644 --- a/ddm/src/admin.rs +++ b/ddm/src/admin.rs @@ -2,9 +2,9 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -use crate::db::{Db, Ipv6Prefix, PeerInfo}; +use crate::db::{Db, Ipv6Prefix, PeerInfo, TunnelOrigin, TunnelRoute}; use crate::exchange::PathVector; -use crate::sm::{AdminEvent, Event}; +use crate::sm::{AdminEvent, Event, PrefixSet}; use dropshot::endpoint; use dropshot::ApiDescription; use dropshot::ConfigDropshot; @@ -99,7 +99,10 @@ async fn expire_peer( let ctx = ctx.context().lock().unwrap(); for e in &ctx.event_channels { - e.send(Event::Admin(AdminEvent::Expire(addr))).unwrap(); //TODO unwrap + e.send(Event::Admin(AdminEvent::Expire(addr))) + .map_err(|e| { + HttpError::for_internal_error(format!("admin event send: {e}")) + })?; } Ok(HttpResponseUpdatedNoContent()) @@ -116,6 +119,15 @@ async fn get_originated( Ok(HttpResponseOk(originated)) } +#[endpoint { method = GET, path = "/originated_tunnel_endpoints" }] +async fn get_originated_tunnel_endpoints( + ctx: RequestContext>>, +) -> Result>, HttpError> { + let ctx = ctx.context().lock().unwrap(); + let originated = ctx.db.originated_tunnel(); + Ok(HttpResponseOk(originated)) +} + #[endpoint { method = GET, path = "/prefixes" }] async fn get_prefixes( ctx: RequestContext>>, @@ -144,6 +156,15 @@ async fn get_prefixes( Ok(HttpResponseOk(result)) } +#[endpoint { method = GET, path = "/tunnel_endpoints" }] +async fn get_tunnel_endpoints( + ctx: RequestContext>>, +) -> Result>, HttpError> { + let ctx = ctx.context().lock().unwrap(); + let imported = ctx.db.imported_tunnel(); + Ok(HttpResponseOk(imported)) +} + #[endpoint { method = PUT, path = "/prefix" }] async fn advertise_prefixes( ctx: RequestContext>>, @@ -154,8 +175,33 @@ async fn advertise_prefixes( ctx.db.originate(&prefixes); for e in &ctx.event_channels { - e.send(Event::Admin(AdminEvent::Announce(prefixes.clone()))) - .unwrap(); //TODO(unwrap) + e.send(Event::Admin(AdminEvent::Announce(PrefixSet::Underlay( + prefixes.clone(), + )))) + .map_err(|e| { + HttpError::for_internal_error(format!("admin event send: {e}")) + })?; + } + + Ok(HttpResponseUpdatedNoContent()) +} + +#[endpoint { method = PUT, path = "/tunnel_endpoint" }] +async fn advertise_tunnel_endpoints( + ctx: RequestContext>>, + request: TypedBody>, +) -> Result { + let ctx = ctx.context().lock().unwrap(); + let endpoints = request.into_inner(); + ctx.db.originate_tunnel(&endpoints); + + for e in &ctx.event_channels { + e.send(Event::Admin(AdminEvent::Announce(PrefixSet::Tunnel( + endpoints.clone(), + )))) + .map_err(|e| { + HttpError::for_internal_error(format!("admin event send: {e}")) + })?; } Ok(HttpResponseUpdatedNoContent()) @@ -171,8 +217,33 @@ async fn withdraw_prefixes( ctx.db.withdraw(&prefixes); for e in &ctx.event_channels { - e.send(Event::Admin(AdminEvent::Withdraw(prefixes.clone()))) - .unwrap(); //TODO(unwrap) + e.send(Event::Admin(AdminEvent::Withdraw(PrefixSet::Underlay( + prefixes.clone(), + )))) + .map_err(|e| { + HttpError::for_internal_error(format!("admin event send: {e}")) + })?; + } + + Ok(HttpResponseUpdatedNoContent()) +} + +#[endpoint { method = DELETE, path = "/tunnel_endpoint" }] +async fn withdraw_tunnel_endpoints( + ctx: RequestContext>>, + request: TypedBody>, +) -> Result { + let ctx = ctx.context().lock().unwrap(); + let endpoints = request.into_inner(); + ctx.db.withdraw_tunnel(&endpoints); + + for e in &ctx.event_channels { + e.send(Event::Admin(AdminEvent::Withdraw(PrefixSet::Tunnel( + endpoints.clone(), + )))) + .map_err(|e| { + HttpError::for_internal_error(format!("admin event send: {e}")) + })?; } Ok(HttpResponseUpdatedNoContent()) @@ -185,7 +256,9 @@ async fn sync( let ctx = ctx.context().lock().unwrap(); for e in &ctx.event_channels { - e.send(Event::Admin(AdminEvent::Sync)).unwrap(); //TODO(unwrap) + e.send(Event::Admin(AdminEvent::Sync)).map_err(|e| { + HttpError::for_internal_error(format!("admin event send: {e}")) + })?; } Ok(HttpResponseUpdatedNoContent()) @@ -197,9 +270,13 @@ pub fn api_description( api.register(get_peers)?; api.register(expire_peer)?; api.register(advertise_prefixes)?; + api.register(advertise_tunnel_endpoints)?; api.register(withdraw_prefixes)?; + api.register(withdraw_tunnel_endpoints)?; api.register(get_prefixes)?; + api.register(get_tunnel_endpoints)?; api.register(get_originated)?; + api.register(get_originated_tunnel_endpoints)?; api.register(sync)?; Ok(api) } diff --git a/ddm/src/db.rs b/ddm/src/db.rs index 2702a6d2..4b700fa5 100644 --- a/ddm/src/db.rs +++ b/ddm/src/db.rs @@ -2,13 +2,11 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -// Copyright 2022 Oxide Computer Company - use schemars::{JsonSchema, JsonSchema_repr}; use serde::{Deserialize, Serialize}; use serde_repr::{Deserialize_repr, Serialize_repr}; use std::collections::{HashMap, HashSet}; -use std::net::{AddrParseError, Ipv6Addr}; +use std::net::{AddrParseError, IpAddr, Ipv4Addr, Ipv6Addr}; use std::num::ParseIntError; use std::sync::{Arc, Mutex}; use thiserror::Error; @@ -23,6 +21,8 @@ pub struct DbData { pub peers: HashMap, pub imported: HashSet, pub originated: HashSet, + pub imported_tunnel: HashSet, + pub originated_tunnel: HashSet, } unsafe impl Sync for Db {} @@ -41,10 +41,18 @@ impl Db { self.data.lock().unwrap().imported.clone() } + pub fn imported_tunnel(&self) -> HashSet { + self.data.lock().unwrap().imported_tunnel.clone() + } + pub fn import(&self, r: &HashSet) { self.data.lock().unwrap().imported.extend(r.clone()); } + pub fn import_tunnel(&self, r: &HashSet) { + self.data.lock().unwrap().imported_tunnel.extend(r.clone()); + } + pub fn delete_import(&self, r: &HashSet) { let imported = &mut self.data.lock().unwrap().imported; for x in r { @@ -52,20 +60,45 @@ impl Db { } } + pub fn delete_import_tunnel(&self, r: &HashSet) { + let imported = &mut self.data.lock().unwrap().imported_tunnel; + for x in r { + imported.remove(x); + } + } + pub fn originate(&self, p: &HashSet) { self.data.lock().unwrap().originated.extend(p); } + pub fn originate_tunnel(&self, p: &HashSet) { + self.data + .lock() + .unwrap() + .originated_tunnel + .extend(p.clone()); + } + pub fn originated(&self) -> HashSet { self.data.lock().unwrap().originated.clone() } + pub fn originated_tunnel(&self) -> HashSet { + self.data.lock().unwrap().originated_tunnel.clone() + } + pub fn withdraw(&self, p: &HashSet) { for prefix in p { self.data.lock().unwrap().originated.remove(prefix); } } + pub fn withdraw_tunnel(&self, p: &HashSet) { + for prefix in p { + self.data.lock().unwrap().originated_tunnel.remove(prefix); + } + } + /// Set peer info at the given index. Returns true if peer information was /// changed. pub fn set_peer(&self, index: u32, info: PeerInfo) -> bool { @@ -75,8 +108,13 @@ impl Db { } } - pub fn remove_nexthop_routes(&self, nexthop: Ipv6Addr) -> HashSet { + pub fn remove_nexthop_routes( + &self, + nexthop: Ipv6Addr, + ) -> (HashSet, HashSet) { let mut data = self.data.lock().unwrap(); + // Routes are generally held in sets to prevent duplication and provide + // handy set-algebra operations. let mut removed = HashSet::new(); for x in &data.imported { if x.nexthop == nexthop { @@ -86,7 +124,17 @@ impl Db { for x in &removed { data.imported.remove(x); } - removed + + let mut tnl_removed = HashSet::new(); + for x in &data.imported_tunnel { + if x.nexthop == nexthop { + tnl_removed.insert(x.clone()); + } + } + for x in &tnl_removed { + data.imported_tunnel.remove(x); + } + (removed, tnl_removed) } pub fn remove_peer(&self, index: u32) { @@ -173,6 +221,12 @@ pub struct Ipv6Prefix { pub len: u8, } +impl std::fmt::Display for Ipv6Prefix { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}/{}", self.addr, self.len) + } +} + #[derive(Debug, Error)] pub enum Ipv6PrefixParseError { #[error("expected CIDR representation /")] @@ -201,6 +255,38 @@ impl std::str::FromStr for Ipv6Prefix { } } +#[derive( + Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, JsonSchema, +)] +pub struct TunnelRoute { + pub origin: TunnelOrigin, + + // The nexthop is only used to associate the route with a peer allowing us + // to remove the route if the peer expires. It does not influence what goes + // into the underlaying underlay routing platform. Tunnel routes only + // influence the state of the underlying encapsulation service. + pub nexthop: Ipv6Addr, +} + +#[derive( + Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, JsonSchema, +)] +pub struct TunnelOrigin { + pub overlay_prefix: IpPrefix, + pub boundary_addr: Ipv6Addr, + pub vni: u32, +} + +impl From for TunnelOrigin { + fn from(x: crate::db::TunnelRoute) -> Self { + Self { + overlay_prefix: x.origin.overlay_prefix, + boundary_addr: x.origin.boundary_addr, + vni: x.origin.vni, + } + } +} + #[derive( Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, JsonSchema, )] @@ -210,3 +296,98 @@ pub struct Route { pub ifname: String, pub path: Vec, } + +#[derive( + Debug, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, JsonSchema, +)] +pub struct Ipv4Prefix { + pub addr: Ipv4Addr, + pub len: u8, +} + +impl std::fmt::Display for Ipv4Prefix { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}/{}", self.addr, self.len) + } +} + +#[derive(Debug, Error)] +pub enum Ipv4PrefixParseError { + #[error("expected CIDR representation /")] + Cidr, + + #[error("address parse error: {0}")] + Addr(#[from] AddrParseError), + + #[error("mask parse error: {0}")] + Mask(#[from] ParseIntError), +} + +impl std::str::FromStr for Ipv4Prefix { + type Err = Ipv4PrefixParseError; + + fn from_str(s: &str) -> Result { + let parts: Vec<&str> = s.split('/').collect(); + if parts.len() < 2 { + return Err(Ipv4PrefixParseError::Cidr); + } + + Ok(Ipv4Prefix { + addr: Ipv4Addr::from_str(parts[0])?, + len: u8::from_str(parts[1])?, + }) + } +} + +#[derive( + Debug, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, JsonSchema, +)] +pub enum IpPrefix { + V4(Ipv4Prefix), + V6(Ipv6Prefix), +} + +impl std::fmt::Display for IpPrefix { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::V4(p) => p.fmt(f), + Self::V6(p) => p.fmt(f), + } + } +} + +impl IpPrefix { + pub fn addr(&self) -> IpAddr { + match self { + Self::V4(s) => s.addr.into(), + Self::V6(s) => s.addr.into(), + } + } + + pub fn length(&self) -> u8 { + match self { + Self::V4(s) => s.len, + Self::V6(s) => s.len, + } + } +} + +#[derive(Debug, Error)] +pub enum IpPrefixParseError { + #[error("v4 address parse error: {0}")] + V4(#[from] Ipv4PrefixParseError), + + #[error("v4 address parse error: {0}")] + V6(#[from] Ipv6PrefixParseError), +} + +impl std::str::FromStr for IpPrefix { + type Err = IpPrefixParseError; + + fn from_str(s: &str) -> Result { + if let Ok(result) = Ipv4Prefix::from_str(s) { + return Ok(IpPrefix::V4(result)); + } + Ok(IpPrefix::V6(Ipv6Prefix::from_str(s)?)) + } +} diff --git a/ddm/src/discovery.rs b/ddm/src/discovery.rs index 25aa14d5..eb3df2f0 100644 --- a/ddm/src/discovery.rs +++ b/ddm/src/discovery.rs @@ -2,8 +2,6 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -// Copyright 2022 Oxide Computer Company - //! This file implements the ddm router discovery mechanisms. These mechanisims //! are responsible for three primary things //! diff --git a/ddm/src/exchange.rs b/ddm/src/exchange.rs index 94160843..bd6457e0 100644 --- a/ddm/src/exchange.rs +++ b/ddm/src/exchange.rs @@ -2,8 +2,6 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -// Copyright 2022 Oxide Computer Company - //! This file implements the ddm router prefix exchange mechanisms. These //! mechanisms are responsible for announcing and withdrawing prefix sets to and //! from peers. @@ -17,7 +15,7 @@ //! of a ddm router is defined in the state machine implementation in sm.rs. //! -use crate::db::{Ipv6Prefix, Route, RouterKind}; +use crate::db::{Ipv6Prefix, Route, RouterKind, TunnelOrigin, TunnelRoute}; use crate::sm::{Config, Event, PeerEvent, SmContext}; use crate::{dbg, err, inf, wrn}; use dropshot::endpoint; @@ -49,6 +47,45 @@ pub struct HandlerContext { log: Logger, } +#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Default)] +pub struct Update { + pub underlay: Option, + pub tunnel: Option, +} + +impl From for Update { + fn from(u: UnderlayUpdate) -> Self { + Update { + underlay: Some(u), + tunnel: None, + } + } +} + +impl From for Update { + fn from(t: TunnelUpdate) -> Self { + Update { + underlay: None, + tunnel: Some(t), + } + } +} + +impl Update { + fn announce(pr: PullResponse) -> Self { + Self { + underlay: pr.underlay.map(UnderlayUpdate::announce), + tunnel: pr.tunnel.map(TunnelUpdate::announce), + } + } +} + +#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Default)] +pub struct PullResponse { + pub underlay: Option>, + pub tunnel: Option>, +} + #[derive( Debug, Clone, PartialEq, Eq, Hash, Deserialize, Serialize, JsonSchema, )] @@ -58,19 +95,62 @@ pub struct PathVector { } #[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Default)] -struct Update { +pub struct UnderlayUpdate { pub announce: HashSet, pub withdraw: HashSet, } -impl Update { - fn announce(prefixes: HashSet) -> Self { +impl UnderlayUpdate { + pub fn announce(prefixes: HashSet) -> Self { + Self { + announce: prefixes, + ..Default::default() + } + } + pub fn withdraw(prefixes: HashSet) -> Self { + Self { + withdraw: prefixes, + ..Default::default() + } + } + pub fn with_path_element(&self, element: String) -> Self { + Self { + announce: self + .announce + .iter() + .map(|x| { + let mut pv = x.clone(); + pv.path.push(element.clone()); + pv + }) + .collect(), + withdraw: self + .withdraw + .iter() + .map(|x| { + let mut pv = x.clone(); + pv.path.push(element.clone()); + pv + }) + .collect(), + } + } +} + +#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Default)] +pub struct TunnelUpdate { + pub announce: HashSet, + pub withdraw: HashSet, +} + +impl TunnelUpdate { + pub fn announce(prefixes: HashSet) -> Self { Self { announce: prefixes, ..Default::default() } } - fn withdraw(prefixes: HashSet) -> Self { + pub fn withdraw(prefixes: HashSet) -> Self { Self { withdraw: prefixes, ..Default::default() @@ -90,33 +170,57 @@ pub enum ExchangeError { Timeout(#[from] tokio::time::error::Elapsed), } -pub(crate) fn announce( +pub(crate) fn announce_underlay( config: Config, prefixes: HashSet, addr: Ipv6Addr, rt: Arc, log: Logger, ) -> Result<(), ExchangeError> { - let update = Update::announce(prefixes); - send_update(config, update, addr, rt, log) + let update = UnderlayUpdate::announce(prefixes); + send_update(config, update.into(), addr, rt, log) } -pub(crate) fn withdraw( +pub(crate) fn announce_tunnel( + config: Config, + endpoints: HashSet, + addr: Ipv6Addr, + rt: Arc, + log: Logger, +) -> Result<(), ExchangeError> { + let update = + TunnelUpdate::announce(endpoints.into_iter().map(Into::into).collect()); + send_update(config, update.into(), addr, rt, log) +} + +pub(crate) fn withdraw_underlay( config: Config, prefixes: HashSet, addr: Ipv6Addr, rt: Arc, log: Logger, ) -> Result<(), ExchangeError> { - let update = Update::withdraw(prefixes); - send_update(config, update, addr, rt, log) + let update = UnderlayUpdate::withdraw(prefixes); + send_update(config, update.into(), addr, rt, log) +} + +pub(crate) fn withdraw_tunnel( + config: Config, + endpoints: HashSet, + addr: Ipv6Addr, + rt: Arc, + log: Logger, +) -> Result<(), ExchangeError> { + let update = + TunnelUpdate::withdraw(endpoints.into_iter().map(Into::into).collect()); + send_update(config, update.into(), addr, rt, log) } pub(crate) fn do_pull( ctx: &SmContext, addr: &Ipv6Addr, rt: &Arc, -) -> Result, ExchangeError> { +) -> Result { let uri = format!( "http://[{}%{}]:{}/pull", addr, ctx.config.if_index, ctx.config.exchange_port, @@ -154,9 +258,9 @@ pub(crate) fn pull( rt: Arc, log: Logger, ) -> Result<(), ExchangeError> { - let pv: HashSet = do_pull(&ctx, &addr, &rt)?; + let pr: PullResponse = do_pull(&ctx, &addr, &rt)?; - let update = Update::announce(pv); + let update = Update::announce(pr); let hctx = HandlerContext { ctx, @@ -295,7 +399,7 @@ async fn push_handler( #[endpoint { method = GET, path = "/pull" }] async fn pull_handler( ctx: RequestContext>>, -) -> Result>, HttpError> { +) -> Result, HttpError> { let ctx = ctx.context().lock().await.clone(); let db = tokio::task::spawn_blocking(move || ctx.ctx.db.dump()) @@ -304,7 +408,9 @@ async fn pull_handler( HttpError::for_internal_error(format!("spawn db dump thread {}", e)) })?; - let mut prefixes = HashSet::new(); + let mut underlay = HashSet::new(); + let mut tunnel = HashSet::new(); + // Only transit routers redistribute prefixes if ctx.ctx.config.kind == RouterKind::Transit { for route in &db.imported { @@ -317,7 +423,14 @@ async fn pull_handler( path: route.path.clone(), }; pv.path.push(ctx.ctx.hostname.clone()); - prefixes.insert(pv); + underlay.insert(pv); + } + for route in &db.imported_tunnel { + if route.nexthop == ctx.peer { + continue; + } + let tv = route.origin.clone(); + tunnel.insert(tv); } } for prefix in &db.originated { @@ -325,16 +438,125 @@ async fn pull_handler( destination: *prefix, path: vec![ctx.ctx.hostname.clone()], }; - prefixes.insert(pv); + underlay.insert(pv); + } + for prefix in &db.originated_tunnel { + let tv = TunnelOrigin { + overlay_prefix: prefix.overlay_prefix, + boundary_addr: prefix.boundary_addr, + vni: prefix.vni, + }; + tunnel.insert(tv); } - Ok(HttpResponseOk(prefixes)) + Ok(HttpResponseOk(PullResponse { + underlay: if underlay.is_empty() { + None + } else { + Some(underlay) + }, + tunnel: if tunnel.is_empty() { + None + } else { + Some(tunnel) + }, + })) } fn handle_update(update: &Update, ctx: &HandlerContext) { + if let Some(underlay_update) = &update.underlay { + handle_underlay_update(underlay_update, ctx); + } + + if let Some(tunnel_update) = &update.tunnel { + handle_tunnel_update(tunnel_update, ctx); + } + + // distribute updates + + if ctx.ctx.config.kind == RouterKind::Transit { + dbg!( + ctx.log, + ctx.ctx.config.if_name, + "redistributing update to {} peers", + ctx.ctx.event_channels.len() + ); + + let underlay = update + .underlay + .as_ref() + .map(|update| update.with_path_element(ctx.ctx.hostname.clone())); + + let push = Update { + underlay, + tunnel: update.tunnel.clone(), + }; + + for ec in &ctx.ctx.event_channels { + ec.send(Event::Peer(PeerEvent::Push(push.clone()))).unwrap(); + } + } +} + +fn handle_tunnel_update(update: &TunnelUpdate, ctx: &HandlerContext) { + let mut import = HashSet::new(); + let mut remove = HashSet::new(); + let db = &ctx.ctx.db; + + for x in &update.announce { + import.insert(TunnelRoute { + origin: TunnelOrigin { + overlay_prefix: x.overlay_prefix, + boundary_addr: x.boundary_addr, + vni: x.vni, + }, + nexthop: ctx.peer, + }); + } + db.import_tunnel(&import); + if let Err(e) = crate::sys::add_tunnel_routes( + &ctx.log, + &ctx.ctx.config.if_name, + &import, + ) { + err!( + ctx.log, + ctx.ctx.config.if_name, + "add tunnel routes: {e}: {:#?}", + import, + ) + } + + for x in &update.withdraw { + remove.insert(TunnelRoute { + origin: TunnelOrigin { + overlay_prefix: x.overlay_prefix, + boundary_addr: x.boundary_addr, + vni: x.vni, + }, + nexthop: ctx.peer, + }); + } + db.delete_import_tunnel(&remove); + if let Err(e) = crate::sys::remove_tunnel_routes( + &ctx.log, + &ctx.ctx.config.if_name, + &remove, + ) { + err!( + ctx.log, + ctx.ctx.config.if_name, + "remove tunnel routes: {e}: {:#?}", + import, + ) + } +} + +fn handle_underlay_update(update: &UnderlayUpdate, ctx: &HandlerContext) { let mut import = HashSet::new(); let mut add = Vec::new(); let db = &ctx.ctx.db; + for prefix in &update.announce { import.insert(Route { destination: prefix.destination, @@ -351,7 +573,12 @@ fn handle_update(update: &Update, ctx: &HandlerContext) { add.push(r); } db.import(&import); - crate::sys::add_routes(&ctx.log, &ctx.ctx.config, add, &ctx.ctx.rt); + crate::sys::add_underlay_routes( + &ctx.log, + &ctx.ctx.config, + add, + &ctx.ctx.rt, + ); let mut withdraw = HashSet::new(); for prefix in &update.withdraw { @@ -384,45 +611,11 @@ fn handle_update(update: &Update, ctx: &HandlerContext) { del.push(r); } } - crate::sys::remove_routes( + crate::sys::remove_underlay_routes( &ctx.log, &ctx.ctx.config.if_name, &ctx.ctx.config.dpd, del, &ctx.ctx.rt, ); - - // distribute updates - - if ctx.ctx.config.kind == RouterKind::Transit { - dbg!( - ctx.log, - ctx.ctx.config.if_name, - "redistributing update to {} peers", - ctx.ctx.event_channels.len() - ); - let push = crate::sm::Push { - announce: update - .announce - .iter() - .map(|x| { - let mut pv = x.clone(); - pv.path.push(ctx.ctx.hostname.clone()); - pv - }) - .collect(), - withdraw: update - .withdraw - .iter() - .map(|x| { - let mut pv = x.clone(); - pv.path.push(ctx.ctx.hostname.clone()); - pv - }) - .collect(), - }; - for ec in &ctx.ctx.event_channels { - ec.send(Event::Peer(PeerEvent::Push(push.clone()))).unwrap(); - } - } } diff --git a/ddm/src/sm.rs b/ddm/src/sm.rs index 4d8e24a4..7ce60dc4 100644 --- a/ddm/src/sm.rs +++ b/ddm/src/sm.rs @@ -2,10 +2,8 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -// Copyright 2022 Oxide Computer Company - -use crate::db::{Db, Ipv6Prefix, RouterKind}; -use crate::exchange::PathVector; +use crate::db::{Db, Ipv6Prefix, RouterKind, TunnelOrigin}; +use crate::exchange::{PathVector, TunnelUpdate, UnderlayUpdate, Update}; use crate::{dbg, discovery, err, exchange, inf, wrn}; use libnet::get_ipaddr_info; use slog::Logger; @@ -22,10 +20,10 @@ use thiserror::Error; #[derive(Debug)] pub enum AdminEvent { /// Announce a set of IPv6 prefixes - Announce(HashSet), + Announce(PrefixSet), /// Withdraw a set of IPv6 prefixes - Withdraw(HashSet), + Withdraw(PrefixSet), /// Expire the peer at the specified address Expire(Ipv6Addr), @@ -34,15 +32,15 @@ pub enum AdminEvent { Sync, } -#[derive(Debug, Clone)] -pub struct Push { - pub announce: HashSet, - pub withdraw: HashSet, +#[derive(Debug)] +pub enum PrefixSet { + Underlay(HashSet), + Tunnel(HashSet), } #[derive(Debug)] pub enum PeerEvent { - Push(Push), + Push(Update), } #[derive(Debug)] @@ -416,20 +414,33 @@ impl Exchange { ) { exchange_thread.abort(); self.ctx.db.remove_peer(self.ctx.config.if_index); - let to_remove = self.ctx.db.remove_nexthop_routes(self.peer); + let (to_remove, to_remove_tnl) = + self.ctx.db.remove_nexthop_routes(self.peer); let mut routes: Vec = Vec::new(); for x in &to_remove { let mut r: crate::sys::Route = x.clone().into(); r.ifname = self.ctx.config.if_name.clone(); routes.push(r); } - crate::sys::remove_routes( + crate::sys::remove_underlay_routes( &self.log, &self.ctx.config.if_name, &self.ctx.config.dpd, routes, &self.ctx.rt, ); + if let Err(e) = crate::sys::remove_tunnel_routes( + &self.log, + &self.ctx.config.if_name, + &to_remove_tnl, + ) { + err!( + self.log, + self.ctx.config.if_name, + "failed to remove tunnel routes: {:#?} {e}", + to_remove_tnl + ); + } // if we're a transit router propagate withdraws for the // expired peer. if self.ctx.config.kind == RouterKind::Transit { @@ -439,21 +450,34 @@ impl Exchange { "redistributing expire to {} peers", self.ctx.event_channels.len() ); - let pv = to_remove - .iter() - .map(|x| PathVector { - destination: x.destination, - path: { - let mut ps = x.path.clone(); - ps.push(self.ctx.hostname.clone()); - ps - }, - }) - .collect(); - let push = Push { - announce: HashSet::new(), - withdraw: pv, + + let underlay = if to_remove.is_empty() { + None + } else { + Some(UnderlayUpdate::withdraw( + to_remove + .iter() + .map(|x| PathVector { + destination: x.destination, + path: { + let mut ps = x.path.clone(); + ps.push(self.ctx.hostname.clone()); + ps + }, + }) + .collect(), + )) + }; + + let tunnel = if to_remove_tnl.is_empty() { + None + } else { + Some(TunnelUpdate::withdraw( + to_remove_tnl.iter().cloned().map(Into::into).collect(), + )) }; + + let push = Update { underlay, tunnel }; for ec in &self.ctx.event_channels { ec.send(Event::Peer(PeerEvent::Push(push.clone()))).unwrap(); } @@ -511,7 +535,9 @@ impl State for Exchange { } }; match e { - Event::Admin(AdminEvent::Announce(prefixes)) => { + Event::Admin(AdminEvent::Announce(PrefixSet::Underlay( + prefixes, + ))) => { let pv: HashSet = prefixes .iter() .map(|x| PathVector { @@ -519,7 +545,7 @@ impl State for Exchange { path: vec![self.ctx.hostname.clone()], }) .collect(); - if let Err(e) = crate::exchange::announce( + if let Err(e) = crate::exchange::announce_underlay( self.ctx.config.clone(), pv, self.peer, @@ -548,7 +574,42 @@ impl State for Exchange { ); } } - Event::Admin(AdminEvent::Withdraw(prefixes)) => { + Event::Admin(AdminEvent::Announce(PrefixSet::Tunnel( + endpoints, + ))) => { + let tv: HashSet = endpoints.clone(); + if let Err(e) = crate::exchange::announce_tunnel( + self.ctx.config.clone(), + tv, + self.peer, + self.ctx.rt.clone(), + self.log.clone(), + ) { + err!( + self.log, + self.ctx.config.if_name, + "announce tunnel: {}", + e, + ); + wrn!( + self.log, + self.ctx.config.if_name, + "expiring peer {} due to failed tunnel announce", + self.peer, + ); + self.expire_peer(&exchange_thread, &pull_stop); + return ( + Box::new(Solicit::new( + self.ctx.clone(), + self.log.clone(), + )), + event, + ); + } + } + Event::Admin(AdminEvent::Withdraw(PrefixSet::Underlay( + prefixes, + ))) => { let pv: HashSet = prefixes .iter() .map(|x| PathVector { @@ -556,7 +617,7 @@ impl State for Exchange { path: vec![self.ctx.hostname.clone()], }) .collect(); - if let Err(e) = crate::exchange::withdraw( + if let Err(e) = crate::exchange::withdraw_underlay( self.ctx.config.clone(), pv, self.peer, @@ -585,6 +646,39 @@ impl State for Exchange { ); } } + Event::Admin(AdminEvent::Withdraw(PrefixSet::Tunnel( + endpoints, + ))) => { + let tv: HashSet = endpoints.clone(); + if let Err(e) = crate::exchange::withdraw_tunnel( + self.ctx.config.clone(), + tv, + self.peer, + self.ctx.rt.clone(), + self.log.clone(), + ) { + err!( + self.log, + self.ctx.config.if_name, + "withdraw tunnel: {}", + e, + ); + wrn!( + self.log, + self.ctx.config.if_name, + "expiring peer {} due to failed tunnel withdraw", + self.peer, + ); + self.expire_peer(&exchange_thread, &pull_stop); + return ( + Box::new(Solicit::new( + self.ctx.clone(), + self.log.clone(), + )), + event, + ); + } + } Event::Admin(AdminEvent::Expire(peer)) => { if self.peer == peer { inf!( @@ -618,72 +712,75 @@ impl State for Exchange { ); } } - Event::Peer(PeerEvent::Push(push)) => { + // TODO tunnel + Event::Peer(PeerEvent::Push(update)) => { inf!( self.log, self.ctx.config.if_name, "push from {}: {:#?}", self.peer, - push, + update, ); - if !push.announce.is_empty() { - if let Err(e) = crate::exchange::announce( - self.ctx.config.clone(), - push.announce, - self.peer, - self.ctx.rt.clone(), - self.log.clone(), - ) { - err!( - self.log, - self.ctx.config.if_name, - "announce: {}", - e, - ); - wrn!( - self.log, - self.ctx.config.if_name, - "expiring peer {} due to failed announce", + if let Some(push) = update.underlay { + if !push.announce.is_empty() { + if let Err(e) = crate::exchange::announce_underlay( + self.ctx.config.clone(), + push.announce, self.peer, - ); - self.expire_peer(&exchange_thread, &pull_stop); - return ( - Box::new(Solicit::new( - self.ctx.clone(), - self.log.clone(), - )), - event, - ); + self.ctx.rt.clone(), + self.log.clone(), + ) { + err!( + self.log, + self.ctx.config.if_name, + "announce: {}", + e, + ); + wrn!( + self.log, + self.ctx.config.if_name, + "expiring peer {} due to failed announce", + self.peer, + ); + self.expire_peer(&exchange_thread, &pull_stop); + return ( + Box::new(Solicit::new( + self.ctx.clone(), + self.log.clone(), + )), + event, + ); + } } - } - if !push.withdraw.is_empty() { - if let Err(e) = crate::exchange::withdraw( - self.ctx.config.clone(), - push.withdraw, - self.peer, - self.ctx.rt.clone(), - self.log.clone(), - ) { - err!( - self.log, - self.ctx.config.if_name, - "withdraw: {}", - e, - ); - wrn!( - self.log, - self.ctx.config.if_name, - "expiring peer {} due to failed withdraw", + if !push.withdraw.is_empty() { + if let Err(e) = crate::exchange::withdraw_underlay( + self.ctx.config.clone(), + push.withdraw, self.peer, - ); - self.expire_peer(&exchange_thread, &pull_stop); - return ( - Box::new(Solicit::new( - self.ctx.clone(), - self.log.clone(), - )), - event, - ); + self.ctx.rt.clone(), + self.log.clone(), + ) { + err!( + self.log, + self.ctx.config.if_name, + "withdraw: {}", + e, + ); + wrn!( + self.log, + self.ctx.config.if_name, + "expiring peer {} due to failed withdraw", + self.peer, + ); + self.expire_peer(&exchange_thread, &pull_stop); + return ( + Box::new(Solicit::new( + self.ctx.clone(), + self.log.clone(), + )), + event, + ); + } } } } diff --git a/ddm/src/sys.rs b/ddm/src/sys.rs index a1e54c73..7053f08b 100644 --- a/ddm/src/sys.rs +++ b/ddm/src/sys.rs @@ -2,6 +2,7 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. +use crate::db::TunnelRoute; use crate::sm::{Config, DpdConfig}; use crate::{dbg, err, inf, wrn}; use dendrite_common::network::{Cidr, Ipv6Cidr}; @@ -11,9 +12,12 @@ use dpd_client::types; use dpd_client::Client; use dpd_client::ClientState; use libnet::{IpPrefix, Ipv4Prefix, Ipv6Prefix}; +use opte_ioctl::OpteHdl; +use oxide_vpc::api::TunnelEndpoint; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use slog::Logger; +use std::collections::{HashMap, HashSet}; use std::net::IpAddr; use std::sync::Arc; @@ -101,7 +105,7 @@ impl From for IpPrefix { } } -pub fn add_routes( +pub fn add_underlay_routes( log: &Logger, config: &Config, routes: Vec, @@ -229,7 +233,111 @@ pub fn add_routes_dendrite( } } -pub fn remove_routes( +fn tunnel_route_update_map( + routes: &HashSet, +) -> HashMap> { + let mut m: HashMap> = + HashMap::new(); + for r in routes { + let pfx = r.origin.overlay_prefix; + let tep = TunnelEndpoint { + ip: r.origin.boundary_addr.into(), + vni: oxide_vpc::api::Vni::new(r.origin.vni).unwrap(), + }; + match m.get_mut(&pfx) { + Some(entry) => { + entry.push(tep); + } + None => { + m.insert(pfx, vec![tep]); + } + } + } + m +} + +pub fn add_tunnel_routes( + log: &Logger, + ifname: &str, + routes: &HashSet, +) -> Result<(), opte_ioctl::Error> { + use oxide_vpc::api::{ + IpCidr, Ipv4Cidr, Ipv4PrefixLen, Ipv6Cidr, Ipv6PrefixLen, + SetVirt2BoundaryReq, + }; + let hdl = OpteHdl::open(OpteHdl::XDE_CTL)?; + + for (pfx, tep) in tunnel_route_update_map(routes) { + for t in &tep { + inf!( + log, + ifname, + "adding tunnel route {} -[{}]-> {}", + pfx, + t.vni, + t.ip, + ); + } + let vip = match pfx { + crate::db::IpPrefix::V4(p) => IpCidr::Ip4(Ipv4Cidr::new( + p.addr.into(), + Ipv4PrefixLen::new(p.len).unwrap(), + )), + crate::db::IpPrefix::V6(p) => IpCidr::Ip6(Ipv6Cidr::new( + p.addr.into(), + Ipv6PrefixLen::new(p.len).unwrap(), + )), + }; + let req = SetVirt2BoundaryReq { vip, tep }; + if let Err(e) = hdl.set_v2b(&req) { + err!(log, ifname, "failed to set v2p route: {:?}: {}", req, e); + } + } + + Ok(()) +} + +pub fn remove_tunnel_routes( + log: &Logger, + ifname: &str, + routes: &HashSet, +) -> Result<(), opte_ioctl::Error> { + use oxide_vpc::api::{ + ClearVirt2BoundaryReq, IpCidr, Ipv4Cidr, Ipv4PrefixLen, Ipv6Cidr, + Ipv6PrefixLen, + }; + let hdl = OpteHdl::open(OpteHdl::XDE_CTL)?; + for (pfx, tep) in tunnel_route_update_map(routes) { + for t in &tep { + inf!( + log, + ifname, + "removing tunnel route {} -[{}]-> {}", + pfx, + t.vni, + t.ip, + ); + } + let vip = match pfx { + crate::db::IpPrefix::V4(p) => IpCidr::Ip4(Ipv4Cidr::new( + p.addr.into(), + Ipv4PrefixLen::new(p.len).unwrap(), + )), + crate::db::IpPrefix::V6(p) => IpCidr::Ip6(Ipv6Cidr::new( + p.addr.into(), + Ipv6PrefixLen::new(p.len).unwrap(), + )), + }; + let req = ClearVirt2BoundaryReq { vip, tep }; + if let Err(e) = hdl.clear_v2b(&req) { + err!(log, ifname, "failed to clear v2p route: {:?}: {}", req, e); + } + } + + Ok(()) +} + +pub fn remove_underlay_routes( log: &Logger, ifname: &str, dpd: &Option, diff --git a/ddmadm/src/main.rs b/ddmadm/src/main.rs index 65322a65..f5372ac3 100644 --- a/ddmadm/src/main.rs +++ b/ddmadm/src/main.rs @@ -5,8 +5,7 @@ use anyhow::Result; use clap::Parser; use colored::*; -use ddm::db::Ipv6Prefix; -use ddm_admin_client::types; +use ddm::db::{IpPrefix, Ipv6Prefix, TunnelOrigin}; use ddm_admin_client::Client; use mg_common::cli::oxide_cli_style; use slog::{Drain, Logger}; @@ -47,6 +46,18 @@ enum SubCommand { /// Withdraw prefixes from a DDM router. WithdrawPrefixes(Prefixes), + /// Get the tunnel endpoints a DDM router knows about. + TunnelImported, + + /// Get the tunnel endpoints a DDM router has originated. + TunnelOriginated, + + /// Advertise prefixes from a DDM router. + TunnelAdvertise(TunnelEndpoint), + + /// Withdraw prefixes from a DDM router. + TunnelWithdraw(TunnelEndpoint), + /// Sync prefix information from peers. Sync, } @@ -56,6 +67,18 @@ struct Prefixes { prefixes: Vec, } +#[derive(Debug, Parser)] +struct TunnelEndpoint { + #[arg(short, long)] + pub overlay_prefix: IpPrefix, + + #[arg(short, long)] + pub boundary_addr: Ipv6Addr, + + #[arg(short, long)] + pub vni: u32, +} + #[derive(Debug, Parser)] struct Peer { addr: Ipv6Addr, @@ -127,11 +150,8 @@ async fn run() -> Result<()> { let strpath = pv.path.join(" "); writeln!( &mut tw, - "{}/{}\t{}\t{}", - pv.destination.addr, - pv.destination.len, - nexthop, - strpath, + "{}\t{}\t{}", + pv.destination, nexthop, strpath, )?; } } @@ -142,14 +162,14 @@ async fn run() -> Result<()> { let mut tw = TabWriter::new(stdout()); writeln!(&mut tw, "{}", "Prefix".dimmed(),)?; for prefix in msg.into_inner() { - writeln!(&mut tw, "{}/{}", prefix.addr, prefix.len,)?; + writeln!(&mut tw, "{}", prefix)?; } tw.flush()?; } SubCommand::AdvertisePrefixes(ac) => { - let mut prefixes: Vec = Vec::new(); + let mut prefixes: Vec = Vec::new(); for p in ac.prefixes { - prefixes.push(types::Ipv6Prefix { + prefixes.push(Ipv6Prefix { addr: p.addr, len: p.len, }); @@ -157,15 +177,75 @@ async fn run() -> Result<()> { client.advertise_prefixes(&prefixes).await?; } SubCommand::WithdrawPrefixes(ac) => { - let mut prefixes: Vec = Vec::new(); + let mut prefixes: Vec = Vec::new(); for p in ac.prefixes { - prefixes.push(types::Ipv6Prefix { + prefixes.push(Ipv6Prefix { addr: p.addr, len: p.len, }); } client.withdraw_prefixes(&prefixes).await?; } + SubCommand::TunnelImported => { + let msg = client.get_tunnel_endpoints().await?; + let mut tw = TabWriter::new(stdout()); + writeln!( + &mut tw, + "{}\t{}\t{}", + "Overlay Prefix".dimmed(), + "Boundary Address".dimmed(), + "VNI".dimmed(), + )?; + for endpoint in msg.into_inner() { + writeln!( + &mut tw, + "{}\t{}\t{}", + endpoint.origin.overlay_prefix, + endpoint.origin.boundary_addr, + endpoint.origin.vni, + )?; + } + tw.flush()?; + } + SubCommand::TunnelOriginated => { + let msg = client.get_originated_tunnel_endpoints().await?; + let mut tw = TabWriter::new(stdout()); + writeln!( + &mut tw, + "{}\t{}\t{}", + "Overlay Prefix".dimmed(), + "Boundary Address".dimmed(), + "VNI".dimmed(), + )?; + for endpoint in msg.into_inner() { + writeln!( + &mut tw, + "{}\t{}\t{}", + endpoint.overlay_prefix, + endpoint.boundary_addr, + endpoint.vni, + )?; + } + tw.flush()?; + } + SubCommand::TunnelAdvertise(ep) => { + client + .advertise_tunnel_endpoints(&vec![TunnelOrigin { + overlay_prefix: ep.overlay_prefix, + boundary_addr: ep.boundary_addr, + vni: ep.vni, + }]) + .await?; + } + SubCommand::TunnelWithdraw(ep) => { + client + .withdraw_tunnel_endpoints(&vec![TunnelOrigin { + overlay_prefix: ep.overlay_prefix, + boundary_addr: ep.boundary_addr, + vni: ep.vni, + }]) + .await?; + } SubCommand::Sync => { client.sync().await?; } diff --git a/ddmd/src/main.rs b/ddmd/src/main.rs index e3f39fb7..faceb97d 100644 --- a/ddmd/src/main.rs +++ b/ddmd/src/main.rs @@ -6,7 +6,7 @@ use clap::Parser; use ddm::db::{Db, RouterKind}; use ddm::sm::{DpdConfig, SmContext, StateMachine}; use ddm::sys::Route; -use slog::{Drain, Logger}; +use slog::{error, Drain, Logger}; use std::net::{IpAddr, Ipv6Addr}; use std::sync::mpsc::channel; use std::sync::Arc; @@ -172,10 +172,25 @@ fn termination_handler( .await .expect("error setting termination handler"); const SIGTERM_EXIT: i32 = 130; + let imported = db.imported(); let routes: Vec = imported.iter().map(|x| (x.clone()).into()).collect(); - ddm::sys::remove_routes(&log, "shutdown-all", &dendrite, routes, &rt); + ddm::sys::remove_underlay_routes( + &log, + "shutdown-all", + &dendrite, + routes, + &rt, + ); + + let imported_tnl = db.imported_tunnel(); + if let Err(e) = + ddm::sys::remove_tunnel_routes(&log, "shutdown-all", &imported_tnl) + { + error!(log, "shutdown tunnel routes: {e}"); + } + std::process::exit(SIGTERM_EXIT); }); } diff --git a/mg-admin-client/src/lib.rs b/mg-admin-client/src/lib.rs index 35223349..e8956b30 100644 --- a/mg-admin-client/src/lib.rs +++ b/mg-admin-client/src/lib.rs @@ -8,14 +8,14 @@ progenitor::generate_api!( spec = "../openapi/mg-admin.json", inner_type = slog::Logger, pre_hook = (|log: &slog::Logger, request: &reqwest::Request| { - slog::debug!(log, "client request"; + slog::trace!(log, "client request"; "method" => %request.method(), "uri" => %request.url(), "body" => ?&request.body(), ); }), post_hook = (|log: &slog::Logger, result: &Result<_, _>| { - slog::debug!(log, "client response"; "result" => ?result); + slog::trace!(log, "client response"; "result" => ?result); }), replace = { Prefix4 = rdb::Prefix4, diff --git a/mg-lower/Cargo.toml b/mg-lower/Cargo.toml index e5a3f66e..1246059f 100644 --- a/mg-lower/Cargo.toml +++ b/mg-lower/Cargo.toml @@ -4,6 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] +ddm-admin-client = { path = "../ddm-admin-client" } rdb = { path = "../rdb" } anyhow.workspace = true dpd-client.workspace = true @@ -12,3 +13,4 @@ slog.workspace = true libnet.workspace = true tokio.workspace = true thiserror.workspace = true +http.workspace = true diff --git a/mg-lower/src/ddm.rs b/mg-lower/src/ddm.rs new file mode 100644 index 00000000..ef557761 --- /dev/null +++ b/mg-lower/src/ddm.rs @@ -0,0 +1,146 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +use ddm_admin_client::{Client, Ipv6Prefix, TunnelOrigin}; +use rdb::Route4ImportKey; +use slog::{error, Logger}; +use std::{collections::HashSet, net::Ipv6Addr, sync::Arc}; + +const BOUNDARY_SERVICES_VNI: u32 = 99; + +pub(crate) fn update_tunnel_endpoints( + tep: Ipv6Addr, // tunnel endpoint address + client: &Client, + routes: &[Route4ImportKey], + rt: Arc, + log: &Logger, +) { + let current: HashSet = match rt + .block_on(async { client.get_originated_tunnel_endpoints().await }) + .map(|x| x.into_inner()) + { + Ok(x) => x, + Err(e) => { + error!(log, "get originated tunnel endpoints: {e}"); + return; + } + } + .into_iter() + .collect(); + + let target: HashSet = + routes.iter().map(|x| route_to_tunnel(tep, x)).collect(); + + let to_add = target.difference(¤t); + let to_remove = current.difference(&target); + + add_tunnel_endpoints(tep, client, to_add.into_iter(), &rt, log); + remove_tunnel_endpoints(client, to_remove.into_iter(), &rt, log); +} + +fn ensure_tep_underlay_origin( + client: &Client, + tep: Ipv6Addr, + rt: &Arc, + log: &Logger, +) { + let current: Vec = match rt + .block_on(async { client.get_originated().await }) + .map(|x| x.into_inner()) + { + Ok(x) => x, + Err(e) => { + error!(log, "get originated endpoints: {e}"); + return; + } + } + .into_iter() + .collect(); + + let target = Ipv6Prefix { addr: tep, len: 64 }; + + if current.contains(&target) { + return; + } + + if let Err(e) = + rt.block_on(async { client.advertise_prefixes(&vec![target]).await }) + { + error!(log, "get originated endpoints: {e}"); + }; +} + +fn route_to_tunnel(tep: Ipv6Addr, x: &Route4ImportKey) -> TunnelOrigin { + TunnelOrigin { + overlay_prefix: ddm_admin_client::IpPrefix::V4( + ddm_admin_client::Ipv4Prefix { + addr: x.prefix.value, + len: x.prefix.length, + }, + ), + boundary_addr: tep, + vni: BOUNDARY_SERVICES_VNI, //TODO? + } +} + +pub(crate) fn add_tunnel_routes( + tep: Ipv6Addr, // tunnel endpoint address + client: &Client, + routes: &[Route4ImportKey], + rt: Arc, + log: &Logger, +) { + let teps: Vec = + routes.iter().map(|x| route_to_tunnel(tep, x)).collect(); + add_tunnel_endpoints(tep, client, teps.iter(), &rt, log) +} + +pub(crate) fn add_tunnel_endpoints<'a, I: Iterator>( + tep: Ipv6Addr, // tunnel endpoint address + client: &Client, + routes: I, + rt: &Arc, + log: &Logger, +) { + ensure_tep_underlay_origin(client, tep, rt, log); + let routes = routes.cloned().collect(); + let resp = + rt.block_on(async { client.advertise_tunnel_endpoints(&routes).await }); + if let Err(e) = resp { + error!(log, "advertise tunnel endpoints: {e}"); + } +} + +pub(crate) fn remove_tunnel_routes( + tep: Ipv6Addr, // tunnel endpoint address + client: &Client, + routes: &[Route4ImportKey], + rt: Arc, + log: &Logger, +) { + let teps: Vec = + routes.iter().map(|x| route_to_tunnel(tep, x)).collect(); + remove_tunnel_endpoints(client, teps.iter(), &rt, log) +} + +pub(crate) fn remove_tunnel_endpoints< + 'a, + I: Iterator, +>( + client: &Client, + routes: I, + rt: &Arc, + log: &Logger, +) { + let routes = routes.cloned().collect(); + let resp = + rt.block_on(async { client.withdraw_tunnel_endpoints(&routes).await }); + if let Err(e) = resp { + error!(log, "withdraw tunnel endpoints: {e}"); + } +} + +pub(crate) fn new_ddm_client(log: &Logger) -> Client { + Client::new("http://localhost:8000", log.clone()) +} diff --git a/mg-lower/src/dendrite.rs b/mg-lower/src/dendrite.rs index 64e6d963..2f99e9f9 100644 --- a/mg-lower/src/dendrite.rs +++ b/mg-lower/src/dendrite.rs @@ -9,13 +9,15 @@ use dendrite_common::ports::PortId; use dendrite_common::ports::QsfpPort; use dpd_client::types; use dpd_client::Client as DpdClient; +use http::status::StatusCode; use libnet::{get_route, IpPrefix, Ipv4Prefix}; use rdb::Route4ImportKey; +use slog::warn; use slog::{error, Logger}; - use std::collections::HashSet; use std::hash::Hash; use std::net::IpAddr; +use std::net::Ipv6Addr; use std::sync::Arc; const TFPORT_QSFP_DEVICE_PREFIX: &str = "tfportqsfp"; @@ -49,6 +51,25 @@ impl RouteHash { } } +pub(crate) fn ensure_tep_addr( + tep: Ipv6Addr, + dpd: &DpdClient, + rt: Arc, + log: &Logger, +) { + if let Err(e) = rt.block_on(async { + dpd.loopback_ipv6_create(&types::Ipv6Entry { + tag: MG_LOWER_TAG.into(), + addr: tep, + }) + .await + }) { + if e.status() != Some(StatusCode::CONFLICT) { + warn!(log, "failed to ensure TEP address {tep} on ASIC: {e}"); + } + } +} + /// Perform a set of route additions and deletions via the Dendrite API. pub(crate) fn update_dendrite<'a, I>( to_add: I, diff --git a/mg-lower/src/lib.rs b/mg-lower/src/lib.rs index 54e28a86..00704eae 100644 --- a/mg-lower/src/lib.rs +++ b/mg-lower/src/lib.rs @@ -10,15 +10,23 @@ use crate::dendrite::{ db_route_to_dendrite_route, new_dpd_client, update_dendrite, RouteHash, }; use crate::error::Error; +use ddm::{ + add_tunnel_routes, new_ddm_client, remove_tunnel_routes, + update_tunnel_endpoints, +}; +use ddm_admin_client::Client as DdmClient; +use dendrite::ensure_tep_addr; use dpd_client::Client as DpdClient; use rdb::{ChangeSet, Db}; use slog::{error, info, Logger}; use std::collections::HashSet; +use std::net::Ipv6Addr; use std::sync::mpsc::channel; use std::sync::Arc; use std::thread::sleep; use std::time::Duration; +mod ddm; mod dendrite; mod error; @@ -32,7 +40,12 @@ const MG_LOWER_TAG: &str = "mg-lower"; /// moving foward. The loop runs on the calling thread, so callers are /// responsible for running this function in a separate thread if asynchronous /// execution is required. -pub fn run(db: Db, log: Logger, rt: Arc) { +pub fn run( + tep: Ipv6Addr, //tunnel endpoint address + db: Db, + log: Logger, + rt: Arc, +) { loop { let (tx, rx) = channel(); @@ -42,25 +55,29 @@ pub fn run(db: Db, log: Logger, rt: Arc) { // initialize the underlying router with the current state let dpd = new_dpd_client(&log); - let mut generation = match initialize(&db, &log, &dpd, rt.clone()) { - Ok(gen) => gen, - Err(e) => { - error!(log, "initializing failed: {e}"); - info!(log, "restarting sync loop in one second"); - sleep(Duration::from_secs(1)); - continue; - } - }; + let ddm = new_ddm_client(&log); + let mut generation = + match full_sync(tep, &db, &log, &dpd, &ddm, rt.clone()) { + Ok(gen) => gen, + Err(e) => { + error!(log, "initializing failed: {e}"); + info!(log, "restarting sync loop in one second"); + sleep(Duration::from_secs(1)); + continue; + } + }; // handle any changes that occur loop { - match rx.recv() { + match rx.recv_timeout(Duration::from_secs(1)) { Ok(change) => { generation = match handle_change( + tep, &db, change, &log, &dpd, + &ddm, generation, rt.clone(), ) { @@ -72,27 +89,51 @@ pub fn run(db: Db, log: Logger, rt: Arc) { } } } - Err(e) => { - error!(log, "mg-lower watch rx: {e}"); + // if we've not received updates in the timeout interval, do a + // full sync in case something has changed out from under us. + Err(_) => { + generation = + match full_sync(tep, &db, &log, &dpd, &ddm, rt.clone()) + { + Ok(gen) => gen, + Err(e) => { + error!(log, "initializing failed: {e}"); + info!( + log, + "restarting sync loop in one second" + ); + sleep(Duration::from_secs(1)); + continue; + } + } } } } } } -/// Initialize the underlying platform with a complete set of routes from the +/// Synchronize the underlying platforms with a complete set of routes from the /// RIB. -fn initialize( +fn full_sync( + tep: Ipv6Addr, // tunnel endpoint address db: &Db, log: &Logger, dpd: &DpdClient, + ddm: &DdmClient, rt: Arc, ) -> Result { let generation = db.generation(); + let db_imported = db.get_imported4(); + + ensure_tep_addr(tep, dpd, rt.clone(), log); + + // announce tunnel endpoints via ddm + update_tunnel_endpoints(tep, ddm, &db_imported, rt.clone(), log); + // get all imported routes from db let imported: HashSet = - db_route_to_dendrite_route(db.get_imported4(), log, dpd); + db_route_to_dendrite_route(db_imported, log, dpd); // get all routes created by mg-lower from dendrite let routes = @@ -126,21 +167,35 @@ fn initialize( } /// Synchronize a change set from the RIB to the underlying platform. +#[allow(clippy::too_many_arguments)] fn handle_change( + tep: Ipv6Addr, // tunnel endpoint address db: &Db, change: ChangeSet, log: &Logger, dpd: &DpdClient, + ddm: &DdmClient, generation: u64, rt: Arc, ) -> Result { + info!( + log, + "mg-lower: handling rib change generation {} -> {}", + generation, + change.generation + ); + if change.generation > generation + 1 { - return initialize(db, log, dpd, rt.clone()); + return full_sync(tep, db, log, dpd, ddm, rt.clone()); } - let to_add = change.import.added.clone().into_iter().collect(); + let to_add: Vec = + change.import.added.clone().into_iter().collect(); + add_tunnel_routes(tep, ddm, &to_add, rt.clone(), log); let to_add = db_route_to_dendrite_route(to_add, log, dpd); - let to_del = change.import.removed.clone().into_iter().collect(); + let to_del: Vec = + change.import.removed.clone().into_iter().collect(); + remove_tunnel_routes(tep, ddm, &to_del, rt.clone(), log); let to_del = db_route_to_dendrite_route(to_del, log, dpd); update_dendrite(to_add.iter(), to_del.iter(), dpd, rt.clone(), log)?; diff --git a/mgadm/Cargo.toml b/mgadm/Cargo.toml index 18797021..6f943505 100644 --- a/mgadm/Cargo.toml +++ b/mgadm/Cargo.toml @@ -18,3 +18,4 @@ tabwriter.workspace = true colored.workspace = true humantime.workspace = true serde_json.workspace = true +thiserror.workspace = true diff --git a/mgadm/src/main.rs b/mgadm/src/main.rs index 9b2ee978..a7dbdedd 100644 --- a/mgadm/src/main.rs +++ b/mgadm/src/main.rs @@ -11,6 +11,7 @@ use slog::Logger; use std::net::{IpAddr, SocketAddr}; mod bgp; +mod static_routing; #[derive(Parser, Debug)] #[command(version, about, long_about = None, styles = oxide_cli_style())] @@ -29,8 +30,13 @@ struct Cli { #[derive(Subcommand, Debug)] enum Commands { + /// BGP management commands. #[command(subcommand)] Bgp(bgp::Commands), + + /// Static routing management commands. + #[command(subcommand)] + Static(static_routing::Commands), } #[tokio::main] @@ -45,6 +51,9 @@ async fn main() -> Result<()> { match cli.command { Commands::Bgp(command) => bgp::commands(command, client).await?, + Commands::Static(command) => { + static_routing::commands(command, client).await? + } } Ok(()) } diff --git a/mgadm/src/static_routing.rs b/mgadm/src/static_routing.rs new file mode 100644 index 00000000..6f3f7531 --- /dev/null +++ b/mgadm/src/static_routing.rs @@ -0,0 +1,97 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +use anyhow::Result; +use clap::{Args, Subcommand}; +use mg_admin_client::types; +use mg_admin_client::Client; +use rdb::Prefix4; +use std::net::{AddrParseError, Ipv4Addr}; +use std::num::ParseIntError; +use thiserror::Error; + +#[derive(Subcommand, Debug)] +pub enum Commands { + GetV4Routes, + AddV4Route(StaticRoute4), + RemoveV4Routes(StaticRoute4), +} + +#[derive(Debug, Error)] +pub enum Ipv4PrefixParseError { + #[error("expected CIDR representation /")] + Cidr, + + #[error("address parse error: {0}")] + Addr(#[from] AddrParseError), + + #[error("mask parse error: {0}")] + Mask(#[from] ParseIntError), +} + +#[derive(Debug, Args)] +pub struct StaticRoute4 { + pub destination: Ipv4Prefix, + pub nexthop: Ipv4Addr, +} + +#[derive(Debug, Clone, Copy)] +pub struct Ipv4Prefix { + pub addr: Ipv4Addr, + pub len: u8, +} + +impl std::str::FromStr for Ipv4Prefix { + type Err = Ipv4PrefixParseError; + + fn from_str(s: &str) -> Result { + let parts: Vec<&str> = s.split('/').collect(); + if parts.len() < 2 { + return Err(Ipv4PrefixParseError::Cidr); + } + + Ok(Ipv4Prefix { + addr: Ipv4Addr::from_str(parts[0])?, + len: u8::from_str(parts[1])?, + }) + } +} + +pub async fn commands(command: Commands, client: Client) -> Result<()> { + match command { + Commands::GetV4Routes => { + let routes = client.static_list_v4_routes().await?; + println!("{:#?}", routes); + } + Commands::AddV4Route(route) => { + let arg = types::AddStaticRoute4Request { + routes: types::StaticRoute4List { + list: vec![types::StaticRoute4 { + prefix: Prefix4 { + value: route.destination.addr, + length: route.destination.len, + }, + nexthop: route.nexthop, + }], + }, + }; + client.static_add_v4_route(&arg).await?; + } + Commands::RemoveV4Routes(route) => { + let arg = types::AddStaticRoute4Request { + routes: types::StaticRoute4List { + list: vec![types::StaticRoute4 { + prefix: Prefix4 { + value: route.destination.addr, + length: route.destination.len, + }, + nexthop: route.nexthop, + }], + }, + }; + client.static_add_v4_route(&arg).await?; + } + } + Ok(()) +} diff --git a/mgd/Cargo.toml b/mgd/Cargo.toml index f4516c48..58d66de8 100644 --- a/mgd/Cargo.toml +++ b/mgd/Cargo.toml @@ -19,6 +19,7 @@ slog-term.workspace = true tokio.workspace = true http.workspace = true thiserror.workspace = true +rand.workspace = true [features] default = ["mg-lower"] diff --git a/mgd/src/admin.rs b/mgd/src/admin.rs index d2d80a5a..065f8330 100644 --- a/mgd/src/admin.rs +++ b/mgd/src/admin.rs @@ -2,18 +2,19 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -use crate::bgp_admin; +use crate::{bgp_admin, static_admin}; use bgp_admin::BgpContext; use dropshot::{ApiDescription, ConfigDropshot, HttpServerStarter}; use rdb::Db; use slog::o; use slog::{error, info, warn, Logger}; use std::fs::File; -use std::net::{IpAddr, SocketAddr}; +use std::net::{IpAddr, Ipv6Addr, SocketAddr}; use std::sync::Arc; use tokio::task::JoinHandle; pub struct HandlerContext { + pub tep: Ipv6Addr, // tunnel endpoint address pub bgp: BgpContext, pub log: Logger, pub data_dir: String, @@ -27,7 +28,6 @@ pub fn start_server( context: Arc, ) -> Result, String> { let sa = SocketAddr::new(addr, port); - let ds_config = ConfigDropshot { bind_address: sa, ..Default::default() @@ -58,6 +58,8 @@ macro_rules! register { pub fn api_description() -> ApiDescription> { let mut api = ApiDescription::new(); + + // bgp register!(api, bgp_admin::get_routers); register!(api, bgp_admin::new_router); register!(api, bgp_admin::ensure_router_handler); @@ -71,6 +73,12 @@ pub fn api_description() -> ApiDescription> { register!(api, bgp_admin::get_imported4); register!(api, bgp_admin::bgp_apply); register!(api, bgp_admin::graceful_shutdown); + + // static + register!(api, static_admin::static_add_v4_route); + register!(api, static_admin::static_remove_v4_route); + register!(api, static_admin::static_list_v4_routes); + api } diff --git a/mgd/src/bgp_admin.rs b/mgd/src/bgp_admin.rs index cfc3d3b4..ae51542a 100644 --- a/mgd/src/bgp_admin.rs +++ b/mgd/src/bgp_admin.rs @@ -307,16 +307,6 @@ pub(crate) fn add_router( router.run(); - #[cfg(feature = "default")] - { - let rt = Arc::new(tokio::runtime::Handle::current()); - let log = ctx.log.clone(); - let db = db.clone(); - std::thread::spawn(move || { - mg_lower::run(db, log, rt); - }); - } - routers.insert(rq.asn, router); db.add_bgp_router( rq.asn, diff --git a/mgd/src/main.rs b/mgd/src/main.rs index 4abfb07f..5ac49750 100644 --- a/mgd/src/main.rs +++ b/mgd/src/main.rs @@ -8,6 +8,7 @@ use bgp::connection_tcp::{BgpConnectionTcp, BgpListenerTcp}; use bgp::log::init_logger; use clap::{Parser, Subcommand}; use mg_common::cli::oxide_cli_style; +use rand::Fill; use rdb::{BgpNeighborInfo, BgpRouterInfo}; use slog::Logger; use std::collections::{BTreeMap, HashMap}; @@ -18,6 +19,7 @@ use std::thread::spawn; mod admin; mod bgp_admin; mod error; +mod static_admin; #[derive(Parser, Debug)] #[command(version, about, long_about = None, styles = oxide_cli_style())] @@ -69,13 +71,27 @@ async fn run(args: RunArgs) { let db = rdb::Db::new(&format!("{}/rdb", args.data_dir), log.clone()) .expect("open datastore file"); + let tep_ula = get_tunnel_endpoint_ula(&db); + let context = Arc::new(HandlerContext { + tep: tep_ula, log: log.clone(), bgp, data_dir: args.data_dir.clone(), db: db.clone(), }); + #[cfg(feature = "default")] + { + let rt = Arc::new(tokio::runtime::Handle::current()); + let ctx = context.clone(); + let log = log.clone(); + let db = ctx.db.clone(); + std::thread::spawn(move || { + mg_lower::run(ctx.tep, db, log, rt); + }); + } + start_bgp_routers( context.clone(), db.get_bgp_routers() @@ -84,6 +100,8 @@ async fn run(args: RunArgs) { .expect("get BGP neighbors from data store"), ); + initialize_static_routes(&db); + let j = admin::start_server( log.clone(), args.admin_addr, @@ -150,3 +168,32 @@ fn start_bgp_routers( .unwrap_or_else(|_| panic!("add BGP neighbor {nbr:#?}")); } } + +fn initialize_static_routes(db: &rdb::Db) { + let routes = db + .get_static4() + .expect("failed to get static routes from db"); + for route in &routes { + db.set_nexthop4(*route, false).unwrap_or_else(|e| { + panic!("failed to initialize static route {route:#?}: {e}") + }); + } +} + +fn get_tunnel_endpoint_ula(db: &rdb::Db) -> Ipv6Addr { + if let Some(addr) = db.get_tep_addr().unwrap() { + return addr; + } + + // creat the randomized ULA fdxx:xxxx:xxxx:xxxx::1 as a tunnel endpoint + let mut rng = rand::thread_rng(); + let mut r = [0u8; 7]; + r.try_fill(&mut rng).unwrap(); + let tep_ula = Ipv6Addr::from([ + 0xfd, r[0], r[1], r[2], r[3], r[4], r[5], r[6], 0, 0, 0, 0, 0, 0, 0, 1, + ]); + + db.set_tep_addr(tep_ula).unwrap(); + + tep_ula +} diff --git a/mgd/src/static_admin.rs b/mgd/src/static_admin.rs new file mode 100644 index 00000000..2fd2f2c7 --- /dev/null +++ b/mgd/src/static_admin.rs @@ -0,0 +1,113 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +use crate::admin::HandlerContext; +use bgp::session::DEFAULT_ROUTE_PRIORITY; +use dropshot::{ + endpoint, HttpError, HttpResponseDeleted, HttpResponseOk, + HttpResponseUpdatedNoContent, RequestContext, TypedBody, +}; +use rdb::{Prefix4, Route4ImportKey}; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; +use std::{net::Ipv4Addr, sync::Arc}; + +#[derive(Debug, Deserialize, Serialize, JsonSchema)] +pub struct AddStaticRoute4Request { + routes: StaticRoute4List, +} + +#[derive(Debug, Deserialize, Serialize, JsonSchema)] +pub struct DeleteStaticRoute4Request { + routes: StaticRoute4List, +} + +#[derive(Debug, Deserialize, Serialize, JsonSchema)] +pub struct StaticRoute4List { + list: Vec, +} + +#[derive(Debug, Deserialize, Serialize, JsonSchema)] +pub struct StaticRoute4 { + pub prefix: Prefix4, + pub nexthop: Ipv4Addr, +} + +impl From for Route4ImportKey { + fn from(val: StaticRoute4) -> Self { + Route4ImportKey { + prefix: val.prefix, + nexthop: val.nexthop, + // Having an ID of zero indicates this entry did not come from BGP. + // TODO: this could likely be done in a more rust-y way, or just + // have a cleaner data structure organization. + id: 0, + // + priority: DEFAULT_ROUTE_PRIORITY, + } + } +} + +impl From for StaticRoute4 { + fn from(value: Route4ImportKey) -> Self { + Self { + prefix: value.prefix, + nexthop: value.nexthop, + } + } +} + +#[endpoint { method = PUT, path = "/static/route4" }] +pub async fn static_add_v4_route( + ctx: RequestContext>, + request: TypedBody, +) -> Result { + let routes: Vec = request + .into_inner() + .routes + .list + .into_iter() + .map(Into::into) + .collect(); + for r in routes { + ctx.context() + .db + .set_nexthop4(r, true) + .map_err(|e| HttpError::for_internal_error(e.to_string()))?; + } + Ok(HttpResponseUpdatedNoContent()) +} + +#[endpoint { method = DELETE, path = "/static/route4" }] +pub async fn static_remove_v4_route( + ctx: RequestContext>, + request: TypedBody, +) -> Result { + let routes: Vec = request + .into_inner() + .routes + .list + .into_iter() + .map(Into::into) + .collect(); + for r in routes { + ctx.context().db.remove_nexthop4(r); + } + Ok(HttpResponseDeleted()) +} + +#[endpoint { method = GET, path = "/static/route4" }] +pub async fn static_list_v4_routes( + ctx: RequestContext>, +) -> Result, HttpError> { + let list = ctx + .context() + .db + .get_imported4() + .into_iter() + .filter(|x| x.id == 0) // indicates not from bgp + .map(Into::into) + .collect(); + Ok(HttpResponseOk(StaticRoute4List { list })) +} diff --git a/openapi/ddm-admin.json b/openapi/ddm-admin.json index 49853977..b2d20e9f 100644 --- a/openapi/ddm-admin.json +++ b/openapi/ddm-admin.json @@ -33,6 +33,34 @@ } } }, + "/originated_tunnel_endpoints": { + "get": { + "operationId": "get_originated_tunnel_endpoints", + "responses": { + "200": { + "description": "successful operation", + "content": { + "application/json": { + "schema": { + "title": "Set_of_TunnelOrigin", + "type": "array", + "items": { + "$ref": "#/components/schemas/TunnelOrigin" + }, + "uniqueItems": true + } + } + } + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + } + }, "/peers": { "get": { "operationId": "get_peers", @@ -193,21 +221,97 @@ } } } - } - }, - "components": { - "responses": { - "Error": { - "description": "Error", - "content": { - "application/json": { - "schema": { - "$ref": "#/components/schemas/Error" + }, + "/tunnel_endpoint": { + "put": { + "operationId": "advertise_tunnel_endpoints", + "requestBody": { + "content": { + "application/json": { + "schema": { + "title": "Set_of_TunnelOrigin", + "type": "array", + "items": { + "$ref": "#/components/schemas/TunnelOrigin" + }, + "uniqueItems": true + } + } + }, + "required": true + }, + "responses": { + "204": { + "description": "resource updated" + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + }, + "delete": { + "operationId": "withdraw_tunnel_endpoints", + "requestBody": { + "content": { + "application/json": { + "schema": { + "title": "Set_of_TunnelOrigin", + "type": "array", + "items": { + "$ref": "#/components/schemas/TunnelOrigin" + }, + "uniqueItems": true + } } + }, + "required": true + }, + "responses": { + "204": { + "description": "resource updated" + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" } } } }, + "/tunnel_endpoints": { + "get": { + "operationId": "get_tunnel_endpoints", + "responses": { + "200": { + "description": "successful operation", + "content": { + "application/json": { + "schema": { + "title": "Set_of_TunnelRoute", + "type": "array", + "items": { + "$ref": "#/components/schemas/TunnelRoute" + }, + "uniqueItems": true + } + } + } + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + } + } + }, + "components": { "schemas": { "Error": { "description": "Error information from a response.", @@ -228,6 +332,52 @@ "request_id" ] }, + "IpPrefix": { + "oneOf": [ + { + "type": "object", + "properties": { + "V4": { + "$ref": "#/components/schemas/Ipv4Prefix" + } + }, + "required": [ + "V4" + ], + "additionalProperties": false + }, + { + "type": "object", + "properties": { + "V6": { + "$ref": "#/components/schemas/Ipv6Prefix" + } + }, + "required": [ + "V6" + ], + "additionalProperties": false + } + ] + }, + "Ipv4Prefix": { + "type": "object", + "properties": { + "addr": { + "type": "string", + "format": "ipv4" + }, + "len": { + "type": "integer", + "format": "uint8", + "minimum": 0 + } + }, + "required": [ + "addr", + "len" + ] + }, "Ipv6Prefix": { "type": "object", "properties": { @@ -302,6 +452,56 @@ 0, 1 ] + }, + "TunnelOrigin": { + "type": "object", + "properties": { + "boundary_addr": { + "type": "string", + "format": "ipv6" + }, + "overlay_prefix": { + "$ref": "#/components/schemas/IpPrefix" + }, + "vni": { + "type": "integer", + "format": "uint32", + "minimum": 0 + } + }, + "required": [ + "boundary_addr", + "overlay_prefix", + "vni" + ] + }, + "TunnelRoute": { + "type": "object", + "properties": { + "nexthop": { + "type": "string", + "format": "ipv6" + }, + "origin": { + "$ref": "#/components/schemas/TunnelOrigin" + } + }, + "required": [ + "nexthop", + "origin" + ] + } + }, + "responses": { + "Error": { + "description": "Error", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/Error" + } + } + } } } } diff --git a/openapi/mg-admin.json b/openapi/mg-admin.json index 6e68ae1f..e67595dd 100644 --- a/openapi/mg-admin.json +++ b/openapi/mg-admin.json @@ -355,6 +355,77 @@ } } } + }, + "/static/route4": { + "get": { + "operationId": "static_list_v4_routes", + "responses": { + "200": { + "description": "successful operation", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/StaticRoute4List" + } + } + } + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + }, + "put": { + "operationId": "static_add_v4_route", + "requestBody": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/AddStaticRoute4Request" + } + } + }, + "required": true + }, + "responses": { + "204": { + "description": "resource updated" + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + }, + "delete": { + "operationId": "static_remove_v4_route", + "requestBody": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/DeleteStaticRoute4Request" + } + } + }, + "required": true + }, + "responses": { + "204": { + "description": "successful deletion" + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + } } }, "components": { @@ -424,22 +495,36 @@ "resolution" ] }, + "AddStaticRoute4Request": { + "type": "object", + "properties": { + "routes": { + "$ref": "#/components/schemas/StaticRoute4List" + } + }, + "required": [ + "routes" + ] + }, "ApplyRequest": { + "description": "Apply changes to an ASN.", "type": "object", "properties": { "asn": { + "description": "ASN to apply changes to.", "type": "integer", "format": "uint32", "minimum": 0 }, "originate": { + "description": "Complete set of prefixes to originate. Any active prefixes not in this list will be removed. All prefixes in this list are ensured to be in the originating set.", "type": "array", "items": { "$ref": "#/components/schemas/Prefix4" } }, "peers": { - "description": "Lists of peers indexed by peer group.", + "description": "Lists of peers indexed by peer group. Set's within a peer group key are a total set. For example, the value\n\n```text {\"foo\": [a, b, d]} ``` Means that the peer group \"foo\" only contains the peers `a`, `b` and `d`. If there is a peer `c` currently in the peer group \"foo\", it will be removed.", "type": "object", "additionalProperties": { "type": "array", @@ -542,6 +627,17 @@ "asn" ] }, + "DeleteStaticRoute4Request": { + "type": "object", + "properties": { + "routes": { + "$ref": "#/components/schemas/StaticRoute4List" + } + }, + "required": [ + "routes" + ] + }, "Error": { "description": "Error information from a response.", "type": "object", @@ -811,6 +907,36 @@ "peers" ] }, + "StaticRoute4": { + "type": "object", + "properties": { + "nexthop": { + "type": "string", + "format": "ipv4" + }, + "prefix": { + "$ref": "#/components/schemas/Prefix4" + } + }, + "required": [ + "nexthop", + "prefix" + ] + }, + "StaticRoute4List": { + "type": "object", + "properties": { + "list": { + "type": "array", + "items": { + "$ref": "#/components/schemas/StaticRoute4" + } + } + }, + "required": [ + "list" + ] + }, "Withdraw4Request": { "type": "object", "properties": { diff --git a/rdb/src/db.rs b/rdb/src/db.rs index 263a0b8e..6a1beb20 100644 --- a/rdb/src/db.rs +++ b/rdb/src/db.rs @@ -12,9 +12,9 @@ use crate::error::Error; use crate::types::*; use mg_common::{lock, read_lock, write_lock}; -use slog::{error, Logger}; +use slog::{error, info, Logger}; use std::collections::{HashMap, HashSet}; -use std::net::IpAddr; +use std::net::{IpAddr, Ipv6Addr}; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::mpsc::Sender; use std::sync::{Arc, Mutex, RwLock}; @@ -31,6 +31,16 @@ const BGP_ROUTER: &str = "bgp_router"; /// information. const BGP_NEIGHBOR: &str = "bgp_neighbor"; +/// The handle used to open a persistent key-value tree for settings +/// information. +const SETTINGS: &str = "settings"; + +/// The handle used to open a persistent key-value tree for static routes. +const STATIC4_ROUTES: &str = "static4_routes"; + +/// Key used in settings tree for tunnel endpoint setting +const TEP_KEY: &str = "tep"; + /// The central routing information base. Both persistent an volatile route /// information is managed through this structure. #[derive(Clone)] @@ -266,14 +276,68 @@ impl Db { lock!(self.imported).clone().into_iter().collect() } - pub fn set_nexthop4(&self, r: Route4ImportKey) { + pub fn set_nexthop4( + &self, + r: Route4ImportKey, + is_static: bool, + ) -> Result<(), Error> { + if is_static { + let tree = self.persistent.open_tree(STATIC4_ROUTES)?; + let key = serde_json::to_string(&r)?; + tree.insert(key.as_str(), "")?; + } + let before = self.effective_set_for_prefix4(r.prefix); lock!(self.imported).replace(r); let after = self.effective_set_for_prefix4(r.prefix); - if let Some(change_set) = self.import_route_change_set(before, after) { + if let Some(change_set) = self.import_route_change_set(&before, &after) + { + info!( + self.log, + "sending notification for change set {:#?}", change_set, + ); self.notify(change_set); + } else { + info!( + self.log, + "no effective change for {:#?} -> {:#?}", before, after + ); } + + Ok(()) + } + + pub fn get_static4(&self) -> Result, Error> { + let tree = self.persistent.open_tree(STATIC4_ROUTES)?; + Ok(tree + .scan_prefix(vec![]) + .filter_map(|item| { + let (key, _) = match item { + Ok(item) => item, + Err(e) => { + error!( + self.log, + "db: error fetching static route entry: {e}" + ); + return None; + } + }; + + let key = String::from_utf8_lossy(&key); + let rkey: Route4ImportKey = match serde_json::from_str(&key) { + Ok(item) => item, + Err(e) => { + error!( + self.log, + "db: error parsing static router entry: {e}" + ); + return None; + } + }; + Some(rkey) + }) + .collect()) } pub fn remove_nexthop4(&self, r: Route4ImportKey) { @@ -281,7 +345,8 @@ impl Db { lock!(self.imported).remove(&r); let after = self.effective_set_for_prefix4(r.prefix); - if let Some(change_set) = self.import_route_change_set(before, after) { + if let Some(change_set) = self.import_route_change_set(&before, &after) + { self.notify(change_set); } } @@ -342,24 +407,48 @@ impl Db { /// bumping the RIB generation number if there are changes. fn import_route_change_set( &self, - before: HashSet, - after: HashSet, + before: &HashSet, + after: &HashSet, ) -> Option { let added: HashSet = - after.difference(&before).copied().collect(); + after.difference(before).copied().collect(); let removed: HashSet = - before.difference(&after).copied().collect(); - - let gen = self.generation.fetch_add(1, Ordering::SeqCst); + before.difference(after).copied().collect(); if added.is_empty() && removed.is_empty() { return None; } + let gen = self.generation.fetch_add(1, Ordering::SeqCst); + Some(ChangeSet::from_import( ImportChangeSet { added, removed }, gen, )) } + + pub fn get_tep_addr(&self) -> Result, Error> { + let tree = self.persistent.open_tree(SETTINGS)?; + let result = tree.get(TEP_KEY)?; + let value = match result { + Some(value) => value, + None => return Ok(None), + }; + let octets: [u8; 16] = (*value).try_into().map_err(|_| { + Error::DbValue(format!( + "rdb: tep length error exepcted 16 bytes found {}", + value.len(), + )) + })?; + + Ok(Some(Ipv6Addr::from(octets))) + } + + pub fn set_tep_addr(&self, addr: Ipv6Addr) -> Result<(), Error> { + let tree = self.persistent.open_tree(SETTINGS)?; + let key = addr.octets(); + tree.insert(TEP_KEY, &key)?; + Ok(()) + } } diff --git a/rdb/src/error.rs b/rdb/src/error.rs index f6f66adb..85568a95 100644 --- a/rdb/src/error.rs +++ b/rdb/src/error.rs @@ -12,4 +12,7 @@ pub enum Error { #[error("db key error{0}")] DbKey(String), + + #[error("db value error{0}")] + DbValue(String), } diff --git a/rdb/src/types.rs b/rdb/src/types.rs index a7c4b6f2..72bae162 100644 --- a/rdb/src/types.rs +++ b/rdb/src/types.rs @@ -253,7 +253,7 @@ pub struct Policy { pub priority: u16, } -#[derive(Clone, Default)] +#[derive(Clone, Default, Debug)] pub struct ImportChangeSet { pub added: HashSet, pub removed: HashSet, @@ -274,7 +274,7 @@ impl ImportChangeSet { } } -#[derive(Clone, Default)] +#[derive(Clone, Default, Debug)] pub struct OriginChangeSet { pub added: HashSet, pub removed: HashSet, @@ -295,7 +295,7 @@ impl OriginChangeSet { } } -#[derive(Clone, Default)] +#[derive(Clone, Default, Debug)] pub struct ChangeSet { pub generation: u64, pub import: ImportChangeSet, diff --git a/tests/src/ddm.rs b/tests/src/ddm.rs index 0adea2a8..aa482b2d 100644 --- a/tests/src/ddm.rs +++ b/tests/src/ddm.rs @@ -3,8 +3,7 @@ // file, You can obtain one at https://mozilla.org/MPL/2.0/. use anyhow::{anyhow, Result}; -use ddm_admin_client::types::Ipv6Prefix; -use ddm_admin_client::Client; +use ddm_admin_client::{Client, Ipv6Prefix, TunnelOrigin}; use slog::{Drain, Logger}; use std::env; use std::net::Ipv6Addr; @@ -518,6 +517,63 @@ async fn run_trio_tests( println!("redundant advertise passed"); + wait_for_eq!(tunnel_originated_endpoint_count(&t1).await?, 0); + + t1.advertise_tunnel_endpoints(&vec![TunnelOrigin { + overlay_prefix: "203.0.113.0/24".parse().unwrap(), + boundary_addr: "fd00:1701::1".parse().unwrap(), + vni: 47, + }]) + .await?; + + wait_for_eq!(tunnel_originated_endpoint_count(&t1).await?, 1); + wait_for_eq!(tunnel_endpoint_count(&t1).await?, 0); + wait_for_eq!(tunnel_endpoint_count(&s1).await?, 1); + wait_for_eq!(tunnel_endpoint_count(&s2).await?, 1); + + println!("tunnel endpoint advertise passed"); + + // redudant advertise should not change things + + t1.advertise_tunnel_endpoints(&vec![TunnelOrigin { + overlay_prefix: "203.0.113.0/24".parse().unwrap(), + boundary_addr: "fd00:1701::1".parse().unwrap(), + vni: 47, + }]) + .await?; + + sleep(Duration::from_secs(5)); + + wait_for_eq!(tunnel_originated_endpoint_count(&t1).await?, 1); + wait_for_eq!(tunnel_endpoint_count(&t1).await?, 0); + wait_for_eq!(tunnel_endpoint_count(&s1).await?, 1); + wait_for_eq!(tunnel_endpoint_count(&s2).await?, 1); + + println!("redundant tunnel endpoint advertise passed"); + + zs1.stop_router()?; + sleep(Duration::from_secs(5)); + zs1.start_router()?; + sleep(Duration::from_secs(5)); + let s1 = Client::new("http://10.0.0.1:8000", log.clone()); + wait_for_eq!(tunnel_endpoint_count(&s1).await?, 1); + + println!("tunnel router restart passed"); + + t1.withdraw_tunnel_endpoints(&vec![TunnelOrigin { + overlay_prefix: "203.0.113.0/24".parse().unwrap(), + boundary_addr: "fd00:1701::1".parse().unwrap(), + vni: 47, + }]) + .await?; + + wait_for_eq!(tunnel_originated_endpoint_count(&t1).await?, 0); + wait_for_eq!(tunnel_endpoint_count(&t1).await?, 0); + wait_for_eq!(tunnel_endpoint_count(&s1).await?, 0); + wait_for_eq!(tunnel_endpoint_count(&s2).await?, 0); + + println!("tunnel endpoint withdraw passed"); + Ok(()) } @@ -695,6 +751,14 @@ async fn prefix_count(c: &Client) -> Result { .sum::()) } +async fn tunnel_endpoint_count(c: &Client) -> Result { + Ok(c.get_tunnel_endpoints().await?.len()) +} + +async fn tunnel_originated_endpoint_count(c: &Client) -> Result { + Ok(c.get_originated_tunnel_endpoints().await?.len()) +} + fn init_logger() -> Logger { let decorator = slog_term::TermDecorator::new().build(); let drain = slog_term::FullFormat::new(decorator).build().fuse();