Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incorporate Zeusd for CPU and DRAM monitoring in ZeusMonitor #150

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 84 additions & 1 deletion zeus/device/cpu/rapl.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@
import os
import time
import warnings
from pathlib import Path
from functools import lru_cache
from glob import glob
from multiprocessing.sharedctypes import Synchronized
from typing import Sequence

import httpx

import zeus.device.cpu.common as cpu_common
from zeus.device.cpu.common import CpuDramMeasurement
from zeus.device.exception import ZeusBaseCPUError
from zeus.device.exception import ZeusdError
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please merge this with the line above.

from zeus.utils.logging import get_logger

logger = get_logger(name=__name__)
Expand Down Expand Up @@ -261,6 +265,65 @@ def supportsGetDramEnergyConsumption(self) -> bool:
return self.dram is not None


class ZeusdRAPLCPU(RAPLCPU):
"""A RAPLCPU that interfaces with RAPL via zeusd.

The parent RAPLCPU class requires root privileges to interface with RAPL.
ZeusdRAPLCPU (this class) overrides RAPLCPU's methods so that they instead send requests
to the Zeus daemon, which will interface with RAPL on behalf of ZeusdRAPLCPU. As a result,
ZeusdRAPLCPU does not need root privileges to monitor CPU and DRAM energy consumption.

See [here](https://ml.energy/zeus/getting_started/#system-privileges)
for details on system privileges required.
"""

def __init__(
self,
cpu_index: int,
zeusd_sock_path: str = "/var/run/zeusd.sock",
) -> None:
"""Initialize the Intel CPU with a specified index."""
self.cpu_index = cpu_index
jaywonchung marked this conversation as resolved.
Show resolved Hide resolved

self._client = httpx.Client(transport=httpx.HTTPTransport(uds=zeusd_sock_path))
self._url_prefix = f"http://zeusd/cpu/{cpu_index}"

def getTotalEnergyConsumption(self) -> CpuDramMeasurement:
"""Returns the total energy consumption of the specified powerzone. Units: mJ."""
resp = self._client.post(
self._url_prefix + "/get_index_energy",
json={
"cpu": True,
"dram": True,
},
)
if resp.status_code != 200:
raise ZeusdError(f"Failed to get total energy consumption: {resp.text}")

data = resp.json()
cpu_uj = data.get("cpu_energy_uj")
dram_uj = data.get("dram_energy_uj")
cpu_mj = None if cpu_uj is None else cpu_uj / 1000
dram_mj = None if dram_uj is None else dram_uj / 1000
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wbjin Quick question -- Here we've constructed the request with cpu and dram both set to True. In that case, can the response cpu_energy_uj ever be None? Because if there was some error from the daemon-side, the error would be propagated back to the Python client, so the status code won't be 200. On the other hand, dram may actually be None because the underlying RAPL device may not support DRAM measurement. Just wanted to confirm this with you.

Assuming that the above is true, cpu_energy_uj somehow ending up as a None means something went wrong. Zeus should error out.

Suggested change
cpu_uj = data.get("cpu_energy_uj")
dram_uj = data.get("dram_energy_uj")
cpu_mj = None if cpu_uj is None else cpu_uj / 1000
dram_mj = None if dram_uj is None else dram_uj / 1000
cpu_mj = data["cpu_energy_uj"] / 1000
dram_uj = data.get("dram_energy_uj")
dram_mj = dram_uj / 1000 if dram_uj is not None else None

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for pointing that out -- this was my mistake. I don't think cpu_energy_uj can ever be None here if zeusd is working correctly. But @wbjin please confirm if this sounds right!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might go even further here. In the constructor, we can call supportsGetDramEnergyConsumption (via the daemon) once and cache the result inside the class. Subsequent calls to supportsGetDramEnergyConsumption will return the cached value. Also, in getTotalEnergyConsumption, we would be able to know whether None is a valid response value for the dram field.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, if cpu and dram are set to be both True in the request, cpu_energy_uj shouldn't be None while dram_energy_uj can be None if dram isn't supported.


