From d6aae930276b49e66993abcbed74e9111feb2836 Mon Sep 17 00:00:00 2001 From: Cass Fridkin Date: Tue, 24 Sep 2024 22:01:48 -0600 Subject: [PATCH 1/6] implement list api --- Cargo.lock | 1 + nativelink-config/src/cas_server.rs | 9 + nativelink-service/BUILD.bazel | 3 + nativelink-service/Cargo.toml | 2 + nativelink-service/src/lib.rs | 1 + nativelink-service/src/operations_server.rs | 293 ++++++++++++++++++++ src/bin/nativelink.rs | 7 +- 7 files changed, 315 insertions(+), 1 deletion(-) create mode 100644 nativelink-service/src/operations_server.rs diff --git a/Cargo.lock b/Cargo.lock index e56790fe9..614d3668c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1890,6 +1890,7 @@ dependencies = [ "http-body-util", "hyper 1.4.1", "hyper-util", + "lru", "maplit", "nativelink-config", "nativelink-error", diff --git a/nativelink-config/src/cas_server.rs b/nativelink-config/src/cas_server.rs index 60576696d..4f9fb495c 100644 --- a/nativelink-config/src/cas_server.rs +++ b/nativelink-config/src/cas_server.rs @@ -263,6 +263,9 @@ pub struct ServicesConfig { /// This is the service for health status check. pub health: Option, + + /// This is the service for the Operations API + pub operations: Option, } #[derive(Deserialize, Debug)] @@ -741,3 +744,9 @@ pub struct CasConfig { /// Any global configurations that apply to all modules live here. pub global: Option, } + +#[derive(Deserialize, Debug)] +#[serde(deny_unknown_fields)] +pub struct OperationsConfig { + pub enabled: bool, +} diff --git a/nativelink-service/BUILD.bazel b/nativelink-service/BUILD.bazel index f18e55bc4..8bfebd67a 100644 --- a/nativelink-service/BUILD.bazel +++ b/nativelink-service/BUILD.bazel @@ -17,6 +17,7 @@ rust_library( "src/execution_server.rs", "src/health_server.rs", "src/lib.rs", + "src/operations_server.rs", "src/worker_api_server.rs", ], visibility = ["//visibility:public"], @@ -33,8 +34,10 @@ rust_library( "@crates//:http-body", "@crates//:http-body-util", "@crates//:hyper-1.4.1", + "@crates//:lru", "@crates//:parking_lot", "@crates//:prost", + "@crates//:prost-types", "@crates//:serde_json5", "@crates//:tokio", "@crates//:tonic", diff --git a/nativelink-service/Cargo.toml b/nativelink-service/Cargo.toml index 95cfbe1cb..9d9e21ef8 100644 --- a/nativelink-service/Cargo.toml +++ b/nativelink-service/Cargo.toml @@ -16,9 +16,11 @@ futures = { version = "0.3.30", default-features = false } http-body = "1.0.1" http-body-util = "0.1.2" hyper = { version = "1.4.1" } +lru = { version = "0.12.3", default-features = false } serde_json5 = "0.1.0" parking_lot = "0.12.3" prost = { version = "0.13.1", default-features = false } +prost-types = { version = "0.13.1" } tokio = { version = "1.38.0", features = ["fs", "rt-multi-thread", "signal", "io-util"], default-features = false } tokio-stream = { version = "0.1.15", features = ["fs"], default-features = false } tonic = { version = "0.12.0", features = ["transport", "tls"], default-features = false } diff --git a/nativelink-service/src/lib.rs b/nativelink-service/src/lib.rs index 534b55072..a8f56d266 100644 --- a/nativelink-service/src/lib.rs +++ b/nativelink-service/src/lib.rs @@ -19,4 +19,5 @@ pub mod capabilities_server; pub mod cas_server; pub mod execution_server; pub mod health_server; +pub mod operations_server; pub mod worker_api_server; diff --git a/nativelink-service/src/operations_server.rs b/nativelink-service/src/operations_server.rs new file mode 100644 index 000000000..ac20fa9e0 --- /dev/null +++ b/nativelink-service/src/operations_server.rs @@ -0,0 +1,293 @@ +// Copyright 2024 The NativeLink Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{HashMap, VecDeque}; +use std::num::NonZero; +use std::sync::Arc; + +use futures::StreamExt; +use lru::LruCache; +use nativelink_error::{Code, Error}; +use nativelink_proto::google::longrunning::operation::Result as OperationResult; +use nativelink_proto::google::longrunning::operations_server::{Operations, OperationsServer}; +use nativelink_proto::google::longrunning::{ + CancelOperationRequest, DeleteOperationRequest, GetOperationRequest, ListOperationsRequest, + ListOperationsResponse, Operation, WaitOperationRequest, +}; +use nativelink_proto::google::rpc; +use nativelink_util::action_messages::ActionStage; +use nativelink_util::operation_state_manager::{ + ActionStateResult, ClientStateManager, OperationFilter, +}; +use prost_types::Any; +use tokio::sync::Mutex; +use tonic::{Request, Response, Status}; +use uuid::Uuid; + +const LIST_OPERATIONS_MAXIMUM_PAGE_SIZE: i32 = 100; +const LIST_OPERATIONS_DEFAULT_PAGE_SIZE: usize = 50; +const NUM_CACHED_RESPONSES: NonZero = unsafe { NonZero::new_unchecked(1024) }; +const NO_MORE_PAGES_TOKEN: &str = "NO_MORE_PAGES"; + +pub struct OpsServer { + schedulers: HashMap>, + cache: Mutex>>>, +} + +impl Clone for OpsServer { + fn clone(&self) -> Self { + Self::new(self.schedulers.clone()) + } + + fn clone_from(&mut self, source: &Self) { + self.schedulers.clone_from(&source.schedulers); + } +} + +impl OpsServer { + pub fn new(schedulers: HashMap>) -> Self { + Self { + schedulers, + cache: Mutex::new(LruCache::new(NUM_CACHED_RESPONSES)), + } + } + + pub fn into_service(self) -> OperationsServer { + OperationsServer::new(self) + } + + async fn list_operations_inner( + &self, + scheduler: &Arc, + page_size: usize, + page_uuid: Option, + filter: OperationFilter, + ) -> Result { + let mut cache = self.cache.lock().await; + let mut action_state_results = if let Some(uuid) = page_uuid { + cache.pop(&uuid).ok_or_else(|| { + Error::new( + Code::NotFound, + format!("Couldn't find page with token {}", uuid), + ) + })? + } else { + scheduler.filter_operations(filter).await?.collect().await + }; + + let rest = action_state_results.split_off(page_size.min(action_state_results.len())); + let next_page_token = if !rest.is_empty() { + let next_page_uuid = Uuid::new_v4(); + let token = next_page_uuid.to_string(); + + cache.push(next_page_uuid, rest); + token + } else { + NO_MORE_PAGES_TOKEN.to_string() + }; + + drop(cache); + + let mut out = Vec::with_capacity(action_state_results.len()); + for action_state_result in action_state_results { + let info = action_state_result.as_action_info().await?; + let state = action_state_result.as_state().await?; + + let name = info.unique_qualifier.digest().to_string(); + let metadata = None; + let (done, result) = match &state.stage { + ActionStage::Completed(action_result) => { + let result = if action_result.exit_code == 0 { + OperationResult::Response(Any::default()) + } else { + OperationResult::Error(rpc::Status { + code: action_result.exit_code, + message: action_result.message.clone(), + details: vec![], + }) + }; + + (true, Some(result)) + } + ActionStage::CompletedFromCache(cached_action_result) => { + let result = if cached_action_result.exit_code == 0 { + OperationResult::Response(Any::default()) + } else { + OperationResult::Error(rpc::Status { + code: cached_action_result.exit_code, + message: String::from_utf8_lossy(&cached_action_result.stderr_raw) + .into_owned(), + details: vec![], + }) + }; + + (true, Some(result)) + } + _ => (false, None), + }; + + out.push(Operation { + name, + metadata, + done, + result, + }); + } + + Ok(ListOperationsResponse { + operations: out, + next_page_token, + }) + } +} + +#[tonic::async_trait] +impl Operations for OpsServer { + /// Lists operations that match the specified filter in the request. If the + /// server doesn't support this method, it returns `UNIMPLEMENTED`. + /// + /// NOTE: the `name` binding allows API services to override the binding + /// to use different resource name schemes, such as `users/*/operations`. To + /// override the binding, API services can add a binding such as + /// `"/v1/{name=users/*}/operations"` to their service configuration. + /// For backwards compatibility, the default name includes the operations + /// collection id, however overriding users must ensure the name binding + /// is the parent resource, without the operations collection id. + async fn list_operations( + &self, + request: Request, + ) -> Result, Status> { + let ListOperationsRequest { + name, + filter: filter_string, + page_size, + page_token, + } = request.into_inner(); + + let normalized_page_size = if page_size < 0 || page_size > LIST_OPERATIONS_MAXIMUM_PAGE_SIZE + { + return Err(Status::out_of_range(format!( + "page size {} out of range 0..=100", + page_size + ))); + } else if page_size == 0 { + LIST_OPERATIONS_DEFAULT_PAGE_SIZE + } else { + page_size + .try_into() + .expect("a positive number between 0-100 to fit in u32") + }; + + let Some(scheduler) = self + .schedulers + .iter() + .find_map(|(scheduler_name, scheduler)| { + let n = scheduler_name.len(); + if name.starts_with(scheduler_name.as_str()) + && name.as_bytes().get(n).is_some_and(|b| *b == b'/') + { + Some(scheduler) + } else { + None + } + }) + else { + return Err(Status::not_found(format!( + "couldn't find a scheduler named {}", + &name + ))); + }; + + let filter = if filter_string.is_empty() { + OperationFilter::default() + } else { + return Err(Status::unimplemented("filtering not implemented yet")); + }; + + let page_uuid = if page_token.is_empty() { + None + } else if page_token == NO_MORE_PAGES_TOKEN { + return Ok(Response::new(ListOperationsResponse { + operations: vec![], + next_page_token: NO_MORE_PAGES_TOKEN.to_string(), + })); + } else { + match page_token.parse() { + Ok(uuid) => Some(uuid), + Err(e) => { + return Err(Status::invalid_argument(format!( + "Invalid page token {page_token}: {e}" + ))) + } + } + }; + + let message = self + .list_operations_inner(scheduler, normalized_page_size, page_uuid, filter) + .await?; + + Ok(Response::new(message)) + } + + /// Gets the latest state of a long-running operation. Clients can use this + /// method to poll the operation result at intervals as recommended by the API + /// service. + async fn get_operation( + &self, + request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("todo")) + } + /// Deletes a long-running operation. This method indicates that the client is + /// no longer interested in the operation result. It does not cancel the + /// operation. If the server doesn't support this method, it returns + /// `google.rpc.Code.UNIMPLEMENTED`. + async fn delete_operation( + &self, + request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("unimplemented")) + } + /// Starts asynchronous cancellation on a long-running operation. The server + /// makes a best effort to cancel the operation, but success is not + /// guaranteed. If the server doesn't support this method, it returns + /// `google.rpc.Code.UNIMPLEMENTED`. Clients can use + /// [Operations.GetOperation][google.longrunning.Operations.GetOperation] or + /// other methods to check whether the cancellation succeeded or whether the + /// operation completed despite cancellation. On successful cancellation, + /// the operation is not deleted; instead, it becomes an operation with + /// an [Operation.error][google.longrunning.Operation.error] value with a [google.rpc.Status.code][google.rpc.Status.code] of 1, + /// corresponding to `Code.CANCELLED`. + async fn cancel_operation( + &self, + request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("unimplemented")) + } + /// Waits for the specified long-running operation until it is done or reaches + /// at most a specified timeout, returning the latest state. If the operation + /// is already done, the latest state is immediately returned. If the timeout + /// specified is greater than the default HTTP/RPC timeout, the HTTP/RPC + /// timeout is used. If the server does not support this method, it returns + /// `google.rpc.Code.UNIMPLEMENTED`. + /// Note that this method is on a best-effort basis. It may return the latest + /// state before the specified timeout (including immediately), meaning even an + /// immediate response is no guarantee that the operation is done. + async fn wait_operation( + &self, + request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("todo")) + } +} diff --git a/src/bin/nativelink.rs b/src/bin/nativelink.rs index 20ac7f26f..e43e25f3d 100644 --- a/src/bin/nativelink.rs +++ b/src/bin/nativelink.rs @@ -43,6 +43,7 @@ use nativelink_service::capabilities_server::CapabilitiesServer; use nativelink_service::cas_server::CasServer; use nativelink_service::execution_server::ExecutionServer; use nativelink_service::health_server::HealthServer; +use nativelink_service::operations_server::OpsServer; use nativelink_service::worker_api_server::WorkerApiServer; use nativelink_store::default_store_factory::store_factory; use nativelink_store::store_manager::StoreManager; @@ -437,7 +438,11 @@ async fn inner_main( }) }) .err_tip(|| "Could not create BEP service")?, - ); + ) + .add_optional_service(services.operations.and_then(|cfg| { + cfg.enabled + .then(|| OpsServer::new(action_schedulers.clone()).into_service()) + })); let health_registry = health_registry_builder.lock().await.build(); From ae7981f8f283bdc0623262ef66dcfebd7449c0c4 Mon Sep 17 00:00:00 2001 From: Cass Fridkin Date: Tue, 24 Sep 2024 22:16:53 -0600 Subject: [PATCH 2/6] implement "get" --- nativelink-service/src/operations_server.rs | 123 +++++++++++++------- 1 file changed, 79 insertions(+), 44 deletions(-) diff --git a/nativelink-service/src/operations_server.rs b/nativelink-service/src/operations_server.rs index ac20fa9e0..fe693d459 100644 --- a/nativelink-service/src/operations_server.rs +++ b/nativelink-service/src/operations_server.rs @@ -26,7 +26,7 @@ use nativelink_proto::google::longrunning::{ ListOperationsResponse, Operation, WaitOperationRequest, }; use nativelink_proto::google::rpc; -use nativelink_util::action_messages::ActionStage; +use nativelink_util::action_messages::{ActionStage, OperationId}; use nativelink_util::operation_state_manager::{ ActionStateResult, ClientStateManager, OperationFilter, }; @@ -101,48 +101,8 @@ impl OpsServer { let mut out = Vec::with_capacity(action_state_results.len()); for action_state_result in action_state_results { - let info = action_state_result.as_action_info().await?; - let state = action_state_result.as_state().await?; - - let name = info.unique_qualifier.digest().to_string(); - let metadata = None; - let (done, result) = match &state.stage { - ActionStage::Completed(action_result) => { - let result = if action_result.exit_code == 0 { - OperationResult::Response(Any::default()) - } else { - OperationResult::Error(rpc::Status { - code: action_result.exit_code, - message: action_result.message.clone(), - details: vec![], - }) - }; - - (true, Some(result)) - } - ActionStage::CompletedFromCache(cached_action_result) => { - let result = if cached_action_result.exit_code == 0 { - OperationResult::Response(Any::default()) - } else { - OperationResult::Error(rpc::Status { - code: cached_action_result.exit_code, - message: String::from_utf8_lossy(&cached_action_result.stderr_raw) - .into_owned(), - details: vec![], - }) - }; - - (true, Some(result)) - } - _ => (false, None), - }; - - out.push(Operation { - name, - metadata, - done, - result, - }); + let operation = translate_action_stage_result(action_state_result).await?; + out.push(operation); } Ok(ListOperationsResponse { @@ -150,6 +110,33 @@ impl OpsServer { next_page_token, }) } + + async fn get_operation_inner( + &self, + client_operation_id: OperationId, + ) -> Result { + for scheduler in self.schedulers.values() { + if let Some(action_state_result) = scheduler + .filter_operations(OperationFilter { + client_operation_id: Some(client_operation_id.clone()), + ..Default::default() + }) + .await? + .next() + .await + { + return translate_action_stage_result(action_state_result).await; + } + } + + Err(Error::new( + Code::NotFound, + format!( + "Couldn't find operation with ID {}", + client_operation_id.into_string() + ), + )) + } } #[tonic::async_trait] @@ -247,7 +234,9 @@ impl Operations for OpsServer { &self, request: Request, ) -> Result, Status> { - Err(Status::unimplemented("todo")) + let GetOperationRequest { name } = request.into_inner(); + let message = self.get_operation_inner(OperationId::String(name)).await?; + Ok(Response::new(message)) } /// Deletes a long-running operation. This method indicates that the client is /// no longer interested in the operation result. It does not cancel the @@ -291,3 +280,49 @@ impl Operations for OpsServer { Err(Status::unimplemented("todo")) } } + +async fn translate_action_stage_result( + action_state_result: Box, +) -> Result { + let info = action_state_result.as_action_info().await?; + let state = action_state_result.as_state().await?; + + let name = info.unique_qualifier.digest().to_string(); + let metadata = None; + let (done, result) = match &state.stage { + ActionStage::Completed(action_result) => { + let result = if action_result.exit_code == 0 { + OperationResult::Response(Any::default()) + } else { + OperationResult::Error(rpc::Status { + code: action_result.exit_code, + message: action_result.message.clone(), + details: vec![], + }) + }; + + (true, Some(result)) + } + ActionStage::CompletedFromCache(cached_action_result) => { + let result = if cached_action_result.exit_code == 0 { + OperationResult::Response(Any::default()) + } else { + OperationResult::Error(rpc::Status { + code: cached_action_result.exit_code, + message: String::from_utf8_lossy(&cached_action_result.stderr_raw).into_owned(), + details: vec![], + }) + }; + + (true, Some(result)) + } + _ => (false, None), + }; + + Ok(Operation { + name, + metadata, + done, + result, + }) +} From 4b117cf6c362101dc5de16b59fdb494cb0173371 Mon Sep 17 00:00:00 2001 From: Cass Fridkin Date: Tue, 24 Sep 2024 23:06:13 -0600 Subject: [PATCH 3/6] implement wait --- nativelink-service/src/operations_server.rs | 94 +++++++++++++++++++-- 1 file changed, 87 insertions(+), 7 deletions(-) diff --git a/nativelink-service/src/operations_server.rs b/nativelink-service/src/operations_server.rs index fe693d459..0e900605a 100644 --- a/nativelink-service/src/operations_server.rs +++ b/nativelink-service/src/operations_server.rs @@ -14,11 +14,13 @@ use std::collections::{HashMap, VecDeque}; use std::num::NonZero; +use std::str::FromStr; use std::sync::Arc; +use std::time::Duration; -use futures::StreamExt; +use futures::{StreamExt, TryStreamExt}; use lru::LruCache; -use nativelink_error::{Code, Error}; +use nativelink_error::{Code, Error, ResultExt}; use nativelink_proto::google::longrunning::operation::Result as OperationResult; use nativelink_proto::google::longrunning::operations_server::{Operations, OperationsServer}; use nativelink_proto::google::longrunning::{ @@ -32,6 +34,7 @@ use nativelink_util::operation_state_manager::{ }; use prost_types::Any; use tokio::sync::Mutex; +use tonic::metadata::MetadataValue; use tonic::{Request, Response, Status}; use uuid::Uuid; @@ -39,6 +42,10 @@ const LIST_OPERATIONS_MAXIMUM_PAGE_SIZE: i32 = 100; const LIST_OPERATIONS_DEFAULT_PAGE_SIZE: usize = 50; const NUM_CACHED_RESPONSES: NonZero = unsafe { NonZero::new_unchecked(1024) }; const NO_MORE_PAGES_TOKEN: &str = "NO_MORE_PAGES"; +const WAIT_OPERATION_DEFAULT_TIMEOUT: prost_types::Duration = prost_types::Duration { + seconds: 20, + nanos: 0, +}; pub struct OpsServer { schedulers: HashMap>, @@ -137,6 +144,46 @@ impl OpsServer { ), )) } + + async fn wait_operation_inner(&self, operation_id: OperationId) -> Result { + let mut action_state_result_maybe = None; + for scheduler in self.schedulers.values() { + if let Some(action_state_result) = scheduler + .filter_operations(OperationFilter { + client_operation_id: Some(operation_id.clone()), + ..Default::default() + }) + .await? + .next() + .await + { + action_state_result_maybe = Some(action_state_result); + break; + } + } + + let Some(mut action_state_result) = action_state_result_maybe else { + return Err(Error::new( + Code::NotFound, + format!( + "Couldn't find operation with ID {}", + operation_id.into_string() + ), + )); + }; + + loop { + let mut state = action_state_result.as_state().await?; + match state.stage { + ActionStage::Completed(_) | ActionStage::CompletedFromCache(_) => { + return translate_action_stage_result(action_state_result).await + } + _ => { + state = action_state_result.changed().await?; + } + } + } + } } #[tonic::async_trait] @@ -238,15 +285,16 @@ impl Operations for OpsServer { let message = self.get_operation_inner(OperationId::String(name)).await?; Ok(Response::new(message)) } + /// Deletes a long-running operation. This method indicates that the client is /// no longer interested in the operation result. It does not cancel the /// operation. If the server doesn't support this method, it returns /// `google.rpc.Code.UNIMPLEMENTED`. async fn delete_operation( &self, - request: Request, + _: Request, ) -> Result, Status> { - Err(Status::unimplemented("unimplemented")) + Err(Status::unimplemented("UNIMPLEMENTED")) } /// Starts asynchronous cancellation on a long-running operation. The server /// makes a best effort to cancel the operation, but success is not @@ -260,9 +308,9 @@ impl Operations for OpsServer { /// corresponding to `Code.CANCELLED`. async fn cancel_operation( &self, - request: Request, + _: Request, ) -> Result, Status> { - Err(Status::unimplemented("unimplemented")) + Err(Status::unimplemented("UNIMPLEMENTED")) } /// Waits for the specified long-running operation until it is done or reaches /// at most a specified timeout, returning the latest state. If the operation @@ -277,7 +325,39 @@ impl Operations for OpsServer { &self, request: Request, ) -> Result, Status> { - Err(Status::unimplemented("todo")) + let rpc_timeout: Duration = + if let Some(grpc_timeout_header) = request.metadata().get("grpc-timeout") { + grpc_timeout_header + .to_str() + .map_err(|e| Status::invalid_argument(format!("invalid grpc-timeout: {e}")))? + .parse::() + .map_err(|e| Status::invalid_argument(format!("invalid grpc-timeout: {e}")))? + .try_into() + .map_err(|e| Status::invalid_argument(format!("invalid grpc-timeout: {e}")))? + } else { + WAIT_OPERATION_DEFAULT_TIMEOUT + .try_into() + .expect("a positive timeout to translate") + }; + + let WaitOperationRequest { + name, + timeout: message_timeout_maybe, + } = request.into_inner(); + + let message_timeout: Duration = message_timeout_maybe + .unwrap_or(WAIT_OPERATION_DEFAULT_TIMEOUT) + .try_into() + .map_err(|e| Status::invalid_argument(format!("invalid timeout: {e}")))?; + + let timeout = rpc_timeout.min(message_timeout); + let operation_id = OperationId::String(name); + + let message = tokio::time::timeout(timeout, self.wait_operation_inner(operation_id)) + .await + .map_err(|_| Status::deadline_exceeded("timeout elapsed"))??; + + Ok(Response::new(message)) } } From 8fd9166c2fd6c964fddf5b4f0f8848c332771a45 Mon Sep 17 00:00:00 2001 From: Cass Fridkin Date: Tue, 24 Sep 2024 23:09:14 -0600 Subject: [PATCH 4/6] minor cleanup --- nativelink-service/src/operations_server.rs | 49 ++++----------------- 1 file changed, 8 insertions(+), 41 deletions(-) diff --git a/nativelink-service/src/operations_server.rs b/nativelink-service/src/operations_server.rs index 0e900605a..d382b606c 100644 --- a/nativelink-service/src/operations_server.rs +++ b/nativelink-service/src/operations_server.rs @@ -14,13 +14,12 @@ use std::collections::{HashMap, VecDeque}; use std::num::NonZero; -use std::str::FromStr; use std::sync::Arc; use std::time::Duration; -use futures::{StreamExt, TryStreamExt}; +use futures::StreamExt; use lru::LruCache; -use nativelink_error::{Code, Error, ResultExt}; +use nativelink_error::{Code, Error}; use nativelink_proto::google::longrunning::operation::Result as OperationResult; use nativelink_proto::google::longrunning::operations_server::{Operations, OperationsServer}; use nativelink_proto::google::longrunning::{ @@ -34,7 +33,6 @@ use nativelink_util::operation_state_manager::{ }; use prost_types::Any; use tokio::sync::Mutex; -use tonic::metadata::MetadataValue; use tonic::{Request, Response, Status}; use uuid::Uuid; @@ -74,6 +72,7 @@ impl OpsServer { OperationsServer::new(self) } + /// List operations matching a given filter. async fn list_operations_inner( &self, scheduler: &Arc, @@ -118,6 +117,7 @@ impl OpsServer { }) } + /// Get an operation, if it exists. async fn get_operation_inner( &self, client_operation_id: OperationId, @@ -145,6 +145,7 @@ impl OpsServer { )) } + /// Wait (potentially forever) for an operation to complete. async fn wait_operation_inner(&self, operation_id: OperationId) -> Result { let mut action_state_result_maybe = None; for scheduler in self.schedulers.values() { @@ -172,8 +173,8 @@ impl OpsServer { )); }; + let mut state = action_state_result.as_state().await?; loop { - let mut state = action_state_result.as_state().await?; match state.stage { ActionStage::Completed(_) | ActionStage::CompletedFromCache(_) => { return translate_action_stage_result(action_state_result).await @@ -188,16 +189,6 @@ impl OpsServer { #[tonic::async_trait] impl Operations for OpsServer { - /// Lists operations that match the specified filter in the request. If the - /// server doesn't support this method, it returns `UNIMPLEMENTED`. - /// - /// NOTE: the `name` binding allows API services to override the binding - /// to use different resource name schemes, such as `users/*/operations`. To - /// override the binding, API services can add a binding such as - /// `"/v1/{name=users/*}/operations"` to their service configuration. - /// For backwards compatibility, the default name includes the operations - /// collection id, however overriding users must ensure the name binding - /// is the parent resource, without the operations collection id. async fn list_operations( &self, request: Request, @@ -274,9 +265,6 @@ impl Operations for OpsServer { Ok(Response::new(message)) } - /// Gets the latest state of a long-running operation. Clients can use this - /// method to poll the operation result at intervals as recommended by the API - /// service. async fn get_operation( &self, request: Request, @@ -286,41 +274,20 @@ impl Operations for OpsServer { Ok(Response::new(message)) } - /// Deletes a long-running operation. This method indicates that the client is - /// no longer interested in the operation result. It does not cancel the - /// operation. If the server doesn't support this method, it returns - /// `google.rpc.Code.UNIMPLEMENTED`. async fn delete_operation( &self, _: Request, ) -> Result, Status> { Err(Status::unimplemented("UNIMPLEMENTED")) } - /// Starts asynchronous cancellation on a long-running operation. The server - /// makes a best effort to cancel the operation, but success is not - /// guaranteed. If the server doesn't support this method, it returns - /// `google.rpc.Code.UNIMPLEMENTED`. Clients can use - /// [Operations.GetOperation][google.longrunning.Operations.GetOperation] or - /// other methods to check whether the cancellation succeeded or whether the - /// operation completed despite cancellation. On successful cancellation, - /// the operation is not deleted; instead, it becomes an operation with - /// an [Operation.error][google.longrunning.Operation.error] value with a [google.rpc.Status.code][google.rpc.Status.code] of 1, - /// corresponding to `Code.CANCELLED`. + async fn cancel_operation( &self, _: Request, ) -> Result, Status> { Err(Status::unimplemented("UNIMPLEMENTED")) } - /// Waits for the specified long-running operation until it is done or reaches - /// at most a specified timeout, returning the latest state. If the operation - /// is already done, the latest state is immediately returned. If the timeout - /// specified is greater than the default HTTP/RPC timeout, the HTTP/RPC - /// timeout is used. If the server does not support this method, it returns - /// `google.rpc.Code.UNIMPLEMENTED`. - /// Note that this method is on a best-effort basis. It may return the latest - /// state before the specified timeout (including immediately), meaning even an - /// immediate response is no guarantee that the operation is done. + async fn wait_operation( &self, request: Request, From 098f21f970920535ba815f4b0812e5a9d5c4837c Mon Sep 17 00:00:00 2001 From: Cass Fridkin Date: Tue, 24 Sep 2024 23:53:53 -0600 Subject: [PATCH 5/6] Make `list` check all schedulers --- nativelink-service/src/operations_server.rs | 49 ++++++++------------- 1 file changed, 19 insertions(+), 30 deletions(-) diff --git a/nativelink-service/src/operations_server.rs b/nativelink-service/src/operations_server.rs index d382b606c..502416b43 100644 --- a/nativelink-service/src/operations_server.rs +++ b/nativelink-service/src/operations_server.rs @@ -17,7 +17,7 @@ use std::num::NonZero; use std::sync::Arc; use std::time::Duration; -use futures::StreamExt; +use futures::{stream, StreamExt, TryStreamExt}; use lru::LruCache; use nativelink_error::{Code, Error}; use nativelink_proto::google::longrunning::operation::Result as OperationResult; @@ -75,21 +75,32 @@ impl OpsServer { /// List operations matching a given filter. async fn list_operations_inner( &self, - scheduler: &Arc, page_size: usize, page_uuid: Option, filter: OperationFilter, ) -> Result { - let mut cache = self.cache.lock().await; let mut action_state_results = if let Some(uuid) = page_uuid { - cache.pop(&uuid).ok_or_else(|| { + self.cache.lock().await.pop(&uuid).ok_or_else(|| { Error::new( Code::NotFound, format!("Couldn't find page with token {}", uuid), ) })? } else { - scheduler.filter_operations(filter).await?.collect().await + let schedulers = self.schedulers.values().cloned(); + stream::iter(schedulers) + .then(|scheduler| { + let filter = filter.clone(); + async move { + let operations = scheduler.filter_operations(filter).await?; + Ok(operations.collect::>().await) + } + }) + .try_fold(VecDeque::new(), |mut queue, mut operations| async move { + queue.extend(operations.drain(..)); + Ok::<_, Error>(queue) + }) + .await? }; let rest = action_state_results.split_off(page_size.min(action_state_results.len())); @@ -97,14 +108,12 @@ impl OpsServer { let next_page_uuid = Uuid::new_v4(); let token = next_page_uuid.to_string(); - cache.push(next_page_uuid, rest); + self.cache.lock().await.push(next_page_uuid, rest); token } else { NO_MORE_PAGES_TOKEN.to_string() }; - drop(cache); - let mut out = Vec::with_capacity(action_state_results.len()); for action_state_result in action_state_results { let operation = translate_action_stage_result(action_state_result).await?; @@ -194,10 +203,10 @@ impl Operations for OpsServer { request: Request, ) -> Result, Status> { let ListOperationsRequest { - name, filter: filter_string, page_size, page_token, + .. } = request.into_inner(); let normalized_page_size = if page_size < 0 || page_size > LIST_OPERATIONS_MAXIMUM_PAGE_SIZE @@ -214,26 +223,6 @@ impl Operations for OpsServer { .expect("a positive number between 0-100 to fit in u32") }; - let Some(scheduler) = self - .schedulers - .iter() - .find_map(|(scheduler_name, scheduler)| { - let n = scheduler_name.len(); - if name.starts_with(scheduler_name.as_str()) - && name.as_bytes().get(n).is_some_and(|b| *b == b'/') - { - Some(scheduler) - } else { - None - } - }) - else { - return Err(Status::not_found(format!( - "couldn't find a scheduler named {}", - &name - ))); - }; - let filter = if filter_string.is_empty() { OperationFilter::default() } else { @@ -259,7 +248,7 @@ impl Operations for OpsServer { }; let message = self - .list_operations_inner(scheduler, normalized_page_size, page_uuid, filter) + .list_operations_inner(normalized_page_size, page_uuid, filter) .await?; Ok(Response::new(message)) From cfd749af6b215669ed121be53397a08f5aa08926 Mon Sep 17 00:00:00 2001 From: Cass Fridkin Date: Tue, 24 Sep 2024 23:58:11 -0600 Subject: [PATCH 6/6] Make bazel lints happy --- nativelink-service/src/operations_server.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/nativelink-service/src/operations_server.rs b/nativelink-service/src/operations_server.rs index 502416b43..81e8b9d1f 100644 --- a/nativelink-service/src/operations_server.rs +++ b/nativelink-service/src/operations_server.rs @@ -83,7 +83,7 @@ impl OpsServer { self.cache.lock().await.pop(&uuid).ok_or_else(|| { Error::new( Code::NotFound, - format!("Couldn't find page with token {}", uuid), + format!("Couldn't find page with token {uuid}"), ) })? } else { @@ -209,11 +209,10 @@ impl Operations for OpsServer { .. } = request.into_inner(); - let normalized_page_size = if page_size < 0 || page_size > LIST_OPERATIONS_MAXIMUM_PAGE_SIZE + let normalized_page_size = if !(0..=LIST_OPERATIONS_MAXIMUM_PAGE_SIZE).contains(&page_size) { return Err(Status::out_of_range(format!( - "page size {} out of range 0..=100", - page_size + "page size {page_size} out of range 0..=100", ))); } else if page_size == 0 { LIST_OPERATIONS_DEFAULT_PAGE_SIZE