From 523ee33784c2dfdd5a988cdf3cb4843a66d92244 Mon Sep 17 00:00:00 2001 From: "Nathan (Blaise) Bruer" Date: Mon, 22 Apr 2024 13:34:42 -0500 Subject: [PATCH] Migrate all logging to the tracing library (#871) This is a significant internal change to move completely away from the last of the "log" crate and move to "tracing". The major advantage of doing this, is that we can now trace functions across future boundaries using tracing's instrumentation. We will now be requiring all tokio::spawn's usage to be properly instrumented. closes #870 --- Cargo.lock | 3 - .../src/cache_lookup_scheduler.rs | 8 +- nativelink-scheduler/src/grpc_scheduler.rs | 31 ++- nativelink-scheduler/src/simple_scheduler.rs | 126 ++++++++---- nativelink-service/BUILD.bazel | 1 - nativelink-service/Cargo.toml | 1 - nativelink-service/src/ac_server.rs | 70 +++---- nativelink-service/src/bytestream_server.rs | 109 ++++++----- nativelink-service/src/capabilities_server.rs | 13 +- nativelink-service/src/cas_server.rs | 101 +++++----- nativelink-service/src/execution_server.rs | 49 +++-- nativelink-service/src/worker_api_server.rs | 124 ++++++------ nativelink-store/BUILD.bazel | 1 - nativelink-store/Cargo.toml | 1 - .../src/completeness_checking_store.rs | 12 +- nativelink-store/src/dedup_store.rs | 13 +- nativelink-store/src/filesystem_store.rs | 157 ++++++++------- nativelink-store/src/grpc_store.rs | 7 +- nativelink-store/src/ref_store.rs | 20 +- nativelink-store/src/s3_store.rs | 31 +-- nativelink-util/BUILD.bazel | 1 - nativelink-util/Cargo.toml | 1 - nativelink-util/src/connection_manager.rs | 30 ++- nativelink-util/src/evicting_map.rs | 9 +- nativelink-util/src/fs.rs | 8 +- nativelink-util/src/retry.rs | 12 +- nativelink-worker/src/local_worker.rs | 63 ++++-- .../src/running_actions_manager.rs | 180 ++++++++++++------ nativelink-worker/src/worker_utils.rs | 7 +- .../utils/mock_running_actions_manager.rs | 4 + src/bin/nativelink.rs | 93 ++++++--- 31 files changed, 764 insertions(+), 522 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a376815b2..74582e0b3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1824,7 +1824,6 @@ dependencies = [ "bytes", "futures", "hyper", - "log", "maplit", "nativelink-config", "nativelink-error", @@ -1869,7 +1868,6 @@ dependencies = [ "http-body 1.0.0", "hyper", "hyper-rustls", - "log", "lz4_flex", "memory-stats", "nativelink-config", @@ -1903,7 +1901,6 @@ dependencies = [ "bytes", "futures", "hex", - "log", "lru", "mock_instant", "nativelink-config", diff --git a/nativelink-scheduler/src/cache_lookup_scheduler.rs b/nativelink-scheduler/src/cache_lookup_scheduler.rs index c9e0dd154..fc243616a 100644 --- a/nativelink-scheduler/src/cache_lookup_scheduler.rs +++ b/nativelink-scheduler/src/cache_lookup_scheduler.rs @@ -35,7 +35,7 @@ use tokio::select; use tokio::sync::watch; use tokio_stream::wrappers::WatchStream; use tonic::Request; -use tracing::warn; +use tracing::{event, Level}; use crate::action_scheduler::ActionScheduler; use crate::platform_property_manager::PlatformPropertyManager; @@ -180,7 +180,11 @@ impl ActionScheduler for CacheLookupScheduler { return; } Err(err) => { - warn!("Error while calling `has` on `ac_store` in `CacheLookupScheduler`'s `add_action` function: {}", err); + event!( + Level::WARN, + ?err, + "Error while calling `has` on `ac_store` in `CacheLookupScheduler`'s `add_action` function" + ); } _ => {} } diff --git a/nativelink-scheduler/src/grpc_scheduler.rs b/nativelink-scheduler/src/grpc_scheduler.rs index 4373b8d9f..8f012dba3 100644 --- a/nativelink-scheduler/src/grpc_scheduler.rs +++ b/nativelink-scheduler/src/grpc_scheduler.rs @@ -40,7 +40,7 @@ use tokio::select; use tokio::sync::watch; use tokio::time::sleep; use tonic::{Request, Streaming}; -use tracing::{error, info, warn}; +use tracing::{error_span, event, Instrument, Level}; use crate::action_scheduler::ActionScheduler; use crate::platform_property_manager::PlatformPropertyManager; @@ -123,7 +123,10 @@ impl GrpcScheduler { loop { select!( _ = tx.closed() => { - info!("Client disconnected in GrpcScheduler"); + event!( + Level::INFO, + "Client disconnected in GrpcScheduler" + ); return; } response = result_stream.message() => { @@ -135,16 +138,27 @@ impl GrpcScheduler { match response.try_into() { Ok(response) => { if let Err(err) = tx.send(Arc::new(response)) { - info!("Client disconnected in GrpcScheduler: {}", err); + event!( + Level::INFO, + ?err, + "Client error in GrpcScheduler" + ); return; } } - Err(err) => error!("Error converting response to ActionState in GrpcScheduler: {}", err), + Err(err) => { + event!( + Level::ERROR, + ?err, + "Error converting response to ActionState in GrpcScheduler" + ); + }, } } ) } - }); + } + .instrument(error_span!("stream_state"))); return Ok(rx); } Err(make_err!( @@ -264,9 +278,10 @@ impl ActionScheduler for GrpcScheduler { match result_stream { Ok(result_stream) => Some(result_stream), Err(err) => { - warn!( - "Error response looking up action with upstream scheduler: {}", - err + event!( + Level::WARN, + ?err, + "Error looking up action with upstream scheduler" ); None } diff --git a/nativelink-scheduler/src/simple_scheduler.rs b/nativelink-scheduler/src/simple_scheduler.rs index 9e22397be..1c08ec041 100644 --- a/nativelink-scheduler/src/simple_scheduler.rs +++ b/nativelink-scheduler/src/simple_scheduler.rs @@ -38,7 +38,7 @@ use parking_lot::{Mutex, MutexGuard}; use tokio::sync::{watch, Notify}; use tokio::task::JoinHandle; use tokio::time::Duration; -use tracing::{error, warn}; +use tracing::{event, Level}; use crate::action_scheduler::ActionScheduler; use crate::platform_property_manager::PlatformPropertyManager; @@ -122,10 +122,12 @@ impl Workers { let res = worker .send_initial_connection_result() .err_tip(|| "Failed to send initial connection result to worker"); - if let Err(e) = &res { - error!( - "Worker connection appears to have been closed while adding to pool : {:?}", - e + if let Err(err) = &res { + event!( + Level::ERROR, + ?worker_id, + ?err, + "Worker connection appears to have been closed while adding to pool" ); } res @@ -379,15 +381,22 @@ impl SimpleSchedulerImpl { // Don't remove this task, instead we keep them around for a bit just in case // the client disconnected and will reconnect and ask for same job to be executed // again. - warn!( - "Action {} has no more listeners during evict_worker()", - action_info.digest().hash_str() + event!( + Level::WARN, + ?action_info, + ?worker_id, + "Action has no more listeners during evict_worker()" ); } } None => { self.metrics.retry_action_but_action_missing.inc(); - error!("Worker stated it was running an action, but it was not in the active_actions : Worker: {:?}, ActionInfo: {:?}", worker_id, action_info); + event!( + Level::ERROR, + ?action_info, + ?worker_id, + "Worker stated it was running an action, but it was not in the active_actions" + ); } } } @@ -435,9 +444,10 @@ impl SimpleSchedulerImpl { self.queued_actions.keys().rev().cloned().collect(); for action_info in action_infos { let Some(awaited_action) = self.queued_actions.get(action_info.as_ref()) else { - error!( - "queued_actions out of sync with itself for action {}", - action_info.digest().hash_str() + event!( + Level::ERROR, + ?action_info, + "queued_actions out of sync with itself" ); continue; }; @@ -454,13 +464,20 @@ impl SimpleSchedulerImpl { worker.notify_update(WorkerUpdate::RunAction(action_info.clone())); if notify_worker_result.is_err() { // Remove worker, as it is no longer receiving messages and let it try to find another worker. - let err = make_err!( - Code::Internal, - "Worker command failed, removing worker {}", - worker_id + event!( + Level::WARN, + ?worker_id, + ?action_info, + "Worker command failed, removing worker", + ); + self.immediate_evict_worker( + &worker_id, + make_err!( + Code::Internal, + "Worker command failed, removing worker {}", + worker_id + ), ); - warn!("{:?}", err); - self.immediate_evict_worker(&worker_id, err); return; } @@ -482,9 +499,11 @@ impl SimpleSchedulerImpl { // Don't remove this task, instead we keep them around for a bit just in case // the client disconnected and will reconnect and ask for same job to be executed // again. - warn!( - "Action {} has no more listeners", - awaited_action.action_info.digest().hash_str() + event!( + Level::WARN, + ?action_info, + ?worker_id, + "Action has no more listeners during do_try_match()" ); } awaited_action.attempts += 1; @@ -505,7 +524,12 @@ impl SimpleSchedulerImpl { self.metrics .update_action_with_internal_error_no_action .inc(); - error!("Could not find action info in active actions : {action_info_hash_key:?}"); + event!( + Level::ERROR, + ?action_info_hash_key, + ?worker_id, + "Could not find action info in active actions" + ); return; }; @@ -518,13 +542,24 @@ impl SimpleSchedulerImpl { running_action.attempts -= 1; } let Some(running_action_worker_id) = running_action.worker_id else { - return error!( - "Got a result from a worker that should not be running the action, Removing worker. Expected action to be unassigned got worker {worker_id}" + event!( + Level::ERROR, + ?action_info_hash_key, + ?worker_id, + "Got a result from a worker that should not be running the action, Removing worker. Expected action to be unassigned got worker", ); + return; }; if running_action_worker_id == *worker_id { // Don't set the error on an action that's running somewhere else. - warn!("Internal error for worker {}: {}", worker_id, err); + event!( + Level::WARN, + ?action_info_hash_key, + ?worker_id, + ?running_action_worker_id, + ?err, + "Internal worker error", + ); running_action.last_error = Some(err.clone()); } else { self.metrics @@ -561,11 +596,17 @@ impl SimpleSchedulerImpl { ) -> Result<(), Error> { if !action_stage.has_action_result() { self.metrics.update_action_missing_action_result.inc(); + event!( + Level::ERROR, + ?action_info_hash_key, + ?worker_id, + ?action_stage, + "Worker sent error while updating action. Removing worker" + ); let err = make_err!( Code::Internal, "Worker '{worker_id}' set the action_stage of running action {action_info_hash_key:?} to {action_stage:?}. Removing worker.", ); - error!("{:?}", err); self.immediate_evict_worker(worker_id, err.clone()); return Err(err); } @@ -590,7 +631,14 @@ impl SimpleSchedulerImpl { "Got a result from a worker that should not be running the action, Removing worker. Expected action to be unassigned got worker {worker_id}", ), }; - error!("{:?}", err); + event!( + Level::ERROR, + ?action_info, + ?worker_id, + ?running_action.worker_id, + ?err, + "Got a result from a worker that should not be running the action, Removing worker" + ); // First put it back in our active_actions or we will drop the task. self.active_actions.insert(action_info, running_action); self.immediate_evict_worker(worker_id, err.clone()); @@ -606,9 +654,11 @@ impl SimpleSchedulerImpl { if !running_action.current_state.stage.is_finished() { if send_result.is_err() { self.metrics.update_action_no_more_listeners.inc(); - warn!( - "Action {} has no more listeners during update_action()", - action_info.digest().hash_str() + event!( + Level::WARN, + ?action_info, + ?worker_id, + "Action has no more listeners during update_action()" ); } // If the operation is not finished it means the worker is still working on it, so put it @@ -903,12 +953,18 @@ impl WorkerScheduler for SimpleScheduler { }) .collect(); for worker_id in &worker_ids_to_remove { - let err = make_err!( - Code::Internal, - "Worker {worker_id} timed out, removing from pool" + event!( + Level::WARN, + ?worker_id, + "Worker timed out, removing from pool" + ); + inner.immediate_evict_worker( + worker_id, + make_err!( + Code::Internal, + "Worker {worker_id} timed out, removing from pool" + ), ); - warn!("{:?}", err); - inner.immediate_evict_worker(worker_id, err); } Ok(()) diff --git a/nativelink-service/BUILD.bazel b/nativelink-service/BUILD.bazel index 7dcdc3ab9..32a838097 100644 --- a/nativelink-service/BUILD.bazel +++ b/nativelink-service/BUILD.bazel @@ -29,7 +29,6 @@ rust_library( "@crates//:bytes", "@crates//:futures", "@crates//:hyper", - "@crates//:log", "@crates//:parking_lot", "@crates//:prost", "@crates//:rand", diff --git a/nativelink-service/Cargo.toml b/nativelink-service/Cargo.toml index 8a851d31e..99c439d11 100644 --- a/nativelink-service/Cargo.toml +++ b/nativelink-service/Cargo.toml @@ -15,7 +15,6 @@ bytes = "1.6.0" futures = "0.3.30" hyper = { version = "0.14.28" } serde_json5 = "0.1.0" -log = "0.4.21" parking_lot = "0.12.1" prost = "0.12.4" rand = "0.8.5" diff --git a/nativelink-service/src/ac_server.rs b/nativelink-service/src/ac_server.rs index ea09ba498..24a4bfc07 100644 --- a/nativelink-service/src/ac_server.rs +++ b/nativelink-service/src/ac_server.rs @@ -13,9 +13,9 @@ // limitations under the License. use std::collections::HashMap; +use std::fmt::Debug; use std::pin::Pin; use std::sync::Arc; -use std::time::Instant; use bytes::BytesMut; use nativelink_config::cas_server::{AcStoreConfig, InstanceName}; @@ -33,7 +33,7 @@ use nativelink_util::common::DigestInfo; use nativelink_util::store_trait::Store; use prost::Message; use tonic::{Request, Response, Status}; -use tracing::{error, info}; +use tracing::{event, instrument, Level}; #[derive(Clone)] pub struct AcStoreInfo { @@ -45,6 +45,12 @@ pub struct AcServer { stores: HashMap, } +impl Debug for AcServer { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("AcServer").finish() + } +} + impl AcServer { pub fn new( config: &HashMap, @@ -157,60 +163,38 @@ impl AcServer { #[tonic::async_trait] impl ActionCache for AcServer { + #[allow(clippy::blocks_in_conditions)] + #[instrument( + ret(level = Level::INFO), + level = Level::ERROR, + skip_all, + fields(request = ?grpc_request.get_ref()) + )] async fn get_action_result( &self, grpc_request: Request, ) -> Result, Status> { - let now = Instant::now(); - info!( - "\x1b[0;31mget_action_result Req\x1b[0m: {:?}", - grpc_request.get_ref() - ); - let hash = grpc_request - .get_ref() - .action_digest - .as_ref() - .map(|v| v.hash.to_string()); let resp = self.inner_get_action_result(grpc_request).await; - let d = now.elapsed().as_secs_f32(); if resp.is_err() && resp.as_ref().err().unwrap().code != Code::NotFound { - error!( - "\x1b[0;31mget_action_result Resp\x1b[0m: {} {:?} {:?}", - d, hash, resp - ); - } else { - info!( - "\x1b[0;31mget_action_result Resp\x1b[0m: {} {:?} {:?}", - d, hash, resp - ); + event!(Level::ERROR, return = ?resp); } return resp.map_err(|e| e.into()); } + #[allow(clippy::blocks_in_conditions)] + #[instrument( + err, + ret(level = Level::INFO), + level = Level::ERROR, + skip_all, + fields(request = ?grpc_request.get_ref()) + )] async fn update_action_result( &self, grpc_request: Request, ) -> Result, Status> { - let now = Instant::now(); - info!( - "\x1b[0;31mupdate_action_result Req\x1b[0m: {:?}", - grpc_request.get_ref() - ); - let resp = self.inner_update_action_result(grpc_request).await; - let d = now.elapsed().as_secs_f32(); - if resp.is_err() { - log::error!( - "\x1b[0;31mupdate_action_result Resp\x1b[0m: {} {:?}", - d, - resp - ); - } else { - log::info!( - "\x1b[0;31mupdate_action_result Resp\x1b[0m: {} {:?}", - d, - resp - ); - } - return resp.map_err(|e| e.into()); + self.inner_update_action_result(grpc_request) + .await + .map_err(|e| e.into()) } } diff --git a/nativelink-service/src/bytestream_server.rs b/nativelink-service/src/bytestream_server.rs index d84617511..b94f080cb 100644 --- a/nativelink-service/src/bytestream_server.rs +++ b/nativelink-service/src/bytestream_server.rs @@ -18,7 +18,7 @@ use std::fmt::{Debug, Formatter}; use std::pin::Pin; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Duration; use futures::future::{pending, BoxFuture}; use futures::stream::unfold; @@ -45,7 +45,7 @@ use parking_lot::Mutex; use tokio::task::AbortHandle; use tokio::time::sleep; use tonic::{Request, Response, Status, Streaming}; -use tracing::{enabled, error, info, Level}; +use tracing::{enabled, error_span, event, instrument, Instrument, Level}; /// If this value changes update the documentation in the config definition. const DEFAULT_PERSIST_STREAM_ON_DISCONNECT_TIMEOUT: Duration = Duration::from_secs(60); @@ -100,9 +100,10 @@ impl<'a> Drop for ActiveStreamGuard<'a> { let mut active_uploads = self.bytestream_server.active_uploads.lock(); let uuid = stream_state.uuid.clone(); let Some(active_uploads_slot) = active_uploads.get_mut(&uuid) else { - error!( - "Failed to find active upload for UUID: {}. This should never happen.", - uuid + event!( + Level::ERROR, + err = "Failed to find active upload. This should never happen.", + uuid = ?uuid, ); return; }; @@ -113,7 +114,7 @@ impl<'a> Drop for ActiveStreamGuard<'a> { (*sleep_fn)().await; if let Some(active_uploads) = weak_active_uploads.upgrade() { let mut active_uploads = active_uploads.lock(); - info!("Removing idle stream {uuid}"); + event!(Level::INFO, msg = "Removing idle stream", uuid = ?uuid); active_uploads.remove(&uuid); } }) @@ -213,7 +214,7 @@ impl ByteStreamServer { return Err(make_input_err!("Cannot upload same UUID simultaneously")); }; let bytes_received = maybe_idle_stream.0.clone(); - info!("Joining existing stream {}", entry.key()); + event!(Level::INFO, msg = "Joining existing stream", entry = ?entry.key()); return Ok(idle_stream.into_active_stream(bytes_received, self)); } Entry::Vacant(entry) => { @@ -307,7 +308,10 @@ impl ByteStreamServer { }), }); - Ok(Response::new(Box::pin(unfold(state, move |state| async { + let read_stream_span = error_span!("read_stream"); + + Ok(Response::new(Box::pin(unfold(state, move |state| { + async { let mut state = state?; // If None our stream is done. let mut response = ReadResponse::default(); { @@ -327,7 +331,11 @@ impl ByteStreamServer { return Some((Err(err.into()), None)); } response.data = bytes; - info!("\x1b[0;31mBytestream Read Chunk Resp\x1b[0m: {:?}", response); + if enabled!(Level::DEBUG) { + event!(Level::INFO, response = ?response); + } else { + event!(Level::INFO, response.data = format!("", response.data.len())); + } break; } Err(mut e) => { @@ -351,7 +359,7 @@ impl ByteStreamServer { // message as it will be the most relevant. e.messages.truncate(1); } - info!("\x1b[0;31mBytestream Read Chunk Resp\x1b[0m: Error {:?}", e); + event!(Level::ERROR, response = ?e); return Some((Err(e.into()), None)) } } @@ -374,9 +382,19 @@ impl ByteStreamServer { } } Some((Ok(response), Some(state))) + }.instrument(read_stream_span.clone()) })))) } + // We instrument tracing here as well as below because `stream` has a hash on it + // that is extracted from the first stream message. If we only implemented it below + // we would not have the hash available to us. + #[instrument( + ret(level = Level::INFO), + level = Level::ERROR, + skip(self), + fields(stream.first_msg = "") + )] async fn inner_write( &self, mut stream: WriteRequestStreamWrapper, Status>, @@ -566,79 +584,66 @@ impl ByteStreamServer { #[tonic::async_trait] impl ByteStream for ByteStreamServer { type ReadStream = ReadStream; + + #[allow(clippy::blocks_in_conditions)] + #[instrument( + err, + level = Level::ERROR, + skip_all, + fields(request = ?grpc_request.get_ref()) + )] async fn read( &self, grpc_request: Request, ) -> Result, Status> { - info!("\x1b[0;31mRead Req\x1b[0m: {:?}", grpc_request.get_ref()); - let now = Instant::now(); let resp = self .inner_read(grpc_request) .await .err_tip(|| "Failed on read() command") .map_err(|e| e.into()); - let d = now.elapsed().as_secs_f32(); - if let Err(err) = resp.as_ref() { - error!("\x1b[0;31mRead Resp\x1b[0m: {} {:?}", d, err); - } else { - info!("\x1b[0;31mRead Resp\x1b[0m: {}", d); + if resp.is_ok() { + event!(Level::DEBUG, return = "Ok()"); } resp } + #[allow(clippy::blocks_in_conditions)] + #[instrument( + err, + level = Level::ERROR, + skip_all, + fields(request = ?grpc_request.get_ref()) + )] async fn write( &self, grpc_request: Request>, ) -> Result, Status> { - let now = Instant::now(); let stream = WriteRequestStreamWrapper::from(grpc_request.into_inner()) .await .err_tip(|| "Could not unwrap first stream message") .map_err(Into::::into)?; - let hash = if enabled!(Level::DEBUG) { - Some(stream.hash.clone()) - } else { - None - }; - - info!("\x1b[0;31mWrite Req\x1b[0m: {:?}", hash); - let resp = self - .inner_write(stream) + self.inner_write(stream) .await .err_tip(|| "In ByteStreamServer::write()") - .map_err(|e| e.into()); - - let d = now.elapsed().as_secs_f32(); - if let Err(err) = resp.as_ref() { - error!("\x1b[0;31mWrite Resp\x1b[0m: {} {:?} {:?}", d, hash, err); - } else { - info!("\x1b[0;31mWrite Resp\x1b[0m: {} {:?}", d, hash); - } - resp + .map_err(|e| e.into()) } + #[allow(clippy::blocks_in_conditions)] + #[instrument( + err, + ret(level = Level::INFO), + level = Level::ERROR, + skip_all, + fields(request = ?grpc_request.get_ref()) + )] async fn query_write_status( &self, grpc_request: Request, ) -> Result, Status> { - let now = Instant::now(); - let query_request = grpc_request.into_inner(); - - let resp = self - .inner_query_write_status(&query_request) + self.inner_query_write_status(&grpc_request.into_inner()) .await .err_tip(|| "Failed on query_write_status() command") - .map_err(|e| e.into()); - - let d = now.elapsed().as_secs_f32(); - if resp.is_err() { - error!("\x1b[0;31mQuery Req\x1b[0m: {:?}", query_request); - error!("\x1b[0;31mQuery Resp\x1b[0m: {} {:?}", d, resp); - } else { - info!("\x1b[0;31mQuery Req\x1b[0m: {:?}", query_request); - info!("\x1b[0;31mQuery Resp\x1b[0m: {} {:?}", d, resp); - } - resp + .map_err(|e| e.into()) } } diff --git a/nativelink-service/src/capabilities_server.rs b/nativelink-service/src/capabilities_server.rs index f775d29da..78e2fe62b 100644 --- a/nativelink-service/src/capabilities_server.rs +++ b/nativelink-service/src/capabilities_server.rs @@ -31,6 +31,7 @@ use nativelink_proto::build::bazel::semver::SemVer; use nativelink_scheduler::action_scheduler::ActionScheduler; use nativelink_util::digest_hasher::default_digest_hasher_func; use tonic::{Request, Response, Status}; +use tracing::{instrument, Level}; const MAX_BATCH_TOTAL_SIZE: i64 = 64 * 1024; @@ -82,11 +83,19 @@ impl CapabilitiesServer { #[tonic::async_trait] impl Capabilities for CapabilitiesServer { + #[allow(clippy::blocks_in_conditions)] + #[instrument( + err, + ret(level = Level::INFO), + level = Level::ERROR, + skip_all, + fields(request = ?grpc_request.get_ref()) + )] async fn get_capabilities( &self, - request: Request, + grpc_request: Request, ) -> Result, Status> { - let instance_name = request.into_inner().instance_name; + let instance_name = grpc_request.into_inner().instance_name; let maybe_supported_node_properties = self .supported_node_properties_for_instance .get(&instance_name); diff --git a/nativelink-service/src/cas_server.rs b/nativelink-service/src/cas_server.rs index f04213517..81ec1ef02 100644 --- a/nativelink-service/src/cas_server.rs +++ b/nativelink-service/src/cas_server.rs @@ -15,7 +15,6 @@ use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; -use std::time::Instant; use bytes::Bytes; use futures::stream::{FuturesUnordered, Stream}; @@ -36,7 +35,7 @@ use nativelink_store::store_manager::StoreManager; use nativelink_util::common::DigestInfo; use nativelink_util::store_trait::Store; use tonic::{Request, Response, Status}; -use tracing::{error, info}; +use tracing::{event, instrument, Level}; pub struct CasServer { stores: HashMap>, @@ -248,94 +247,80 @@ impl CasServer { #[tonic::async_trait] impl ContentAddressableStorage for CasServer { + type GetTreeStream = GetTreeStream; + + #[allow(clippy::blocks_in_conditions)] + #[instrument( + err, + ret(level = Level::INFO), + level = Level::ERROR, + skip_all, + fields(request = ?grpc_request.get_ref()) + )] async fn find_missing_blobs( &self, grpc_request: Request, ) -> Result, Status> { - info!( - "\x1b[0;31mfind_missing_blobs Req\x1b[0m: {:?}", - grpc_request.get_ref() - ); - let now = Instant::now(); - let resp = self - .inner_find_missing_blobs(grpc_request) + self.inner_find_missing_blobs(grpc_request) .await .err_tip(|| "Failed on find_missing_blobs() command") - .map_err(|e| e.into()); - let d = now.elapsed().as_secs_f32(); - if resp.is_err() { - error!("\x1b[0;31mfind_missing_blobs Resp\x1b[0m: {} {:?}", d, resp); - } else { - info!("\x1b[0;31mfind_missing_blobs Resp\x1b[0m: {} {:?}", d, resp); - } - resp + .map_err(|e| e.into()) } + #[allow(clippy::blocks_in_conditions)] + #[instrument( + err, + ret(level = Level::INFO), + level = Level::ERROR, + skip_all, + fields(request = ?grpc_request.get_ref()) + )] async fn batch_update_blobs( &self, grpc_request: Request, ) -> Result, Status> { - info!( - "\x1b[0;31mbatch_update_blobs Req\x1b[0m: {:?}", - grpc_request.get_ref() - ); - let now = Instant::now(); - let resp = self - .inner_batch_update_blobs(grpc_request) + self.inner_batch_update_blobs(grpc_request) .await .err_tip(|| "Failed on batch_update_blobs() command") - .map_err(|e| e.into()); - let d = now.elapsed().as_secs_f32(); - if resp.is_err() { - error!("\x1b[0;31mbatch_update_blobs Resp\x1b[0m: {} {:?}", d, resp); - } else { - info!("\x1b[0;31mbatch_update_blobs Resp\x1b[0m: {} {:?}", d, resp); - } - resp + .map_err(|e| e.into()) } + #[allow(clippy::blocks_in_conditions)] + #[instrument( + err, + ret(level = Level::INFO), + level = Level::ERROR, + skip_all, + fields(request = ?grpc_request.get_ref()) + )] async fn batch_read_blobs( &self, grpc_request: Request, ) -> Result, Status> { - info!( - "\x1b[0;31mbatch_read_blobs Req\x1b[0m: {:?}", - grpc_request.get_ref() - ); - let now = Instant::now(); - let resp = self - .inner_batch_read_blobs(grpc_request) + self.inner_batch_read_blobs(grpc_request) .await .err_tip(|| "Failed on batch_read_blobs() command") - .map_err(|e| e.into()); - let d = now.elapsed().as_secs_f32(); - if resp.is_err() { - error!("\x1b[0;31mbatch_read_blobs Resp\x1b[0m: {} {:?}", d, resp); - } else { - info!("\x1b[0;31mbatch_read_blobs Resp\x1b[0m: {} {:?}", d, resp); - } - resp + .map_err(|e| e.into()) } - type GetTreeStream = GetTreeStream; + #[allow(clippy::blocks_in_conditions)] + #[instrument( + err, + level = Level::ERROR, + skip_all, + fields(request = ?grpc_request.get_ref()) + )] async fn get_tree( &self, grpc_request: Request, ) -> Result, Status> { - info!( - "\x1b[0;31mget_tree Req\x1b[0m: {:?}", - grpc_request.get_ref() - ); - let now = Instant::now(); - let resp: Result, Status> = self + let resp = self .inner_get_tree(grpc_request) .await .err_tip(|| "Failed on get_tree() command") .map_err(|e| e.into()); - let d = now.elapsed().as_secs_f32(); - match &resp { - Err(err) => error!("\x1b[0;31mget_tree Resp\x1b[0m: {} : {:?}", d, err), - Ok(_) => info!("\x1b[0;31mget_tree Resp\x1b[0m: {}", d), + if resp.is_ok() { + event!(Level::DEBUG, return = "Ok()"); } resp } diff --git a/nativelink-service/src/execution_server.rs b/nativelink-service/src/execution_server.rs index cd26d4f93..a403636b4 100644 --- a/nativelink-service/src/execution_server.rs +++ b/nativelink-service/src/execution_server.rs @@ -15,7 +15,7 @@ use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; -use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; use futures::{Stream, StreamExt}; use nativelink_config::cas_server::{ExecutionConfig, InstanceName}; @@ -41,7 +41,7 @@ use rand::{thread_rng, Rng}; use tokio::sync::watch; use tokio_stream::wrappers::WatchStream; use tonic::{Request, Response, Status}; -use tracing::{error, info}; +use tracing::{event, instrument, Level}; struct InstanceInfo { scheduler: Arc, @@ -184,7 +184,7 @@ impl ExecutionServer { fn to_execute_stream(receiver: watch::Receiver>) -> Response { let receiver_stream = Box::pin(WatchStream::new(receiver).map(|action_update| { - info!("\x1b[0;31mexecute Resp Stream\x1b[0m: {:?}", action_update); + event!(Level::INFO, ?action_update, "Execute Resp Stream",); Ok(Into::::into(action_update.as_ref().clone())) })); tonic::Response::new(receiver_stream) @@ -263,36 +263,45 @@ impl ExecutionServer { #[tonic::async_trait] impl Execution for ExecutionServer { type ExecuteStream = ExecuteStream; + type WaitExecutionStream = ExecuteStream; + #[allow(clippy::blocks_in_conditions)] + #[instrument( + err, + level = Level::ERROR, + skip_all, + fields(request = ?grpc_request.get_ref()) + )] async fn execute( &self, grpc_request: Request, ) -> Result, Status> { - // TODO(blaise.bruer) This is a work in progress, remote execution likely won't work yet. - info!("\x1b[0;31mexecute Req\x1b[0m: {:?}", grpc_request.get_ref()); - let now = Instant::now(); - let resp = self - .inner_execute(grpc_request) + self.inner_execute(grpc_request) .await .err_tip(|| "Failed on execute() command") - .map_err(|e| e.into()); - let d = now.elapsed().as_secs_f32(); - if let Err(err) = &resp { - error!("\x1b[0;31mexecute Resp\x1b[0m: {} {:?}", d, err); - } else { - info!("\x1b[0;31mexecute Resp\x1b[0m: {}", d); - } - resp + .map_err(|e| e.into()) } - type WaitExecutionStream = ExecuteStream; + #[allow(clippy::blocks_in_conditions)] + #[instrument( + err, + level = Level::ERROR, + skip_all, + fields(request = ?grpc_request.get_ref()) + )] async fn wait_execution( &self, - request: Request, + grpc_request: Request, ) -> Result, Status> { - self.inner_wait_execution(request) + let resp = self + .inner_wait_execution(grpc_request) .await .err_tip(|| "Failed on wait_execution() command") - .map_err(|e| e.into()) + .map_err(|e| e.into()); + + if resp.is_ok() { + event!(Level::DEBUG, return = "Ok()"); + } + resp } } diff --git a/nativelink-service/src/worker_api_server.rs b/nativelink-service/src/worker_api_server.rs index f2ee583a7..47bcca454 100644 --- a/nativelink-service/src/worker_api_server.rs +++ b/nativelink-service/src/worker_api_server.rs @@ -15,7 +15,7 @@ use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; -use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; use futures::stream::unfold; use futures::Stream; @@ -35,7 +35,7 @@ use nativelink_util::platform_properties::PlatformProperties; use tokio::sync::mpsc; use tokio::time::interval; use tonic::{Request, Response, Status}; -use tracing::{error, info, warn}; +use tracing::{error_span, event, instrument, Instrument, Level}; use uuid::Uuid; pub type ConnectWorkerStream = @@ -67,17 +67,21 @@ impl WorkerApiServer { .expect("Error: system time is now behind unix epoch"); match weak_scheduler.upgrade() { Some(scheduler) => { - if let Err(e) = + if let Err(err) = scheduler.remove_timedout_workers(timestamp.as_secs()).await { - error!("Error while running remove_timedout_workers : {:?}", e); + event!( + Level::ERROR, + ?err, + "Failed to remove_timedout_workers", + ); } } // If we fail to upgrade, our service is probably destroyed, so return. None => return, } } - }); + }.instrument(error_span!("worker_api_server"))); } Self::new_with_now_fn( @@ -159,9 +163,10 @@ impl WorkerApiServer { if let Some(update_for_worker) = rx.recv().await { return Some((Ok(update_for_worker), (rx, worker_id))); } - warn!( - "UpdateForWorker channel was closed, thus closing connection to worker node : {}", - worker_id + event!( + Level::WARN, + ?worker_id, + "UpdateForWorker channel was closed, thus closing connection to worker node", ); None @@ -231,83 +236,76 @@ impl WorkerApiServer { #[tonic::async_trait] impl WorkerApi for WorkerApiServer { type ConnectWorkerStream = ConnectWorkerStream; + + #[allow(clippy::blocks_in_conditions)] + #[instrument( + err, + level = Level::ERROR, + skip_all, + fields(request = ?grpc_request.get_ref()) + )] async fn connect_worker( &self, grpc_request: Request, ) -> Result, Status> { - let now = Instant::now(); - info!( - "\x1b[0;31mconnect_worker Req\x1b[0m: {:?}", - grpc_request.get_ref() - ); - let supported_properties = grpc_request.into_inner(); - let resp = self.inner_connect_worker(supported_properties).await; - let d = now.elapsed().as_secs_f32(); - if let Err(err) = resp.as_ref() { - error!("\x1b[0;31mconnect_worker Resp\x1b[0m: {} {:?}", d, err); - } else { - info!("\x1b[0;31mconnect_worker Resp\x1b[0m: {}", d); + let resp = self + .inner_connect_worker(grpc_request.into_inner()) + .await + .map_err(|e| e.into()); + if resp.is_ok() { + event!(Level::DEBUG, return = "Ok()"); } - return resp.map_err(|e| e.into()); + resp } + #[allow(clippy::blocks_in_conditions)] + #[instrument( + err, + ret(level = Level::INFO), + level = Level::ERROR, + skip_all, + fields(request = ?grpc_request.get_ref()) + )] async fn keep_alive( &self, grpc_request: Request, ) -> Result, Status> { - let now = Instant::now(); - info!( - "\x1b[0;31mkeep_alive Req\x1b[0m: {:?}", - grpc_request.get_ref() - ); - let keep_alive_request = grpc_request.into_inner(); - let resp = self.inner_keep_alive(keep_alive_request).await; - let d = now.elapsed().as_secs_f32(); - if let Err(err) = resp.as_ref() { - error!("\x1b[0;31mkeep_alive Resp\x1b[0m: {} {:?}", d, err); - } else { - info!("\x1b[0;31mkeep_alive Resp\x1b[0m: {}", d); - } - return resp.map_err(|e| e.into()); + self.inner_keep_alive(grpc_request.into_inner()) + .await + .map_err(|e| e.into()) } + #[allow(clippy::blocks_in_conditions)] + #[instrument( + err, + ret(level = Level::INFO), + level = Level::ERROR, + skip_all, + fields(request = ?grpc_request.get_ref()) + )] async fn going_away( &self, grpc_request: Request, ) -> Result, Status> { - let now = Instant::now(); - info!( - "\x1b[0;31mgoing_away Req\x1b[0m: {:?}", - grpc_request.get_ref() - ); - let going_away_request = grpc_request.into_inner(); - let resp = self.inner_going_away(going_away_request).await; - let d = now.elapsed().as_secs_f32(); - if let Err(err) = resp.as_ref() { - error!("\x1b[0;31mgoing_away Resp\x1b[0m: {} {:?}", d, err); - } else { - info!("\x1b[0;31mgoing_away Resp\x1b[0m: {}", d); - } - return resp.map_err(|e| e.into()); + self.inner_going_away(grpc_request.into_inner()) + .await + .map_err(|e| e.into()) } + #[allow(clippy::blocks_in_conditions)] + #[instrument( + err, + ret(level = Level::INFO), + level = Level::ERROR, + skip_all, + fields(request = ?grpc_request.get_ref()) + )] async fn execution_response( &self, grpc_request: Request, ) -> Result, Status> { - let now = Instant::now(); - info!( - "\x1b[0;31mexecution_response Req\x1b[0m: {:?}", - grpc_request.get_ref() - ); - let execute_result = grpc_request.into_inner(); - let resp = self.inner_execution_response(execute_result).await; - let d = now.elapsed().as_secs_f32(); - if let Err(err) = resp.as_ref() { - error!("\x1b[0;31mexecution_response Resp\x1b[0m: {} {:?}", d, err); - } else { - info!("\x1b[0;31mexecution_response Resp\x1b[0m: {}", d); - } - return resp.map_err(|e| e.into()); + self.inner_execution_response(grpc_request.into_inner()) + .await + .map_err(|e| e.into()) } } diff --git a/nativelink-store/BUILD.bazel b/nativelink-store/BUILD.bazel index adf0f67ef..21ac9c159 100644 --- a/nativelink-store/BUILD.bazel +++ b/nativelink-store/BUILD.bazel @@ -52,7 +52,6 @@ rust_library( "@crates//:http-body", "@crates//:hyper", "@crates//:hyper-rustls", - "@crates//:log", "@crates//:lz4_flex", "@crates//:parking_lot", "@crates//:prost", diff --git a/nativelink-store/Cargo.toml b/nativelink-store/Cargo.toml index 1ea647768..bc8da14b2 100644 --- a/nativelink-store/Cargo.toml +++ b/nativelink-store/Cargo.toml @@ -24,7 +24,6 @@ hex = "0.4.3" http-body = "1.0.0" hyper = { version = "0.14.28" } hyper-rustls = { version = "0.24.2", features = ["webpki-tokio"] } -log = "0.4.21" lz4_flex = "0.11.3" parking_lot = "0.12.1" prost = "0.12.4" diff --git a/nativelink-store/src/completeness_checking_store.rs b/nativelink-store/src/completeness_checking_store.rs index 35360319c..ff7ff9f77 100644 --- a/nativelink-store/src/completeness_checking_store.rs +++ b/nativelink-store/src/completeness_checking_store.rs @@ -29,7 +29,7 @@ use nativelink_util::health_utils::{default_health_status_indicator, HealthStatu use nativelink_util::store_trait::{Store, UploadSizeInfo}; use parking_lot::Mutex; use tokio::sync::Notify; -use tracing::warn; +use tracing::{event, Level}; use crate::ac_utils::{get_and_decode_digest, get_size_and_decode_digest}; @@ -288,12 +288,16 @@ async fn inner_has_with_results( maybe_result = futures.next() => { match maybe_result { Some(Ok(())) => {} - Some(Err((e, i))) => { + Some(Err((err, i))) => { state_mux.lock().results[i] = None; // Note: Don't return the errors. We just flag the result as // missing but show a warning if it's not a NotFound. - if e.code != Code::NotFound { - warn!("{e:?}"); + if err.code != Code::NotFound { + event!( + Level::WARN, + ?err, + "Error checking existence of digest" + ); } } None => { diff --git a/nativelink-store/src/dedup_store.rs b/nativelink-store/src/dedup_store.rs index 4a45d9231..799f00285 100644 --- a/nativelink-store/src/dedup_store.rs +++ b/nativelink-store/src/dedup_store.rs @@ -29,7 +29,7 @@ use nativelink_util::store_trait::{Store, UploadSizeInfo}; use serde::{Deserialize, Serialize}; use tokio_util::codec::FramedRead; use tokio_util::io::StreamReader; -use tracing::warn; +use tracing::{event, Level}; // NOTE: If these change update the comments in `stores.rs` to reflect // the new defaults. @@ -110,11 +110,12 @@ impl DedupStore { }; match self.bincode_options.deserialize::(&data) { - Err(e) => { - warn!( - "Failed to deserialize index in dedup store : {} - {:?}", - digest.hash_str(), - e + Err(err) => { + event!( + Level::WARN, + ?digest, + ?err, + "Failed to deserialize index in dedup store", ); // We return the equivalent of NotFound here so the client is happy. return Ok(None); diff --git a/nativelink-store/src/filesystem_store.rs b/nativelink-store/src/filesystem_store.rs index 6d3e16f14..dad11e1a3 100644 --- a/nativelink-store/src/filesystem_store.rs +++ b/nativelink-store/src/filesystem_store.rs @@ -39,7 +39,7 @@ use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom}; use tokio::task::spawn_blocking; use tokio::time::{sleep, timeout, Sleep}; use tokio_stream::wrappers::ReadDirStream; -use tracing::{error, info, warn}; +use tracing::{event, trace_span, Instrument, Level}; use crate::cas_utils::is_zero_digest; @@ -112,21 +112,21 @@ impl Drop for EncodedFilePath { shared_context .active_drop_spawns .fetch_add(1, Ordering::Relaxed); - tokio::spawn(async move { - info!( - "\x1b[0;31mFilesystem Store\x1b[0m: Deleting: {:?}", - file_path - ); - let result = fs::remove_file(&file_path) - .await - .err_tip(|| format!("Failed to remove file {file_path:?}")); - if let Err(err) = result { - error!("\x1b[0;31mFilesystem Store\x1b[0m: {:?}", err); + tokio::spawn( + async move { + event!(Level::INFO, ?file_path, "File deleted",); + let result = fs::remove_file(&file_path) + .await + .err_tip(|| format!("Failed to remove file {file_path:?}")); + if let Err(err) = result { + event!(Level::ERROR, ?file_path, ?err, "Failed to delete file",); + } + shared_context + .active_drop_spawns + .fetch_sub(1, Ordering::Relaxed); } - shared_context - .active_drop_spawns - .fetch_sub(1, Ordering::Relaxed); - }); + .instrument(trace_span!("delete_file")), + ); } } @@ -218,7 +218,13 @@ impl FileEntry for FileEntryImpl { if let Err(remove_err) = remove_result { err = err.merge(remove_err); } - warn!("\x1b[0;31mFilesystem Store\x1b[0m: {:?}", err); + event!( + Level::WARN, + ?err, + ?block_size, + ?temp_full_path, + "Failed to create file", + ); Err(err) .err_tip(|| format!("Failed to create {temp_full_path:?} in filesystem store")) }) @@ -335,8 +341,8 @@ impl LenEntry for FileEntryImpl { })? }) .await; - if let Err(e) = result { - error!("{e}"); + if let Err(err) = result { + event!(Level::ERROR, ?err, "Failed to touch file",); return false; } true @@ -362,18 +368,23 @@ impl LenEntry for FileEntryImpl { let to_path = to_full_path_from_digest(&encoded_file_path.shared_context.temp_path, &new_digest); - info!( - "\x1b[0;31mFilesystem Store\x1b[0m: Unref {}, moving file {:?} to {:?}", - encoded_file_path.digest.hash_str(), - from_path, - to_path - ); if let Err(err) = fs::rename(&from_path, &to_path).await { - warn!( - "Failed to rename file from {:?} to {:?} : {:?}", - from_path, to_path, err + event!( + Level::WARN, + digest = ?encoded_file_path.digest, + ?from_path, + ?to_path, + ?err, + "Failed to rename file", ); } else { + event!( + Level::INFO, + digest = ?encoded_file_path.digest, + ?from_path, + ?to_path, + "Renamed file", + ); encoded_file_path.path_type = PathType::Temp; encoded_file_path.digest = new_digest; } @@ -479,9 +490,11 @@ async fn add_files_to_cache( ) .await; if let Err(err) = result { - warn!( - "Could not add file to eviction cache, so deleting: {} - {:?}", - file_name, err + event!( + Level::WARN, + ?file_name, + ?err, + "Failed to add file to eviction cache", ); // Ignore result. let _ = @@ -501,10 +514,7 @@ async fn prune_temp_path(temp_path: &str) -> Result<(), Error> { while let Some(dir_entry) = read_dir_stream.next().await { let path = dir_entry?.path(); if let Err(err) = fs::remove_file(&path).await { - warn!( - "Failed to delete file in filesystem store {:?} : {:?}", - &path, err - ); + event!(Level::WARN, ?path, ?err, "Failed to delete file",); } } Ok(()) @@ -658,43 +668,52 @@ impl FilesystemStore { // We need to guarantee that this will get to the end even if the parent future is dropped. // See: https://github.com/TraceMachina/nativelink/issues/495 - tokio::spawn(async move { - let mut encoded_file_path = entry.get_encoded_file_path().write().await; - let final_path = get_file_path_raw( - &PathType::Content, - encoded_file_path.shared_context.as_ref(), - &digest, - ); - - evicting_map.insert(digest, entry.clone()).await; + tokio::spawn( + async move { + let mut encoded_file_path = entry.get_encoded_file_path().write().await; + let final_path = get_file_path_raw( + &PathType::Content, + encoded_file_path.shared_context.as_ref(), + &digest, + ); - let from_path = encoded_file_path.get_file_path(); - // Internally tokio spawns fs commands onto a blocking thread anyways. - // Since we are already on a blocking thread, we just need the `fs` wrapper to manage - // an open-file permit (ensure we don't open too many files at once). - let result = (rename_fn)(&from_path, &final_path) - .err_tip(|| format!("Failed to rename temp file to final path {final_path:?}")); - - // In the event our move from temp file to final file fails we need to ensure we remove - // the entry from our map. - // Remember: At this point it is possible for another thread to have a reference to - // `entry`, so we can't delete the file, only drop() should ever delete files. - if let Err(err) = result { - warn!("Error while renaming file: {err} - {from_path:?} -> {final_path:?}"); - // Warning: To prevent deadlock we need to release our lock or during `remove_if()` - // it will call `unref()`, which triggers a write-lock on `encoded_file_path`. - drop(encoded_file_path); - // It is possible that the item in our map is no longer the item we inserted, - // So, we need to conditionally remove it only if the pointers are the same. - evicting_map - .remove_if(&digest, |map_entry| Arc::::ptr_eq(map_entry, &entry)) - .await; - return Err(err); + evicting_map.insert(digest, entry.clone()).await; + + let from_path = encoded_file_path.get_file_path(); + // Internally tokio spawns fs commands onto a blocking thread anyways. + // Since we are already on a blocking thread, we just need the `fs` wrapper to manage + // an open-file permit (ensure we don't open too many files at once). + let result = (rename_fn)(&from_path, &final_path) + .err_tip(|| format!("Failed to rename temp file to final path {final_path:?}")); + + // In the event our move from temp file to final file fails we need to ensure we remove + // the entry from our map. + // Remember: At this point it is possible for another thread to have a reference to + // `entry`, so we can't delete the file, only drop() should ever delete files. + if let Err(err) = result { + event!( + Level::ERROR, + ?err, + ?from_path, + ?final_path, + "Failed to rename file", + ); + // Warning: To prevent deadlock we need to release our lock or during `remove_if()` + // it will call `unref()`, which triggers a write-lock on `encoded_file_path`. + drop(encoded_file_path); + // It is possible that the item in our map is no longer the item we inserted, + // So, we need to conditionally remove it only if the pointers are the same. + evicting_map + .remove_if(&digest, |map_entry| Arc::::ptr_eq(map_entry, &entry)) + .await; + return Err(err); + } + encoded_file_path.path_type = PathType::Content; + encoded_file_path.digest = digest; + Ok(()) } - encoded_file_path.path_type = PathType::Content; - encoded_file_path.digest = digest; - Ok(()) - }) + .instrument(trace_span!("emplace_file")), + ) .await .err_tip(|| "Failed to create spawn in filesystem store update_file")? } diff --git a/nativelink-store/src/grpc_store.rs b/nativelink-store/src/grpc_store.rs index d33803f40..ed3cd686d 100644 --- a/nativelink-store/src/grpc_store.rs +++ b/nativelink-store/src/grpc_store.rs @@ -51,7 +51,7 @@ use rand::rngs::OsRng; use rand::Rng; use tokio::time::sleep; use tonic::{IntoRequest, Request, Response, Status, Streaming}; -use tracing::error; +use tracing::{event, Level}; use uuid::Uuid; // This store is usually a pass-through store, but can also be used as a CAS store. Using it as an @@ -582,7 +582,10 @@ impl Store for GrpcStore { let stream = Box::pin(unfold(local_state, |mut local_state| async move { if local_state.did_error { - error!("GrpcStore::update() polled stream after error was returned."); + event!( + Level::ERROR, + "GrpcStore::update() polled stream after error was returned" + ); return None; } let data = match local_state diff --git a/nativelink-store/src/ref_store.rs b/nativelink-store/src/ref_store.rs index 314dd7c8e..9206bb331 100644 --- a/nativelink-store/src/ref_store.rs +++ b/nativelink-store/src/ref_store.rs @@ -22,7 +22,7 @@ use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; use nativelink_util::store_trait::{Store, UploadSizeInfo}; -use tracing::error; +use tracing::{event, Level}; use crate::store_manager::StoreManager; @@ -139,8 +139,13 @@ impl Store for RefStore { fn inner_store(&self, digest: Option) -> &'_ dyn Store { match self.get_store() { Ok(store) => store.inner_store(digest), - Err(e) => { - error!("Failed to get store for digest: {e:?}"); + Err(err) => { + event!( + Level::ERROR, + ?digest, + ?err, + "Failed to get store for digest", + ); self } } @@ -149,8 +154,13 @@ impl Store for RefStore { fn inner_store_arc(self: Arc, digest: Option) -> Arc { match self.get_store() { Ok(store) => store.clone().inner_store_arc(digest), - Err(e) => { - error!("Failed to get store for digest: {e:?}"); + Err(err) => { + event!( + Level::ERROR, + ?digest, + ?err, + "Failed to get store for digest", + ); self } } diff --git a/nativelink-store/src/s3_store.rs b/nativelink-store/src/s3_store.rs index 6e11409df..cd886ad43 100644 --- a/nativelink-store/src/s3_store.rs +++ b/nativelink-store/src/s3_store.rs @@ -55,7 +55,7 @@ use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tokio::net::TcpStream; use tokio::sync::{mpsc, SemaphorePermit}; use tokio::time::sleep; -use tracing::info; +use tracing::{event, Level}; use crate::cas_utils::is_zero_digest; @@ -434,20 +434,29 @@ impl Store for S3Store { }; // If we failed to upload the file, check to see if we can retry. - let retry_result = result.map_or_else(|mut e| { + let retry_result = result.map_or_else(|mut err| { // Ensure our code is Code::Aborted, so the client can retry if possible. - e.code = Code::Aborted; + err.code = Code::Aborted; let bytes_received = reader.get_bytes_received(); if let Err(try_reset_err) = reader.try_reset_stream() { - let e = e + event!( + Level::ERROR, + ?bytes_received, + err = ?try_reset_err, + "Unable to reset stream after failed upload in S3Store::update" + ); + return RetryResult::Err(err .merge(try_reset_err) - .append(format!("Failed to retry upload with {bytes_received} bytes received in S3Store::update")); - log::error!("{e:?}"); - return RetryResult::Err(e); + .append(format!("Failed to retry upload with {bytes_received} bytes received in S3Store::update"))); } - let e = e.append(format!("Retry on upload happened with {bytes_received} bytes received in S3Store::update")); - log::info!("{e:?}"); - RetryResult::Retry(e) + let err = err.append(format!("Retry on upload happened with {bytes_received} bytes received in S3Store::update")); + event!( + Level::INFO, + ?err, + ?bytes_received, + "Retryable S3 error" + ); + RetryResult::Retry(err) }, |()| RetryResult::Ok(())); Some((retry_result, reader)) })) @@ -622,7 +631,7 @@ impl Store for S3Store { Code::Aborted, "Failed to abort multipart upload in S3 store : {e:?}" ); - info!("{err:?}"); + event!(Level::INFO, ?err, "Multipart upload error"); Err(err) }, |_| Ok(()), diff --git a/nativelink-util/BUILD.bazel b/nativelink-util/BUILD.bazel index 3e1c87a64..f72e8d34f 100644 --- a/nativelink-util/BUILD.bazel +++ b/nativelink-util/BUILD.bazel @@ -41,7 +41,6 @@ rust_library( "@crates//:bytes", "@crates//:futures", "@crates//:hex", - "@crates//:log", "@crates//:lru", "@crates//:parking_lot", "@crates//:pin-project-lite", diff --git a/nativelink-util/Cargo.toml b/nativelink-util/Cargo.toml index fce2b7c75..40f7a762f 100644 --- a/nativelink-util/Cargo.toml +++ b/nativelink-util/Cargo.toml @@ -14,7 +14,6 @@ blake3 = { version = "1.5.1", features = ["mmap"] } bytes = "1.6.0" futures = "0.3.30" hex = "0.4.3" -log = "0.4.21" lru = "0.12.3" parking_lot = "0.12.1" pin-project-lite = "0.2.14" diff --git a/nativelink-util/src/connection_manager.rs b/nativelink-util/src/connection_manager.rs index 4a7248df4..7002e83dd 100644 --- a/nativelink-util/src/connection_manager.rs +++ b/nativelink-util/src/connection_manager.rs @@ -24,7 +24,7 @@ use nativelink_config::stores::Retry; use nativelink_error::{make_err, Code, Error}; use tokio::sync::{mpsc, oneshot}; use tonic::transport::{channel, Channel, Endpoint}; -use tracing::{debug, error, info, warn}; +use tracing::{event, Level}; use crate::retry::{self, Retrier, RetryResult}; @@ -244,7 +244,11 @@ impl ConnectionManagerWorker { let Some((current_connection_index, endpoint)) = self.endpoints.get_mut(endpoint_index) else { // Unknown endpoint, this should never happen. - error!("Connection to unknown endpoint {endpoint_index} requested."); + event!( + Level::ERROR, + ?endpoint_index, + "Connection to unknown endpoint requested" + ); return; }; let is_backoff = connection_index.is_some(); @@ -253,14 +257,18 @@ impl ConnectionManagerWorker { *current_connection_index }); if is_backoff { - warn!( - "Connection {connection_index} failed to {:?}, reconnecting.", - endpoint.uri() + event!( + Level::WARN, + ?connection_index, + endpoint = ?endpoint.uri(), + "Connection failed, reconnecting" ); } else { - info!( - "Creating new connection {connection_index} to {:?}.", - endpoint.uri() + event!( + Level::INFO, + ?connection_index, + endpoint = ?endpoint.uri(), + "Creating new connection" ); } let identifier = ChannelIdentifier { @@ -439,7 +447,11 @@ impl tonic::codegen::Service } } Err(err) => { - debug!("Error while creating connection on channel: {err:?}"); + event!( + Level::DEBUG, + ?err, + "Error while creating connection on channel" + ); let _ = self.connection_tx.send(ConnectionRequest::Error(( self.channel.identifier, self.pending_channel.take().is_some(), diff --git a/nativelink-util/src/evicting_map.rs b/nativelink-util/src/evicting_map.rs index 46de82729..ee941073d 100644 --- a/nativelink-util/src/evicting_map.rs +++ b/nativelink-util/src/evicting_map.rs @@ -24,7 +24,7 @@ use futures::{future, join, StreamExt}; use lru::LruCache; use nativelink_config::stores::EvictionPolicy; use serde::{Deserialize, Serialize}; -use tracing::info; +use tracing::{event, Level}; use crate::common::DigestInfo; use crate::metrics_utils::{CollectorState, Counter, CounterWithTime, MetricsComponent}; @@ -275,7 +275,7 @@ where .lru .pop_lru() .expect("Tried to peek() then pop() but failed"); - info!("\x1b[0;31mEvicting Map\x1b[0m: Evicting {}", key.hash_str()); + event!(Level::INFO, ?key, "Evicting",); state.remove(&eviction_item, false).await; peek_entry = if let Some((_, entry)) = state.lru.peek_lru() { @@ -300,10 +300,7 @@ where let mut state = self.state.lock().await; let (key, eviction_item) = state.lru.pop_entry(digest)?; - info!( - "\x1b[0;31mEvicting Map\x1b[0m: Touch failed, evicting {}", - key.hash_str() - ); + event!(Level::INFO, ?key, "Touch failed, evicting",); state.remove(&eviction_item, false).await; None } diff --git a/nativelink-util/src/fs.rs b/nativelink-util/src/fs.rs index e9bba313b..ab36bf488 100644 --- a/nativelink-util/src/fs.rs +++ b/nativelink-util/src/fs.rs @@ -33,7 +33,7 @@ use tokio::io::{ }; use tokio::sync::{Semaphore, SemaphorePermit}; use tokio::time::timeout; -use tracing::error; +use tracing::{event, Level}; /// Default read buffer size when reading to/from disk. pub const DEFAULT_READ_BUFF_SIZE: usize = 16384; @@ -310,9 +310,11 @@ where pub fn set_open_file_limit(limit: usize) { let current_total = TOTAL_FILE_SEMAPHORES.load(Ordering::Acquire); if limit < current_total { - error!( + event!( + Level::ERROR, "set_open_file_limit({}) must be greater than {}", - limit, current_total + limit, + current_total ); return; } diff --git a/nativelink-util/src/retry.rs b/nativelink-util/src/retry.rs index 42a32c33f..0721eff4b 100644 --- a/nativelink-util/src/retry.rs +++ b/nativelink-util/src/retry.rs @@ -20,7 +20,7 @@ use futures::future::Future; use futures::stream::StreamExt; use nativelink_config::stores::{ErrorCode, Retry}; use nativelink_error::{make_err, Code, Error}; -use tracing::info; +use tracing::{event, Level}; struct ExponentialBackoff { current: Duration, @@ -154,14 +154,14 @@ impl Retrier { Some(RetryResult::Err(e)) => { return Err(e.append(format!("On attempt {attempt}"))); } - Some(RetryResult::Retry(e)) => { - if !self.should_retry(&e.code) { - info!("Not retrying permanent error on attempt {attempt}: {e:?}"); - return Err(e); + Some(RetryResult::Retry(err)) => { + if !self.should_retry(&err.code) { + event!(Level::ERROR, ?attempt, ?err, "Not retrying permanent error"); + return Err(err); } (self.sleep_fn)( iter.next() - .ok_or(e.append(format!("On attempt {attempt}")))?, + .ok_or(err.append(format!("On attempt {attempt}")))?, ) .await } diff --git a/nativelink-worker/src/local_worker.rs b/nativelink-worker/src/local_worker.rs index 2be295cb1..bf54bc40a 100644 --- a/nativelink-worker/src/local_worker.rs +++ b/nativelink-worker/src/local_worker.rs @@ -43,7 +43,7 @@ use tokio::sync::mpsc; use tokio::time::sleep; use tokio_stream::wrappers::UnboundedReceiverStream; use tonic::Streaming; -use tracing::{error, warn}; +use tracing::{error_span, event, instrument, Instrument, Level}; use crate::running_actions_manager::{ ExecutionConfiguration, Metrics as RunningActionManagerMetrics, RunningAction, @@ -218,10 +218,14 @@ impl<'a, T: WorkerApiClientTrait, U: RunningActionsManager> LocalWorkerImpl<'a, e ))?; - self.running_actions_manager - .kill_action(action_id) - .await - .err_tip(|| format!("Failed to send kill request for action {}", hex::encode(action_id)))? + if let Err(err) = self.running_actions_manager.kill_action(action_id).await { + event!( + Level::ERROR, + action_id = hex::encode(action_id), + ?err, + "Failed to send kill request for action" + ); + }; } Update::StartAction(start_execute) => { self.metrics.start_actions_received.inc(); @@ -248,7 +252,12 @@ impl<'a, T: WorkerApiClientTrait, U: RunningActionsManager> LocalWorkerImpl<'a, actions_in_transit.fetch_sub(1, Ordering::Release); r }) - .and_then(|action| + .and_then(|action| { + event!( + Level::INFO, + action_id = hex::encode(action.get_action_id()), + "Received request to run action" + ); action .clone() .prepare_action() @@ -262,7 +271,7 @@ impl<'a, T: WorkerApiClientTrait, U: RunningActionsManager> LocalWorkerImpl<'a, } result }) - ).await + }).await }); let running_actions_manager = self.running_actions_manager.clone(); @@ -274,7 +283,12 @@ impl<'a, T: WorkerApiClientTrait, U: RunningActionsManager> LocalWorkerImpl<'a, // Save in the action cache before notifying the scheduler that we've completed. if let Some(digest_info) = action_digest.clone().and_then(|action_digest| action_digest.try_into().ok()) { if let Err(err) = running_actions_manager.cache_action_result(digest_info, &mut action_result, try_hasher?).await { - error!("\x1b[0;31mError saving action in store\x1b[0m: {} - {:?}", err, action_digest); + event!( + Level::ERROR, + ?err, + ?action_digest, + "Error saving action in store", + ); } } let action_stage = ActionStage::Completed(action_result); @@ -305,10 +319,14 @@ impl<'a, T: WorkerApiClientTrait, U: RunningActionsManager> LocalWorkerImpl<'a, self.actions_in_transit.fetch_add(1, Ordering::Release); futures.push( - tokio::spawn(start_action_fut).map(move |res| { + tokio::spawn(start_action_fut.instrument(error_span!("worker_start_action"))).map(move |res| { let res = res.err_tip(|| "Failed to launch spawn")?; if let Err(err) = &res { - error!("\x1b[0;31mError executing action\x1b[0m: {}", err); + event!( + Level::ERROR, + ?err, + "Error executing action", + ); } add_future_channel .send(make_publish_future(res).boxed()) @@ -479,14 +497,15 @@ impl LocalWorker { Ok((worker_id, update_for_worker_stream)) } + #[instrument(skip(self), level = Level::INFO)] pub async fn run(mut self) -> Result<(), Error> { let sleep_fn = self .sleep_fn .take() .err_tip(|| "Could not unwrap sleep_fn in LocalWorker::run")?; let sleep_fn_pin = Pin::new(&sleep_fn); - let error_handler = Box::pin(move |e: Error| async move { - error!("{:?}", e); + let error_handler = Box::pin(move |err| async move { + event!(Level::ERROR, ?err, "Error"); (sleep_fn_pin)(Duration::from_secs_f32(CONNECTION_RETRY_DELAY_S)).await; }); @@ -518,10 +537,14 @@ impl LocalWorker { update_for_worker_stream, ), }; - warn!("Worker {} connected to scheduler", inner.worker_id); + event!( + Level::WARN, + worker_id = %inner.worker_id, + "Worker registered with scheduler" + ); // Now listen for connections and run all other services. - if let Err(e) = inner.run(update_for_worker_stream).await { + if let Err(err) = inner.run(update_for_worker_stream).await { 'no_more_actions: { // Ensure there are no actions in transit before we try to kill // all our actions. @@ -533,18 +556,16 @@ impl LocalWorker { } (sleep_fn_pin)(Duration::from_secs_f32(sleep_duration)).await; } - let e = make_err!( - Code::Internal, - "Actions in transit did not reach zero before we disconnected from the scheduler." - ); - error!("{e:?}"); - return Err(e); + const ERROR_MSG: &str = "Actions in transit did not reach zero before we disconnected from the scheduler"; + event!(Level::ERROR, ERROR_MSG); + return Err(err.append(ERROR_MSG)); } + event!(Level::ERROR, ?err, "Worker disconnected from scheduler"); // Kill off any existing actions because if we re-connect, we'll // get some more and it might resource lock us. self.running_actions_manager.kill_all().await; - (error_handler)(e).await; + (error_handler)(err).await; continue; // Try to connect again. } } diff --git a/nativelink-worker/src/running_actions_manager.rs b/nativelink-worker/src/running_actions_manager.rs index 8c39cd405..324851333 100644 --- a/nativelink-worker/src/running_actions_manager.rs +++ b/nativelink-worker/src/running_actions_manager.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::borrow::Cow; +use std::cmp::min; use std::collections::vec_deque::VecDeque; use std::collections::HashMap; use std::ffi::{OsStr, OsString}; @@ -75,7 +76,7 @@ use tokio::sync::{oneshot, watch}; use tokio::task::spawn_blocking; use tokio_stream::wrappers::ReadDirStream; use tonic::Request; -use tracing::{error, info}; +use tracing::{enabled, error_span, event, Instrument, Level}; use uuid::Uuid; pub type ActionId = [u8; 32]; @@ -524,8 +525,42 @@ async fn process_side_channel_file( })) } +async fn do_cleanup( + running_actions_manager: &RunningActionsManagerImpl, + action_id: &ActionId, + action_directory: &str, +) -> Result<(), Error> { + event!(Level::INFO, "Worker cleaning up"); + // Note: We need to be careful to keep trying to cleanup even if one of the steps fails. + let remove_dir_result = fs::remove_dir_all(action_directory) + .await + .err_tip(|| format!("Could not remove working directory {action_directory}")); + if let Err(err) = running_actions_manager.cleanup_action(action_id) { + event!( + Level::ERROR, + action_id = hex::encode(action_id), + ?err, + "Error cleaning up action" + ); + return Result::<(), Error>::Err(err).merge(remove_dir_result); + } + if let Err(err) = remove_dir_result { + event!( + Level::ERROR, + action_id = hex::encode(action_id), + ?err, + "Error removing working directory" + ); + return Err(err); + } + Ok(()) +} + #[async_trait] pub trait RunningAction: Sync + Send + Sized + Unpin + 'static { + /// Returns the action id of the action. + fn get_action_id(&self) -> &ActionId; + /// Anything that needs to execute before the actions is actually executed should happen here. async fn prepare_action(self: Arc) -> Result, Error>; @@ -695,7 +730,7 @@ impl RunningActionImpl { )) .await?; } - info!("\x1b[0;31mWorker Received Command\x1b[0m: {:?}", command); + event!(Level::INFO, ?command, "Worker received command",); { let mut state = self.state.lock(); state.command_proto = Some(command); @@ -737,7 +772,7 @@ impl RunningActionImpl { } else { command_proto.arguments.iter().map(AsRef::as_ref).collect() }; - info!("\x1b[0;31mWorker Executing\x1b[0m: {:?}", &args); + event!(Level::INFO, ?args, "Executing command",); let mut command_builder = process::Command::new(args[0]); command_builder .args(&args[1..]) @@ -831,7 +866,8 @@ impl RunningActionImpl { .err_tip(|| "Expected stderr to exist on command this should never happen")?; let mut child_process_guard = guard(child_process, |mut child_process| { - error!( + event!( + Level::ERROR, "Child process was not cleaned up before dropping the call to execute(), killing in background spawn." ); tokio::spawn(async move { child_process.kill().await }); @@ -872,8 +908,12 @@ impl RunningActionImpl { _ = &mut sleep_fut => { self.running_actions_manager.metrics.task_timeouts.inc(); killed_action = true; - if let Err(e) = child_process_guard.start_kill() { - error!("Could not kill process in RunningActionsManager for timeout : {:?}", e); + if let Err(err) = child_process_guard.start_kill() { + event!( + Level::ERROR, + ?err, + "Could not kill process in RunningActionsManager for action timeout", + ); } { let mut state = self.state.lock(); @@ -939,15 +979,18 @@ impl RunningActionImpl { }, _ = &mut kill_channel_rx => { killed_action = true; - if let Err(e) = child_process_guard.start_kill() { - error!( - "Could not kill process in RunningActionsManager for action {} : {:?}", - hex::encode(self.action_id), - e); + if let Err(err) = child_process_guard.start_kill() { + event!( + Level::ERROR, + action_id = hex::encode(self.action_id), + ?err, + "Could not kill process", + ); } else { - error!( - "Could not get child process id, maybe already dead? for action {}", - hex::encode(self.action_id) + event!( + Level::ERROR, + action_id = hex::encode(self.action_id), + "Could not get child process id, maybe already dead?", ); } { @@ -967,7 +1010,7 @@ impl RunningActionImpl { } async fn inner_upload_results(self: Arc) -> Result, Error> { - info!("\x1b[0;31mWorker Uploading Results\x1b[0m"); + event!(Level::INFO, "Worker uploading results",); let (mut command_proto, execution_result, mut execution_metadata) = { let mut state = self.state.lock(); state.execution_metadata.output_upload_start_timestamp = @@ -1105,12 +1148,18 @@ impl RunningActionImpl { let mut output_file_symlinks = vec![]; if execution_result.exit_code != 0 { - error!( - "Command returned exit code {} : {} {}", - execution_result.exit_code, - std::str::from_utf8(&execution_result.stdout).unwrap_or(""), - std::str::from_utf8(&execution_result.stderr).unwrap_or("") - ); + // Don't convert our stdout/stderr to strings unless we are need too. + if enabled!(Level::ERROR) { + let stdout = std::str::from_utf8(&execution_result.stdout).unwrap_or(""); + let stderr = std::str::from_utf8(&execution_result.stderr).unwrap_or(""); + event!( + Level::ERROR, + exit_code = ?execution_result.exit_code, + stdout = ?stdout[..min(stdout.len(), 1000)], + stderr = ?stderr[..min(stderr.len(), 1000)], + "Command returned non-zero exit code", + ); + } } let stdout_digest_fut = self.metrics().upload_stdout.wrap(async { @@ -1181,32 +1230,6 @@ impl RunningActionImpl { Ok(self) } - async fn inner_cleanup(self: Arc) -> Result, Error> { - info!("\x1b[0;31mWorker Cleanup\x1b[0m"); - // Note: We need to be careful to keep trying to cleanup even if one of the steps fails. - let remove_dir_result = fs::remove_dir_all(&self.action_directory) - .await - .err_tip(|| { - format!( - "Could not remove working directory {}", - self.action_directory - ) - }); - self.did_cleanup.store(true, Ordering::Relaxed); - if let Err(e) = self.running_actions_manager.cleanup_action(&self.action_id) { - error!("Error cleaning up action: {e:?}"); - return Result::, Error>::Err(e).merge(remove_dir_result.map(|_| self)); - } - if let Err(e) = remove_dir_result { - error!( - "Error removing working for action {} directory: {e:?}", - hex::encode(self.action_id) - ); - return Err(e); - } - Ok(self) - } - async fn inner_get_finished_result(self: Arc) -> Result { let mut state = self.state.lock(); state @@ -1218,15 +1241,43 @@ impl RunningActionImpl { impl Drop for RunningActionImpl { fn drop(&mut self) { - assert!( - self.did_cleanup.load(Ordering::Relaxed), - "RunningActionImpl did not cleanup. This is a violation of how RunningActionImpl's requirements" + if self.did_cleanup.load(Ordering::Acquire) { + return; + } + event!( + Level::ERROR, + action_id = hex::encode(self.action_id), + "RunningActionImpl did not cleanup. This is a violation of the requirements, will attempt to do it in the background." + ); + let running_actions_manager = self.running_actions_manager.clone(); + let action_id = self.action_id; + let action_directory = self.action_directory.clone(); + tokio::spawn( + async move { + let Err(err) = + do_cleanup(&running_actions_manager, &action_id, &action_directory).await + else { + return; + }; + event!( + Level::ERROR, + action_id = hex::encode(action_id), + ?action_directory, + ?err, + "Error cleaning up action" + ); + } + .instrument(error_span!("RunningActionImpl::drop")), ); } } #[async_trait] impl RunningAction for RunningActionImpl { + fn get_action_id(&self) -> &ActionId { + &self.action_id + } + async fn prepare_action(self: Arc) -> Result, Error> { self.metrics() .clone() @@ -1255,7 +1306,16 @@ impl RunningAction for RunningActionImpl { self.metrics() .clone() .cleanup - .wrap(Self::inner_cleanup(self)) + .wrap(async move { + let result = do_cleanup( + &self.running_actions_manager, + &self.action_id, + &self.action_directory, + ) + .await; + self.did_cleanup.store(true, Ordering::Release); + result.map(move |()| self) + }) .await } @@ -1697,15 +1757,21 @@ impl RunningActionsManagerImpl { // Note: We do not capture metrics on this call, only `.kill_all()`. // Important: When the future returns the process may still be running. async fn kill_action(action: Arc) { + event!( + Level::WARN, + action_id = ?hex::encode(action.action_id), + "Sending kill to running action", + ); let kill_channel_tx = { let mut action_state = action.state.lock(); action_state.kill_channel_tx.take() }; if let Some(kill_channel_tx) = kill_channel_tx { if kill_channel_tx.send(()).is_err() { - error!( - "Error sending kill to running action {}", - hex::encode(action.action_id) + event!( + Level::ERROR, + action_id = ?hex::encode(action.action_id), + "Error sending kill to running action", ); } } @@ -1730,7 +1796,11 @@ impl RunningActionsManager for RunningActionsManagerImpl { .and_then(|time| time.try_into().ok()) .unwrap_or(SystemTime::UNIX_EPOCH); let action_info = self.create_action_info(start_execute, queued_timestamp).await?; - info!("\x1b[0;31mWorker Received Action\x1b[0m: {:?}", action_info); + event!( + Level::INFO, + ?action_info, + "Worker received action", + ); let action_id = action_info.unique_qualifier.get_hash(); let action_directory = self.make_action_directory(&action_id).await?; let execution_metadata = ExecutionMetadata { diff --git a/nativelink-worker/src/worker_utils.rs b/nativelink-worker/src/worker_utils.rs index fc9e9984e..8021a3720 100644 --- a/nativelink-worker/src/worker_utils.rs +++ b/nativelink-worker/src/worker_utils.rs @@ -24,7 +24,7 @@ use nativelink_error::{make_err, make_input_err, Error, ResultExt}; use nativelink_proto::build::bazel::remote::execution::v2::platform::Property; use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::SupportedProperties; use tokio::process; -use tracing::info; +use tracing::{event, Level}; pub async fn make_supported_properties( worker_properties: &HashMap, @@ -61,10 +61,7 @@ pub async fn make_supported_properties( process.stdin(Stdio::null()); let err_fn = || format!("Error executing property_name {property_name} command"); - info!( - "Spawning process for cmd: '{}' for property: '{}'", - cmd, property_name - ); + event!(Level::INFO, cmd, property_name, "Spawning process",); let process_output = process.output().await.err_tip(err_fn)?; if !process_output.status.success() { return Err(make_err!( diff --git a/nativelink-worker/tests/utils/mock_running_actions_manager.rs b/nativelink-worker/tests/utils/mock_running_actions_manager.rs index 1e61da83e..24a0c533d 100644 --- a/nativelink-worker/tests/utils/mock_running_actions_manager.rs +++ b/nativelink-worker/tests/utils/mock_running_actions_manager.rs @@ -347,6 +347,10 @@ impl MockRunningAction { #[async_trait] impl RunningAction for MockRunningAction { + fn get_action_id(&self) -> &ActionId { + unreachable!("not implemented for tests"); + } + async fn prepare_action(self: Arc) -> Result, Error> { self.tx_call .send(RunningActionCalls::PrepareAction) diff --git a/src/bin/nativelink.rs b/src/bin/nativelink.rs index a00fa6f80..47f5a8d09 100644 --- a/src/bin/nativelink.rs +++ b/src/bin/nativelink.rs @@ -66,7 +66,7 @@ use tokio_rustls::TlsAcceptor; use tonic::codec::CompressionEncoding; use tonic::transport::Server as TonicServer; use tower::util::ServiceExt; -use tracing::{error, warn}; +use tracing::{error_span, event, Instrument, Level}; use tracing_subscriber::filter::{EnvFilter, LevelFilter}; #[global_allocator] @@ -640,54 +640,91 @@ async fn inner_main( http.http2_max_header_list_size(value); } - warn!("Ready, listening on {}", socket_addr); + event!(Level::WARN, "Ready, listening on {socket_addr}",); root_futures.push(Box::pin(async move { loop { // Wait for client to connect. let (tcp_stream, remote_addr) = match tcp_listener.accept().await { Ok(result) => result, - Err(e) => { - error!( - "{:?}", - Result::<(), _>::Err(e).err_tip(|| "Failed to accept tcp connection") - ); + Err(err) => { + event!(Level::ERROR, ?err, "Failed to accept tcp connection"); continue; } }; + event!( + target: "nativelink::services", + Level::INFO, + ?remote_addr, + ?socket_addr, + "Client connected" + ); connected_clients_mux.inner.lock().insert(remote_addr); connected_clients_mux.counter.inc(); // This is the safest way to guarantee that if our future // is ever dropped we will cleanup our data. let scope_guard = guard( - connected_clients_mux.clone(), - move |connected_clients_mux| { - connected_clients_mux.inner.lock().remove(&remote_addr); + Arc::downgrade(&connected_clients_mux), + move |weak_connected_clients_mux| { + event!( + target: "nativelink::services", + Level::INFO, + ?remote_addr, + ?socket_addr, + "Client disconnected" + ); + if let Some(connected_clients_mux) = weak_connected_clients_mux.upgrade() { + connected_clients_mux.inner.lock().remove(&remote_addr); + } }, ); let (http, svc) = (http.clone(), svc.clone()); let fut = if let Some(tls_acceptor) = &maybe_tls_acceptor { let tls_stream = match tls_acceptor.accept(tcp_stream).await { Ok(result) => result, - Err(e) => { - error!( - "{:?}", - Result::<(), _>::Err(e).err_tip(|| "Failed to accept tls stream") - ); + Err(err) => { + event!(Level::ERROR, ?err, "Failed to accept tls stream"); continue; } }; http.serve_connection(tls_stream, svc).left_future() } else { http.serve_connection(tcp_stream, svc).right_future() - }; - tokio::spawn(async move { - // Move it into our spawn, so if our spawn dies the cleanup happens. - let _guard = scope_guard; - if let Err(e) = fut.await { - error!("Failed running service : {:?}", e); + } + .map_ok_or_else( + |err| { + use std::error::Error; + if let Some(inner_err) = err.source() { + if let Some(io_err) = inner_err.downcast_ref::() { + if io_err.kind() == std::io::ErrorKind::NotConnected { + return Ok(()); + } + } + } + Err(err) + }, + Ok, + ); + tokio::spawn( + async move { + // Move it into our spawn, so if our spawn dies the cleanup happens. + let _guard = scope_guard; + if let Err(err) = fut.await { + event!( + target: "nativelink::services", + Level::ERROR, + ?err, + "Failed running service" + ); + } } - }); + .instrument(error_span!( + target: "nativelink::services", + "http_connection", + ?remote_addr, + ?socket_addr + )), + ); } })); } @@ -756,8 +793,8 @@ async fn inner_main( } let worker_metrics = root_worker_metrics.sub_registry_with_prefix(&name); local_worker.register_metrics(worker_metrics); - worker_names.insert(name); - tokio::spawn(local_worker.run()) + worker_names.insert(name.clone()); + tokio::spawn(local_worker.run().instrument(error_span!("worker", ?name))) } }; root_futures.push(Box::pin(spawn_fut.map_ok_or_else(|e| Err(e.into()), |v| v))); @@ -792,16 +829,16 @@ fn main() -> Result<(), Box> { .with( tracing_subscriber::fmt::layer() .pretty() - .with_thread_ids(true) - .with_thread_names(true) + // .with_span_events(FmtSpan::CLOSE) + .with_timer(tracing_subscriber::fmt::time::time()) .with_filter(env_filter), ) .init(); } else { tracing_subscriber::fmt() .pretty() - .with_thread_ids(true) - .with_thread_names(true) + // .with_span_events(FmtSpan::CLOSE) + .with_timer(tracing_subscriber::fmt::time::time()) .with_env_filter(env_filter) .init(); }