From acd0d5b971a874f8c4742518bbfc465300dbeb83 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Imobach=20Gonz=C3=A1lez=20Sosa?= Date: Wed, 7 Aug 2024 15:09:23 +0100 Subject: [PATCH 01/97] feat(rust): expose part of the DASD interface via HTTP --- rust/agama-lib/src/storage/client.rs | 1 + rust/agama-lib/src/storage/client/dasd.rs | 100 ++++++++++ rust/agama-lib/src/storage/model.rs | 2 + rust/agama-lib/src/storage/model/dasd.rs | 39 ++++ rust/agama-lib/src/storage/proxies.rs | 72 +++++++ rust/agama-server/src/storage/web.rs | 10 +- rust/agama-server/src/storage/web/dasd.rs | 176 ++++++++++++++++++ .../src/storage/web/dasd/stream.rs | 160 ++++++++++++++++ rust/agama-server/src/web/event.rs | 13 +- 9 files changed, 570 insertions(+), 3 deletions(-) create mode 100644 rust/agama-lib/src/storage/client/dasd.rs create mode 100644 rust/agama-lib/src/storage/model/dasd.rs create mode 100644 rust/agama-server/src/storage/web/dasd.rs create mode 100644 rust/agama-server/src/storage/web/dasd/stream.rs diff --git a/rust/agama-lib/src/storage/client.rs b/rust/agama-lib/src/storage/client.rs index 9998cc61aa..37b53a6936 100644 --- a/rust/agama-lib/src/storage/client.rs +++ b/rust/agama-lib/src/storage/client.rs @@ -14,6 +14,7 @@ use zbus::fdo::ObjectManagerProxy; use zbus::names::{InterfaceName, OwnedInterfaceName}; use zbus::zvariant::{OwnedObjectPath, OwnedValue}; use zbus::Connection; +pub mod dasd; pub mod iscsi; type DBusObject = ( diff --git a/rust/agama-lib/src/storage/client/dasd.rs b/rust/agama-lib/src/storage/client/dasd.rs new file mode 100644 index 0000000000..17674d64ad --- /dev/null +++ b/rust/agama-lib/src/storage/client/dasd.rs @@ -0,0 +1,100 @@ +//! Implements a client to access Agama's D-Bus API related to DASD management. + +use zbus::{ + fdo::ObjectManagerProxy, + zvariant::{ObjectPath, OwnedObjectPath}, + Connection, +}; + +use crate::{ + error::ServiceError, + storage::{model::dasd::DASDDevice, proxies::DASDManagerProxy}, +}; + +/// Client to connect to Agama's D-Bus API for DASD management. +#[derive(Clone)] +pub struct DASDClient<'a> { + manager_proxy: DASDManagerProxy<'a>, + object_manager_proxy: ObjectManagerProxy<'a>, +} + +impl<'a> DASDClient<'a> { + pub async fn new(connection: Connection) -> Result, ServiceError> { + let manager_proxy = DASDManagerProxy::new(&connection).await?; + let object_manager_proxy = ObjectManagerProxy::builder(&connection) + .destination("org.opensuse.Agama.Storage1")? + .path("/org/opensuse/Agama/Storage1")? + .build() + .await?; + Ok(Self { + manager_proxy, + object_manager_proxy, + }) + } + + pub async fn probe(&self) -> Result<(), ServiceError> { + Ok(self.manager_proxy.probe().await?) + } + + pub async fn devices(&self) -> Result, ServiceError> { + let managed_objects = self.object_manager_proxy.get_managed_objects().await?; + + let mut devices: Vec<(OwnedObjectPath, DASDDevice)> = vec![]; + for (path, ifaces) in managed_objects { + if let Some(properties) = ifaces.get("org.opensuse.Agama.Storage1.DASD.Device") { + match DASDDevice::try_from(properties) { + Ok(device) => { + devices.push((path, device)); + } + Err(error) => { + log::warn!("Not a valid DASD device: {}", error); + } + } + } + } + Ok(devices) + } + + pub async fn format(&self, ids: &[&str]) -> Result<(), ServiceError> { + let selected = self.find_devices(ids).await?; + let references = selected.iter().collect::>>(); + self.manager_proxy.format(&references).await?; + Ok(()) + } + + pub async fn enable(&self, ids: &[&str]) -> Result<(), ServiceError> { + let selected = self.find_devices(ids).await?; + let references = selected.iter().collect::>>(); + self.manager_proxy.enable(&references).await?; + Ok(()) + } + + pub async fn disable(&self, ids: &[&str]) -> Result<(), ServiceError> { + let selected = self.find_devices(ids).await?; + let references = selected.iter().collect::>>(); + self.manager_proxy.disable(&references).await?; + Ok(()) + } + + pub async fn set_diag(&self, ids: &[&str], diag: bool) -> Result<(), ServiceError> { + let selected = self.find_devices(ids).await?; + let references = selected.iter().collect::>>(); + self.manager_proxy.set_diag(&references, diag).await?; + Ok(()) + } + + async fn find_devices(&self, ids: &[&str]) -> Result>, ServiceError> { + let devices = self.devices().await?; + let selected: Vec = devices + .into_iter() + .filter_map(|(path, device)| { + if ids.contains(&device.id.as_str()) { + Some(path.into_inner()) + } else { + None + } + }) + .collect(); + Ok(selected) + } +} diff --git a/rust/agama-lib/src/storage/model.rs b/rust/agama-lib/src/storage/model.rs index da49d35cc6..81c3b0f65e 100644 --- a/rust/agama-lib/src/storage/model.rs +++ b/rust/agama-lib/src/storage/model.rs @@ -5,6 +5,8 @@ use zbus::zvariant::{OwnedValue, Value}; use crate::dbus::{get_optional_property, get_property}; +pub mod dasd; + #[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)] pub struct DeviceSid(u32); diff --git a/rust/agama-lib/src/storage/model/dasd.rs b/rust/agama-lib/src/storage/model/dasd.rs new file mode 100644 index 0000000000..5b370d4841 --- /dev/null +++ b/rust/agama-lib/src/storage/model/dasd.rs @@ -0,0 +1,39 @@ +//! Implements a data model for DASD devices management. +use std::collections::HashMap; + +use serde::Serialize; +use zbus::zvariant::OwnedValue; + +use crate::{dbus::get_property, error::ServiceError}; + +/// Represents a DASD device (specific to s390x systems). +#[derive(Clone, Debug, Serialize, Default, utoipa::ToSchema)] +pub struct DASDDevice { + pub id: String, + pub enabled: bool, + pub device_name: String, + pub formatted: bool, + pub diag: bool, + pub status: String, + pub device_type: String, + pub access_type: String, + pub partition_info: String, +} + +impl TryFrom<&HashMap> for DASDDevice { + type Error = ServiceError; + + fn try_from(value: &HashMap) -> Result { + Ok(DASDDevice { + id: get_property(value, "Id")?, + enabled: get_property(value, "Enabled")?, + device_name: get_property(value, "DeviceName")?, + formatted: get_property(value, "Formatted")?, + diag: get_property(value, "Diag")?, + status: get_property(value, "Status")?, + device_type: get_property(value, "Type")?, + access_type: get_property(value, "AccessType")?, + partition_info: get_property(value, "PartitionInfo")?, + }) + } +} diff --git a/rust/agama-lib/src/storage/proxies.rs b/rust/agama-lib/src/storage/proxies.rs index ebb9396ac7..c5d2de158b 100644 --- a/rust/agama-lib/src/storage/proxies.rs +++ b/rust/agama-lib/src/storage/proxies.rs @@ -156,3 +156,75 @@ trait Node { #[dbus_proxy(property)] fn target(&self) -> zbus::Result; } + +#[dbus_proxy( + interface = "org.opensuse.Agama.Storage1.DASD.Manager", + default_service = "org.opensuse.Agama.Storage1", + default_path = "/org/opensuse/Agama/Storage1" +)] +trait DASDManager { + /// Disable method + fn disable(&self, devices: &[&zbus::zvariant::ObjectPath<'_>]) -> zbus::Result; + + /// Enable method + fn enable(&self, devices: &[&zbus::zvariant::ObjectPath<'_>]) -> zbus::Result; + + /// Format method + fn format( + &self, + devices: &[&zbus::zvariant::ObjectPath<'_>], + ) -> zbus::Result<(u32, zbus::zvariant::OwnedObjectPath)>; + + /// Probe method + fn probe(&self) -> zbus::Result<()>; + + /// SetDiag method + fn set_diag( + &self, + devices: &[&zbus::zvariant::ObjectPath<'_>], + diag: bool, + ) -> zbus::Result; +} + +#[dbus_proxy( + interface = "org.opensuse.Agama.Storage1.DASD.Device", + default_service = "org.opensuse.Agama.Storage1", + assume_defaults = true +)] +trait DASDDevice { + /// AccessType property + #[dbus_proxy(property)] + fn access_type(&self) -> zbus::Result; + + /// DeviceName property + #[dbus_proxy(property)] + fn device_name(&self) -> zbus::Result; + + /// Diag property + #[dbus_proxy(property)] + fn diag(&self) -> zbus::Result; + + /// Enabled property + #[dbus_proxy(property)] + fn enabled(&self) -> zbus::Result; + + /// Formatted property + #[dbus_proxy(property)] + fn formatted(&self) -> zbus::Result; + + /// Id property + #[dbus_proxy(property)] + fn id(&self) -> zbus::Result; + + /// PartitionInfo property + #[dbus_proxy(property)] + fn partition_info(&self) -> zbus::Result; + + /// Status property + #[dbus_proxy(property)] + fn status(&self) -> zbus::Result; + + /// Type property + #[dbus_proxy(property)] + fn type_(&self) -> zbus::Result; +} diff --git a/rust/agama-server/src/storage/web.rs b/rust/agama-server/src/storage/web.rs index 4f0de7fab4..941bfd51ff 100644 --- a/rust/agama-server/src/storage/web.rs +++ b/rust/agama-server/src/storage/web.rs @@ -21,11 +21,15 @@ use axum::{ use serde::{Deserialize, Serialize}; use tokio_stream::{Stream, StreamExt}; +pub mod dasd; pub mod iscsi; use crate::{ error::Error, - storage::web::iscsi::{iscsi_service, iscsi_stream}, + storage::web::{ + dasd::{dasd_service, dasd_stream}, + iscsi::{iscsi_service, iscsi_stream}, + }, web::{ common::{issues_router, progress_router, service_status_router, EventStreams}, Event, @@ -38,8 +42,10 @@ pub async fn storage_streams(dbus: zbus::Connection) -> Result Result Result Result { + let stream: EventStreams = vec![("dasd_devices", Box::pin(DASDDeviceStream::new(dbus).await?))]; + Ok(stream) +} + +#[derive(Clone)] +struct DASDState<'a> { + client: DASDClient<'a>, +} + +pub async fn dasd_service(dbus: &zbus::Connection) -> Result, ServiceError> { + let client = DASDClient::new(dbus.clone()).await?; + let state = DASDState { client }; + let router = Router::new() + .route("/devices", get(devices)) + .route("/probe", post(probe)) + .route("/format", post(format)) + .route("/enable", post(enable)) + .route("/disable", post(disable)) + .route("/diag", put(set_diag)) + .with_state(state); + Ok(router) +} + +/// Returns the list of known DASD devices. +#[utoipa::path( + get, + path="/devices", + context_path="/api/storage/dasd", + responses( + (status = OK, description = "List of DASD devices", body = Vec) + ) +)] +async fn devices(State(state): State>) -> Result>, Error> { + let devices = state + .client + .devices() + .await? + .into_iter() + .map(|(_path, device)| device) + .collect(); + Ok(Json(devices)) +} + +/// Find DASD devices in the system. +#[utoipa::path( + post, + path="/probe", + context_path="/api/storage/dasd", + responses( + (status = OK, description = "The probing process ran successfully") + ) +)] +async fn probe(State(state): State>) -> Result, Error> { + Ok(Json(state.client.probe().await?)) +} + +/// Formats a set of devices. +#[utoipa::path( + post, + path="/format", + context_path="/api/storage/dasd", + responses( + (status = OK, description = "The formatting process started.") + ) +)] +async fn format( + State(state): State>, + Json(devices): Json, +) -> Result, Error> { + state.client.format(&devices.as_references()).await?; + Ok(Json(())) +} + +/// Enables a set of devices. +#[utoipa::path( + post, + path="/enable", + context_path="/api/storage/dasd", + responses( + (status = OK, description = "The formatting process started.") + ) +)] +async fn enable( + State(state): State>, + Json(devices): Json, +) -> Result, Error> { + state.client.enable(&devices.as_references()).await?; + Ok(Json(())) +} + +/// Disables a set of devices. +#[utoipa::path( + post, + path="/disable", + context_path="/api/storage/dasd", + responses( + (status = OK, description = "The formatting process started.") + ) +)] +async fn disable( + State(state): State>, + Json(devices): Json, +) -> Result, Error> { + state.client.disable(&devices.as_references()).await?; + Ok(Json(())) +} + +/// Sets the diag property for a set of devices. +#[utoipa::path( + put, + path="/diag", + context_path="/api/storage/dasd", + responses( + (status = OK, description = "The formatting process started.") + ) + )] +async fn set_diag( + State(state): State>, + Json(params): Json, +) -> Result, Error> { + state + .client + .set_diag(¶ms.devices.as_references(), params.diag) + .await?; + Ok(Json(())) +} + +#[derive(Deserialize)] +struct SetDiagParams { + #[serde(flatten)] + pub devices: DevicesList, + pub diag: bool, +} + +#[derive(Deserialize)] +struct DevicesList { + devices: Vec, +} + +impl DevicesList { + pub fn as_references(&self) -> Vec<&str> { + self.devices.iter().map(AsRef::as_ref).collect() + } +} diff --git a/rust/agama-server/src/storage/web/dasd/stream.rs b/rust/agama-server/src/storage/web/dasd/stream.rs new file mode 100644 index 0000000000..c1e39f7cc4 --- /dev/null +++ b/rust/agama-server/src/storage/web/dasd/stream.rs @@ -0,0 +1,160 @@ +// FIXME: the code is pretty similar to iscsi::stream. Refactor the stream to reduce the repetition. + +use std::{collections::HashMap, task::Poll}; + +use agama_lib::{ + dbus::get_optional_property, + error::ServiceError, + property_from_dbus, + storage::{client::dasd::DASDClient, model::dasd::DASDDevice}, +}; +use futures_util::{ready, Stream}; +use pin_project::pin_project; +use thiserror::Error; +use tokio::sync::mpsc::unbounded_channel; +use tokio_stream::{wrappers::UnboundedReceiverStream, StreamExt}; +use zbus::zvariant::{ObjectPath, OwnedObjectPath, OwnedValue}; + +use crate::{ + dbus::{DBusObjectChange, DBusObjectChangesStream, ObjectsCache}, + web::Event, +}; + +#[derive(Debug, Error)] +enum DASDDeviceStreamError { + #[error("Service error: {0}")] + Service(#[from] ServiceError), + #[error("Unknown DASD device: {0}")] + UnknownDevice(OwnedObjectPath), +} + +/// This stream listens for changes in the collection of DASD devices and emits +/// the updated objects. +/// +/// It relies on the [DBusObjectChangesStream] stream and uses a cache to avoid holding a bunch of +/// proxy objects. +#[pin_project] +pub struct DASDDeviceStream { + dbus: zbus::Connection, + cache: ObjectsCache, + #[pin] + inner: UnboundedReceiverStream, +} + +impl DASDDeviceStream { + /// Creates a new stream + /// + /// * `dbus`: D-Bus connection to listen on. + pub async fn new(dbus: &zbus::Connection) -> Result { + const MANAGER_PATH: &str = "/org/opensuse/Agama/Storage1"; + const NAMESPACE: &str = "/org/opensuse/Agama/Storage1/dasds"; + + let (tx, rx) = unbounded_channel(); + let mut stream = DBusObjectChangesStream::new( + dbus, + &ObjectPath::from_str_unchecked(MANAGER_PATH), + &ObjectPath::from_str_unchecked(NAMESPACE), + "org.opensuse.Agama.Storage1.DASD.Device", + ) + .await?; + + tokio::spawn(async move { + while let Some(change) = stream.next().await { + let _ = tx.send(change); + } + }); + let rx = UnboundedReceiverStream::new(rx); + + let mut cache: ObjectsCache = Default::default(); + let client = DASDClient::new(dbus.clone()).await?; + for (path, device) in client.devices().await? { + cache.add(path.into(), device); + } + + Ok(Self { + dbus: dbus.clone(), + cache, + inner: rx, + }) + } + + fn update_device<'a>( + cache: &'a mut ObjectsCache, + path: &OwnedObjectPath, + values: &HashMap, + ) -> Result<&'a DASDDevice, ServiceError> { + let device = cache.find_or_create(path); + property_from_dbus!(device, id, "Id", values, str); + property_from_dbus!(device, enabled, "Enabled", values, bool); + property_from_dbus!(device, device_name, "DeviceName", values, str); + property_from_dbus!(device, formatted, "Formatted", values, bool); + property_from_dbus!(device, diag, "Diag", values, bool); + property_from_dbus!(device, status, "Status", values, str); + property_from_dbus!(device, device_type, "Type", values, str); + property_from_dbus!(device, access_type, "AccessType", values, str); + property_from_dbus!(device, partition_info, "PartitionInfo", values, str); + Ok(device) + } + + fn remove_device( + cache: &mut ObjectsCache, + path: &OwnedObjectPath, + ) -> Result { + cache + .remove(path) + .ok_or_else(|| DASDDeviceStreamError::UnknownDevice(path.clone())) + } + + fn handle_change( + cache: &mut ObjectsCache, + change: &DBusObjectChange, + ) -> Result { + match change { + DBusObjectChange::Added(path, values) => { + let device = Self::update_device(cache, path, values)?; + Ok(Event::DASDDeviceAdded { + device: device.clone(), + }) + } + DBusObjectChange::Changed(path, updated) => { + let device = Self::update_device(cache, path, updated)?; + Ok(Event::DASDDeviceChanged { + device: device.clone(), + }) + } + DBusObjectChange::Removed(path) => { + let device = Self::remove_device(cache, path)?; + Ok(Event::DASDDeviceRemoved { device }) + } + } + } +} + +impl Stream for DASDDeviceStream { + type Item = Event; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let mut pinned = self.project(); + + Poll::Ready(loop { + let change = ready!(pinned.inner.as_mut().poll_next(cx)); + let next_value = match change { + Some(change) => { + if let Ok(event) = Self::handle_change(pinned.cache, &change) { + Some(event) + } else { + log::warn!("Could not process change {:?}", &change); + None + } + } + None => break None, + }; + if next_value.is_some() { + break next_value; + } + }) + } +} diff --git a/rust/agama-server/src/web/event.rs b/rust/agama-server/src/web/event.rs index 20075ce00a..765bdea790 100644 --- a/rust/agama-server/src/web/event.rs +++ b/rust/agama-server/src/web/event.rs @@ -1,8 +1,8 @@ use crate::network::model::NetworkChange; use agama_lib::{ localization::model::LocaleConfig, manager::InstallationPhase, - product::RegistrationRequirement, progress::Progress, software::SelectedBy, storage::ISCSINode, - users::FirstUser, + product::RegistrationRequirement, progress::Progress, software::SelectedBy, + storage::model::dasd::DASDDevice, storage::ISCSINode, users::FirstUser, }; use serde::Serialize; use std::collections::HashMap; @@ -77,6 +77,15 @@ pub enum Event { name: Option, ibft: Option, }, + DASDDeviceAdded { + device: DASDDevice, + }, + DASDDeviceChanged { + device: DASDDevice, + }, + DASDDeviceRemoved { + device: DASDDevice, + }, } pub type EventsSender = Sender; From ca965f10ce9c0c3a5451161b5a4e11d2cf0c8f61 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Imobach=20Gonz=C3=A1lez=20Sosa?= Date: Fri, 9 Aug 2024 12:59:20 +0100 Subject: [PATCH 02/97] feat(rust): expose the jobs interface via HTTP --- rust/agama-lib/src/jobs.rs | 35 +++++ rust/agama-lib/src/jobs/client.rs | 53 +++++++ rust/agama-lib/src/lib.rs | 1 + rust/agama-lib/src/proxies.rs | 26 ++++ rust/agama-server/src/web/common.rs | 3 + rust/agama-server/src/web/common/jobs.rs | 187 +++++++++++++++++++++++ rust/agama-server/src/web/event.rs | 11 +- 7 files changed, 315 insertions(+), 1 deletion(-) create mode 100644 rust/agama-lib/src/jobs.rs create mode 100644 rust/agama-lib/src/jobs/client.rs create mode 100644 rust/agama-server/src/web/common/jobs.rs diff --git a/rust/agama-lib/src/jobs.rs b/rust/agama-lib/src/jobs.rs new file mode 100644 index 0000000000..d6ce8bf029 --- /dev/null +++ b/rust/agama-lib/src/jobs.rs @@ -0,0 +1,35 @@ +//! This module implements support for the so-called Jobs. It is a concept hat represents running +//! an external command that may take some time, like formatting a DASD device. It is exposed via +//! D-Bus and, at this time, only the storage service makes use of it. + +use std::collections::HashMap; + +use serde::Serialize; +use zbus::zvariant::OwnedValue; + +use crate::{dbus::get_property, error::ServiceError}; + +pub mod client; + +/// Represents a job. +#[derive(Clone, Debug, Default, Serialize)] +pub struct Job { + /// Artificial job identifier. + pub id: String, + /// Whether the job is running. + pub running: bool, + /// Job exit code. + pub exit_code: u32, +} + +impl TryFrom<&HashMap> for Job { + type Error = ServiceError; + + fn try_from(value: &HashMap) -> Result { + Ok(Job { + running: get_property(value, "Running")?, + exit_code: get_property(value, "ExitCode")?, + ..Default::default() + }) + } +} diff --git a/rust/agama-lib/src/jobs/client.rs b/rust/agama-lib/src/jobs/client.rs new file mode 100644 index 0000000000..c7f02d5ad9 --- /dev/null +++ b/rust/agama-lib/src/jobs/client.rs @@ -0,0 +1,53 @@ +//! Implements a client to access Agama's Jobs API. + +use zbus::{fdo::ObjectManagerProxy, zvariant::OwnedObjectPath, Connection}; + +use crate::error::ServiceError; + +use super::Job; + +#[derive(Clone)] +pub struct JobsClient<'a> { + object_manager_proxy: ObjectManagerProxy<'a>, +} + +impl<'a> JobsClient<'a> { + pub async fn new( + connection: Connection, + destination: &'static str, + path: &'static str, + ) -> Result { + let object_manager_proxy = ObjectManagerProxy::builder(&connection) + .destination(destination)? + .path(path)? + .build() + .await?; + + Ok(Self { + object_manager_proxy, + }) + } + + pub async fn jobs(&self) -> Result, ServiceError> { + let managed_objects = self.object_manager_proxy.get_managed_objects().await?; + + let mut jobs = vec![]; + for (path, ifaces) in managed_objects { + let Some(properties) = ifaces.get("org.opensuse.Agama.Storage1.Job") else { + continue; + }; + + match Job::try_from(properties) { + Ok(mut job) => { + job.id = path.to_string(); + jobs.push((path, job)); + } + Err(error) => { + log::warn!("Not a valid job: {}", error); + } + } + } + + Ok(jobs) + } +} diff --git a/rust/agama-lib/src/lib.rs b/rust/agama-lib/src/lib.rs index 1bbc4c48c8..2a02f0cc63 100644 --- a/rust/agama-lib/src/lib.rs +++ b/rust/agama-lib/src/lib.rs @@ -27,6 +27,7 @@ pub mod auth; pub mod base_http_client; pub mod error; pub mod install_settings; +pub mod jobs; pub mod localization; pub mod manager; pub mod network; diff --git a/rust/agama-lib/src/proxies.rs b/rust/agama-lib/src/proxies.rs index 240327ab59..19157b2f10 100644 --- a/rust/agama-lib/src/proxies.rs +++ b/rust/agama-lib/src/proxies.rs @@ -192,3 +192,29 @@ trait Locale { /// SetLocale method fn set_locale(&self, locale: &str) -> zbus::Result<()>; } + +#[dbus_proxy( + interface = "org.opensuse.Agama.Storage1.Job", + default_service = "org.opensuse.Agama.Storage1", + default_path = "/org/opensuse/Agama/Storage1/jobs" +)] +trait Job { + #[dbus_proxy(property)] + fn running(&self) -> zbus::Result; + + #[dbus_proxy(property)] + fn exit_code(&self) -> zbus::Result; + + #[dbus_proxy(signal)] + fn finished(&self, exit_code: u32) -> zbus::Result<()>; +} + +#[dbus_proxy( + interface = "org.opensuse.Agama.Storage1.DASD.Format", + default_service = "org.opensuse.Agama.Storage1", + default_path = "/org/opensuse/Agama/Storage1/jobs/1" +)] +trait FormatJob { + #[dbus_proxy(property)] + fn summary(&self) -> zbus::Result>; +} diff --git a/rust/agama-server/src/web/common.rs b/rust/agama-server/src/web/common.rs index 814a811811..d54cd4b6b8 100644 --- a/rust/agama-server/src/web/common.rs +++ b/rust/agama-server/src/web/common.rs @@ -15,6 +15,9 @@ use zbus::PropertyStream; use crate::error::Error; +mod jobs; +pub use jobs::{jobs_router, jobs_stream}; + use super::Event; pub type EventStreams = Vec<(&'static str, Pin + Send>>)>; diff --git a/rust/agama-server/src/web/common/jobs.rs b/rust/agama-server/src/web/common/jobs.rs new file mode 100644 index 0000000000..cd9b1a8811 --- /dev/null +++ b/rust/agama-server/src/web/common/jobs.rs @@ -0,0 +1,187 @@ +use std::{collections::HashMap, task::Poll}; + +use agama_lib::{ + dbus::get_optional_property, + error::ServiceError, + jobs::{client::JobsClient, Job}, + property_from_dbus, +}; +use axum::{extract::State, routing::get, Json, Router}; +use futures_util::{ready, Stream}; +use pin_project::pin_project; +use tokio::sync::mpsc::unbounded_channel; +use tokio_stream::{wrappers::UnboundedReceiverStream, StreamExt}; +use zbus::zvariant::{ObjectPath, OwnedObjectPath, OwnedValue}; + +use crate::{ + dbus::{DBusObjectChange, DBusObjectChangesStream, ObjectsCache}, + error::Error, + web::Event, +}; + +use super::EventStreams; + +/// Builds a router for the jobs objects. +pub async fn jobs_router( + dbus: &zbus::Connection, + destination: &'static str, + path: &'static str, +) -> Result, ServiceError> { + let client = JobsClient::new(dbus.clone(), destination, path).await?; + let state = JobsState { client }; + Ok(Router::new().route("/jobs", get(jobs)).with_state(state)) +} + +#[derive(Clone)] +struct JobsState<'a> { + client: JobsClient<'a>, +} + +async fn jobs(State(state): State>) -> Result>, Error> { + let jobs = state + .client + .jobs() + .await? + .into_iter() + .map(|(_path, job)| job) + .collect(); + Ok(Json(jobs)) +} + +/// Returns the stream of jobs-related events. +/// +/// The stream combines the following events: +/// +/// * Changes on the DASD devices collection. +/// +/// * `dbus`: D-Bus connection to use. +pub async fn jobs_stream( + dbus: &zbus::Connection, + manager: &'static str, + namespace: &'static str, +) -> Result { + let jobs_stream = JobsStream::new(dbus, manager, namespace).await?; + let stream: EventStreams = vec![("jobs", Box::pin(jobs_stream))]; + Ok(stream) +} + +#[pin_project] +pub struct JobsStream { + dbus: zbus::Connection, + cache: ObjectsCache, + #[pin] + inner: UnboundedReceiverStream, +} + +#[derive(Debug, thiserror::Error)] +enum JobsStreamError { + #[error("Service error: {0}")] + Service(#[from] ServiceError), + #[error("Unknown job: {0}")] + UnknownJob(OwnedObjectPath), +} + +impl JobsStream { + pub async fn new( + dbus: &zbus::Connection, + manager: &'static str, + namespace: &'static str, + ) -> Result { + let (tx, rx) = unbounded_channel(); + let mut stream = DBusObjectChangesStream::new( + dbus, + &ObjectPath::from_static_str(manager)?, + &ObjectPath::from_static_str(namespace)?, + "org.opensuse.Agama.Storage1.Job", + ) + .await?; + + tokio::spawn(async move { + while let Some(change) = stream.next().await { + let _ = tx.send(change); + } + }); + let rx = UnboundedReceiverStream::new(rx); + + let mut cache: ObjectsCache = Default::default(); + let client = JobsClient::new(dbus.clone(), manager, namespace).await?; + for (path, job) in client.jobs().await? { + cache.add(path.into(), job); + } + + Ok(Self { + dbus: dbus.clone(), + cache, + inner: rx, + }) + } + + fn update_job<'a>( + cache: &'a mut ObjectsCache, + path: &OwnedObjectPath, + values: &HashMap, + ) -> Result<&'a Job, ServiceError> { + let job = cache.find_or_create(path); + property_from_dbus!(job, running, "Running", values, bool); + property_from_dbus!(job, exit_code, "ExitCode", values, u32); + Ok(job) + } + + fn remove_job( + cache: &mut ObjectsCache, + path: &OwnedObjectPath, + ) -> Result { + cache + .remove(path) + .ok_or_else(|| JobsStreamError::UnknownJob(path.clone())) + } + + fn handle_change( + cache: &mut ObjectsCache, + change: &DBusObjectChange, + ) -> Result { + match change { + DBusObjectChange::Added(path, values) => { + let job = Self::update_job(cache, path, values)?; + Ok(Event::JobAdded { job: job.clone() }) + } + DBusObjectChange::Changed(path, updated) => { + let job = Self::update_job(cache, path, updated)?; + Ok(Event::JobChanged { job: job.clone() }) + } + DBusObjectChange::Removed(path) => { + let job = Self::remove_job(cache, path)?; + Ok(Event::JobRemoved { job }) + } + } + } +} + +impl Stream for JobsStream { + type Item = Event; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let mut pinned = self.project(); + + Poll::Ready(loop { + let change = ready!(pinned.inner.as_mut().poll_next(cx)); + let next_value = match change { + Some(change) => { + if let Ok(event) = Self::handle_change(pinned.cache, &change) { + Some(event) + } else { + log::warn!("Could not process change {:?}", &change); + None + } + } + None => break None, + }; + if next_value.is_some() { + break next_value; + } + }) + } +} diff --git a/rust/agama-server/src/web/event.rs b/rust/agama-server/src/web/event.rs index 765bdea790..990eb987eb 100644 --- a/rust/agama-server/src/web/event.rs +++ b/rust/agama-server/src/web/event.rs @@ -1,6 +1,6 @@ use crate::network::model::NetworkChange; use agama_lib::{ - localization::model::LocaleConfig, manager::InstallationPhase, + jobs::Job, localization::model::LocaleConfig, manager::InstallationPhase, product::RegistrationRequirement, progress::Progress, software::SelectedBy, storage::model::dasd::DASDDevice, storage::ISCSINode, users::FirstUser, }; @@ -86,6 +86,15 @@ pub enum Event { DASDDeviceRemoved { device: DASDDevice, }, + JobAdded { + job: Job, + }, + JobChanged { + job: Job, + }, + JobRemoved { + job: Job, + }, } pub type EventsSender = Sender; From 92c8770eb658bcf57dc03e5677b80c395c0b4449 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Imobach=20Gonz=C3=A1lez=20Sosa?= Date: Sat, 10 Aug 2024 21:39:02 +0100 Subject: [PATCH 03/97] feat(rust): expose storage jobs via HTTP --- rust/agama-server/src/storage/web.rs | 7 ++++++- rust/agama-server/src/web/common.rs | 2 +- rust/agama-server/src/web/common/jobs.rs | 19 +++++++++---------- 3 files changed, 16 insertions(+), 12 deletions(-) diff --git a/rust/agama-server/src/storage/web.rs b/rust/agama-server/src/storage/web.rs index 941bfd51ff..0fc935f346 100644 --- a/rust/agama-server/src/storage/web.rs +++ b/rust/agama-server/src/storage/web.rs @@ -31,7 +31,9 @@ use crate::{ iscsi::{iscsi_service, iscsi_stream}, }, web::{ - common::{issues_router, progress_router, service_status_router, EventStreams}, + common::{ + issues_router, jobs_service, progress_router, service_status_router, EventStreams, + }, Event, }, }; @@ -73,12 +75,14 @@ struct StorageState<'a> { pub async fn storage_service(dbus: zbus::Connection) -> Result { const DBUS_SERVICE: &str = "org.opensuse.Agama.Storage1"; const DBUS_PATH: &str = "/org/opensuse/Agama/Storage1"; + const DBUS_DESTINATION: &str = "org.opensuse.Agama.Storage1"; let status_router = service_status_router(&dbus, DBUS_SERVICE, DBUS_PATH).await?; let progress_router = progress_router(&dbus, DBUS_SERVICE, DBUS_PATH).await?; let issues_router = issues_router(&dbus, DBUS_SERVICE, DBUS_PATH).await?; let iscsi_router = iscsi_service(&dbus).await?; let dasd_router = dasd_service(&dbus).await?; + let jobs_router = jobs_service(&dbus, DBUS_DESTINATION, DBUS_PATH).await?; let client = StorageClient::new(dbus.clone()).await?; let state = StorageState { client }; @@ -97,6 +101,7 @@ pub async fn storage_service(dbus: zbus::Connection) -> Result( +pub async fn jobs_service( dbus: &zbus::Connection, destination: &'static str, path: &'static str, @@ -56,13 +54,13 @@ async fn jobs(State(state): State>) -> Result>, Erro /// /// * `dbus`: D-Bus connection to use. pub async fn jobs_stream( - dbus: &zbus::Connection, + dbus: zbus::Connection, + destination: &'static str, manager: &'static str, namespace: &'static str, -) -> Result { - let jobs_stream = JobsStream::new(dbus, manager, namespace).await?; - let stream: EventStreams = vec![("jobs", Box::pin(jobs_stream))]; - Ok(stream) +) -> Result + Send>>, Error> { + let stream = JobsStream::new(&dbus, destination, manager, namespace).await?; + Ok(Box::pin(stream)) } #[pin_project] @@ -84,6 +82,7 @@ enum JobsStreamError { impl JobsStream { pub async fn new( dbus: &zbus::Connection, + destination: &'static str, manager: &'static str, namespace: &'static str, ) -> Result { @@ -104,7 +103,7 @@ impl JobsStream { let rx = UnboundedReceiverStream::new(rx); let mut cache: ObjectsCache = Default::default(); - let client = JobsClient::new(dbus.clone(), manager, namespace).await?; + let client = JobsClient::new(dbus.clone(), destination, manager).await?; for (path, job) in client.jobs().await? { cache.add(path.into(), job); } From e7a6ceaa7cc5cfea6cc459959bb1bc458ccfb01e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Imobach=20Gonz=C3=A1lez=20Sosa?= Date: Fri, 9 Aug 2024 14:55:09 +0100 Subject: [PATCH 04/97] feat(rust): emit DASD format progress --- rust/agama-server/src/storage/web/dasd.rs | 10 +- .../src/storage/web/dasd/stream.rs | 99 ++++++++++++++++++- rust/agama-server/src/web/event.rs | 6 ++ 3 files changed, 111 insertions(+), 4 deletions(-) diff --git a/rust/agama-server/src/storage/web/dasd.rs b/rust/agama-server/src/storage/web/dasd.rs index 72a85075d6..f8c3848c69 100644 --- a/rust/agama-server/src/storage/web/dasd.rs +++ b/rust/agama-server/src/storage/web/dasd.rs @@ -18,7 +18,7 @@ use serde::Deserialize; use crate::{error::Error, web::common::EventStreams}; -use self::stream::DASDDeviceStream; +use self::stream::{DASDDeviceStream, DASDFormatJobStream}; mod stream; @@ -30,7 +30,13 @@ mod stream; /// /// * `dbus`: D-Bus connection to use. pub async fn dasd_stream(dbus: &zbus::Connection) -> Result { - let stream: EventStreams = vec![("dasd_devices", Box::pin(DASDDeviceStream::new(dbus).await?))]; + let stream: EventStreams = vec![ + ("dasd_devices", Box::pin(DASDDeviceStream::new(dbus).await?)), + ( + "format_jobs", + Box::pin(DASDFormatJobStream::new(dbus).await?), + ), + ]; Ok(stream) } diff --git a/rust/agama-server/src/storage/web/dasd/stream.rs b/rust/agama-server/src/storage/web/dasd/stream.rs index c1e39f7cc4..cfe801fdee 100644 --- a/rust/agama-server/src/storage/web/dasd/stream.rs +++ b/rust/agama-server/src/storage/web/dasd/stream.rs @@ -1,6 +1,6 @@ // FIXME: the code is pretty similar to iscsi::stream. Refactor the stream to reduce the repetition. -use std::{collections::HashMap, task::Poll}; +use std::{collections::HashMap, sync::Arc, task::Poll}; use agama_lib::{ dbus::get_optional_property, @@ -13,7 +13,11 @@ use pin_project::pin_project; use thiserror::Error; use tokio::sync::mpsc::unbounded_channel; use tokio_stream::{wrappers::UnboundedReceiverStream, StreamExt}; -use zbus::zvariant::{ObjectPath, OwnedObjectPath, OwnedValue}; +use zbus::{ + fdo::{PropertiesChanged, PropertiesChangedArgs}, + zvariant::{self, ObjectPath, OwnedObjectPath, OwnedValue}, + MatchRule, Message, MessageStream, MessageType, +}; use crate::{ dbus::{DBusObjectChange, DBusObjectChangesStream, ObjectsCache}, @@ -158,3 +162,94 @@ impl Stream for DASDDeviceStream { }) } } + +/// This stream listens for DASD progress changes and emits an [Event::DASDFormatJobChanged] event. +#[pin_project] +pub struct DASDFormatJobStream { + #[pin] + inner: MessageStream, +} + +impl DASDFormatJobStream { + pub async fn new(connection: &zbus::Connection) -> Result { + let rule = MatchRule::builder() + .msg_type(MessageType::Signal) + .path_namespace("/org/opensuse/Agama/Storage1/jobs")? + .interface("org.freedesktop.DBus.Properties")? + .member("PropertiesChanged")? + .build(); + let inner = MessageStream::for_match_rule(rule, connection, None).await?; + Ok(Self { inner }) + } + + fn handle_change(message: Result, zbus::Error>) -> Option { + let Ok(message) = message else { + return None; + }; + let properties = PropertiesChanged::from_message(message)?; + let args = properties.args().ok()?; + + if args.interface_name.as_str() != "org.opensuse.Agama.Storage1.DASD.Format" { + return None; + } + + let id = properties.path()?.to_string(); + let event = Self::to_event(id, &args); + if event.is_none() { + log::warn!("Could not decode the DASDFormatJobChanged event"); + } + event + } + + fn to_event(path: String, properties_changed: &PropertiesChangedArgs) -> Option { + let dict = properties_changed + .changed_properties() + .get("Summary")? + .downcast_ref::()?; + + // the key is the D-Bus path of the DASD device and the value is the progress + // of the related formatting process + let map = >>::try_from(dict.clone()).ok()?; + + let summary = map.values().next()?; + let summary = summary.downcast_ref::()?; + let fields = summary.fields(); + let total: &u32 = fields.get(0)?.downcast_ref()?; + let step: &u32 = fields.get(1)?.downcast_ref()?; + let done: &bool = fields.get(2)?.downcast_ref()?; + Some(Event::DASDFormatJobChanged { + job_id: path.to_string(), + total: total.clone(), + step: step.clone(), + done: done.clone(), + }) + } +} + +impl Stream for DASDFormatJobStream { + type Item = Event; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let mut pinned = self.project(); + + Poll::Ready(loop { + let item = ready!(pinned.inner.as_mut().poll_next(cx)); + let next_value = match item { + Some(change) => { + if let Some(event) = Self::handle_change(change) { + Some(event) + } else { + None + } + } + None => break None, + }; + if next_value.is_some() { + break next_value; + } + }) + } +} diff --git a/rust/agama-server/src/web/event.rs b/rust/agama-server/src/web/event.rs index 990eb987eb..67343c9730 100644 --- a/rust/agama-server/src/web/event.rs +++ b/rust/agama-server/src/web/event.rs @@ -95,6 +95,12 @@ pub enum Event { JobRemoved { job: Job, }, + DASDFormatJobChanged { + job_id: String, + total: u32, + step: u32, + done: bool, + }, } pub type EventsSender = Sender; From 40280bcf12e213fba0b424ee95805d7b6d41a2df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Imobach=20Gonz=C3=A1lez=20Sosa?= Date: Sat, 10 Aug 2024 23:15:16 +0100 Subject: [PATCH 05/97] feat(rust): emit storage jobs events --- rust/agama-server/src/web.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/rust/agama-server/src/web.rs b/rust/agama-server/src/web.rs index 3025f17b9b..07926d98e4 100644 --- a/rust/agama-server/src/web.rs +++ b/rust/agama-server/src/web.rs @@ -13,7 +13,7 @@ use crate::{ software::web::{software_service, software_streams}, storage::web::{storage_service, storage_streams}, users::web::{users_service, users_streams}, - web::common::{issues_stream, progress_stream, service_status_stream}, + web::common::{issues_stream, jobs_stream, progress_stream, service_status_stream}, }; use axum::Router; @@ -141,6 +141,16 @@ async fn run_events_monitor(dbus: zbus::Connection, events: EventsSender) -> Res ) .await?, ); + stream.insert( + "storage-jobs", + jobs_stream( + dbus.clone(), + "org.opensuse.Agama.Storage1", + "/org/opensuse/Agama/Storage1", + "/org/opensuse/Agama/Storage1/jobs", + ) + .await?, + ); stream.insert( "software-status", service_status_stream( From 8f287c475bb937ba438732c5c19e923a25161091 Mon Sep 17 00:00:00 2001 From: YaST Bot Date: Sun, 18 Aug 2024 02:52:18 +0000 Subject: [PATCH 06/97] Update web PO files Agama-weblate commit: c60662df7a490ce910a5197448a392f5cf6cf16f --- web/po/ca.po | 472 ++++++++++-------------- web/po/cs.po | 468 ++++++++++-------------- web/po/de.po | 492 ++++++++++--------------- web/po/es.po | 471 ++++++++++-------------- web/po/fr.po | 801 ++++++++++++++++------------------------- web/po/id.po | 472 +++++++++--------------- web/po/ja.po | 472 ++++++++++-------------- web/po/ka.po | 435 ++++++++-------------- web/po/mk.po | 322 ++--------------- web/po/nb_NO.po | 473 ++++++++++-------------- web/po/nl.po | 475 +++++++++--------------- web/po/pt_BR.po | 471 ++++++++++-------------- web/po/ru.po | 470 ++++++++++-------------- web/po/sv.po | 470 ++++++++++-------------- web/po/tr.po | 345 +++--------------- web/po/uk.po | 322 ++--------------- web/po/zh_Hans.po | 466 ++++++++++-------------- web/src/languages.json | 1 + 18 files changed, 2807 insertions(+), 5091 deletions(-) diff --git a/web/po/ca.po b/web/po/ca.po index a5ee913a8a..1fdca856a7 100644 --- a/web/po/ca.po +++ b/web/po/ca.po @@ -7,7 +7,7 @@ msgid "" msgstr "" "Project-Id-Version: PACKAGE VERSION\n" "Report-Msgid-Bugs-To: \n" -"POT-Creation-Date: 2024-08-08 02:27+0000\n" +"POT-Creation-Date: 2024-08-18 02:29+0000\n" "PO-Revision-Date: 2024-07-25 08:46+0000\n" "Last-Translator: David Medina \n" "Language-Team: Catalan