Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
dr-bonez committed Nov 26, 2024
1 parent e950c5e commit 9735a32
Showing 1 changed file with 124 additions and 10 deletions.
134 changes: 124 additions & 10 deletions core/startos/src/net/network_interface.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
use std::collections::{BTreeMap, BTreeSet};
use std::future::Future;
use std::net::{IpAddr, SocketAddr};
use std::sync::{Arc, Weak};
use std::task::Poll;

use clap::Parser;
use futures::TryStreamExt;
use futures::future::BoxFuture;
use futures::{FutureExt, TryStreamExt};
use imbl_value::InternedString;
use rpc_toolkit::{from_fn_async, Context, HandlerExt, ParentHandler};
use serde::{Deserialize, Serialize};
use tokio::net::TcpStream;
use tokio::sync::RwLock;
use tokio::sync::{watch, RwLock};
use tokio_stream::StreamExt;
use ts_rs::TS;
use zbus::zvariant::OwnedObjectPath;
use zbus::fdo::PropertiesChangedStream;
use zbus::proxy::PropertyStream;
use zbus::zvariant::{
DeserializeDict, DynamicDeserialize, OwnedObjectPath, OwnedValue, SerializeDict, Type as ZType,
Value as ZValue,
};
use zbus::{proxy, Connection};

use crate::context::{CliContext, RpcContext};
Expand All @@ -27,27 +35,133 @@ use crate::util::sync::SyncMutex;
default_path = "/org/freedesktop/NetworkManager"
)]
trait NetworkManager {
async fn get_all_devices(&self) -> Result<Vec<OwnedObjectPath>, Error>;
#[zbus(property)]
fn active_connections(&self) -> Result<Vec<OwnedObjectPath>, Error>;
}

#[proxy(
interface = "org.freedesktop.NetworkManager.Connection.Active",
default_service = "org.freedesktop.NetworkManager"
)]
trait ActiveConnection {
#[zbus(property)]
fn ip4_config(&self) -> Result<OwnedObjectPath, Error>;
#[zbus(property)]
fn devices(&self) -> Result<Vec<OwnedObjectPath>, Error>;
}

#[proxy(
interface = "org.freedesktop.NetworkManager.IP4Config",
default_service = "org.freedesktop.NetworkManager"
)]
trait Ip4Config {
#[zbus(property)]
fn address_data(&self) -> Result<Vec<AddressData>, Error>;
}

#[derive(Debug, DeserializeDict, ZValue, ZType)]
#[zvariant(signature = "dict")]
struct AddressData {
address: String,
prefix: u32,
}

#[proxy(
interface = "org.freedesktop.NetworkManager.Device",
default_service = "org.freedesktop.NetworkManager"
)]
trait Device {
#[zbus(property)]
fn ip_interface(&self) -> Result<String, Error>;
}

#[tokio::test]
async fn test() -> Result<(), Error> {
let connection = Connection::system().await?;

let proxy = NetworkManagerProxy::new(&connection).await?;
let reply = proxy.active_connections().await?;
println!("{reply:?}");

let mut stream = proxy.receive_active_connections_changed().await;
while let Some(conn) = stream.next().await {
println!("{:?}", conn.get().await?);
let active = proxy
.receive_active_connections_changed()
.await
.next()
.await
.unwrap()
.get()
.await?;
eprintln!("{active:?}");
for active in active {
let proxy = ActiveConnectionProxy::new(&connection, active).await?;
let ip4 = proxy.ip4_config().await?;
eprintln!("{ip4:?}");
let ip_proxy = Ip4ConfigProxy::new(&connection, ip4).await?;
let addresses = ip_proxy.address_data().await?;
eprintln!("{addresses:?}");
let devices = proxy.devices().await?;
eprintln!("{devices:?}");
for device in devices {
let proxy = DeviceProxy::new(&connection, device).await?;
let ifaces = proxy.ip_interface().await?;
eprintln!("{ifaces:?}");
}
}

Ok(())
}

struct WatchPropertyStream<'a, T> {
stream: PropertyStream<'a, T>,
last: T,
}
impl<'a, T> WatchPropertyStream<'a, T>
where
T: Unpin + TryFrom<OwnedValue>,
T::Error: Into<zbus::Error>,
{
async fn new(mut stream: PropertyStream<'a, T>) -> Result<Self, Error> {
let last = stream
.next()
.await
.ok_or_else(|| Error::new(eyre!("stream is empty"), ErrorKind::DBus))?
.get()
.await?;
Ok(Self { stream, last })
}
async fn unless_changed<U, Fut: Future<Output = Result<U, Error>>>(
&mut self,
fut: Fut,
) -> Result<Option<U>, Error> {
tokio::select! {
changed = self.stream.next() => {
self.last = changed.ok_or_else(|| Error::new(eyre!("stream is empty"), ErrorKind::DBus))?.get().await?;
Ok(None)
},
res = fut => {
res.map(Some)
}
}
}
}

async fn watcher(write_to: watch::Sender<BTreeMap<InternedString, IpInfo>>) {
loop {
if let Err(e) = async {
let connection = Connection::system().await?;
let netman_proxy = NetworkManagerProxy::new(&connection).await?;

let mut active_sub = netman_proxy.receive_active_connections_changed().await;

'conn: while let Some(active) = active_sub.next().await {}

Ok::<_, Error>(())
}
.await
{
tracing::error!("{e}");
tracing::debug!("{e:?}");
}
}
}

pub struct NetworkInterfaceController {
db: TypedPatchDb<Database>,
listeners: SyncMutex<BTreeMap<u16, Weak<()>>>,
Expand Down

0 comments on commit 9735a32

Please sign in to comment.