diff --git a/core/startos/src/db/model/public.rs b/core/startos/src/db/model/public.rs index 380d88ac5c..214ee25a86 100644 --- a/core/startos/src/db/model/public.rs +++ b/core/startos/src/db/model/public.rs @@ -161,7 +161,7 @@ pub struct NetworkInterfaceInfo { pub ip_info: IpInfo, } -#[derive(Clone, Debug, Default, Deserialize, Serialize, HasModel, TS)] +#[derive(Clone, Debug, Default, PartialEq, Eq, Deserialize, Serialize, HasModel, TS)] #[serde(rename_all = "camelCase")] #[model = "Model"] #[ts(export)] diff --git a/core/startos/src/net/network_interface.rs b/core/startos/src/net/network_interface.rs index 531b1b09f9..3ec681be4f 100644 --- a/core/startos/src/net/network_interface.rs +++ b/core/startos/src/net/network_interface.rs @@ -5,8 +5,8 @@ use std::sync::{Arc, Weak}; use std::task::Poll; use clap::Parser; -use futures::future::BoxFuture; -use futures::{FutureExt, TryStreamExt}; +use futures::future::{pending, BoxFuture}; +use futures::{FutureExt, TryFutureExt, TryStreamExt}; use imbl_value::InternedString; use rpc_toolkit::{from_fn_async, Context, HandlerExt, ParentHandler}; use serde::{Deserialize, Serialize}; @@ -27,6 +27,7 @@ use crate::db::model::public::IpInfo; use crate::db::model::Database; use crate::net::utils::{iface_is_physical, list_interfaces}; use crate::prelude::*; +use crate::util::actor::background::{BackgroundJobQueue, BackgroundJobRunner}; use crate::util::sync::SyncMutex; #[proxy( @@ -59,12 +60,15 @@ trait Ip4Config { fn address_data(&self) -> Result, Error>; } -#[derive(Debug, DeserializeDict, ZValue, ZType)] +#[derive(Clone, Debug, DeserializeDict, ZValue, ZType)] #[zvariant(signature = "dict")] struct AddressData { address: String, prefix: u32, } +impl TryFrom> for IpInfo { + fn try_from(value: Vec) -> Result {} +} #[proxy( interface = "org.freedesktop.NetworkManager.Device", @@ -126,31 +130,113 @@ where .await?; Ok(Self { stream, last }) } - async fn unless_changed>>( + async fn until_changed>>( &mut self, fut: Fut, - ) -> Result, Error> { + ) -> Result<(), Error> { + let mut next = self.stream.next(); tokio::select! { - changed = self.stream.next() => { + changed = next => { self.last = changed.ok_or_else(|| Error::new(eyre!("stream is empty"), ErrorKind::DBus))?.get().await?; - Ok(None) + Ok(()) }, - res = fut => { - res.map(Some) + res = fut.and_then(|_| pending()) => { + res } } } } async fn watcher(write_to: watch::Sender>) { + let (q, run) = BackgroundJobQueue::new(); loop { if let Err(e) = async { + let mut jobs = BackgroundJobQueue::new(); let connection = Connection::system().await?; let netman_proxy = NetworkManagerProxy::new(&connection).await?; - let mut active_sub = netman_proxy.receive_active_connections_changed().await; + let mut active_sub = + WatchPropertyStream::new(netman_proxy.receive_active_connections_changed().await) + .await?; + + loop { + let active = active_sub.last.clone(); + active_sub + .until_changed(async { + let mut ifaces = BTreeSet::new(); + let mut jobs = Vec::new(); + for active in active { + let ac_proxy = ActiveConnectionProxy::new(&connection, active).await?; + let mut devices = ac_proxy.devices().await?; + if devices.len() == 1 { + let dev_proxy = + DeviceProxy::new(&connection, devices.swap_remove(0)).await?; + let iface = InternedString::intern(dev_proxy.ip_interface().await?); + ifaces.insert(iface.clone()); + jobs.push(async { + let ac_proxy = ac_proxy; + let mut ip_config_sub = WatchPropertyStream::new( + ac_proxy.receive_ip4_config_changed().await, + ) + .await?; + + loop { + let ip_config = ip_config_sub.last.clone(); + ip_config_sub + .until_changed(async { + let ip_proxy = + Ip4ConfigProxy::new(&connection, ip_config) + .await?; + let mut address_sub = WatchPropertyStream::new( + ip_proxy.receive_address_data_changed().await, + ) + .await?; + + loop { + let addresses = address_sub.last.clone(); + address_sub + .until_changed(async { + let ip_info: IpInfo = + addresses.try_into()?; + + write_to.send_if_modified(|m| { + m.insert(iface, ip_info.clone()) + .filter(|old| old == &ip_info) + .is_none() + }); - 'conn: while let Some(active) = active_sub.next().await {} + Ok::<_, Error>(()) + }) + .await?; + } + + Ok::<_, Error>(()) + }) + .await?; + } + + Ok::<_, Error>(()) + }); + } + } + write_to.send_if_modified(|m| { + let mut changed = false; + m.retain(|i, _| { + if ifaces.contains(i) { + true + } else { + changed |= true; + false + } + }); + changed + }); + futures::future::try_join_all(jobs).await?; + + Ok::<_, Error>(()) + }) + .await?; + } Ok::<_, Error>(()) } @@ -160,6 +246,7 @@ async fn watcher(write_to: watch::Sender>) { tracing::debug!("{e:?}"); } } + run.await; } pub struct NetworkInterfaceController {