Skip to content

Commit

Permalink
Allow empty page_token for getTree (#1340)
Browse files Browse the repository at this point in the history
  • Loading branch information
amonshiz authored Sep 9, 2024
1 parent f337391 commit d66d418
Show file tree
Hide file tree
Showing 2 changed files with 181 additions and 107 deletions.
29 changes: 17 additions & 12 deletions nativelink-service/src/cas_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,18 +236,23 @@ impl CasServer {
let mut deque: VecDeque<DigestInfo> = VecDeque::new();
let mut directories: Vec<Directory> = Vec::new();
// `page_token` will return the `{hash_str}-{size_bytes}` of the current request's first directory digest.
let mut page_token_parts = request.page_token.split('-');
let page_token_digest = DigestInfo::try_new(
page_token_parts
.next()
.err_tip(|| "Failed to parse `hash_str` in `page_token`")?,
page_token_parts
.next()
.err_tip(|| "Failed to parse `size_bytes` in `page_token`")?
.parse::<i64>()
.err_tip(|| "Failed to parse `size_bytes` as i64")?,
)
.err_tip(|| "Failed to parse `page_token` as `Digest` in `GetTreeRequest`")?;
let page_token_digest = match request.page_token.len() {
0 => root_digest,
_ => {
let mut page_token_parts = request.page_token.split('-');
DigestInfo::try_new(
page_token_parts
.next()
.err_tip(|| "Failed to parse `hash_str` in `page_token`")?,
page_token_parts
.next()
.err_tip(|| "Failed to parse `size_bytes` in `page_token`")?
.parse::<i64>()
.err_tip(|| "Failed to parse `size_bytes` as i64")?,
)
.err_tip(|| "Failed to parse `page_token` as `Digest` in `GetTreeRequest`")?
}
};
let page_size = request.page_size;
// If `page_size` is 0, paging is not necessary.
let mut page_token_matched = page_size == 0;
Expand Down
259 changes: 164 additions & 95 deletions nativelink-service/tests/cas_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,35 +383,71 @@ async fn get_tree_read_directories_without_paging() -> Result<(), Box<dyn std::e

// Must work when paging is disabled ( `page_size` is 0 ).
// It reads all directories at once.
let raw_response = cas_server
.get_tree(Request::new(GetTreeRequest {
instance_name: INSTANCE_NAME.to_string(),
page_size: 0,
page_token: format!("{root_directory_digest_info}"),
root_digest: Some(root_directory_digest_info.into()),
digest_function: digest_function::Value::Sha256.into(),
}))
.await;
assert!(raw_response.is_ok());
assert_eq!(
raw_response
.unwrap()
.into_inner()
.filter_map(|x| async move { Some(x.unwrap()) })
.collect::<Vec<_>>()
.await,
vec![GetTreeResponse {
directories: vec![
root_directory.clone(),
sub_directories[0].clone(),
sub_directories[1].clone(),
sub_directories[2].clone(),
sub_directories[3].clone(),
sub_directories[4].clone()
],
next_page_token: String::new()
}]
);

// First verify that using an empty page token is treated as if the client had sent the root
// digest.
{
let raw_response = cas_server
.get_tree(Request::new(GetTreeRequest {
instance_name: INSTANCE_NAME.to_string(),
page_size: 0,
page_token: String::new(),
root_digest: Some(root_directory_digest_info.into()),
digest_function: digest_function::Value::Sha256.into(),
}))
.await;
assert_eq!(
raw_response
.unwrap()
.into_inner()
.filter_map(|x| async move { Some(x.unwrap()) })
.collect::<Vec<_>>()
.await,
vec![GetTreeResponse {
directories: vec![
root_directory.clone(),
sub_directories[0].clone(),
sub_directories[1].clone(),
sub_directories[2].clone(),
sub_directories[3].clone(),
sub_directories[4].clone()
],
next_page_token: String::new()
}]
);
}

// Also verify that sending the root digest returns the entire tree as well.
{
let raw_response = cas_server
.get_tree(Request::new(GetTreeRequest {
instance_name: INSTANCE_NAME.to_string(),
page_size: 0,
page_token: format!("{root_directory_digest_info}"),
root_digest: Some(root_directory_digest_info.into()),
digest_function: digest_function::Value::Sha256.into(),
}))
.await;
assert_eq!(
raw_response
.unwrap()
.into_inner()
.filter_map(|x| async move { Some(x.unwrap()) })
.collect::<Vec<_>>()
.await,
vec![GetTreeResponse {
directories: vec![
root_directory.clone(),
sub_directories[0].clone(),
sub_directories[1].clone(),
sub_directories[2].clone(),
sub_directories[3].clone(),
sub_directories[4].clone()
],
next_page_token: String::new()
}]
);
}

Ok(())
}
Expand All @@ -434,72 +470,105 @@ async fn get_tree_read_directories_with_paging() -> Result<(), Box<dyn std::erro
// First, it reads `root_directory` and `sub_directory[0]`.
// Then, it reads `sub_directory[1]` and `sub_directory[2]`.
// Finally, it reads `sub_directory[3]` and `sub_directory[4]`.
let raw_response = cas_server
.get_tree(Request::new(GetTreeRequest {
instance_name: INSTANCE_NAME.to_string(),
page_size: 2,
page_token: format!("{root_directory_digest_info}"),
root_digest: Some(root_directory_digest_info.into()),
digest_function: digest_function::Value::Sha256.into(),
}))
.await;
assert!(raw_response.is_ok());
assert_eq!(
raw_response
.unwrap()
.into_inner()
.filter_map(|x| async move { Some(x.unwrap()) })
.collect::<Vec<_>>()
.await,
vec![GetTreeResponse {
directories: vec![root_directory.clone(), sub_directories[0].clone()],
next_page_token: format!("{}", sub_directory_digest_infos[1]),
}]
);
let raw_response = cas_server
.get_tree(Request::new(GetTreeRequest {
instance_name: INSTANCE_NAME.to_string(),
page_size: 2,
page_token: format!("{}", sub_directory_digest_infos[1]),
root_digest: Some(root_directory_digest_info.into()),
digest_function: digest_function::Value::Sha256.into(),
}))
.await;
assert!(raw_response.is_ok());
assert_eq!(
raw_response
.unwrap()
.into_inner()
.filter_map(|x| async move { Some(x.unwrap()) })
.collect::<Vec<_>>()
.await,
vec![GetTreeResponse {
directories: vec![sub_directories[1].clone(), sub_directories[2].clone()],
next_page_token: format!("{}", sub_directory_digest_infos[3]),
}]
);
let raw_response = cas_server
.get_tree(Request::new(GetTreeRequest {
instance_name: INSTANCE_NAME.to_string(),
page_size: 2,
page_token: format!("{}", sub_directory_digest_infos[3]),
root_digest: Some(root_directory_digest_info.into()),
digest_function: digest_function::Value::Sha256.into(),
}))
.await;
assert!(raw_response.is_ok());
assert_eq!(
raw_response
.unwrap()
.into_inner()
.filter_map(|x| async move { Some(x.unwrap()) })
.collect::<Vec<_>>()
.await,
vec![GetTreeResponse {
directories: vec![sub_directories[3].clone(), sub_directories[4].clone()],
next_page_token: String::new(),
}]
);

// First, verify that an empty initial page token is treated as if the client had sent the
// root digest and respects the page size.
{
let raw_response = cas_server
.get_tree(Request::new(GetTreeRequest {
instance_name: INSTANCE_NAME.to_string(),
page_size: 2,
page_token: String::new(),
root_digest: Some(root_directory_digest_info.into()),
digest_function: digest_function::Value::Sha256.into(),
}))
.await;
assert_eq!(
raw_response
.unwrap()
.into_inner()
.filter_map(|x| async move { Some(x.unwrap()) })
.collect::<Vec<_>>()
.await,
vec![GetTreeResponse {
directories: vec![root_directory.clone(), sub_directories[0].clone()],
next_page_token: format!("{}", sub_directory_digest_infos[1]),
}]
);
}

// Also verify that sending the root digest as the page token is treated as paging from the
// beginning and respects page size.
{
let raw_response = cas_server
.get_tree(Request::new(GetTreeRequest {
instance_name: INSTANCE_NAME.to_string(),
page_size: 2,
page_token: format!("{root_directory_digest_info}"),
root_digest: Some(root_directory_digest_info.into()),
digest_function: digest_function::Value::Sha256.into(),
}))
.await;
assert_eq!(
raw_response
.unwrap()
.into_inner()
.filter_map(|x| async move { Some(x.unwrap()) })
.collect::<Vec<_>>()
.await,
vec![GetTreeResponse {
directories: vec![root_directory.clone(), sub_directories[0].clone()],
next_page_token: format!("{}", sub_directory_digest_infos[1]),
}]
);
}

// Verify that paging from a non-initial page token will return the expected content.
{
let raw_response = cas_server
.get_tree(Request::new(GetTreeRequest {
instance_name: INSTANCE_NAME.to_string(),
page_size: 2,
page_token: format!("{}", sub_directory_digest_infos[1]),
root_digest: Some(root_directory_digest_info.into()),
digest_function: digest_function::Value::Sha256.into(),
}))
.await;
assert_eq!(
raw_response
.unwrap()
.into_inner()
.filter_map(|x| async move { Some(x.unwrap()) })
.collect::<Vec<_>>()
.await,
vec![GetTreeResponse {
directories: vec![sub_directories[1].clone(), sub_directories[2].clone()],
next_page_token: format!("{}", sub_directory_digest_infos[3]),
}]
);

let raw_response = cas_server
.get_tree(Request::new(GetTreeRequest {
instance_name: INSTANCE_NAME.to_string(),
page_size: 2,
page_token: format!("{}", sub_directory_digest_infos[3]),
root_digest: Some(root_directory_digest_info.into()),
digest_function: digest_function::Value::Sha256.into(),
}))
.await;
assert_eq!(
raw_response
.unwrap()
.into_inner()
.filter_map(|x| async move { Some(x.unwrap()) })
.collect::<Vec<_>>()
.await,
vec![GetTreeResponse {
directories: vec![sub_directories[3].clone(), sub_directories[4].clone()],
next_page_token: String::new(),
}]
);
}

Ok(())
}
Expand Down

0 comments on commit d66d418

Please sign in to comment.