return CpuDramMeasurement(cpu_mj=cpu_mj, dram_mj=dram_mj)

def supportsGetDramEnergyConsumption(self) -> bool:
"""Returns True if the specified CPU powerzone supports retrieving the subpackage energy consumption."""
resp = self._client.get(
self._url_prefix + "/supportsDramEnergy",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All other routes use camel case for the path. Please make this camel case as well.

)
if resp.status_code != 200:
raise ZeusdError(
f"Failed to get whether DRAM energy is supported: {resp.text}"
)
data = resp.json()
dram_available = data.get("dram_available")
if dram_available is None:
raise ZeusdError("Failed to get whether DRAM energy is supported.")
return dram_available


class RAPLCPUs(cpu_common.CPUs):
"""RAPL CPU Manager object, containing individual RAPLCPU objects, abstracting RAPL calls and handling related exceptions."""

Expand All @@ -281,13 +344,33 @@ def _init_cpus(self) -> None:
"""Initialize all Intel CPUs."""
self._cpus = []

cpu_indices = []

def sort_key(dir):
return int(dir.split(":")[1])

for dir in sorted(glob(f"{self.rapl_dir}/intel-rapl:*"), key=sort_key):
parts = dir.split(":")
if len(parts) > 1 and parts[1].isdigit():
self._cpus.append(RAPLCPU(int(parts[1]), self.rapl_dir))
cpu_indices.append(int(parts[1]))

# If `ZEUSD_SOCK_PATH` is set, always use ZeusdRAPLCPU
if (sock_path := os.environ.get("ZEUSD_SOCK_PATH")) is not None:
if not Path(sock_path).exists():
raise ZeusdError(
f"ZEUSD_SOCK_PATH points to non-existent file: {sock_path}"
)
if not Path(sock_path).is_socket():
raise ZeusdError(f"ZEUSD_SOCK_PATH is not a socket: {sock_path}")
if not os.access(sock_path, os.W_OK):
raise ZeusdError(f"ZEUSD_SOCK_PATH is not writable: {sock_path}")
self._cpus = [
ZeusdRAPLCPU(cpu_index, sock_path) for cpu_index in cpu_indices
]
else:
self._cpus = [
RAPLCPU(cpu_index, self.rapl_dir) for cpu_index in cpu_indices
]

