-
Notifications
You must be signed in to change notification settings - Fork 40
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[repo depot 2/n] sled agent APIs to manage update artifact storage #6764
Changes from 2 commits
a5052d0
ce1bc42
5ad16a1
485ee40
26f4107
893980e
03a51c7
efcfb92
e8b2673
3b15f3b
c909649
4325077
86b8047
599089a
f624e5d
b44d06b
013e67f
5eefb6e
aebfe32
d743727
2ae3743
45c4cac
3c2f866
743a67b
58b9cbe
6906679
cd7dc7b
e967f02
508861d
9f96f71
32afac5
4d00c16
9b3691a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,6 +33,7 @@ use omicron_common::disk::{DatasetKind, DatasetsConfig}; | |
use omicron_common::update::ArtifactHash; | ||
use repo_depot_api::*; | ||
use sha2::{Digest, Sha256}; | ||
use sled_agent_api::ArtifactPutResponse; | ||
use sled_storage::dataset::M2_ARTIFACT_DATASET; | ||
use sled_storage::error::Error as StorageError; | ||
use sled_storage::manager::StorageHandle; | ||
|
@@ -175,16 +176,16 @@ impl<T: DatasetsManager> ArtifactStore<T> { | |
&self, | ||
sha256: ArtifactHash, | ||
) -> Result<File, Error> { | ||
let sha256 = sha256.to_string(); | ||
let sha256_str = sha256.to_string(); | ||
let mut last_error = None; | ||
for mountpoint in self.storage.artifact_storage_paths().await? { | ||
let path = mountpoint.join(&sha256); | ||
let path = mountpoint.join(&sha256_str); | ||
match File::open(&path).await { | ||
Ok(file) => { | ||
info!( | ||
&self.log, | ||
"Retrieved artifact"; | ||
"sha256" => &sha256, | ||
"sha256" => &sha256_str, | ||
"path" => path.as_str(), | ||
); | ||
return Ok(file); | ||
|
@@ -195,11 +196,7 @@ impl<T: DatasetsManager> ArtifactStore<T> { | |
} | ||
} | ||
} | ||
if let Some(last_error) = last_error { | ||
Err(last_error) | ||
} else { | ||
Err(Error::NotFound { sha256 }) | ||
} | ||
Err(last_error.unwrap_or(Error::NotFound { sha256 })) | ||
} | ||
|
||
/// List operation (served by Sled Agent API) | ||
|
@@ -276,9 +273,8 @@ impl<T: DatasetsManager> ArtifactStore<T> { | |
/// can be used to write the artifact to all temporary files, then move all | ||
/// temporary files to their final paths. | ||
/// | ||
/// Most errors during the write process are considered non-fatal errors. | ||
/// All non-fatal errors are logged, and the most recently-seen non-fatal | ||
/// error is returned by [`ArtifactWriter::finalize`]. | ||
/// Most errors during the write process are considered non-fatal errors, | ||
/// which are logged instead of immediately returned. | ||
/// | ||
/// In this method, possible fatal errors are: | ||
/// - No temporary files could be created. | ||
|
@@ -290,7 +286,9 @@ impl<T: DatasetsManager> ArtifactStore<T> { | |
) -> Result<ArtifactWriter, Error> { | ||
let mut files = Vec::new(); | ||
let mut last_error = None; | ||
let mut datasets = 0; | ||
for mountpoint in self.storage.artifact_storage_paths().await? { | ||
datasets += 1; | ||
let temp_dir = mountpoint.join(TEMP_SUBDIR); | ||
if let Err(err) = tokio::fs::create_dir(&temp_dir).await { | ||
if err.kind() != ErrorKind::AlreadyExists { | ||
|
@@ -330,11 +328,11 @@ impl<T: DatasetsManager> ArtifactStore<T> { | |
Err(last_error.unwrap_or(Error::NoUpdateDataset)) | ||
} else { | ||
Ok(ArtifactWriter { | ||
datasets, | ||
hasher: Sha256::new(), | ||
files, | ||
log: self.log.clone(), | ||
sha256, | ||
last_error, | ||
}) | ||
} | ||
} | ||
|
@@ -344,7 +342,7 @@ impl<T: DatasetsManager> ArtifactStore<T> { | |
&self, | ||
sha256: ArtifactHash, | ||
body: StreamingBody, | ||
) -> Result<(), Error> { | ||
) -> Result<ArtifactPutResponse, Error> { | ||
self.writer(sha256) | ||
.await? | ||
.write_stream(body.into_stream().map_err(Error::Body)) | ||
|
@@ -476,11 +474,11 @@ impl DatasetsManager for StorageHandle { | |
|
||
/// Abstraction that handles writing to several temporary files. | ||
struct ArtifactWriter { | ||
datasets: usize, | ||
files: Vec<Option<(NamedUtf8TempFile<File>, Utf8PathBuf)>>, | ||
hasher: Sha256, | ||
log: Logger, | ||
sha256: ArtifactHash, | ||
last_error: Option<Error>, | ||
} | ||
|
||
impl ArtifactWriter { | ||
|
@@ -490,7 +488,7 @@ impl ArtifactWriter { | |
async fn write_stream( | ||
self, | ||
stream: impl Stream<Item = Result<impl AsRef<[u8]>, Error>>, | ||
) -> Result<(), Error> { | ||
) -> Result<ArtifactPutResponse, Error> { | ||
let writer = stream | ||
.try_fold(self, |mut writer, chunk| async { | ||
writer.write(chunk).await?; | ||
|
@@ -502,15 +500,13 @@ impl ArtifactWriter { | |
|
||
/// Write `chunk` to all temporary files. | ||
/// | ||
/// Errors in this method are considered non-fatal errors. All non-fatal | ||
/// errors are logged, and the most recently-seen non-fatal error is | ||
/// returned by [`ArtifactWriter::finalize`]. | ||
/// | ||
/// If all files have failed, this method returns the most recently-seen | ||
/// non-fatal error as a fatal error. | ||
/// Errors in this method are considered non-fatal errors. All errors | ||
/// are logged. If all files have failed, this method returns the most | ||
/// recently-seen non-fatal error as a fatal error. | ||
async fn write(&mut self, chunk: impl AsRef<[u8]>) -> Result<(), Error> { | ||
self.hasher.update(&chunk); | ||
|
||
let mut last_error = None; | ||
for option in &mut self.files { | ||
if let Some((mut file, mountpoint)) = option.take() { | ||
match file.as_file_mut().write_all(chunk.as_ref()).await { | ||
|
@@ -520,11 +516,7 @@ impl ArtifactWriter { | |
Err(err) => { | ||
let path = file.path().to_owned(); | ||
log_and_store!( | ||
self.last_error, | ||
&self.log, | ||
"write to", | ||
path, | ||
err | ||
last_error, &self.log, "write to", path, err | ||
); | ||
// `file` and `final_path` are dropped here, cleaning up | ||
// the file | ||
|
@@ -535,18 +527,18 @@ impl ArtifactWriter { | |
|
||
self.files.retain(Option::is_some); | ||
if self.files.is_empty() { | ||
Err(self.last_error.take().unwrap_or(Error::NoUpdateDataset)) | ||
Err(last_error.unwrap_or(Error::NoUpdateDataset)) | ||
} else { | ||
Ok(()) | ||
} | ||
} | ||
|
||
/// Rename all files to their final paths. | ||
/// | ||
/// Errors in this method are considered non-fatal errors, but this method | ||
/// will return the most recently-seen error by any method in the write | ||
/// process. | ||
async fn finalize(mut self) -> Result<(), Error> { | ||
/// Errors in this method are considered non-fatal errors. If all files have | ||
/// failed in some way, the most recently-seen error is returned as a fatal | ||
/// error. | ||
async fn finalize(self) -> Result<ArtifactPutResponse, Error> { | ||
let digest = self.hasher.finalize(); | ||
if digest.as_slice() != self.sha256.as_ref() { | ||
return Err(Error::HashMismatch { | ||
|
@@ -555,24 +547,21 @@ impl ArtifactWriter { | |
}); | ||
} | ||
|
||
let mut any_success = false; | ||
let mut last_error = None; | ||
let mut successful_writes = 0; | ||
for (mut file, mountpoint) in self.files.into_iter().flatten() { | ||
// 1. fsync the temporary file. | ||
if let Err(err) = file.as_file_mut().sync_all().await { | ||
let path = file.path().to_owned(); | ||
log_and_store!(self.last_error, &self.log, "sync", path, err); | ||
log_and_store!(last_error, &self.log, "sync", path, err); | ||
continue; | ||
} | ||
// 2. Open the parent directory so we can fsync it. | ||
let parent_dir = match File::open(&mountpoint).await { | ||
Ok(dir) => dir, | ||
Err(err) => { | ||
log_and_store!( | ||
self.last_error, | ||
&self.log, | ||
"open", | ||
mountpoint, | ||
err | ||
last_error, &self.log, "open", mountpoint, err | ||
); | ||
continue; | ||
} | ||
|
@@ -592,7 +581,7 @@ impl ArtifactWriter { | |
"from" => err.file.path().as_str(), | ||
"to" => final_path.as_str(), | ||
); | ||
self.last_error = Some(Error::FileRename { | ||
last_error = Some(Error::FileRename { | ||
from: err.file.path().to_owned(), | ||
to: final_path, | ||
err: err.error, | ||
|
@@ -601,30 +590,27 @@ impl ArtifactWriter { | |
} | ||
// 4. fsync the parent directory. | ||
if let Err(err) = parent_dir.sync_all().await { | ||
log_and_store!( | ||
self.last_error, | ||
&self.log, | ||
"sync", | ||
mountpoint, | ||
err | ||
); | ||
log_and_store!(last_error, &self.log, "sync", mountpoint, err); | ||
continue; | ||
} | ||
|
||
any_success = true; | ||
successful_writes += 1; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's an expectation here that "we're writing a single file" to each M.2, right? I think that's okay, just clarifying, because, I think our expected output is:
And it would be a little confusing to suddenly see:
or something like that, if we started writing / syncing multiple files There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I don't think you could use |
||
} | ||
|
||
if let Some(last_error) = self.last_error { | ||
Err(last_error) | ||
} else if any_success { | ||
if successful_writes > 0 { | ||
info!( | ||
&self.log, | ||
"Wrote artifact"; | ||
"sha256" => &self.sha256.to_string(), | ||
"datasets" => self.datasets, | ||
"successful_writes" => successful_writes, | ||
); | ||
Ok(()) | ||
Ok(ArtifactPutResponse { | ||
datasets: self.datasets, | ||
successful_writes, | ||
}) | ||
} else { | ||
Err(Error::NoUpdateDataset) | ||
Err(last_error.unwrap_or(Error::NoUpdateDataset)) | ||
} | ||
} | ||
} | ||
|
@@ -692,7 +678,7 @@ pub(crate) enum Error { | |
Join(#[from] tokio::task::JoinError), | ||
|
||
#[error("Artifact {sha256} not found")] | ||
NotFound { sha256: String }, | ||
NotFound { sha256: ArtifactHash }, | ||
|
||
#[error("No update datasets present")] | ||
NoUpdateDataset, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great as a response; I might add some docs indicating what these fields mean to the caller.
Maybe: