Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[repo depot 2/n] sled agent APIs to manage update artifact storage #6764

Merged
merged 33 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
a5052d0
sled agent APIs to manage update artifact storage
iliana Oct 2, 2024
ce1bc42
fn datasets -> fn dataset_mountpoints
iliana Oct 4, 2024
5ad16a1
be more resilient in the face of io errors
iliana Oct 4, 2024
485ee40
clean up temporary files on startup
iliana Oct 4, 2024
26f4107
naming consistency
iliana Oct 4, 2024
893980e
log.cleanup_successful();
iliana Oct 4, 2024
03a51c7
Merge remote-tracking branch 'origin/main' into iliana/tuf-repo-depot
iliana Oct 15, 2024
efcfb92
document ArtifactStore
iliana Oct 11, 2024
e8b2673
fn put -> put_impl
iliana Oct 11, 2024
3b15f3b
copy_from_depot should take a URL
iliana Oct 15, 2024
c909649
reduce semantic satiation
iliana Oct 15, 2024
4325077
remove default type parameter
iliana Oct 15, 2024
86b8047
StorageBackend -> DatasetsManager; attempt clean up
iliana Oct 15, 2024
599089a
create reqwest client at startup, not on first use
iliana Oct 15, 2024
f624e5d
don't embed source error strings
iliana Oct 15, 2024
b44d06b
fewer contextless errors
iliana Oct 15, 2024
013e67f
another docstring
iliana Oct 15, 2024
5eefb6e
add the repo depot API to api-manifest.toml
iliana Oct 15, 2024
aebfe32
add list artifacts operation
iliana Oct 16, 2024
d743727
Merge remote-tracking branch 'origin/main' into iliana/tuf-repo-depot
iliana Oct 28, 2024
2ae3743
create an update artifact dataset on both M.2s
iliana Oct 28, 2024
45c4cac
PUT artifacts to all artifact datasets
iliana Oct 28, 2024
3c2f866
change list API to return a count of each artifact
iliana Oct 29, 2024
743a67b
make copy_from_depot create a task and return
iliana Oct 29, 2024
58b9cbe
ls-apis expectorate
iliana Oct 29, 2024
6906679
Merge remote-tracking branch 'origin/main' into iliana/tuf-repo-depot
iliana Oct 30, 2024
cd7dc7b
review comments
iliana Oct 30, 2024
e967f02
propagate non-fatal write errors to `finalize()`
iliana Oct 30, 2024
508861d
expectoraaaaate
iliana Oct 30, 2024
9f96f71
Merge remote-tracking branch 'origin/main' into iliana/tuf-repo-depot
iliana Oct 31, 2024
32afac5
improved API responses for PUT/POST
iliana Oct 31, 2024
4d00c16
document ArtifactPutResponse fields
iliana Oct 31, 2024
9b3691a
Merge remote-tracking branch 'origin/main' into iliana/tuf-repo-depot
iliana Oct 31, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 40 additions & 4 deletions openapi/sled-agent.json
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,15 @@
"required": true
},
"responses": {
"204": {
"description": "resource updated"
"200": {
"description": "successful operation",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ArtifactPutResponse"
}
}
}
},
"4XX": {
"$ref": "#/components/responses/Error"
Expand Down Expand Up @@ -127,8 +134,15 @@
"required": true
},
"responses": {
"204": {
"description": "resource updated"
"202": {
"description": "successfully enqueued operation",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ArtifactCopyFromDepotResponse"
}
}
}
},
"4XX": {
"$ref": "#/components/responses/Error"
Expand Down Expand Up @@ -1557,6 +1571,28 @@
"depot_base_url"
]
},
"ArtifactCopyFromDepotResponse": {
"type": "object"
},
"ArtifactPutResponse": {
"type": "object",
"properties": {
"datasets": {
"type": "integer",
"format": "uint",
"minimum": 0
},
"successful_writes": {
"type": "integer",
"format": "uint",
"minimum": 0
}
},
"required": [
"datasets",
"successful_writes"
]
},
"Baseboard": {
"description": "Describes properties that should uniquely identify a Gimlet.",
"oneOf": [
Expand Down
20 changes: 15 additions & 5 deletions sled-agent/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ use std::time::Duration;

use camino::Utf8PathBuf;
use dropshot::{
FreeformBody, HttpError, HttpResponseCreated, HttpResponseDeleted,
HttpResponseHeaders, HttpResponseOk, HttpResponseUpdatedNoContent, Path,
Query, RequestContext, StreamingBody, TypedBody,
FreeformBody, HttpError, HttpResponseAccepted, HttpResponseCreated,
HttpResponseDeleted, HttpResponseHeaders, HttpResponseOk,
HttpResponseUpdatedNoContent, Path, Query, RequestContext, StreamingBody,
TypedBody,
};
use nexus_sled_agent_shared::inventory::{
Inventory, OmicronZonesConfig, SledRole,
Expand Down Expand Up @@ -319,7 +320,7 @@ pub trait SledAgentApi {
rqctx: RequestContext<Self::Context>,
path_params: Path<ArtifactPathParam>,
body: TypedBody<ArtifactCopyFromDepotBody>,
) -> Result<HttpResponseUpdatedNoContent, HttpError>;
) -> Result<HttpResponseAccepted<ArtifactCopyFromDepotResponse>, HttpError>;

#[endpoint {
method = PUT,
Expand All @@ -329,7 +330,7 @@ pub trait SledAgentApi {
rqctx: RequestContext<Self::Context>,
path_params: Path<ArtifactPathParam>,
body: StreamingBody,
) -> Result<HttpResponseUpdatedNoContent, HttpError>;
) -> Result<HttpResponseOk<ArtifactPutResponse>, HttpError>;

#[endpoint {
method = DELETE,
Expand Down Expand Up @@ -596,6 +597,15 @@ pub struct ArtifactCopyFromDepotBody {
pub depot_base_url: String,
}

#[derive(Serialize, JsonSchema)]
pub struct ArtifactCopyFromDepotResponse {}

#[derive(Debug, Serialize, JsonSchema)]
pub struct ArtifactPutResponse {
pub datasets: usize,
pub successful_writes: usize,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great as a response; I might add some docs indicating what these fields mean to the caller.

Maybe:

Suggested change
pub datasets: usize,
pub successful_writes: usize,
/// The number of valid M.2 artifact datasets we found on the sled.
/// There is typically one of these datasets for each functional M.2.
pub datasets: usize,
/// The number of valid writes to the M.2 artifact datasets. This should
/// be less than or equal to the number of artifact datasets.
pub successful_writes: usize,

}

#[derive(Deserialize, JsonSchema)]
pub struct VmmIssueDiskSnapshotRequestPathParam {
pub propolis_id: PropolisUuid,
Expand Down
92 changes: 39 additions & 53 deletions sled-agent/src/artifact_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use omicron_common::disk::{DatasetKind, DatasetsConfig};
use omicron_common::update::ArtifactHash;
use repo_depot_api::*;
use sha2::{Digest, Sha256};
use sled_agent_api::ArtifactPutResponse;
use sled_storage::dataset::M2_ARTIFACT_DATASET;
use sled_storage::error::Error as StorageError;
use sled_storage::manager::StorageHandle;
Expand Down Expand Up @@ -175,16 +176,16 @@ impl<T: DatasetsManager> ArtifactStore<T> {
&self,
sha256: ArtifactHash,
) -> Result<File, Error> {
let sha256 = sha256.to_string();
let sha256_str = sha256.to_string();
let mut last_error = None;
for mountpoint in self.storage.artifact_storage_paths().await? {
let path = mountpoint.join(&sha256);
let path = mountpoint.join(&sha256_str);
match File::open(&path).await {
Ok(file) => {
info!(
&self.log,
"Retrieved artifact";
"sha256" => &sha256,
"sha256" => &sha256_str,
"path" => path.as_str(),
);
return Ok(file);
Expand All @@ -195,11 +196,7 @@ impl<T: DatasetsManager> ArtifactStore<T> {
}
}
}
if let Some(last_error) = last_error {
Err(last_error)
} else {
Err(Error::NotFound { sha256 })
}
Err(last_error.unwrap_or(Error::NotFound { sha256 }))
}

/// List operation (served by Sled Agent API)
Expand Down Expand Up @@ -276,9 +273,8 @@ impl<T: DatasetsManager> ArtifactStore<T> {
/// can be used to write the artifact to all temporary files, then move all
/// temporary files to their final paths.
///
/// Most errors during the write process are considered non-fatal errors.
/// All non-fatal errors are logged, and the most recently-seen non-fatal
/// error is returned by [`ArtifactWriter::finalize`].
/// Most errors during the write process are considered non-fatal errors,
/// which are logged instead of immediately returned.
///
/// In this method, possible fatal errors are:
/// - No temporary files could be created.
Expand All @@ -290,7 +286,9 @@ impl<T: DatasetsManager> ArtifactStore<T> {
) -> Result<ArtifactWriter, Error> {
let mut files = Vec::new();
let mut last_error = None;
let mut datasets = 0;
for mountpoint in self.storage.artifact_storage_paths().await? {
datasets += 1;
let temp_dir = mountpoint.join(TEMP_SUBDIR);
if let Err(err) = tokio::fs::create_dir(&temp_dir).await {
if err.kind() != ErrorKind::AlreadyExists {
Expand Down Expand Up @@ -330,11 +328,11 @@ impl<T: DatasetsManager> ArtifactStore<T> {
Err(last_error.unwrap_or(Error::NoUpdateDataset))
} else {
Ok(ArtifactWriter {
datasets,
hasher: Sha256::new(),
files,
log: self.log.clone(),
sha256,
last_error,
})
}
}
Expand All @@ -344,7 +342,7 @@ impl<T: DatasetsManager> ArtifactStore<T> {
&self,
sha256: ArtifactHash,
body: StreamingBody,
) -> Result<(), Error> {
) -> Result<ArtifactPutResponse, Error> {
self.writer(sha256)
.await?
.write_stream(body.into_stream().map_err(Error::Body))
Expand Down Expand Up @@ -476,11 +474,11 @@ impl DatasetsManager for StorageHandle {

/// Abstraction that handles writing to several temporary files.
struct ArtifactWriter {
datasets: usize,
files: Vec<Option<(NamedUtf8TempFile<File>, Utf8PathBuf)>>,
hasher: Sha256,
log: Logger,
sha256: ArtifactHash,
last_error: Option<Error>,
}

impl ArtifactWriter {
Expand All @@ -490,7 +488,7 @@ impl ArtifactWriter {
async fn write_stream(
self,
stream: impl Stream<Item = Result<impl AsRef<[u8]>, Error>>,
) -> Result<(), Error> {
) -> Result<ArtifactPutResponse, Error> {
let writer = stream
.try_fold(self, |mut writer, chunk| async {
writer.write(chunk).await?;
Expand All @@ -502,15 +500,13 @@ impl ArtifactWriter {

/// Write `chunk` to all temporary files.
///
/// Errors in this method are considered non-fatal errors. All non-fatal
/// errors are logged, and the most recently-seen non-fatal error is
/// returned by [`ArtifactWriter::finalize`].
///
/// If all files have failed, this method returns the most recently-seen
/// non-fatal error as a fatal error.
/// Errors in this method are considered non-fatal errors. All errors
/// are logged. If all files have failed, this method returns the most
/// recently-seen non-fatal error as a fatal error.
async fn write(&mut self, chunk: impl AsRef<[u8]>) -> Result<(), Error> {
self.hasher.update(&chunk);

let mut last_error = None;
for option in &mut self.files {
if let Some((mut file, mountpoint)) = option.take() {
match file.as_file_mut().write_all(chunk.as_ref()).await {
Expand All @@ -520,11 +516,7 @@ impl ArtifactWriter {
Err(err) => {
let path = file.path().to_owned();
log_and_store!(
self.last_error,
&self.log,
"write to",
path,
err
last_error, &self.log, "write to", path, err
);
// `file` and `final_path` are dropped here, cleaning up
// the file
Expand All @@ -535,18 +527,18 @@ impl ArtifactWriter {

self.files.retain(Option::is_some);
if self.files.is_empty() {
Err(self.last_error.take().unwrap_or(Error::NoUpdateDataset))
Err(last_error.unwrap_or(Error::NoUpdateDataset))
} else {
Ok(())
}
}

/// Rename all files to their final paths.
///
/// Errors in this method are considered non-fatal errors, but this method
/// will return the most recently-seen error by any method in the write
/// process.
async fn finalize(mut self) -> Result<(), Error> {
/// Errors in this method are considered non-fatal errors. If all files have
/// failed in some way, the most recently-seen error is returned as a fatal
/// error.
async fn finalize(self) -> Result<ArtifactPutResponse, Error> {
let digest = self.hasher.finalize();
if digest.as_slice() != self.sha256.as_ref() {
return Err(Error::HashMismatch {
Expand All @@ -555,24 +547,21 @@ impl ArtifactWriter {
});
}

let mut any_success = false;
let mut last_error = None;
let mut successful_writes = 0;
for (mut file, mountpoint) in self.files.into_iter().flatten() {
// 1. fsync the temporary file.
if let Err(err) = file.as_file_mut().sync_all().await {
let path = file.path().to_owned();
log_and_store!(self.last_error, &self.log, "sync", path, err);
log_and_store!(last_error, &self.log, "sync", path, err);
continue;
}
// 2. Open the parent directory so we can fsync it.
let parent_dir = match File::open(&mountpoint).await {
Ok(dir) => dir,
Err(err) => {
log_and_store!(
self.last_error,
&self.log,
"open",
mountpoint,
err
last_error, &self.log, "open", mountpoint, err
);
continue;
}
Expand All @@ -592,7 +581,7 @@ impl ArtifactWriter {
"from" => err.file.path().as_str(),
"to" => final_path.as_str(),
);
self.last_error = Some(Error::FileRename {
last_error = Some(Error::FileRename {
from: err.file.path().to_owned(),
to: final_path,
err: err.error,
Expand All @@ -601,30 +590,27 @@ impl ArtifactWriter {
}
// 4. fsync the parent directory.
if let Err(err) = parent_dir.sync_all().await {
log_and_store!(
self.last_error,
&self.log,
"sync",
mountpoint,
err
);
log_and_store!(last_error, &self.log, "sync", mountpoint, err);
continue;
}

any_success = true;
successful_writes += 1;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's an expectation here that "we're writing a single file" to each M.2, right?

I think that's okay, just clarifying, because, I think our expected output is:

  • datasets = 2, successful_writes = 2

And it would be a little confusing to suddenly see:

  • datasets = 2, successful_writes = 4

or something like that, if we started writing / syncing multiple files

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I don't think you could use ArtifactWriter or any of the endpoints in a way where you're writing more than one artifact; everything is keyed by sha256.

}

if let Some(last_error) = self.last_error {
Err(last_error)
} else if any_success {
if successful_writes > 0 {
info!(
&self.log,
"Wrote artifact";
"sha256" => &self.sha256.to_string(),
"datasets" => self.datasets,
"successful_writes" => successful_writes,
);
Ok(())
Ok(ArtifactPutResponse {
datasets: self.datasets,
successful_writes,
})
} else {
Err(Error::NoUpdateDataset)
Err(last_error.unwrap_or(Error::NoUpdateDataset))
}
}
}
Expand Down Expand Up @@ -692,7 +678,7 @@ pub(crate) enum Error {
Join(#[from] tokio::task::JoinError),

#[error("Artifact {sha256} not found")]
NotFound { sha256: String },
NotFound { sha256: ArtifactHash },

#[error("No update datasets present")]
NoUpdateDataset,
Expand Down
Loading
Loading