diff --git a/Cargo.lock b/Cargo.lock index a376815b2..74582e0b3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1824,7 +1824,6 @@ dependencies = [ "bytes", "futures", "hyper", - "log", "maplit", "nativelink-config", "nativelink-error", @@ -1869,7 +1868,6 @@ dependencies = [ "http-body 1.0.0", "hyper", "hyper-rustls", - "log", "lz4_flex", "memory-stats", "nativelink-config", @@ -1903,7 +1901,6 @@ dependencies = [ "bytes", "futures", "hex", - "log", "lru", "mock_instant", "nativelink-config", diff --git a/nativelink-scheduler/src/cache_lookup_scheduler.rs b/nativelink-scheduler/src/cache_lookup_scheduler.rs index c9e0dd154..fc243616a 100644 --- a/nativelink-scheduler/src/cache_lookup_scheduler.rs +++ b/nativelink-scheduler/src/cache_lookup_scheduler.rs @@ -35,7 +35,7 @@ use tokio::select; use tokio::sync::watch; use tokio_stream::wrappers::WatchStream; use tonic::Request; -use tracing::warn; +use tracing::{event, Level}; use crate::action_scheduler::ActionScheduler; use crate::platform_property_manager::PlatformPropertyManager; @@ -180,7 +180,11 @@ impl ActionScheduler for CacheLookupScheduler { return; } Err(err) => { - warn!("Error while calling `has` on `ac_store` in `CacheLookupScheduler`'s `add_action` function: {}", err); + event!( + Level::WARN, + ?err, + "Error while calling `has` on `ac_store` in `CacheLookupScheduler`'s `add_action` function" + ); } _ => {} } diff --git a/nativelink-scheduler/src/grpc_scheduler.rs b/nativelink-scheduler/src/grpc_scheduler.rs index 4373b8d9f..8f012dba3 100644 --- a/nativelink-scheduler/src/grpc_scheduler.rs +++ b/nativelink-scheduler/src/grpc_scheduler.rs @@ -40,7 +40,7 @@ use tokio::select; use tokio::sync::watch; use tokio::time::sleep; use tonic::{Request, Streaming}; -use tracing::{error, info, warn}; +use tracing::{error_span, event, Instrument, Level}; use crate::action_scheduler::ActionScheduler; use crate::platform_property_manager::PlatformPropertyManager; @@ -123,7 +123,10 @@ impl GrpcScheduler { loop { select!( _ = tx.closed() => { - info!("Client disconnected in GrpcScheduler"); + event!( + Level::INFO, + "Client disconnected in GrpcScheduler" + ); return; } response = result_stream.message() => { @@ -135,16 +138,27 @@ impl GrpcScheduler { match response.try_into() { Ok(response) => { if let Err(err) = tx.send(Arc::new(response)) { - info!("Client disconnected in GrpcScheduler: {}", err); + event!( + Level::INFO, + ?err, + "Client error in GrpcScheduler" + ); return; } } - Err(err) => error!("Error converting response to ActionState in GrpcScheduler: {}", err), + Err(err) => { + event!( + Level::ERROR, + ?err, + "Error converting response to ActionState in GrpcScheduler" + ); + }, } } ) } - }); + } + .instrument(error_span!("stream_state"))); return Ok(rx); } Err(make_err!( @@ -264,9 +278,10 @@ impl ActionScheduler for GrpcScheduler { match result_stream { Ok(result_stream) => Some(result_stream), Err(err) => { - warn!( - "Error response looking up action with upstream scheduler: {}", - err + event!( + Level::WARN, + ?err, + "Error looking up action with upstream scheduler" ); None } diff --git a/nativelink-scheduler/src/simple_scheduler.rs b/nativelink-scheduler/src/simple_scheduler.rs index 9e22397be..1c08ec041 100644 --- a/nativelink-scheduler/src/simple_scheduler.rs +++ b/nativelink-scheduler/src/simple_scheduler.rs @@ -38,7 +38,7 @@ use parking_lot::{Mutex, MutexGuard}; use tokio::sync::{watch, Notify}; use tokio::task::JoinHandle; use tokio::time::Duration; -use tracing::{error, warn}; +use tracing::{event, Level}; use crate::action_scheduler::ActionScheduler; use crate::platform_property_manager::PlatformPropertyManager; @@ -122,10 +122,12 @@ impl Workers { let res = worker .send_initial_connection_result() .err_tip(|| "Failed to send initial connection result to worker"); - if let Err(e) = &res { - error!( - "Worker connection appears to have been closed while adding to pool : {:?}", - e + if let Err(err) = &res { + event!( + Level::ERROR, + ?worker_id, + ?err, + "Worker connection appears to have been closed while adding to pool" ); } res @@ -379,15 +381,22 @@ impl SimpleSchedulerImpl { // Don't remove this task, instead we keep them around for a bit just in case // the client disconnected and will reconnect and ask for same job to be executed // again. - warn!( - "Action {} has no more listeners during evict_worker()", - action_info.digest().hash_str() + event!( + Level::WARN, + ?action_info, + ?worker_id, + "Action has no more listeners during evict_worker()" ); } } None => { self.metrics.retry_action_but_action_missing.inc(); - error!("Worker stated it was running an action, but it was not in the active_actions : Worker: {:?}, ActionInfo: {:?}", worker_id, action_info); + event!( + Level::ERROR, + ?action_info, + ?worker_id, + "Worker stated it was running an action, but it was not in the active_actions" + ); } } } @@ -435,9 +444,10 @@ impl SimpleSchedulerImpl { self.queued_actions.keys().rev().cloned().collect(); for action_info in action_infos { let Some(awaited_action) = self.queued_actions.get(action_info.as_ref()) else { - error!( - "queued_actions out of sync with itself for action {}", - action_info.digest().hash_str() + event!( + Level::ERROR, + ?action_info, + "queued_actions out of sync with itself" ); continue; }; @@ -454,13 +464,20 @@ impl SimpleSchedulerImpl { worker.notify_update(WorkerUpdate::RunAction(action_info.clone())); if notify_worker_result.is_err() { // Remove worker, as it is no longer receiving messages and let it try to find another worker. - let err = make_err!( - Code::Internal, - "Worker command failed, removing worker {}", - worker_id + event!( + Level::WARN, + ?worker_id, + ?action_info, + "Worker command failed, removing worker", + ); + self.immediate_evict_worker( + &worker_id, + make_err!( + Code::Internal, + "Worker command failed, removing worker {}", + worker_id + ), ); - warn!("{:?}", err); - self.immediate_evict_worker(&worker_id, err); return; } @@ -482,9 +499,11 @@ impl SimpleSchedulerImpl { // Don't remove this task, instead we keep them around for a bit just in case // the client disconnected and will reconnect and ask for same job to be executed // again. - warn!( - "Action {} has no more listeners", - awaited_action.action_info.digest().hash_str() + event!( + Level::WARN, + ?action_info, + ?worker_id, + "Action has no more listeners during do_try_match()" ); } awaited_action.attempts += 1; @@ -505,7 +524,12 @@ impl SimpleSchedulerImpl { self.metrics .update_action_with_internal_error_no_action .inc(); - error!("Could not find action info in active actions : {action_info_hash_key:?}"); + event!( + Level::ERROR, + ?action_info_hash_key, + ?worker_id, + "Could not find action info in active actions" + ); return; }; @@ -518,13 +542,24 @@ impl SimpleSchedulerImpl { running_action.attempts -= 1; } let Some(running_action_worker_id) = running_action.worker_id else { - return error!( - "Got a result from a worker that should not be running the action, Removing worker. Expected action to be unassigned got worker {worker_id}" + event!( + Level::ERROR, + ?action_info_hash_key, + ?worker_id, + "Got a result from a worker that should not be running the action, Removing worker. Expected action to be unassigned got worker", ); + return; }; if running_action_worker_id == *worker_id { // Don't set the error on an action that's running somewhere else. - warn!("Internal error for worker {}: {}", worker_id, err); + event!( + Level::WARN, + ?action_info_hash_key, + ?worker_id, + ?running_action_worker_id, + ?err, + "Internal worker error", + ); running_action.last_error = Some(err.clone()); } else { self.metrics @@ -561,11 +596,17 @@ impl SimpleSchedulerImpl { ) -> Result<(), Error> { if !action_stage.has_action_result() { self.metrics.update_action_missing_action_result.inc(); + event!( + Level::ERROR, + ?action_info_hash_key, + ?worker_id, + ?action_stage, + "Worker sent error while updating action. Removing worker" + ); let err = make_err!( Code::Internal, "Worker '{worker_id}' set the action_stage of running action {action_info_hash_key:?} to {action_stage:?}. Removing worker.", ); - error!("{:?}", err); self.immediate_evict_worker(worker_id, err.clone()); return Err(err); } @@ -590,7 +631,14 @@ impl SimpleSchedulerImpl { "Got a result from a worker that should not be running the action, Removing worker. Expected action to be unassigned got worker {worker_id}", ), }; - error!("{:?}", err); + event!( + Level::ERROR, + ?action_info, + ?worker_id, + ?running_action.worker_id, + ?err, + "Got a result from a worker that should not be running the action, Removing worker" + ); // First put it back in our active_actions or we will drop the task. self.active_actions.insert(action_info, running_action); self.immediate_evict_worker(worker_id, err.clone()); @@ -606,9 +654,11 @@ impl SimpleSchedulerImpl { if !running_action.current_state.stage.is_finished() { if send_result.is_err() { self.metrics.update_action_no_more_listeners.inc(); - warn!( - "Action {} has no more listeners during update_action()", - action_info.digest().hash_str() + event!( + Level::WARN, + ?action_info, + ?worker_id, + "Action has no more listeners during update_action()" ); } // If the operation is not finished it means the worker is still working on it, so put it @@ -903,12 +953,18 @@ impl WorkerScheduler for SimpleScheduler { }) .collect(); for worker_id in &worker_ids_to_remove { - let err = make_err!( - Code::Internal, - "Worker {worker_id} timed out, removing from pool" + event!( + Level::WARN, + ?worker_id, + "Worker timed out, removing from pool" + ); + inner.immediate_evict_worker( + worker_id, + make_err!( + Code::Internal, + "Worker {worker_id} timed out, removing from pool" + ), ); - warn!("{:?}", err); - inner.immediate_evict_worker(worker_id, err); } Ok(()) diff --git a/nativelink-service/BUILD.bazel b/nativelink-service/BUILD.bazel index 7dcdc3ab9..32a838097 100644 --- a/nativelink-service/BUILD.bazel +++ b/nativelink-service/BUILD.bazel @@ -29,7 +29,6 @@ rust_library( "@crates//:bytes", "@crates//:futures", "@crates//:hyper", - "@crates//:log", "@crates//:parking_lot", "@crates//:prost", "@crates//:rand", diff --git a/nativelink-service/Cargo.toml b/nativelink-service/Cargo.toml index 8a851d31e..99c439d11 100644 --- a/nativelink-service/Cargo.toml +++ b/nativelink-service/Cargo.toml @@ -15,7 +15,6 @@ bytes = "1.6.0" futures = "0.3.30" hyper = { version = "0.14.28" } serde_json5 = "0.1.0" -log = "0.4.21" parking_lot = "0.12.1" prost = "0.12.4" rand = "0.8.5" diff --git a/nativelink-service/src/ac_server.rs b/nativelink-service/src/ac_server.rs index ea09ba498..24a4bfc07 100644 --- a/nativelink-service/src/ac_server.rs +++ b/nativelink-service/src/ac_server.rs @@ -13,9 +13,9 @@ // limitations under the License. use std::collections::HashMap; +use std::fmt::Debug; use std::pin::Pin; use std::sync::Arc; -use std::time::Instant; use bytes::BytesMut; use nativelink_config::cas_server::{AcStoreConfig, InstanceName}; @@ -33,7 +33,7 @@ use nativelink_util::common::DigestInfo; use nativelink_util::store_trait::Store; use prost::Message; use tonic::{Request, Response, Status}; -use tracing::{error, info}; +use tracing::{event, instrument, Level}; #[derive(Clone)] pub struct AcStoreInfo { @@ -45,6 +45,12 @@ pub struct AcServer { stores: HashMap, } +impl Debug for AcServer { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("AcServer").finish() + } +} + impl AcServer { pub fn new( config: &HashMap, @@ -157,60 +163,38 @@ impl AcServer { #[tonic::async_trait] impl ActionCache for AcServer { + #[allow(clippy::blocks_in_conditions)] + #[instrument( + ret(level = Level::INFO), + level = Level::ERROR, + skip_all, + fields(request = ?grpc_request.get_ref()) + )] async fn get_action_result( &self, grpc_request: Request, ) -> Result, Status> { - let now = Instant::now(); - info!( - "\x1b[0;31mget_action_result Req\x1b[0m: {:?}", - grpc_request.get_ref() - ); - let hash = grpc_request - .get_ref() - .action_digest - .as_ref() - .map(|v| v.hash.to_string()); let resp = self.inner_get_action_result(grpc_request).await; - let d = now.elapsed().as_secs_f32(); if resp.is_err() && resp.as_ref().err().unwrap().code != Code::NotFound { - error!( - "\x1b[0;31mget_action_result Resp\x1b[0m: {} {:?} {:?}", - d, hash, resp - ); - } else { - info!( - "\x1b[0;31mget_action_result Resp\x1b[0m: {} {:?} {:?}", - d, hash, resp - ); + event!(Level::ERROR, return = ?resp); } return resp.map_err(|e| e.into()); } + #[allow(clippy::blocks_in_conditions)] + #[instrument( + err, + ret(level = Level::INFO), + level = Level::ERROR, + skip_all, + fields(request = ?grpc_request.get_ref()) + )] async fn update_action_result( &self, grpc_request: Request, ) -> Result, Status> { - let now = Instant::now(); - info!( - "\x1b[0;31mupdate_action_result Req\x1b[0m: {:?}", - grpc_request.get_ref() - ); - let resp = self.inner_update_action_result(grpc_request).await; - let d = now.elapsed().as_secs_f32(); - if resp.is_err() { - log::error!( - "\x1b[0;31mupdate_action_result Resp\x1b[0m: {} {:?}", - d, - resp - ); - } else { - log::info!( - "\x1b[0;31mupdate_action_result Resp\x1b[0m: {} {:?}", - d, - resp - ); - } - return resp.map_err(|e| e.into()); + self.inner_update_action_result(grpc_request) + .await + .map_err(|e| e.into()) } } diff --git a/nativelink-service/src/bytestream_server.rs b/nativelink-service/src/bytestream_server.rs index d84617511..b94f080cb 100644 --- a/nativelink-service/src/bytestream_server.rs +++ b/nativelink-service/src/bytestream_server.rs @@ -18,7 +18,7 @@ use std::fmt::{Debug, Formatter}; use std::pin::Pin; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Duration; use futures::future::{pending, BoxFuture}; use futures::stream::unfold; @@ -45,7 +45,7 @@ use parking_lot::Mutex; use tokio::task::AbortHandle; use tokio::time::sleep; use tonic::{Request, Response, Status, Streaming}; -use tracing::{enabled, error, info, Level}; +use tracing::{enabled, error_span, event, instrument, Instrument, Level}; /// If this value changes update the documentation in the config definition. const DEFAULT_PERSIST_STREAM_ON_DISCONNECT_TIMEOUT: Duration = Duration::from_secs(60); @@ -100,9 +100,10 @@ impl<'a> Drop for ActiveStreamGuard<'a> { let mut active_uploads = self.bytestream_server.active_uploads.lock(); let uuid = stream_state.uuid.clone(); let Some(active_uploads_slot) = active_uploads.get_mut(&uuid) else { - error!( - "Failed to find active upload for UUID: {}. This should never happen.", - uuid + event!( + Level::ERROR, + err = "Failed to find active upload. This should never happen.", + uuid = ?uuid, ); return; }; @@ -113,7 +114,7 @@ impl<'a> Drop for ActiveStreamGuard<'a> { (*sleep_fn)().await; if let Some(active_uploads) = weak_active_uploads.upgrade() { let mut active_uploads = active_uploads.lock(); - info!("Removing idle stream {uuid}"); + event!(Level::INFO, msg = "Removing idle stream", uuid = ?uuid); active_uploads.remove(&uuid); } }) @@ -213,7 +214,7 @@ impl ByteStreamServer { return Err(make_input_err!("Cannot upload same UUID simultaneously")); }; let bytes_received = maybe_idle_stream.0.clone(); - info!("Joining existing stream {}", entry.key()); + event!(Level::INFO, msg = "Joining existing stream", entry = ?entry.key()); return Ok(idle_stream.into_active_stream(bytes_received, self)); } Entry::Vacant(entry) => { @@ -307,7 +308,10 @@ impl ByteStreamServer { }), }); - Ok(Response::new(Box::pin(unfold(state, move |state| async { + let read_stream_span = error_span!("read_stream"); + + Ok(Response::new(Box::pin(unfold(state, move |state| { + async { let mut state = state?; // If None our stream is done. let mut response = ReadResponse::default(); { @@ -327,7 +331,11 @@ impl ByteStreamServer { return Some((Err(err.into()), None)); } response.data = bytes; - info!("\x1b[0;31mBytestream Read Chunk Resp\x1b[0m: {:?}", response); + if enabled!(Level::DEBUG) { + event!(Level::INFO, response = ?response); + } else { + event!(Level::INFO, response.data = format!("", response.data.len())); + } break; } Err(mut e) => { @@ -351,7 +359,7 @@ impl ByteStreamServer { // message as it will be the most relevant. e.messages.truncate(1); } - info!("\x1b[0;31mBytestream Read Chunk Resp\x1b[0m: Error {:?}", e); + event!(Level::ERROR, response = ?e); return Some((Err(e.into()), None)) } } @@ -374,9 +382,19 @@ impl ByteStreamServer { } } Some((Ok(response), Some(state))) + }.instrument(read_stream_span.clone()) })))) } + // We instrument tracing here as well as below because `stream` has a hash on it + // that is extracted from the first stream message. If we only implemented it below + // we would not have the hash available to us. + #[instrument( + ret(level = Level::INFO), + level = Level::ERROR, + skip(self), + fields(stream.first_msg = "") + )] async fn inner_write( &self, mut stream: WriteRequestStreamWrapper, Status>, @@ -566,79 +584,66 @@ impl ByteStreamServer { #[tonic::async_trait] impl ByteStream for ByteStreamServer { type ReadStream = ReadStream; + + #[allow(clippy::blocks_in_conditions)] + #[instrument( + err, + level = Level::ERROR, + skip_all, + fields(request = ?grpc_request.get_ref()) + )] async fn read( &self, grpc_request: Request, ) -> Result, Status> { - info!("\x1b[0;31mRead Req\x1b[0m: {:?}", grpc_request.get_ref()); - let now = Instant::now(); let resp = self .inner_read(grpc_request) .await .err_tip(|| "Failed on read() command") .map_err(|e| e.into()); - let d = now.elapsed().as_secs_f32(); - if let Err(err) = resp.as_ref() { - error!("\x1b[0;31mRead Resp\x1b[0m: {} {:?}", d, err); - } else { - info!("\x1b[0;31mRead Resp\x1b[0m: {}", d); + if resp.is_ok() { + event!(Level::DEBUG, return = "Ok()"); } resp } + #[allow(clippy::blocks_in_conditions)] + #[instrument( + err, + level = Level::ERROR, + skip_all, + fields(request = ?grpc_request.get_ref()) + )] async fn write( &self, grpc_request: Request>, ) -> Result, Status> { - let now = Instant::now(); let stream = WriteRequestStreamWrapper::from(grpc_request.into_inner()) .await .err_tip(|| "Could not unwrap first stream message") .map_err(Into::::into)?; - let hash = if enabled!(Level::DEBUG) { - Some(stream.hash.clone()) - } else { - None - }; - - info!("\x1b[0;31mWrite Req\x1b[0m: {:?}", hash); - let resp = self - .inner_write(stream) + self.inner_write(stream) .await .err_tip(|| "In ByteStreamServer::write()") - .map_err(|e| e.into()); - - let d = now.elapsed().as_secs_f32(); - if let Err(err) = resp.as_ref() { - error!("\x1b[0;31mWrite Resp\x1b[0m: {} {:?} {:?}", d, hash, err); - } else { - info!("\x1b[0;31mWrite Resp\x1b[0m: {} {:?}", d, hash); - } - resp + .map_err(|e| e.into()) } + #[allow(clippy::blocks_in_conditions)] + #[instrument( + err, + ret(level = Level::INFO), + level = Level::ERROR, + skip_all, + fields(request = ?grpc_request.get_ref()) + )] async fn query_write_status( &self, grpc_request: Request, ) -> Result, Status> { - let now = Instant::now(); - let query_request = grpc_request.into_inner(); - - let resp = self - .inner_query_write_status(&query_request) + self.inner_query_write_status(&grpc_request.into_inner()) .await .err_tip(|| "Failed on query_write_status() command") - .map_err(|e| e.into()); - - let d = now.elapsed().as_secs_f32(); - if resp.is_err() { - error!("\x1b[0;31mQuery Req\x1b[0m: {:?}", query_request); - error!("\x1b[0;31mQuery Resp\x1b[0m: {} {:?}", d, resp); - } else { - info!("\x1b[0;31mQuery Req\x1b[0m: {:?}", query_request); - info!("\x1b[0;31mQuery Resp\x1b[0m: {} {:?}", d, resp); - } - resp + .map_err(|e| e.into()) } } diff --git a/nativelink-service/src/capabilities_server.rs b/nativelink-service/src/capabilities_server.rs index f775d29da..78e2fe62b 100644 --- a/nativelink-service/src/capabilities_server.rs +++ b/nativelink-service/src/capabilities_server.rs @@ -31,6 +31,7 @@ use nativelink_proto::build::bazel::semver::SemVer; use nativelink_scheduler::action_scheduler::ActionScheduler; use nativelink_util::digest_hasher::default_digest_hasher_func; use tonic::{Request, Response, Status}; +use tracing::{instrument, Level}; const MAX_BATCH_TOTAL_SIZE: i64 = 64 * 1024; @@ -82,11 +83,19 @@ impl CapabilitiesServer { #[tonic::async_trait] impl Capabilities for CapabilitiesServer { + #[allow(clippy::blocks_in_conditions)] + #[instrument( + err, + ret(level = Level::INFO), + level = Level::ERROR, + skip_all, + fields(request = ?grpc_request.get_ref()) + )] async fn get_capabilities( &self, - request: Request, + grpc_request: Request, ) -> Result, Status> { - let instance_name = request.into_inner().instance_name; + let instance_name = grpc_request.into_inner().instance_name; let maybe_supported_node_properties = self .supported_node_properties_for_instance .get(&instance_name); diff --git a/nativelink-service/src/cas_server.rs b/nativelink-service/src/cas_server.rs index f04213517..81ec1ef02 100644 --- a/nativelink-service/src/cas_server.rs +++ b/nativelink-service/src/cas_server.rs @@ -15,7 +15,6 @@ use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; -use std::time::Instant; use bytes::Bytes; use futures::stream::{FuturesUnordered, Stream}; @@ -36,7 +35,7 @@ use nativelink_store::store_manager::StoreManager; use nativelink_util::common::DigestInfo; use nativelink_util::store_trait::Store; use tonic::{Request, Response, Status}; -use tracing::{error, info}; +use tracing::{event, instrument, Level}; pub struct CasServer { stores: HashMap>, @@ -248,94 +247,80 @@ impl CasServer { #[tonic::async_trait] impl ContentAddressableStorage for CasServer { + type GetTreeStream = GetTreeStream; + + #[allow(clippy::blocks_in_conditions)] + #[instrument( + err, + ret(level = Level::INFO), + level = Level::ERROR, + skip_all, + fields(request = ?grpc_request.get_ref()) + )] async fn find_missing_blobs( &self, grpc_request: Request, ) -> Result, Status> { - info!( - "\x1b[0;31mfind_missing_blobs Req\x1b[0m: {:?}", - grpc_request.get_ref() - ); - let now = Instant::now(); - let resp = self - .inner_find_missing_blobs(grpc_request) + self.inner_find_missing_blobs(grpc_request) .await .err_tip(|| "Failed on find_missing_blobs() command") - .map_err(|e| e.into()); - let d = now.elapsed().as_secs_f32(); - if resp.is_err() { - error!("\x1b[0;31mfind_missing_blobs Resp\x1b[0m: {} {:?}", d, resp); - } else { - info!("\x1b[0;31mfind_missing_blobs Resp\x1b[0m: {} {:?}", d, resp); - } - resp + .map_err(|e| e.into()) } + #[allow(clippy::blocks_in_conditions)] + #[instrument( + err, + ret(level = Level::INFO), + level = Level::ERROR, + skip_all, + fields(request = ?grpc_request.get_ref()) + )] async fn batch_update_blobs( &self, grpc_request: Request, ) -> Result, Status> { - info!( - "\x1b[0;31mbatch_update_blobs Req\x1b[0m: {:?}", - grpc_request.get_ref() - ); - let now = Instant::now(); - let resp = self - .inner_batch_update_blobs(grpc_request) + self.inner_batch_update_blobs(grpc_request) .await .err_tip(|| "Failed on batch_update_blobs() command") - .map_err(|e| e.into()); - let d = now.elapsed().as_secs_f32(); - if resp.is_err() { - error!("\x1b[0;31mbatch_update_blobs Resp\x1b[0m: {} {:?}", d, resp); - } else { - info!("\x1b[0;31mbatch_update_blobs Resp\x1b[0m: {} {:?}", d, resp); - } - resp + .map_err(|e| e.into()) } + #[allow(clippy::blocks_in_conditions)] + #[instrument( + err, + ret(level = Level::INFO), + level = Level::ERROR, + skip_all, + fields(request = ?grpc_request.get_ref()) + )] async fn batch_read_blobs( &self, grpc_request: Request, ) -> Result, Status> { - info!( - "\x1b[0;31mbatch_read_blobs Req\x1b[0m: {:?}", - grpc_request.get_ref() - ); - let now = Instant::now(); - let resp = self - .inner_batch_read_blobs(grpc_request) + self.inner_batch_read_blobs(grpc_request) .await .err_tip(|| "Failed on batch_read_blobs() command") - .map_err(|e| e.into()); - let d = now.elapsed().as_secs_f32(); - if resp.is_err() { - error!("\x1b[0;31mbatch_read_blobs Resp\x1b[0m: {} {:?}", d, resp); - } else { - info!("\x1b[0;31mbatch_read_blobs Resp\x1b[0m: {} {:?}", d, resp); - } - resp + .map_err(|e| e.into()) } - type GetTreeStream = GetTreeStream; + #[allow(clippy::blocks_in_conditions)] + #[instrument( + err, + level = Level::ERROR, + skip_all, + fields(request = ?grpc_request.get_ref()) + )] async fn get_tree( &self, grpc_request: Request, ) -> Result, Status> { - info!( - "\x1b[0;31mget_tree Req\x1b[0m: {:?}", - grpc_request.get_ref() - ); - let now = Instant::now(); - let resp: Result, Status> = self + let resp = self .inner_get_tree(grpc_request) .await .err_tip(|| "Failed on get_tree() command") .map_err(|e| e.into()); - let d = now.elapsed().as_secs_f32(); - match &resp { - Err(err) => error!("\x1b[0;31mget_tree Resp\x1b[0m: {} : {:?}", d, err), - Ok(_) => info!("\x1b[0;31mget_tree Resp\x1b[0m: {}", d), + if resp.is_ok() { + event!(Level::DEBUG, return = "Ok()"); } resp } diff --git a/nativelink-service/src/execution_server.rs b/nativelink-service/src/execution_server.rs index cd26d4f93..a403636b4 100644 --- a/nativelink-service/src/execution_server.rs +++ b/nativelink-service/src/execution_server.rs @@ -15,7 +15,7 @@ use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; -use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; use futures::{Stream, StreamExt}; use nativelink_config::cas_server::{ExecutionConfig, InstanceName}; @@ -41,7 +41,7 @@ use rand::{thread_rng, Rng}; use tokio::sync::watch; use tokio_stream::wrappers::WatchStream; use tonic::{Request, Response, Status}; -use tracing::{error, info}; +use tracing::{event, instrument, Level}; struct InstanceInfo { scheduler: Arc, @@ -184,7 +184,7 @@ impl ExecutionServer { fn to_execute_stream(receiver: watch::Receiver>) -> Response { let receiver_stream = Box::pin(WatchStream::new(receiver).map(|action_update| { - info!("\x1b[0;31mexecute Resp Stream\x1b[0m: {:?}", action_update); + event!(Level::INFO, ?action_update, "Execute Resp Stream",); Ok(Into::::into(action_update.as_ref().clone())) })); tonic::Response::new(receiver_stream) @@ -263,36 +263,45 @@ impl ExecutionServer { #[tonic::async_trait] impl Execution for ExecutionServer { type ExecuteStream = ExecuteStream; + type WaitExecutionStream = ExecuteStream; + #[allow(clippy::blocks_in_conditions)] + #[instrument( + err, + level = Level::ERROR, + skip_all, + fields(request = ?grpc_request.get_ref()) + )] async fn execute( &self, grpc_request: Request, ) -> Result, Status> { - // TODO(blaise.bruer) This is a work in progress, remote execution likely won't work yet. - info!("\x1b[0;31mexecute Req\x1b[0m: {:?}", grpc_request.get_ref()); - let now = Instant::now(); - let resp = self - .inner_execute(grpc_request) + self.inner_execute(grpc_request) .await .err_tip(|| "Failed on execute() command") - .map_err(|e| e.into()); - let d = now.elapsed().as_secs_f32(); - if let Err(err) = &resp { - error!("\x1b[0;31mexecute Resp\x1b[0m: {} {:?}", d, err); - } else { - info!("\x1b[0;31mexecute Resp\x1b[0m: {}", d); - } - resp + .map_err(|e| e.into()) } - type WaitExecutionStream = ExecuteStream; + #[allow(clippy::blocks_in_conditions)] + #[instrument( + err, + level = Level::ERROR, + skip_all, + fields(request = ?grpc_request.get_ref()) + )] async fn wait_execution( &self, - request: Request, + grpc_request: Request, ) -> Result, Status> { - self.inner_wait_execution(request) + let resp = self + .inner_wait_execution(grpc_request) .await .err_tip(|| "Failed on wait_execution() command") - .map_err(|e| e.into()) + .map_err(|e| e.into()); + + if resp.is_ok() { + event!(Level::DEBUG, return = "Ok()"); + } + resp } } diff --git a/nativelink-service/src/worker_api_server.rs b/nativelink-service/src/worker_api_server.rs index f2ee583a7..47bcca454 100644 --- a/nativelink-service/src/worker_api_server.rs +++ b/nativelink-service/src/worker_api_server.rs @@ -15,7 +15,7 @@ use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; -use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; use futures::stream::unfold; use futures::Stream; @@ -35,7 +35,7 @@ use nativelink_util::platform_properties::PlatformProperties; use tokio::sync::mpsc; use tokio::time::interval; use tonic::{Request, Response, Status}; -use tracing::{error, info, warn}; +use tracing::{error_span, event, instrument, Instrument, Level}; use uuid::Uuid; pub type ConnectWorkerStream = @@ -67,17 +67,21 @@ impl WorkerApiServer { .expect("Error: system time is now behind unix epoch"); match weak_scheduler.upgrade() { Some(scheduler) => { - if let Err(e) = + if let Err(err) = scheduler.remove_timedout_workers(timestamp.as_secs()).await { - error!("Error while running remove_timedout_workers : {:?}", e); + event!( + Level::ERROR, + ?err, + "Failed to remove_timedout_workers", + ); } } // If we fail to upgrade, our service is probably destroyed, so return. None => return, } } - }); + }.instrument(error_span!("worker_api_server"))); } Self::new_with_now_fn( @@ -159,9 +163,10 @@ impl WorkerApiServer { if let Some(update_for_worker) = rx.recv().await { return Some((Ok(update_for_worker), (rx, worker_id))); } - warn!( - "UpdateForWorker channel was closed, thus closing connection to worker node : {}", - worker_id + event!( + Level::WARN, + ?worker_id, + "UpdateForWorker channel was closed, thus closing connection to worker node", ); None @@ -231,83 +236,76 @@ impl WorkerApiServer { #[tonic::async_trait] impl WorkerApi for WorkerApiServer { type ConnectWorkerStream = ConnectWorkerStream; + + #[allow(clippy::blocks_in_conditions)] + #[instrument( + err, + level = Level::ERROR, + skip_all, + fields(request = ?grpc_request.get_ref()) + )] async fn connect_worker( &self, grpc_request: Request, ) -> Result, Status> { - let now = Instant::now(); - info!( - "\x1b[0;31mconnect_worker Req\x1b[0m: {:?}", - grpc_request.get_ref() - ); - let supported_properties = grpc_request.into_inner(); - let resp = self.inner_connect_worker(supported_properties).await; - let d = now.elapsed().as_secs_f32(); - if let Err(err) = resp.as_ref() { - error!("\x1b[0;31mconnect_worker Resp\x1b[0m: {} {:?}", d, err); - } else { - info!("\x1b[0;31mconnect_worker Resp\x1b[0m: {}", d); + let resp = self + .inner_connect_worker(grpc_request.into_inner()) + .await + .map_err(|e| e.into()); + if resp.is_ok() { + event!(Level::DEBUG, return = "Ok()"); } - return resp.map_err(|e| e.into()); + resp } + #[allow(clippy::blocks_in_conditions)] + #[instrument( + err, + ret(level = Level::INFO), + level = Level::ERROR, + skip_all, + fields(request = ?grpc_request.get_ref()) + )] async fn keep_alive( &self, grpc_request: Request, ) -> Result, Status> { - let now = Instant::now(); - info!( - "\x1b[0;31mkeep_alive Req\x1b[0m: {:?}", - grpc_request.get_ref() - ); - let keep_alive_request = grpc_request.into_inner(); - let resp = self.inner_keep_alive(keep_alive_request).await; - let d = now.elapsed().as_secs_f32(); - if let Err(err) = resp.as_ref() { - error!("\x1b[0;31mkeep_alive Resp\x1b[0m: {} {:?}", d, err); - } else { - info!("\x1b[0;31mkeep_alive Resp\x1b[0m: {}", d); - } - return resp.map_err(|e| e.into()); + self.inner_keep_alive(grpc_request.into_inner()) + .await + .map_err(|e| e.into()) } + #[allow(clippy::blocks_in_conditions)] + #[instrument( + err, + ret(level = Level::INFO), + level = Level::ERROR, + skip_all, + fields(request = ?grpc_request.get_ref()) + )] async fn going_away( &self, grpc_request: Request, ) -> Result, Status> { - let now = Instant::now(); - info!( - "\x1b[0;31mgoing_away Req\x1b[0m: {:?}", - grpc_request.get_ref() - ); - let going_away_request = grpc_request.into_inner(); - let resp = self.inner_going_away(going_away_request).await; - let d = now.elapsed().as_secs_f32(); - if let Err(err) = resp.as_ref() { - error!("\x1b[0;31mgoing_away Resp\x1b[0m: {} {:?}", d, err); - } else { - info!("\x1b[0;31mgoing_away Resp\x1b[0m: {}", d); - } - return resp.map_err(|e| e.into()); + self.inner_going_away(grpc_request.into_inner()) + .await + .map_err(|e| e.into()) } + #[allow(clippy::blocks_in_conditions)] + #[instrument( + err, + ret(level = Level::INFO), + level = Level::ERROR, + skip_all, + fields(request = ?grpc_request.get_ref()) + )] async fn execution_response( &self, grpc_request: Request, ) -> Result, Status> { - let now = Instant::now(); - info!( - "\x1b[0;31mexecution_response Req\x1b[0m: {:?}", - grpc_request.get_ref() - ); - let execute_result = grpc_request.into_inner(); - let resp = self.inner_execution_response(execute_result).await; - let d = now.elapsed().as_secs_f32(); - if let Err(err) = resp.as_ref() { - error!("\x1b[0;31mexecution_response Resp\x1b[0m: {} {:?}", d, err); - } else { - info!("\x1b[0;31mexecution_response Resp\x1b[0m: {}", d); - } - return resp.map_err(|e| e.into()); + self.inner_execution_response(grpc_request.into_inner()) + .await + .map_err(|e| e.into()) } } diff --git a/nativelink-store/BUILD.bazel b/nativelink-store/BUILD.bazel index adf0f67ef..21ac9c159 100644 --- a/nativelink-store/BUILD.bazel +++ b/nativelink-store/BUILD.bazel @@ -52,7 +52,6 @@ rust_library( "@crates//:http-body", "@crates//:hyper", "@crates//:hyper-rustls", - "@crates//:log", "@crates//:lz4_flex", "@crates//:parking_lot", "@crates//:prost", diff --git a/nativelink-store/Cargo.toml b/nativelink-store/Cargo.toml index 1ea647768..bc8da14b2 100644 --- a/nativelink-store/Cargo.toml +++ b/nativelink-store/Cargo.toml @@ -24,7 +24,6 @@ hex = "0.4.3" http-body = "1.0.0" hyper = { version = "0.14.28" } hyper-rustls = { version = "0.24.2", features = ["webpki-tokio"] } -log = "0.4.21" lz4_flex = "0.11.3" parking_lot = "0.12.1" prost = "0.12.4" diff --git a/nativelink-store/src/completeness_checking_store.rs b/nativelink-store/src/completeness_checking_store.rs index 35360319c..ff7ff9f77 100644 --- a/nativelink-store/src/completeness_checking_store.rs +++ b/nativelink-store/src/completeness_checking_store.rs @@ -29,7 +29,7 @@ use nativelink_util::health_utils::{default_health_status_indicator, HealthStatu use nativelink_util::store_trait::{Store, UploadSizeInfo}; use parking_lot::Mutex; use tokio::sync::Notify; -use tracing::warn; +use tracing::{event, Level}; use crate::ac_utils::{get_and_decode_digest, get_size_and_decode_digest}; @@ -288,12 +288,16 @@ async fn inner_has_with_results( maybe_result = futures.next() => { match maybe_result { Some(Ok(())) => {} - Some(Err((e, i))) => { + Some(Err((err, i))) => { state_mux.lock().results[i] = None; // Note: Don't return the errors. We just flag the result as // missing but show a warning if it's not a NotFound. - if e.code != Code::NotFound { - warn!("{e:?}"); + if err.code != Code::NotFound { + event!( + Level::WARN, + ?err, + "Error checking existence of digest" + ); } } None => { diff --git a/nativelink-store/src/dedup_store.rs b/nativelink-store/src/dedup_store.rs index 4a45d9231..799f00285 100644 --- a/nativelink-store/src/dedup_store.rs +++ b/nativelink-store/src/dedup_store.rs @@ -29,7 +29,7 @@ use nativelink_util::store_trait::{Store, UploadSizeInfo}; use serde::{Deserialize, Serialize}; use tokio_util::codec::FramedRead; use tokio_util::io::StreamReader; -use tracing::warn; +use tracing::{event, Level}; // NOTE: If these change update the comments in `stores.rs` to reflect // the new defaults. @@ -110,11 +110,12 @@ impl DedupStore { }; match self.bincode_options.deserialize::(&data) { - Err(e) => { - warn!( - "Failed to deserialize index in dedup store : {} - {:?}", - digest.hash_str(), - e + Err(err) => { + event!( + Level::WARN, + ?digest, + ?err, + "Failed to deserialize index in dedup store", ); // We return the equivalent of NotFound here so the client is happy. return Ok(None); diff --git a/nativelink-store/src/filesystem_store.rs b/nativelink-store/src/filesystem_store.rs index 6d3e16f14..dad11e1a3 100644 --- a/nativelink-store/src/filesystem_store.rs +++ b/nativelink-store/src/filesystem_store.rs @@ -39,7 +39,7 @@ use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom}; use tokio::task::spawn_blocking; use tokio::time::{sleep, timeout, Sleep}; use tokio_stream::wrappers::ReadDirStream; -use tracing::{error, info, warn}; +use tracing::{event, trace_span, Instrument, Level}; use crate::cas_utils::is_zero_digest; @@ -112,21 +112,21 @@ impl Drop for EncodedFilePath { shared_context .active_drop_spawns .fetch_add(1, Ordering::Relaxed); - tokio::spawn(async move { - info!( - "\x1b[0;31mFilesystem Store\x1b[0m: Deleting: {:?}", - file_path - ); - let result = fs::remove_file(&file_path) - .await - .err_tip(|| format!("Failed to remove file {file_path:?}")); - if let Err(err) = result { - error!("\x1b[0;31mFilesystem Store\x1b[0m: {:?}", err); + tokio::spawn( + async move { + event!(Level::INFO, ?file_path, "File deleted",); + let result = fs::remove_file(&file_path) + .await + .err_tip(|| format!("Failed to remove file {file_path:?}")); + if let Err(err) = result { + event!(Level::ERROR, ?file_path, ?err, "Failed to delete file",); + } + shared_context + .active_drop_spawns + .fetch_sub(1, Ordering::Relaxed); } - shared_context - .active_drop_spawns - .fetch_sub(1, Ordering::Relaxed); - }); + .instrument(trace_span!("delete_file")), + ); } } @@ -218,7 +218,13 @@ impl FileEntry for FileEntryImpl { if let Err(remove_err) = remove_result { err = err.merge(remove_err); } - warn!("\x1b[0;31mFilesystem Store\x1b[0m: {:?}", err); + event!( + Level::WARN, + ?err, + ?block_size, + ?temp_full_path, + "Failed to create file", + ); Err(err) .err_tip(|| format!("Failed to create {temp_full_path:?} in filesystem store")) }) @@ -335,8 +341,8 @@ impl LenEntry for FileEntryImpl { })? }) .await; - if let Err(e) = result { - error!("{e}"); + if let Err(err) = result { + event!(Level::ERROR, ?err, "Failed to touch file",); return false; } true @@ -362,18 +368,23 @@ impl LenEntry for FileEntryImpl { let to_path = to_full_path_from_digest(&encoded_file_path.shared_context.temp_path, &new_digest); - info!( - "\x1b[0;31mFilesystem Store\x1b[0m: Unref {}, moving file {:?} to {:?}", - encoded_file_path.digest.hash_str(), - from_path, - to_path - ); if let Err(err) = fs::rename(&from_path, &to_path).await { - warn!( - "Failed to rename file from {:?} to {:?} : {:?}", - from_path, to_path, err + event!( + Level::WARN, + digest = ?encoded_file_path.digest, + ?from_path, + ?to_path, + ?err, + "Failed to rename file", ); } else { + event!( + Level::INFO, + digest = ?encoded_file_path.digest, + ?from_path, + ?to_path, + "Renamed file", + ); encoded_file_path.path_type = PathType::Temp; encoded_file_path.digest = new_digest; } @@ -479,9 +490,11 @@ async fn add_files_to_cache( ) .await; if let Err(err) = result { - warn!( - "Could not add file to eviction cache, so deleting: {} - {:?}", - file_name, err + event!( + Level::WARN, + ?file_name, + ?err, + "Failed to add file to eviction cache", ); // Ignore result. let _ = @@ -501,10 +514,7 @@ async fn prune_temp_path(temp_path: &str) -> Result<(), Error> { while let Some(dir_entry) = read_dir_stream.next().await { let path = dir_entry?.path(); if let Err(err) = fs::remove_file(&path).await { - warn!( - "Failed to delete file in filesystem store {:?} : {:?}", - &path, err - ); + event!(Level::WARN, ?path, ?err, "Failed to delete file",); } } Ok(()) @@ -658,43 +668,52 @@ impl FilesystemStore { // We need to guarantee that this will get to the end even if the parent future is dropped. // See: https://github.com/TraceMachina/nativelink/issues/495 - tokio::spawn(async move { - let mut encoded_file_path = entry.get_encoded_file_path().write().await; - let final_path = get_file_path_raw( - &PathType::Content, - encoded_file_path.shared_context.as_ref(), - &digest, - ); - - evicting_map.insert(digest, entry.clone()).await; + tokio::spawn( + async move { + let mut encoded_file_path = entry.get_encoded_file_path().write().await; + let final_path = get_file_path_raw( + &PathType::Content, + encoded_file_path.shared_context.as_ref(), + &digest, + ); - let from_path = encoded_file_path.get_file_path(); - // Internally tokio spawns fs commands onto a blocking thread anyways. - // Since we are already on a blocking thread, we just need the `fs` wrapper to manage - // an open-file permit (ensure we don't open too many files at once). - let result = (rename_fn)(&from_path, &final_path) - .err_tip(|| format!("Failed to rename temp file to final path {final_path:?}")); - - // In the event our move from temp file to final file fails we need to ensure we remove - // the entry from our map. - // Remember: At this point it is possible for another thread to have a reference to - // `entry`, so we can't delete the file, only drop() should ever delete files. - if let Err(err) = result { - warn!("Error while renaming file: {err} - {from_path:?} -> {final_path:?}"); - // Warning: To prevent deadlock we need to release our lock or during `remove_if()` - // it will call `unref()`, which triggers a write-lock on `encoded_file_path`. - drop(encoded_file_path); - // It is possible that the item in our map is no longer the item we inserted, - // So, we need to conditionally remove it only if the pointers are the same. - evicting_map - .remove_if(&digest, |map_entry| Arc::::ptr_eq(map_entry, &entry)) - .await; - return Err(err); + evicting_map.insert(digest, entry.clone()).await; + + let from_path = encoded_file_path.get_file_path(); + // Internally tokio spawns fs commands onto a blocking thread anyways. + // Since we are already on a blocking thread, we just need the `fs` wrapper to manage + // an open-file permit (ensure we don't open too many files at once). + let result = (rename_fn)(&from_path, &final_path) + .err_tip(|| format!("Failed to rename temp file to final path {final_path:?}")); + + // In the event our move from temp file to final file fails we need to ensure we remove + // the entry from our map. + // Remember: At this point it is possible for another thread to have a reference to + // `entry`, so we can't delete the file, only drop() should ever delete files. + if let Err(err) = result { + event!( + Level::ERROR, + ?err, + ?from_path, + ?final_path, + "Failed to rename file", + ); + // Warning: To prevent deadlock we need to release our lock or during `remove_if()` + // it will call `unref()`, which triggers a write-lock on `encoded_file_path`. + drop(encoded_file_path); + // It is possible that the item in our map is no longer the item we inserted, + // So, we need to conditionally remove it only if the pointers are the same. + evicting_map + .remove_if(&digest, |map_entry| Arc::::ptr_eq(map_entry, &entry)) + .await; + return Err(err); + } + encoded_file_path.path_type = PathType::Content; + encoded_file_path.digest = digest; + Ok(()) } - encoded_file_path.path_type = PathType::Content; - encoded_file_path.digest = digest; - Ok(()) - }) + .instrument(trace_span!("emplace_file")), + ) .await .err_tip(|| "Failed to create spawn in filesystem store update_file")? } diff --git a/nativelink-store/src/grpc_store.rs b/nativelink-store/src/grpc_store.rs index d33803f40..ed3cd686d 100644 --- a/nativelink-store/src/grpc_store.rs +++ b/nativelink-store/src/grpc_store.rs @@ -51,7 +51,7 @@ use rand::rngs::OsRng; use rand::Rng; use tokio::time::sleep; use tonic::{IntoRequest, Request, Response, Status, Streaming}; -use tracing::error; +use tracing::{event, Level}; use uuid::Uuid; // This store is usually a pass-through store, but can also be used as a CAS store. Using it as an @@ -582,7 +582,10 @@ impl Store for GrpcStore { let stream = Box::pin(unfold(local_state, |mut local_state| async move { if local_state.did_error { - error!("GrpcStore::update() polled stream after error was returned."); + event!( + Level::ERROR, + "GrpcStore::update() polled stream after error was returned" + ); return None; } let data = match local_state diff --git a/nativelink-store/src/ref_store.rs b/nativelink-store/src/ref_store.rs index 314dd7c8e..9206bb331 100644 --- a/nativelink-store/src/ref_store.rs +++ b/nativelink-store/src/ref_store.rs @@ -22,7 +22,7 @@ use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; use nativelink_util::store_trait::{Store, UploadSizeInfo}; -use tracing::error; +use tracing::{event, Level}; use crate::store_manager::StoreManager; @@ -139,8 +139,13 @@ impl Store for RefStore { fn inner_store(&self, digest: Option) -> &'_ dyn Store { match self.get_store() { Ok(store) => store.inner_store(digest), - Err(e) => { - error!("Failed to get store for digest: {e:?}"); + Err(err) => { + event!( + Level::ERROR, + ?digest, + ?err, + "Failed to get store for digest", + ); self } } @@ -149,8 +154,13 @@ impl Store for RefStore { fn inner_store_arc(self: Arc, digest: Option) -> Arc { match self.get_store() { Ok(store) => store.clone().inner_store_arc(digest), - Err(e) => { - error!("Failed to get store for digest: {e:?}"); + Err(err) => { + event!( + Level::ERROR, + ?digest, + ?err, + "Failed to get store for digest", + ); self } } diff --git a/nativelink-store/src/s3_store.rs b/nativelink-store/src/s3_store.rs index 6e11409df..cd886ad43 100644 --- a/nativelink-store/src/s3_store.rs +++ b/nativelink-store/src/s3_store.rs @@ -55,7 +55,7 @@ use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tokio::net::TcpStream; use tokio::sync::{mpsc, SemaphorePermit}; use tokio::time::sleep; -use tracing::info; +use tracing::{event, Level}; use crate::cas_utils::is_zero_digest; @@ -434,20 +434,29 @@ impl Store for S3Store { }; // If we failed to upload the file, check to see if we can retry. - let retry_result = result.map_or_else(|mut e| { + let retry_result = result.map_or_else(|mut err| { // Ensure our code is Code::Aborted, so the client can retry if possible. - e.code = Code::Aborted; + err.code = Code::Aborted; let bytes_received = reader.get_bytes_received(); if let Err(try_reset_err) = reader.try_reset_stream() { - let e = e + event!( + Level::ERROR, + ?bytes_received, + err = ?try_reset_err, + "Unable to reset stream after failed upload in S3Store::update" + ); + return RetryResult::Err(err .merge(try_reset_err) - .append(format!("Failed to retry upload with {bytes_received} bytes received in S3Store::update")); - log::error!("{e:?}"); - return RetryResult::Err(e); + .append(format!("Failed to retry upload with {bytes_received} bytes received in S3Store::update"))); } - let e = e.append(format!("Retry on upload happened with {bytes_received} bytes received in S3Store::update")); - log::info!("{e:?}"); - RetryResult::Retry(e) + let err = err.append(format!("Retry on upload happened with {bytes_received} bytes received in S3Store::update")); + event!( + Level::INFO, + ?err, + ?bytes_received, + "Retryable S3 error" + ); + RetryResult::Retry(err) }, |()| RetryResult::Ok(())); Some((retry_result, reader)) })) @@ -622,7 +631,7 @@ impl Store for S3Store { Code::Aborted, "Failed to abort multipart upload in S3 store : {e:?}" ); - info!("{err:?}"); + event!(Level::INFO, ?err, "Multipart upload error"); Err(err) }, |_| Ok(()), diff --git a/nativelink-util/BUILD.bazel b/nativelink-util/BUILD.bazel index 3e1c87a64..f72e8d34f 100644 --- a/nativelink-util/BUILD.bazel +++ b/nativelink-util/BUILD.bazel @@ -41,7 +41,6 @@ rust_library( "@crates//:bytes", "@crates//:futures", "@crates//:hex", - "@crates//:log", "@crates//:lru", "@crates//:parking_lot", "@crates//:pin-project-lite", diff --git a/nativelink-util/Cargo.toml b/nativelink-util/Cargo.toml index fce2b7c75..40f7a762f 100644 --- a/nativelink-util/Cargo.toml +++ b/nativelink-util/Cargo.toml @@ -14,7 +14,6 @@ blake3 = { version = "1.5.1", features = ["mmap"] } bytes = "1.6.0" futures = "0.3.30" hex = "0.4.3" -log = "0.4.21" lru = "0.12.3" parking_lot = "0.12.1" pin-project-lite = "0.2.14" diff --git a/nativelink-util/src/connection_manager.rs b/nativelink-util/src/connection_manager.rs index 4a7248df4..7002e83dd 100644 --- a/nativelink-util/src/connection_manager.rs +++ b/nativelink-util/src/connection_manager.rs @@ -24,7 +24,7 @@ use nativelink_config::stores::Retry; use nativelink_error::{make_err, Code, Error}; use tokio::sync::{mpsc, oneshot}; use tonic::transport::{channel, Channel, Endpoint}; -use tracing::{debug, error, info, warn}; +use tracing::{event, Level}; use crate::retry::{self, Retrier, RetryResult}; @@ -244,7 +244,11 @@ impl ConnectionManagerWorker { let Some((current_connection_index, endpoint)) = self.endpoints.get_mut(endpoint_index) else { // Unknown endpoint, this should never happen. - error!("Connection to unknown endpoint {endpoint_index} requested."); + event!( + Level::ERROR, + ?endpoint_index, + "Connection to unknown endpoint requested" + ); return; }; let is_backoff = connection_index.is_some(); @@ -253,14 +257,18 @@ impl ConnectionManagerWorker { *current_connection_index }); if is_backoff { - warn!( - "Connection {connection_index} failed to {:?}, reconnecting.", - endpoint.uri() + event!( + Level::WARN, + ?connection_index, + endpoint = ?endpoint.uri(), + "Connection failed, reconnecting" ); } else { - info!( - "Creating new connection {connection_index} to {:?}.", - endpoint.uri() + event!( + Level::INFO, + ?connection_index, + endpoint = ?endpoint.uri(), + "Creating new connection" ); } let identifier = ChannelIdentifier { @@ -439,7 +447,11 @@ impl tonic::codegen::Service } } Err(err) => { - debug!("Error while creating connection on channel: {err:?}"); + event!( + Level::DEBUG, + ?err, + "Error while creating connection on channel" + ); let _ = self.connection_tx.send(ConnectionRequest::Error(( self.channel.identifier, self.pending_channel.take().is_some(), diff --git a/nativelink-util/src/evicting_map.rs b/nativelink-util/src/evicting_map.rs index 46de82729..ee941073d 100644 --- a/nativelink-util/src/evicting_map.rs +++ b/nativelink-util/src/evicting_map.rs @@ -24,7 +24,7 @@ use futures::{future, join, StreamExt}; use lru::LruCache; use nativelink_config::stores::EvictionPolicy; use serde::{Deserialize, Serialize}; -use tracing::info; +use tracing::{event, Level}; use crate::common::DigestInfo; use crate::metrics_utils::{CollectorState, Counter, CounterWithTime, MetricsComponent}; @@ -275,7 +275,7 @@ where .lru .pop_lru() .expect("Tried to peek() then pop() but failed"); - info!("\x1b[0;31mEvicting Map\x1b[0m: Evicting {}", key.hash_str()); + event!(Level::INFO, ?key, "Evicting",); state.remove(&eviction_item, false).await; peek_entry = if let Some((_, entry)) = state.lru.peek_lru() { @@ -300,10 +300,7 @@ where let mut state = self.state.lock().await; let (key, eviction_item) = state.lru.pop_entry(digest)?; - info!( - "\x1b[0;31mEvicting Map\x1b[0m: Touch failed, evicting {}", - key.hash_str() - ); + event!(Level::INFO, ?key, "Touch failed, evicting",); state.remove(&eviction_item, false).await; None } diff --git a/nativelink-util/src/fs.rs b/nativelink-util/src/fs.rs index e9bba313b..ab36bf488 100644 --- a/nativelink-util/src/fs.rs +++ b/nativelink-util/src/fs.rs @@ -33,7 +33,7 @@ use tokio::io::{ }; use tokio::sync::{Semaphore, SemaphorePermit}; use tokio::time::timeout; -use tracing::error; +use tracing::{event, Level}; /// Default read buffer size when reading to/from disk. pub const DEFAULT_READ_BUFF_SIZE: usize = 16384; @@ -310,9 +310,11 @@ where pub fn set_open_file_limit(limit: usize) { let current_total = TOTAL_FILE_SEMAPHORES.load(Ordering::Acquire); if limit < current_total { - error!( + event!( + Level::ERROR, "set_open_file_limit({}) must be greater than {}", - limit, current_total + limit, + current_total ); return; } diff --git a/nativelink-util/src/retry.rs b/nativelink-util/src/retry.rs index 42a32c33f..0721eff4b 100644 --- a/nativelink-util/src/retry.rs +++ b/nativelink-util/src/retry.rs @@ -20,7 +20,7 @@ use futures::future::Future; use futures::stream::StreamExt; use nativelink_config::stores::{ErrorCode, Retry}; use nativelink_error::{make_err, Code, Error}; -use tracing::info; +use tracing::{event, Level}; struct ExponentialBackoff { current: Duration, @@ -154,14 +154,14 @@ impl Retrier { Some(RetryResult::Err(e)) => { return Err(e.append(format!("On attempt {attempt}"))); } - Some(RetryResult::Retry(e)) => { - if !self.should_retry(&e.code) { - info!("Not retrying permanent error on attempt {attempt}: {e:?}"); - return Err(e); + Some(RetryResult::Retry(err)) => { + if !self.should_retry(&err.code) { + event!(Level::ERROR, ?attempt, ?err, "Not retrying permanent error"); + return Err(err); } (self.sleep_fn)( iter.next() - .ok_or(e.append(format!("On attempt {attempt}")))?, + .ok_or(err.append(format!("On attempt {attempt}")))?, ) .await } diff --git a/nativelink-worker/src/local_worker.rs b/nativelink-worker/src/local_worker.rs index 2be295cb1..bf54bc40a 100644 --- a/nativelink-worker/src/local_worker.rs +++ b/nativelink-worker/src/local_worker.rs @@ -43,7 +43,7 @@ use tokio::sync::mpsc; use tokio::time::sleep; use tokio_stream::wrappers::UnboundedReceiverStream; use tonic::Streaming; -use tracing::{error, warn}; +use tracing::{error_span, event, instrument, Instrument, Level}; use crate::running_actions_manager::{ ExecutionConfiguration, Metrics as RunningActionManagerMetrics, RunningAction, @@ -218,10 +218,14 @@ impl<'a, T: WorkerApiClientTrait, U: RunningActionsManager> LocalWorkerImpl<'a, e ))?; - self.running_actions_manager - .kill_action(action_id) - .await - .err_tip(|| format!("Failed to send kill request for action {}", hex::encode(action_id)))? + if let Err(err) = self.running_actions_manager.kill_action(action_id).await { + event!( + Level::ERROR, + action_id = hex::encode(action_id), + ?err, + "Failed to send kill request for action" + ); + }; } Update::StartAction(start_execute) => { self.metrics.start_actions_received.inc(); @@ -248,7 +252,12 @@ impl<'a, T: WorkerApiClientTrait, U: RunningActionsManager> LocalWorkerImpl<'a, actions_in_transit.fetch_sub(1, Ordering::Release); r }) - .and_then(|action| + .and_then(|action| { + event!( + Level::INFO, + action_id = hex::encode(action.get_action_id()), + "Received request to run action" + ); action .clone() .prepare_action() @@ -262,7 +271,7 @@ impl<'a, T: WorkerApiClientTrait, U: RunningActionsManager> LocalWorkerImpl<'a, } result }) - ).await + }).await }); let running_actions_manager = self.running_actions_manager.clone(); @@ -274,7 +283,12 @@ impl<'a, T: WorkerApiClientTrait, U: RunningActionsManager> LocalWorkerImpl<'a, // Save in the action cache before notifying the scheduler that we've completed. if let Some(digest_info) = action_digest.clone().and_then(|action_digest| action_digest.try_into().ok()) { if let Err(err) = running_actions_manager.cache_action_result(digest_info, &mut action_result, try_hasher?).await { - error!("\x1b[0;31mError saving action in store\x1b[0m: {} - {:?}", err, action_digest); + event!( + Level::ERROR, + ?err, + ?action_digest, + "Error saving action in store", + ); } } let action_stage = ActionStage::Completed(action_result); @@ -305,10 +319,14 @@ impl<'a, T: WorkerApiClientTrait, U: RunningActionsManager> LocalWorkerImpl<'a, self.actions_in_transit.fetch_add(1, Ordering::Release); futures.push( - tokio::spawn(start_action_fut).map(move |res| { + tokio::spawn(start_action_fut.instrument(error_span!("worker_start_action"))).map(move |res| { let res = res.err_tip(|| "Failed to launch spawn")?; if let Err(err) = &res { - error!("\x1b[0;31mError executing action\x1b[0m: {}", err); + event!( + Level::ERROR, + ?err, + "Error executing action", + ); } add_future_channel .send(make_publish_future(res).boxed()) @@ -479,14 +497,15 @@ impl LocalWorker { Ok((worker_id, update_for_worker_stream)) } + #[instrument(skip(self), level = Level::INFO)] pub async fn run(mut self) -> Result<(), Error> { let sleep_fn = self .sleep_fn .take() .err_tip(|| "Could not unwrap sleep_fn in LocalWorker::run")?; let sleep_fn_pin = Pin::new(&sleep_fn); - let error_handler = Box::pin(move |e: Error| async move { - error!("{:?}", e); + let error_handler = Box::pin(move |err| async move { + event!(Level::ERROR, ?err, "Error"); (sleep_fn_pin)(Duration::from_secs_f32(CONNECTION_RETRY_DELAY_S)).await; }); @@ -518,10 +537,14 @@ impl LocalWorker { update_for_worker_stream, ), }; - warn!("Worker {} connected to scheduler", inner.worker_id); + event!( + Level::WARN, + worker_id = %inner.worker_id, + "Worker registered with scheduler" + ); // Now listen for connections and run all other services. - if let Err(e) = inner.run(update_for_worker_stream).await { + if let Err(err) = inner.run(update_for_worker_stream).await { 'no_more_actions: { // Ensure there are no actions in transit before we try to kill // all our actions. @@ -533,18 +556,16 @@ impl LocalWorker { } (sleep_fn_pin)(Duration::from_secs_f32(sleep_duration)).await; } - let e = make_err!( - Code::Internal, - "Actions in transit did not reach zero before we disconnected from the scheduler." - ); - error!("{e:?}"); - return Err(e); + const ERROR_MSG: &str = "Actions in transit did not reach zero before we disconnected from the scheduler"; + event!(Level::ERROR, ERROR_MSG); + return Err(err.append(ERROR_MSG)); } + event!(Level::ERROR, ?err, "Worker disconnected from scheduler"); // Kill off any existing actions because if we re-connect, we'll // get some more and it might resource lock us. self.running_actions_manager.kill_all().await; - (error_handler)(e).await; + (error_handler)(err).await; continue; // Try to connect again. } } diff --git a/nativelink-worker/src/running_actions_manager.rs b/nativelink-worker/src/running_actions_manager.rs index 8c39cd405..324851333 100644 --- a/nativelink-worker/src/running_actions_manager.rs +++ b/nativelink-worker/src/running_actions_manager.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::borrow::Cow; +use std::cmp::min; use std::collections::vec_deque::VecDeque; use std::collections::HashMap; use std::ffi::{OsStr, OsString}; @@ -75,7 +76,7 @@ use tokio::sync::{oneshot, watch}; use tokio::task::spawn_blocking; use tokio_stream::wrappers::ReadDirStream; use tonic::Request; -use tracing::{error, info}; +use tracing::{enabled, error_span, event, Instrument, Level}; use uuid::Uuid; pub type ActionId = [u8; 32]; @@ -524,8 +525,42 @@ async fn process_side_channel_file( })) } +async fn do_cleanup( + running_actions_manager: &RunningActionsManagerImpl, + action_id: &ActionId, + action_directory: &str, +) -> Result<(), Error> { + event!(Level::INFO, "Worker cleaning up"); + // Note: We need to be careful to keep trying to cleanup even if one of the steps fails. + let remove_dir_result = fs::remove_dir_all(action_directory) + .await + .err_tip(|| format!("Could not remove working directory {action_directory}")); + if let Err(err) = running_actions_manager.cleanup_action(action_id) { + event!( + Level::ERROR, + action_id = hex::encode(action_id), + ?err, + "Error cleaning up action" + ); + return Result::<(), Error>::Err(err).merge(remove_dir_result); + } + if let Err(err) = remove_dir_result { + event!( + Level::ERROR, + action_id = hex::encode(action_id), + ?err, + "Error removing working directory" + ); + return Err(err); + } + Ok(()) +} + #[async_trait] pub trait RunningAction: Sync + Send + Sized + Unpin + 'static { + /// Returns the action id of the action. + fn get_action_id(&self) -> &ActionId; + /// Anything that needs to execute before the actions is actually executed should happen here. async fn prepare_action(self: Arc) -> Result, Error>; @@ -695,7 +730,7 @@ impl RunningActionImpl { )) .await?; } - info!("\x1b[0;31mWorker Received Command\x1b[0m: {:?}", command); + event!(Level::INFO, ?command, "Worker received command",); { let mut state = self.state.lock(); state.command_proto = Some(command); @@ -737,7 +772,7 @@ impl RunningActionImpl { } else { command_proto.arguments.iter().map(AsRef::as_ref).collect() }; - info!("\x1b[0;31mWorker Executing\x1b[0m: {:?}", &args); + event!(Level::INFO, ?args, "Executing command",); let mut command_builder = process::Command::new(args[0]); command_builder .args(&args[1..]) @@ -831,7 +866,8 @@ impl RunningActionImpl { .err_tip(|| "Expected stderr to exist on command this should never happen")?; let mut child_process_guard = guard(child_process, |mut child_process| { - error!( + event!( + Level::ERROR, "Child process was not cleaned up before dropping the call to execute(), killing in background spawn." ); tokio::spawn(async move { child_process.kill().await }); @@ -872,8 +908,12 @@ impl RunningActionImpl { _ = &mut sleep_fut => { self.running_actions_manager.metrics.task_timeouts.inc(); killed_action = true; - if let Err(e) = child_process_guard.start_kill() { - error!("Could not kill process in RunningActionsManager for timeout : {:?}", e); + if let Err(err) = child_process_guard.start_kill() { + event!( + Level::ERROR, + ?err, + "Could not kill process in RunningActionsManager for action timeout", + ); } { let mut state = self.state.lock(); @@ -939,15 +979,18 @@ impl RunningActionImpl { }, _ = &mut kill_channel_rx => { killed_action = true; - if let Err(e) = child_process_guard.start_kill() { - error!( - "Could not kill process in RunningActionsManager for action {} : {:?}", - hex::encode(self.action_id), - e); + if let Err(err) = child_process_guard.start_kill() { + event!( + Level::ERROR, + action_id = hex::encode(self.action_id), + ?err, + "Could not kill process", + ); } else { - error!( - "Could not get child process id, maybe already dead? for action {}", - hex::encode(self.action_id) + event!( + Level::ERROR, + action_id = hex::encode(self.action_id), + "Could not get child process id, maybe already dead?", ); } { @@ -967,7 +1010,7 @@ impl RunningActionImpl { } async fn inner_upload_results(self: Arc) -> Result, Error> { - info!("\x1b[0;31mWorker Uploading Results\x1b[0m"); + event!(Level::INFO, "Worker uploading results",); let (mut command_proto, execution_result, mut execution_metadata) = { let mut state = self.state.lock(); state.execution_metadata.output_upload_start_timestamp = @@ -1105,12 +1148,18 @@ impl RunningActionImpl { let mut output_file_symlinks = vec![]; if execution_result.exit_code != 0 { - error!( - "Command returned exit code {} : {} {}", - execution_result.exit_code, - std::str::from_utf8(&execution_result.stdout).unwrap_or(""), - std::str::from_utf8(&execution_result.stderr).unwrap_or("") - ); + // Don't convert our stdout/stderr to strings unless we are need too. + if enabled!(Level::ERROR) { + let stdout = std::str::from_utf8(&execution_result.stdout).unwrap_or(""); + let stderr = std::str::from_utf8(&execution_result.stderr).unwrap_or(""); + event!( + Level::ERROR, + exit_code = ?execution_result.exit_code, + stdout = ?stdout[..min(stdout.len(), 1000)], + stderr = ?stderr[..min(stderr.len(), 1000)], + "Command returned non-zero exit code", + ); + } } let stdout_digest_fut = self.metrics().upload_stdout.wrap(async { @@ -1181,32 +1230,6 @@ impl RunningActionImpl { Ok(self) } - async fn inner_cleanup(self: Arc) -> Result, Error> { - info!("\x1b[0;31mWorker Cleanup\x1b[0m"); - // Note: We need to be careful to keep trying to cleanup even if one of the steps fails. - let remove_dir_result = fs::remove_dir_all(&self.action_directory) - .await - .err_tip(|| { - format!( - "Could not remove working directory {}", - self.action_directory - ) - }); - self.did_cleanup.store(true, Ordering::Relaxed); - if let Err(e) = self.running_actions_manager.cleanup_action(&self.action_id) { - error!("Error cleaning up action: {e:?}"); - return Result::, Error>::Err(e).merge(remove_dir_result.map(|_| self)); - } - if let Err(e) = remove_dir_result { - error!( - "Error removing working for action {} directory: {e:?}", - hex::encode(self.action_id) - ); - return Err(e); - } - Ok(self) - } - async fn inner_get_finished_result(self: Arc) -> Result { let mut state = self.state.lock(); state @@ -1218,15 +1241,43 @@ impl RunningActionImpl { impl Drop for RunningActionImpl { fn drop(&mut self) { - assert!( - self.did_cleanup.load(Ordering::Relaxed), - "RunningActionImpl did not cleanup. This is a violation of how RunningActionImpl's requirements" + if self.did_cleanup.load(Ordering::Acquire) { + return; + } + event!( + Level::ERROR, + action_id = hex::encode(self.action_id), + "RunningActionImpl did not cleanup. This is a violation of the requirements, will attempt to do it in the background." + ); + let running_actions_manager = self.running_actions_manager.clone(); + let action_id = self.action_id; + let action_directory = self.action_directory.clone(); + tokio::spawn( + async move { + let Err(err) = + do_cleanup(&running_actions_manager, &action_id, &action_directory).await + else { + return; + }; + event!( + Level::ERROR, + action_id = hex::encode(action_id), + ?action_directory, + ?err, + "Error cleaning up action" + ); + } + .instrument(error_span!("RunningActionImpl::drop")), ); } } #[async_trait] impl RunningAction for RunningActionImpl { + fn get_action_id(&self) -> &ActionId { + &self.action_id + } + async fn prepare_action(self: Arc) -> Result, Error> { self.metrics() .clone() @@ -1255,7 +1306,16 @@ impl RunningAction for RunningActionImpl { self.metrics() .clone() .cleanup - .wrap(Self::inner_cleanup(self)) + .wrap(async move { + let result = do_cleanup( + &self.running_actions_manager, + &self.action_id, + &self.action_directory, + ) + .await; + self.did_cleanup.store(true, Ordering::Release); + result.map(move |()| self) + }) .await } @@ -1697,15 +1757,21 @@ impl RunningActionsManagerImpl { // Note: We do not capture metrics on this call, only `.kill_all()`. // Important: When the future returns the process may still be running. async fn kill_action(action: Arc) { + event!( + Level::WARN, + action_id = ?hex::encode(action.action_id), + "Sending kill to running action", + ); let kill_channel_tx = { let mut action_state = action.state.lock(); action_state.kill_channel_tx.take() }; if let Some(kill_channel_tx) = kill_channel_tx { if kill_channel_tx.send(()).is_err() { - error!( - "Error sending kill to running action {}", - hex::encode(action.action_id) + event!( + Level::ERROR, + action_id = ?hex::encode(action.action_id), + "Error sending kill to running action", ); } } @@ -1730,7 +1796,11 @@ impl RunningActionsManager for RunningActionsManagerImpl { .and_then(|time| time.try_into().ok()) .unwrap_or(SystemTime::UNIX_EPOCH); let action_info = self.create_action_info(start_execute, queued_timestamp).await?; - info!("\x1b[0;31mWorker Received Action\x1b[0m: {:?}", action_info); + event!( + Level::INFO, + ?action_info, + "Worker received action", + ); let action_id = action_info.unique_qualifier.get_hash(); let action_directory = self.make_action_directory(&action_id).await?; let execution_metadata = ExecutionMetadata { diff --git a/nativelink-worker/src/worker_utils.rs b/nativelink-worker/src/worker_utils.rs index fc9e9984e..8021a3720 100644 --- a/nativelink-worker/src/worker_utils.rs +++ b/nativelink-worker/src/worker_utils.rs @@ -24,7 +24,7 @@ use nativelink_error::{make_err, make_input_err, Error, ResultExt}; use nativelink_proto::build::bazel::remote::execution::v2::platform::Property; use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::SupportedProperties; use tokio::process; -use tracing::info; +use tracing::{event, Level}; pub async fn make_supported_properties( worker_properties: &HashMap, @@ -61,10 +61,7 @@ pub async fn make_supported_properties( process.stdin(Stdio::null()); let err_fn = || format!("Error executing property_name {property_name} command"); - info!( - "Spawning process for cmd: '{}' for property: '{}'", - cmd, property_name - ); + event!(Level::INFO, cmd, property_name, "Spawning process",); let process_output = process.output().await.err_tip(err_fn)?; if !process_output.status.success() { return Err(make_err!( diff --git a/nativelink-worker/tests/utils/mock_running_actions_manager.rs b/nativelink-worker/tests/utils/mock_running_actions_manager.rs index 1e61da83e..24a0c533d 100644 --- a/nativelink-worker/tests/utils/mock_running_actions_manager.rs +++ b/nativelink-worker/tests/utils/mock_running_actions_manager.rs @@ -347,6 +347,10 @@ impl MockRunningAction { #[async_trait] impl RunningAction for MockRunningAction { + fn get_action_id(&self) -> &ActionId { + unreachable!("not implemented for tests"); + } + async fn prepare_action(self: Arc) -> Result, Error> { self.tx_call .send(RunningActionCalls::PrepareAction) diff --git a/src/bin/nativelink.rs b/src/bin/nativelink.rs index a00fa6f80..47f5a8d09 100644 --- a/src/bin/nativelink.rs +++ b/src/bin/nativelink.rs @@ -66,7 +66,7 @@ use tokio_rustls::TlsAcceptor; use tonic::codec::CompressionEncoding; use tonic::transport::Server as TonicServer; use tower::util::ServiceExt; -use tracing::{error, warn}; +use tracing::{error_span, event, Instrument, Level}; use tracing_subscriber::filter::{EnvFilter, LevelFilter}; #[global_allocator] @@ -640,54 +640,91 @@ async fn inner_main( http.http2_max_header_list_size(value); } - warn!("Ready, listening on {}", socket_addr); + event!(Level::WARN, "Ready, listening on {socket_addr}",); root_futures.push(Box::pin(async move { loop { // Wait for client to connect. let (tcp_stream, remote_addr) = match tcp_listener.accept().await { Ok(result) => result, - Err(e) => { - error!( - "{:?}", - Result::<(), _>::Err(e).err_tip(|| "Failed to accept tcp connection") - ); + Err(err) => { + event!(Level::ERROR, ?err, "Failed to accept tcp connection"); continue; } }; + event!( + target: "nativelink::services", + Level::INFO, + ?remote_addr, + ?socket_addr, + "Client connected" + ); connected_clients_mux.inner.lock().insert(remote_addr); connected_clients_mux.counter.inc(); // This is the safest way to guarantee that if our future // is ever dropped we will cleanup our data. let scope_guard = guard( - connected_clients_mux.clone(), - move |connected_clients_mux| { - connected_clients_mux.inner.lock().remove(&remote_addr); + Arc::downgrade(&connected_clients_mux), + move |weak_connected_clients_mux| { + event!( + target: "nativelink::services", + Level::INFO, + ?remote_addr, + ?socket_addr, + "Client disconnected" + ); + if let Some(connected_clients_mux) = weak_connected_clients_mux.upgrade() { + connected_clients_mux.inner.lock().remove(&remote_addr); + } }, ); let (http, svc) = (http.clone(), svc.clone()); let fut = if let Some(tls_acceptor) = &maybe_tls_acceptor { let tls_stream = match tls_acceptor.accept(tcp_stream).await { Ok(result) => result, - Err(e) => { - error!( - "{:?}", - Result::<(), _>::Err(e).err_tip(|| "Failed to accept tls stream") - ); + Err(err) => { + event!(Level::ERROR, ?err, "Failed to accept tls stream"); continue; } }; http.serve_connection(tls_stream, svc).left_future() } else { http.serve_connection(tcp_stream, svc).right_future() - }; - tokio::spawn(async move { - // Move it into our spawn, so if our spawn dies the cleanup happens. - let _guard = scope_guard; - if let Err(e) = fut.await { - error!("Failed running service : {:?}", e); + } + .map_ok_or_else( + |err| { + use std::error::Error; + if let Some(inner_err) = err.source() { + if let Some(io_err) = inner_err.downcast_ref::() { + if io_err.kind() == std::io::ErrorKind::NotConnected { + return Ok(()); + } + } + } + Err(err) + }, + Ok, + ); + tokio::spawn( + async move { + // Move it into our spawn, so if our spawn dies the cleanup happens. + let _guard = scope_guard; + if let Err(err) = fut.await { + event!( + target: "nativelink::services", + Level::ERROR, + ?err, + "Failed running service" + ); + } } - }); + .instrument(error_span!( + target: "nativelink::services", + "http_connection", + ?remote_addr, + ?socket_addr + )), + ); } })); } @@ -756,8 +793,8 @@ async fn inner_main( } let worker_metrics = root_worker_metrics.sub_registry_with_prefix(&name); local_worker.register_metrics(worker_metrics); - worker_names.insert(name); - tokio::spawn(local_worker.run()) + worker_names.insert(name.clone()); + tokio::spawn(local_worker.run().instrument(error_span!("worker", ?name))) } }; root_futures.push(Box::pin(spawn_fut.map_ok_or_else(|e| Err(e.into()), |v| v))); @@ -792,16 +829,16 @@ fn main() -> Result<(), Box> { .with( tracing_subscriber::fmt::layer() .pretty() - .with_thread_ids(true) - .with_thread_names(true) + // .with_span_events(FmtSpan::CLOSE) + .with_timer(tracing_subscriber::fmt::time::time()) .with_filter(env_filter), ) .init(); } else { tracing_subscriber::fmt() .pretty() - .with_thread_ids(true) - .with_thread_names(true) + // .with_span_events(FmtSpan::CLOSE) + .with_timer(tracing_subscriber::fmt::time::time()) .with_env_filter(env_filter) .init(); }