diff --git a/nativelink-util/BUILD.bazel b/nativelink-util/BUILD.bazel index dd409deb8..cfee928a4 100644 --- a/nativelink-util/BUILD.bazel +++ b/nativelink-util/BUILD.bazel @@ -90,6 +90,7 @@ rust_test_suite( "tests/fs_test.rs", "tests/health_utils_test.rs", "tests/operation_id_tests.rs", + "tests/origin_event_test.rs", "tests/proto_stream_utils_test.rs", "tests/resource_info_test.rs", "tests/retry_test.rs", diff --git a/nativelink-util/src/origin_event.rs b/nativelink-util/src/origin_event.rs index 49a13e95e..6878af23a 100644 --- a/nativelink-util/src/origin_event.rs +++ b/nativelink-util/src/origin_event.rs @@ -47,14 +47,68 @@ const ORIGIN_EVENT_VERSION: u32 = 0; static NODE_ID: OnceLock<[u8; 6]> = OnceLock::new(); +/// Returns a unique ID for the given event. +/// This ID is used to identify the event type. +/// The max value that could be output is 0x0FFF, +/// meaning you may use the first nibble for other +/// purposes. +#[inline] +pub fn get_id_for_event(event: &Event) -> [u8; 2] { + match &event.event { + None => [0x00, 0x00], + Some(event::Event::Request(req)) => match req.event { + None => [0x01, 0x00], + Some(request_event::Event::GetCapabilitiesRequest(_)) => [0x01, 0x01], + Some(request_event::Event::GetActionResultRequest(_)) => [0x01, 0x02], + Some(request_event::Event::UpdateActionResultRequest(_)) => [0x01, 0x03], + Some(request_event::Event::FindMissingBlobsRequest(_)) => [0x01, 0x04], + Some(request_event::Event::BatchReadBlobsRequest(_)) => [0x01, 0x05], + Some(request_event::Event::BatchUpdateBlobsRequest(_)) => [0x01, 0x06], + Some(request_event::Event::GetTreeRequest(_)) => [0x01, 0x07], + Some(request_event::Event::ReadRequest(_)) => [0x01, 0x08], + Some(request_event::Event::WriteRequest(())) => [0x01, 0x09], + Some(request_event::Event::QueryWriteStatusRequest(_)) => [0x01, 0x0A], + Some(request_event::Event::ExecuteRequest(_)) => [0x01, 0x0B], + Some(request_event::Event::WaitExecutionRequest(_)) => [0x01, 0x0C], + }, + Some(event::Event::Response(res)) => match res.event { + None => [0x02, 0x00], + Some(response_event::Event::Error(_)) => [0x02, 0x01], + Some(response_event::Event::ServerCapabilities(_)) => [0x02, 0x02], + Some(response_event::Event::ActionResult(_)) => [0x02, 0x03], + Some(response_event::Event::FindMissingBlobsResponse(_)) => [0x02, 0x04], + Some(response_event::Event::BatchReadBlobsResponse(_)) => [0x02, 0x05], + Some(response_event::Event::BatchUpdateBlobsResponse(_)) => [0x02, 0x06], + Some(response_event::Event::WriteResponse(_)) => [0x02, 0x07], + Some(response_event::Event::QueryWriteStatusResponse(_)) => [0x02, 0x08], + Some(response_event::Event::Empty(())) => [0x02, 0x09], + }, + Some(event::Event::Stream(stream)) => match stream.event { + None => [0x03, 0x00], + Some(stream_event::Event::Error(_)) => [0x03, 0x01], + Some(stream_event::Event::GetTreeResponse(_)) => [0x03, 0x02], + Some(stream_event::Event::DataLength(_)) => [0x03, 0x03], + Some(stream_event::Event::WriteRequest(_)) => [0x03, 0x04], + Some(stream_event::Event::Operation(_)) => [0x03, 0x05], + }, + } +} + /// Returns a unique node ID for this process. -pub fn get_node_id() -> &'static [u8; 6] { - NODE_ID.get_or_init(|| { +pub fn get_node_id(event: Option<&Event>) -> [u8; 6] { + let mut node_id = *NODE_ID.get_or_init(|| { let mut rng = rand::thread_rng(); let mut out = [0; 6]; rng.fill_bytes(&mut out); out - }) + }); + let Some(event) = event else { + return node_id; + }; + let event_id = get_id_for_event(event); + node_id[0] = (node_id[0] & 0xF0) | event_id[0]; + node_id[1] = event_id[1]; + node_id } pub struct OriginEventCollector { @@ -77,7 +131,7 @@ impl OriginEventCollector { } async fn publish_origin_event(&self, event: Event, parent_event_id: Option) -> Uuid { - let event_id = Uuid::now_v6(get_node_id()); + let event_id = Uuid::now_v6(&get_node_id(Some(&event))); let parent_event_id = parent_event_id.map_or_else(String::new, |id| id.as_hyphenated().to_string()); // Failing to send this event means that the receiver has been dropped. diff --git a/nativelink-util/src/origin_event_publisher.rs b/nativelink-util/src/origin_event_publisher.rs index ebdfde521..83fdbb946 100644 --- a/nativelink-util/src/origin_event_publisher.rs +++ b/nativelink-util/src/origin_event_publisher.rs @@ -84,7 +84,7 @@ impl OriginEventPublisher { } async fn handle_batch(&self, batch: &mut Vec) { - let uuid = Uuid::now_v6(get_node_id()); + let uuid = Uuid::now_v6(&get_node_id(None)); let events = OriginEvents { // Clippy wants us to use use `mem::take`, but this would // move all capacity as well to the new vector. Since it is diff --git a/nativelink-util/tests/origin_event_test.rs b/nativelink-util/tests/origin_event_test.rs new file mode 100644 index 000000000..7e845ebda --- /dev/null +++ b/nativelink-util/tests/origin_event_test.rs @@ -0,0 +1,164 @@ +// Copyright 2024 The NativeLink Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use nativelink_macro::nativelink_test; +use nativelink_proto::com::github::trace_machina::nativelink::events::{ + event, request_event, response_event, stream_event, Event, RequestEvent, ResponseEvent, + StreamEvent, +}; +use nativelink_util::origin_event::get_id_for_event; + +macro_rules! event_assert { + ($event:ident, $val:expr) => { + assert_eq!( + get_expected_value(&$event), + get_id_for_event(&$event), + "Incorrect event id for {}", + stringify!($val) + ); + }; +} + +macro_rules! test_event { + (Request, None) => { + let event = Event { + event: Some(event::Event::Request(RequestEvent { event: None })), + }; + event_assert!(event, Request(None)); + }; + (Request, $enum_type:ident) => { + let event = Event { + event: Some(event::Event::Request(RequestEvent { + event: Some(request_event::Event::$enum_type(Default::default())), + })), + }; + event_assert!(event, Request($enum_type)); + }; + (Response, None) => { + let event = Event { + event: Some(event::Event::Response(ResponseEvent { event: None })), + }; + event_assert!(event, Response(None)); + }; + (Response, $enum_type:ident) => { + let event = Event { + event: Some(event::Event::Response(ResponseEvent { + event: Some(response_event::Event::$enum_type(Default::default())), + })), + }; + event_assert!(event, Response($enum_type)); + }; + (Stream, None) => { + let event = Event { + event: Some(event::Event::Stream(StreamEvent { event: None })), + }; + event_assert!(event, Stream(None)); + }; + (Stream, $enum_type:ident) => { + let event = Event { + event: Some(event::Event::Stream(StreamEvent { + event: Some(stream_event::Event::$enum_type(Default::default())), + })), + }; + event_assert!(event, Stream($enum_type)); + }; +} + +#[nativelink_test] +fn get_id_for_event_test() { + fn get_expected_value(event: &Event) -> [u8; 2] { + match &event.event { + None => [0x00, 0x00], + Some(event::Event::Request(req)) => { + match req.event { + None => [0x01, 0x00], + Some(request_event::Event::GetCapabilitiesRequest(_)) => [0x01, 0x01], + Some(request_event::Event::GetActionResultRequest(_)) => [0x01, 0x02], + Some(request_event::Event::UpdateActionResultRequest(_)) => [0x01, 0x03], + Some(request_event::Event::FindMissingBlobsRequest(_)) => [0x01, 0x04], + Some(request_event::Event::BatchReadBlobsRequest(_)) => [0x01, 0x05], + Some(request_event::Event::BatchUpdateBlobsRequest(_)) => [0x01, 0x06], + Some(request_event::Event::GetTreeRequest(_)) => [0x01, 0x07], + Some(request_event::Event::ReadRequest(_)) => [0x01, 0x08], + Some(request_event::Event::WriteRequest(())) => [0x01, 0x09], + Some(request_event::Event::QueryWriteStatusRequest(_)) => [0x01, 0x0A], + Some(request_event::Event::ExecuteRequest(_)) => [0x01, 0x0B], + Some(request_event::Event::WaitExecutionRequest(_)) => [0x01, 0x0C], + // Don't forget to add new entries to test cases. + } + } + Some(event::Event::Response(res)) => { + match res.event { + None => [0x02, 0x00], + Some(response_event::Event::Error(_)) => [0x02, 0x01], + Some(response_event::Event::ServerCapabilities(_)) => [0x02, 0x02], + Some(response_event::Event::ActionResult(_)) => [0x02, 0x03], + Some(response_event::Event::FindMissingBlobsResponse(_)) => [0x02, 0x04], + Some(response_event::Event::BatchReadBlobsResponse(_)) => [0x02, 0x05], + Some(response_event::Event::BatchUpdateBlobsResponse(_)) => [0x02, 0x06], + Some(response_event::Event::WriteResponse(_)) => [0x02, 0x07], + Some(response_event::Event::QueryWriteStatusResponse(_)) => [0x02, 0x08], + Some(response_event::Event::Empty(())) => [0x02, 0x09], + // Don't forget to add new entries to test cases. + } + } + Some(event::Event::Stream(stream)) => { + match stream.event { + None => [0x03, 0x00], + Some(stream_event::Event::Error(_)) => [0x03, 0x01], + Some(stream_event::Event::GetTreeResponse(_)) => [0x03, 0x02], + Some(stream_event::Event::DataLength(_)) => [0x03, 0x03], + Some(stream_event::Event::WriteRequest(_)) => [0x03, 0x04], + Some(stream_event::Event::Operation(_)) => [0x03, 0x05], + // Don't forget to add new entries to test cases. + } + } + } + } + + let event = Event { event: None }; + event_assert!(event, None); + + test_event!(Request, None); + test_event!(Request, GetCapabilitiesRequest); + test_event!(Request, GetActionResultRequest); + test_event!(Request, UpdateActionResultRequest); + test_event!(Request, FindMissingBlobsRequest); + test_event!(Request, BatchReadBlobsRequest); + test_event!(Request, BatchUpdateBlobsRequest); + test_event!(Request, GetTreeRequest); + test_event!(Request, ReadRequest); + test_event!(Request, WriteRequest); + test_event!(Request, QueryWriteStatusRequest); + test_event!(Request, ExecuteRequest); + test_event!(Request, WaitExecutionRequest); + + test_event!(Response, None); + test_event!(Response, Error); + test_event!(Response, ServerCapabilities); + test_event!(Response, ActionResult); + test_event!(Response, FindMissingBlobsResponse); + test_event!(Response, BatchReadBlobsResponse); + test_event!(Response, BatchUpdateBlobsResponse); + test_event!(Response, WriteResponse); + test_event!(Response, QueryWriteStatusResponse); + test_event!(Response, Empty); + + test_event!(Stream, None); + test_event!(Stream, Error); + test_event!(Stream, GetTreeResponse); + test_event!(Stream, DataLength); + test_event!(Stream, WriteRequest); + test_event!(Stream, Operation); +}