def __del__(self) -> None:
"""Shuts down the Intel CPU monitoring."""
Expand Down
38 changes: 30 additions & 8 deletions zeusd/src/devices/cpu/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,19 @@ pub struct RaplResponse {
pub dram_energy_uj: Option<u64>,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct DramResponse {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's rename this to DramAvailabilityResponse.

pub dram_available: bool,
}

/// Unified CPU response type
#[derive(Serialize, Deserialize, Debug)]
#[serde(untagged)]
pub enum CpuResponse {
Rapl(RaplResponse),
Dram(DramResponse),
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very clean. Good job!

pub trait CpuManager {
/// Get the number of CPUs available.
fn device_count() -> Result<usize, ZeusdError>;
Expand All @@ -55,7 +68,7 @@ pub trait CpuManager {

pub type CpuCommandRequest = (
CpuCommand,
Option<Sender<Result<RaplResponse, ZeusdError>>>,
Option<Sender<Result<CpuResponse, ZeusdError>>>,
Instant,
Span,
);
Expand Down Expand Up @@ -89,7 +102,7 @@ impl CpuManagementTasks {
cpu_id: usize,
command: CpuCommand,
request_start_time: Instant,
) -> Result<RaplResponse, ZeusdError> {
) -> Result<CpuResponse, ZeusdError> {
if cpu_id >= self.senders.len() {
return Err(ZeusdError::CpuNotFoundError(cpu_id));
}
Expand Down Expand Up @@ -128,6 +141,8 @@ impl CpuManagementTasks {
pub enum CpuCommand {
/// Get the CPU and DRAM energy measurement for the CPU index
GetIndexEnergy { cpu: bool, dram: bool },
/// Return if the specified CPU supports DRAM energy measurement
SupportsDramEnergy,
/// Stop the monitoring task for CPU and DRAM if they have been started.
StopMonitoring,
}
Expand Down Expand Up @@ -156,7 +171,7 @@ impl CpuCommand {
&self,
device: &mut T,
_request_arrival_time: Instant,
) -> Result<RaplResponse, ZeusdError>
) -> Result<CpuResponse, ZeusdError>
where
T: CpuManager,
{
Expand All @@ -172,17 +187,24 @@ impl CpuCommand {
} else {
None
};
Ok(RaplResponse {
// Wrap the RaplResponse in CpuResponse::Rapl
Ok(CpuResponse::Rapl(RaplResponse {
cpu_energy_uj,
dram_energy_uj,
})
}))
}
Self::SupportsDramEnergy => {
// Wrap the DramResponse in CpuResponse::Dram
Ok(CpuResponse::Dram(DramResponse {
dram_available: device.is_dram_available(),
}))
}
Self::StopMonitoring {} => {
Self::StopMonitoring => {
device.stop_monitoring();
Ok(RaplResponse {
Ok(CpuResponse::Rapl(RaplResponse {
cpu_energy_uj: Some(0),
dram_energy_uj: Some(0),
})
}))
}
}
}
Expand Down
23 changes: 23 additions & 0 deletions zeusd/src/routes/cpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,29 @@ async fn get_index_energy_handler(
Ok(HttpResponse::Ok().json(measurement))
}

#[actix_web::get("/{cpu_id}/supportsDramEnergy")]
#[tracing::instrument(
skip(cpu_id, _device_tasks),
fields(
cpu_id = %cpu_id,
)
)]
Comment on lines +52 to +57
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like cpu_id shouldn't be in the skip list. Applies to other route handlers as well. @wbjin WDYT?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it would be useful to be able to see the cpu_id for the request. I think the same applies for /src/routes/gpu.rs gpu routes as well, it skips the gpu_id

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Let me handle gpu_ids in a separate PR.

async fn supports_dram_energy_handler(
cpu_id: web::Path<usize>,
_device_tasks: web::Data<CpuManagementTasks>,
) -> Result<HttpResponse, ZeusdError> {
let now = Instant::now();
tracing::info!("Received request");
let cpu_id = cpu_id.into_inner();

let answer = _device_tasks
.send_command_blocking(cpu_id, CpuCommand::SupportsDramEnergy, now)
.await?;

Ok(HttpResponse::Ok().json(answer))
}

pub fn cpu_routes(cfg: &mut web::ServiceConfig) {
cfg.service(get_index_energy_handler);
cfg.service(supports_dram_energy_handler);
}
19 changes: 19 additions & 0 deletions zeusd/tests/cpu.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod helpers;

use zeusd::devices::cpu::DramResponse;
use zeusd::devices::cpu::RaplResponse;
use zeusd::routes::cpu::GetIndexEnergy;

Expand Down Expand Up @@ -154,3 +155,21 @@ async fn test_invalid_requests() {
.expect("Failed to send request");
assert_eq!(resp.status(), 400);
}

#[tokio::test]
async fn test_supports_dram_energy() {
let app = TestApp::start().await;
let url = format!("http://127.0.0.1:{}/cpu/0/supportsDramEnergy", app.port);
let client = reqwest::Client::new();

let resp = client
.get(url)
.send()
.await
.expect("Failed to send request");
assert_eq!(resp.status(), 200);

let dram_response: DramResponse = serde_json::from_str(&resp.text().await.unwrap())
.expect("Failed to deserialize response body");
assert_eq!(dram_response.dram_available, true);
}
2 changes: 1 addition & 1 deletion zeusd/tests/helpers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ impl_zeusd_request_cpu!(GetIndexEnergy);
/// A test application that starts a server over TCP and provides helper methods
/// for sending requests and fetching what happened to the fake GPUs.
pub struct TestApp {
port: u16,
pub port: u16,
observers: Vec<TestGpuObserver>,
cpu_injectors: Vec<TestCpuInjector>,
}
Expand Down
Loading