Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
dr-bonez committed Nov 26, 2024
1 parent 9735a32 commit 1079ca2
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 12 deletions.
2 changes: 1 addition & 1 deletion core/startos/src/db/model/public.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ pub struct NetworkInterfaceInfo {
pub ip_info: IpInfo,
}

#[derive(Clone, Debug, Default, Deserialize, Serialize, HasModel, TS)]
#[derive(Clone, Debug, Default, PartialEq, Eq, Deserialize, Serialize, HasModel, TS)]
#[serde(rename_all = "camelCase")]
#[model = "Model<Self>"]
#[ts(export)]
Expand Down
109 changes: 98 additions & 11 deletions core/startos/src/net/network_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use std::sync::{Arc, Weak};
use std::task::Poll;

use clap::Parser;
use futures::future::BoxFuture;
use futures::{FutureExt, TryStreamExt};
use futures::future::{pending, BoxFuture};
use futures::{FutureExt, TryFutureExt, TryStreamExt};
use imbl_value::InternedString;
use rpc_toolkit::{from_fn_async, Context, HandlerExt, ParentHandler};
use serde::{Deserialize, Serialize};
Expand All @@ -27,6 +27,7 @@ use crate::db::model::public::IpInfo;
use crate::db::model::Database;
use crate::net::utils::{iface_is_physical, list_interfaces};
use crate::prelude::*;
use crate::util::actor::background::{BackgroundJobQueue, BackgroundJobRunner};
use crate::util::sync::SyncMutex;

#[proxy(
Expand Down Expand Up @@ -59,12 +60,15 @@ trait Ip4Config {
fn address_data(&self) -> Result<Vec<AddressData>, Error>;
}

#[derive(Debug, DeserializeDict, ZValue, ZType)]
#[derive(Clone, Debug, DeserializeDict, ZValue, ZType)]
#[zvariant(signature = "dict")]
struct AddressData {
address: String,
prefix: u32,
}
impl TryFrom<Vec<AddressData>> for IpInfo {
fn try_from(value: Vec<AddressData>) -> Result<Self, Self::Error> {}
}

#[proxy(
interface = "org.freedesktop.NetworkManager.Device",
Expand Down Expand Up @@ -126,31 +130,113 @@ where
.await?;
Ok(Self { stream, last })
}
async fn unless_changed<U, Fut: Future<Output = Result<U, Error>>>(
async fn until_changed<Fut: Future<Output = Result<(), Error>>>(
&mut self,
fut: Fut,
) -> Result<Option<U>, Error> {
) -> Result<(), Error> {
let mut next = self.stream.next();
tokio::select! {
changed = self.stream.next() => {
changed = next => {
self.last = changed.ok_or_else(|| Error::new(eyre!("stream is empty"), ErrorKind::DBus))?.get().await?;
Ok(None)
Ok(())
},
res = fut => {
res.map(Some)
res = fut.and_then(|_| pending()) => {
res
}
}
}
}

async fn watcher(write_to: watch::Sender<BTreeMap<InternedString, IpInfo>>) {
let (q, run) = BackgroundJobQueue::new();
loop {
if let Err(e) = async {
let mut jobs = BackgroundJobQueue::new();
let connection = Connection::system().await?;
let netman_proxy = NetworkManagerProxy::new(&connection).await?;

let mut active_sub = netman_proxy.receive_active_connections_changed().await;
let mut active_sub =
WatchPropertyStream::new(netman_proxy.receive_active_connections_changed().await)
.await?;

loop {
let active = active_sub.last.clone();
active_sub
.until_changed(async {
let mut ifaces = BTreeSet::new();
let mut jobs = Vec::new();
for active in active {
let ac_proxy = ActiveConnectionProxy::new(&connection, active).await?;
let mut devices = ac_proxy.devices().await?;
if devices.len() == 1 {
let dev_proxy =
DeviceProxy::new(&connection, devices.swap_remove(0)).await?;
let iface = InternedString::intern(dev_proxy.ip_interface().await?);
ifaces.insert(iface.clone());
jobs.push(async {
let ac_proxy = ac_proxy;
let mut ip_config_sub = WatchPropertyStream::new(
ac_proxy.receive_ip4_config_changed().await,
)
.await?;

loop {
let ip_config = ip_config_sub.last.clone();
ip_config_sub
.until_changed(async {
let ip_proxy =
Ip4ConfigProxy::new(&connection, ip_config)
.await?;
let mut address_sub = WatchPropertyStream::new(
ip_proxy.receive_address_data_changed().await,
)
.await?;

loop {
let addresses = address_sub.last.clone();
address_sub
.until_changed(async {
let ip_info: IpInfo =
addresses.try_into()?;

write_to.send_if_modified(|m| {
m.insert(iface, ip_info.clone())
.filter(|old| old == &ip_info)
.is_none()
});

'conn: while let Some(active) = active_sub.next().await {}
Ok::<_, Error>(())
})
.await?;
}

Ok::<_, Error>(())
})
.await?;
}

Ok::<_, Error>(())
});
}
}
write_to.send_if_modified(|m| {
let mut changed = false;
m.retain(|i, _| {
if ifaces.contains(i) {
true
} else {
changed |= true;
false
}
});
changed
});
futures::future::try_join_all(jobs).await?;

Ok::<_, Error>(())
})
.await?;
}

Ok::<_, Error>(())
}
Expand All @@ -160,6 +246,7 @@ async fn watcher(write_to: watch::Sender<BTreeMap<InternedString, IpInfo>>) {
tracing::debug!("{e:?}");
}
}
run.await;
}

pub struct NetworkInterfaceController {
Expand Down

0 comments on commit 1079ca2

Please sign in to comment.