-
Notifications
You must be signed in to change notification settings - Fork 29
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Incorporate Zeusd for CPU and DRAM monitoring in ZeusMonitor #150
base: master
Are you sure you want to change the base?
Changes from 14 commits
b75e892
9959712
bf67e56
c57c02a
c9440f3
465c88e
863d802
9dff236
f6b0dd1
557f6c7
db2d474
ce89581
5e57954
ea2d8ee
97d9f0c
552fafa
4ea715c
71189a8
4cbe58f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -18,14 +18,18 @@ | |||||||||||||||||
import os | ||||||||||||||||||
import time | ||||||||||||||||||
import warnings | ||||||||||||||||||
from pathlib import Path | ||||||||||||||||||
from functools import lru_cache | ||||||||||||||||||
from glob import glob | ||||||||||||||||||
from multiprocessing.sharedctypes import Synchronized | ||||||||||||||||||
from typing import Sequence | ||||||||||||||||||
|
||||||||||||||||||
import httpx | ||||||||||||||||||
|
||||||||||||||||||
import zeus.device.cpu.common as cpu_common | ||||||||||||||||||
from zeus.device.cpu.common import CpuDramMeasurement | ||||||||||||||||||
from zeus.device.exception import ZeusBaseCPUError | ||||||||||||||||||
from zeus.device.exception import ZeusdError | ||||||||||||||||||
from zeus.utils.logging import get_logger | ||||||||||||||||||
|
||||||||||||||||||
logger = get_logger(name=__name__) | ||||||||||||||||||
|
@@ -261,6 +265,65 @@ def supportsGetDramEnergyConsumption(self) -> bool: | |||||||||||||||||
return self.dram is not None | ||||||||||||||||||
|
||||||||||||||||||
|
||||||||||||||||||
class ZeusdRAPLCPU(RAPLCPU): | ||||||||||||||||||
"""A RAPLCPU that interfaces with RAPL via zeusd. | ||||||||||||||||||
|
||||||||||||||||||
The parent RAPLCPU class requires root privileges to interface with RAPL. | ||||||||||||||||||
ZeusdRAPLCPU (this class) overrides RAPLCPU's methods so that they instead send requests | ||||||||||||||||||
to the Zeus daemon, which will interface with RAPL on behalf of ZeusdRAPLCPU. As a result, | ||||||||||||||||||
ZeusdRAPLCPU does not need root privileges to monitor CPU and DRAM energy consumption. | ||||||||||||||||||
|
||||||||||||||||||
See [here](https://ml.energy/zeus/getting_started/#system-privileges) | ||||||||||||||||||
for details on system privileges required. | ||||||||||||||||||
""" | ||||||||||||||||||
|
||||||||||||||||||
def __init__( | ||||||||||||||||||
self, | ||||||||||||||||||
cpu_index: int, | ||||||||||||||||||
zeusd_sock_path: str = "/var/run/zeusd.sock", | ||||||||||||||||||
) -> None: | ||||||||||||||||||
"""Initialize the Intel CPU with a specified index.""" | ||||||||||||||||||
self.cpu_index = cpu_index | ||||||||||||||||||
jaywonchung marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||
|
||||||||||||||||||
self._client = httpx.Client(transport=httpx.HTTPTransport(uds=zeusd_sock_path)) | ||||||||||||||||||
self._url_prefix = f"http://zeusd/cpu/{cpu_index}" | ||||||||||||||||||
|
||||||||||||||||||
def getTotalEnergyConsumption(self) -> CpuDramMeasurement: | ||||||||||||||||||
"""Returns the total energy consumption of the specified powerzone. Units: mJ.""" | ||||||||||||||||||
resp = self._client.post( | ||||||||||||||||||
self._url_prefix + "/get_index_energy", | ||||||||||||||||||
json={ | ||||||||||||||||||
"cpu": True, | ||||||||||||||||||
"dram": True, | ||||||||||||||||||
}, | ||||||||||||||||||
) | ||||||||||||||||||
if resp.status_code != 200: | ||||||||||||||||||
raise ZeusdError(f"Failed to get total energy consumption: {resp.text}") | ||||||||||||||||||
|
||||||||||||||||||
data = resp.json() | ||||||||||||||||||
cpu_uj = data.get("cpu_energy_uj") | ||||||||||||||||||
dram_uj = data.get("dram_energy_uj") | ||||||||||||||||||
cpu_mj = None if cpu_uj is None else cpu_uj / 1000 | ||||||||||||||||||
dram_mj = None if dram_uj is None else dram_uj / 1000 | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @wbjin Quick question -- Here we've constructed the request with Assuming that the above is true,
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you for pointing that out -- this was my mistake. I don't think There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We might go even further here. In the constructor, we can call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yup, if |
||||||||||||||||||
|
||||||||||||||||||
return CpuDramMeasurement(cpu_mj=cpu_mj, dram_mj=dram_mj) | ||||||||||||||||||
|
||||||||||||||||||
def supportsGetDramEnergyConsumption(self) -> bool: | ||||||||||||||||||
"""Returns True if the specified CPU powerzone supports retrieving the subpackage energy consumption.""" | ||||||||||||||||||
resp = self._client.get( | ||||||||||||||||||
self._url_prefix + "/supportsDramEnergy", | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All other routes use camel case for the path. Please make this camel case as well. |
||||||||||||||||||
) | ||||||||||||||||||
if resp.status_code != 200: | ||||||||||||||||||
raise ZeusdError( | ||||||||||||||||||
f"Failed to get whether DRAM energy is supported: {resp.text}" | ||||||||||||||||||
) | ||||||||||||||||||
data = resp.json() | ||||||||||||||||||
dram_available = data.get("dram_available") | ||||||||||||||||||
if dram_available is None: | ||||||||||||||||||
raise ZeusdError("Failed to get whether DRAM energy is supported.") | ||||||||||||||||||
return dram_available | ||||||||||||||||||
|
||||||||||||||||||
|
||||||||||||||||||
class RAPLCPUs(cpu_common.CPUs): | ||||||||||||||||||
"""RAPL CPU Manager object, containing individual RAPLCPU objects, abstracting RAPL calls and handling related exceptions.""" | ||||||||||||||||||
|
||||||||||||||||||
|
@@ -281,13 +344,33 @@ def _init_cpus(self) -> None: | |||||||||||||||||
"""Initialize all Intel CPUs.""" | ||||||||||||||||||
self._cpus = [] | ||||||||||||||||||
|
||||||||||||||||||
cpu_indices = [] | ||||||||||||||||||
|
||||||||||||||||||
def sort_key(dir): | ||||||||||||||||||
return int(dir.split(":")[1]) | ||||||||||||||||||
|
||||||||||||||||||
for dir in sorted(glob(f"{self.rapl_dir}/intel-rapl:*"), key=sort_key): | ||||||||||||||||||
parts = dir.split(":") | ||||||||||||||||||
if len(parts) > 1 and parts[1].isdigit(): | ||||||||||||||||||
self._cpus.append(RAPLCPU(int(parts[1]), self.rapl_dir)) | ||||||||||||||||||
cpu_indices.append(int(parts[1])) | ||||||||||||||||||
|
||||||||||||||||||
# If `ZEUSD_SOCK_PATH` is set, always use ZeusdRAPLCPU | ||||||||||||||||||
if (sock_path := os.environ.get("ZEUSD_SOCK_PATH")) is not None: | ||||||||||||||||||
if not Path(sock_path).exists(): | ||||||||||||||||||
raise ZeusdError( | ||||||||||||||||||
f"ZEUSD_SOCK_PATH points to non-existent file: {sock_path}" | ||||||||||||||||||
) | ||||||||||||||||||
if not Path(sock_path).is_socket(): | ||||||||||||||||||
raise ZeusdError(f"ZEUSD_SOCK_PATH is not a socket: {sock_path}") | ||||||||||||||||||
if not os.access(sock_path, os.W_OK): | ||||||||||||||||||
raise ZeusdError(f"ZEUSD_SOCK_PATH is not writable: {sock_path}") | ||||||||||||||||||
self._cpus = [ | ||||||||||||||||||
ZeusdRAPLCPU(cpu_index, sock_path) for cpu_index in cpu_indices | ||||||||||||||||||
] | ||||||||||||||||||
else: | ||||||||||||||||||
self._cpus = [ | ||||||||||||||||||
RAPLCPU(cpu_index, self.rapl_dir) for cpu_index in cpu_indices | ||||||||||||||||||
] | ||||||||||||||||||
|
||||||||||||||||||
def __del__(self) -> None: | ||||||||||||||||||
"""Shuts down the Intel CPU monitoring.""" | ||||||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,6 +35,19 @@ pub struct RaplResponse { | |
pub dram_energy_uj: Option<u64>, | ||
} | ||
|
||
#[derive(Serialize, Deserialize, Debug)] | ||
pub struct DramResponse { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's rename this to |
||
pub dram_available: bool, | ||
} | ||
|
||
/// Unified CPU response type | ||
#[derive(Serialize, Deserialize, Debug)] | ||
#[serde(untagged)] | ||
pub enum CpuResponse { | ||
Rapl(RaplResponse), | ||
Dram(DramResponse), | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is very clean. Good job! |
||
pub trait CpuManager { | ||
/// Get the number of CPUs available. | ||
fn device_count() -> Result<usize, ZeusdError>; | ||
|
@@ -55,7 +68,7 @@ pub trait CpuManager { | |
|
||
pub type CpuCommandRequest = ( | ||
CpuCommand, | ||
Option<Sender<Result<RaplResponse, ZeusdError>>>, | ||
Option<Sender<Result<CpuResponse, ZeusdError>>>, | ||
Instant, | ||
Span, | ||
); | ||
|
@@ -89,7 +102,7 @@ impl CpuManagementTasks { | |
cpu_id: usize, | ||
command: CpuCommand, | ||
request_start_time: Instant, | ||
) -> Result<RaplResponse, ZeusdError> { | ||
) -> Result<CpuResponse, ZeusdError> { | ||
if cpu_id >= self.senders.len() { | ||
return Err(ZeusdError::CpuNotFoundError(cpu_id)); | ||
} | ||
|
@@ -128,6 +141,8 @@ impl CpuManagementTasks { | |
pub enum CpuCommand { | ||
/// Get the CPU and DRAM energy measurement for the CPU index | ||
GetIndexEnergy { cpu: bool, dram: bool }, | ||
/// Return if the specified CPU supports DRAM energy measurement | ||
SupportsDramEnergy, | ||
/// Stop the monitoring task for CPU and DRAM if they have been started. | ||
StopMonitoring, | ||
} | ||
|
@@ -156,7 +171,7 @@ impl CpuCommand { | |
&self, | ||
device: &mut T, | ||
_request_arrival_time: Instant, | ||
) -> Result<RaplResponse, ZeusdError> | ||
) -> Result<CpuResponse, ZeusdError> | ||
where | ||
T: CpuManager, | ||
{ | ||
|
@@ -172,17 +187,24 @@ impl CpuCommand { | |
} else { | ||
None | ||
}; | ||
Ok(RaplResponse { | ||
// Wrap the RaplResponse in CpuResponse::Rapl | ||
Ok(CpuResponse::Rapl(RaplResponse { | ||
cpu_energy_uj, | ||
dram_energy_uj, | ||
}) | ||
})) | ||
} | ||
Self::SupportsDramEnergy => { | ||
// Wrap the DramResponse in CpuResponse::Dram | ||
Ok(CpuResponse::Dram(DramResponse { | ||
dram_available: device.is_dram_available(), | ||
})) | ||
} | ||
Self::StopMonitoring {} => { | ||
Self::StopMonitoring => { | ||
device.stop_monitoring(); | ||
Ok(RaplResponse { | ||
Ok(CpuResponse::Rapl(RaplResponse { | ||
cpu_energy_uj: Some(0), | ||
dram_energy_uj: Some(0), | ||
}) | ||
})) | ||
} | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -48,6 +48,29 @@ async fn get_index_energy_handler( | |
Ok(HttpResponse::Ok().json(measurement)) | ||
} | ||
|
||
#[actix_web::get("/{cpu_id}/supportsDramEnergy")] | ||
#[tracing::instrument( | ||
skip(cpu_id, _device_tasks), | ||
fields( | ||
cpu_id = %cpu_id, | ||
) | ||
)] | ||
Comment on lines
+52
to
+57
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I feel like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, it would be useful to be able to see the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks. Let me handle |
||
async fn supports_dram_energy_handler( | ||
cpu_id: web::Path<usize>, | ||
_device_tasks: web::Data<CpuManagementTasks>, | ||
) -> Result<HttpResponse, ZeusdError> { | ||
let now = Instant::now(); | ||
tracing::info!("Received request"); | ||
let cpu_id = cpu_id.into_inner(); | ||
|
||
let answer = _device_tasks | ||
.send_command_blocking(cpu_id, CpuCommand::SupportsDramEnergy, now) | ||
.await?; | ||
|
||
Ok(HttpResponse::Ok().json(answer)) | ||
} | ||
|
||
pub fn cpu_routes(cfg: &mut web::ServiceConfig) { | ||
cfg.service(get_index_energy_handler); | ||
cfg.service(supports_dram_energy_handler); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please merge this with the line above.