Skip to content

Commit

Permalink
Add OriginEventPublisher
Browse files Browse the repository at this point in the history
Adds ability for nativelink to publish events to a store/database
that can be used by another service to aggregate and/or visualize
what is happening related to requests, users or other related
metadata. By default bazel's metadata will be published with every
event if present giving the ability to combine with BEP data.
  • Loading branch information
allada committed Dec 10, 2024
1 parent 8782c0b commit 32437c5
Show file tree
Hide file tree
Showing 39 changed files with 13,157 additions and 663 deletions.
866 changes: 321 additions & 545 deletions Cargo.lock

Large diffs are not rendered by default.

51 changes: 51 additions & 0 deletions nativelink-config/src/cas_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,52 @@ pub struct BepConfig {
/// The store name referenced in the `stores` map in the main config.
#[serde(deserialize_with = "convert_string_with_shellexpand")]
pub store: StoreRefName,

/// The config related to identifying the client.
/// The value of this header will be used to identify the caller and
/// will be added to the [`BuildEvent::identity`] field of the message.
/// Default: {see `IdentityHeaderSpec`}
#[serde(default)]
pub experimental_identity_header: IdentityHeaderSpec,
}

#[derive(Deserialize, Clone, Debug, Default)]
pub struct IdentityHeaderSpec {
/// The name of the header to look for the identity in.
/// Default: "x-identity"
#[serde(default, deserialize_with = "convert_optional_string_with_shellexpand")]
pub header_name: Option<String>,

/// If the header is required to be set or fail the request.
#[serde(default)]
pub required: bool,
}

#[derive(Deserialize, Clone, Debug)]
pub struct OriginEventsPublisherConfig {
/// The store to publish nativelink events to.
/// The store name referenced in the `stores` map in the main config.
#[serde(deserialize_with = "convert_string_with_shellexpand")]
pub store: StoreRefName,
}

#[derive(Deserialize, Clone, Debug)]
pub struct OriginEventsConfig {
/// The publisher configuration for origin events.
pub publisher: OriginEventsPublisherConfig,

/// The maximum number of events to queue before applying back pressure.
/// IMPORTANT: Backpressure causes all clients to slow down significantly.
/// Zero is default.
///
/// Default: 65536
#[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
pub max_event_queue_size: usize,

/// The config related to identifying the client.
/// Default: {see `IdentityHeaderSpec`}
#[serde(default)]
pub identity_header: IdentityHeaderSpec,
}

#[derive(Deserialize, Debug)]
Expand Down Expand Up @@ -738,6 +784,11 @@ pub struct CasConfig {
/// Servers to setup for this process.
pub servers: Vec<ServerConfig>,

/// Experimental - Origin events configuration. This is the service that will
/// collect and publish nativelink events to a store for processing by an
/// external service.
pub experimental_origin_events: Option<OriginEventsConfig>,

/// Any global configurations that apply to all modules live here.
pub global: Option<GlobalConfig>,
}
18 changes: 18 additions & 0 deletions nativelink-proto/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,20 @@ PROTO_NAMES = [
"build.bazel.remote.execution.v2",
"build.bazel.semver",
"com.github.trace_machina.nativelink.remote_execution",
"com.github.trace_machina.nativelink.events",
"google.api",
"google.bytestream",
"google.devtools.build.v1",
"google.longrunning",
"google.rpc",
"build_event_stream",
"command_line",
"devtools.build.lib.packages.metrics",
"blaze",
"options",
"failure_details",
"blaze.invocation_policy",
"blaze.strategy_policy",
]

rust_binary(
Expand Down Expand Up @@ -69,6 +78,7 @@ genrule(
srcs = [
"build/bazel/remote/execution/v2/remote_execution.proto",
"build/bazel/semver/semver.proto",
"com/github/trace_machina/nativelink/remote_execution/events.proto",
"com/github/trace_machina/nativelink/remote_execution/worker_api.proto",
"google/api/annotations.proto",
"google/api/client.proto",
Expand All @@ -86,6 +96,14 @@ genrule(
"google/protobuf/timestamp.proto",
"google/protobuf/wrappers.proto",
"google/rpc/status.proto",
"src/main/java/com/google/devtools/build/lib/buildeventstream/proto/build_event_stream.proto",
"src/main/java/com/google/devtools/build/lib/packages/metrics/package_load_metrics.proto",
"src/main/protobuf/command_line.proto",
"src/main/protobuf/action_cache.proto",
"src/main/protobuf/option_filters.proto",
"src/main/protobuf/failure_details.proto",
"src/main/protobuf/invocation_policy.proto",
"src/main/protobuf/strategy_policy.proto",
],
outs = ["{}.pb.rs".format(name) for name in PROTO_NAMES],
cmd = select({
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
// Copyright 2024 The NativeLink Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

syntax = "proto3";

package com.github.trace_machina.nativelink.events;

import "build/bazel/remote/execution/v2/remote_execution.proto";
import "google/bytestream/bytestream.proto";
import "google/devtools/build/v1/publish_build_event.proto";
import "google/longrunning/operations.proto";
import "google/protobuf/empty.proto";
import "google/protobuf/timestamp.proto";
import "google/rpc/status.proto";

/// Same as build.bazel.remote.execution.v2.BatchUpdateBlobsRequest,
/// but without the data field, and add a `data_len` field.
message BatchUpdateBlobsRequestOverride {
message Request {
build.bazel.remote.execution.v2.Digest digest = 1;
reserved 2; // Was data, but we are only tracking the length in `data_len`.
build.bazel.remote.execution.v2.Compressor.Value compressor = 3;

// Override/new field to track the length of the data.
uint64 data_len = 15; // Using 15 to stay at 1 byte, but higher than 3.
}

string instance_name = 1;
repeated Request requests = 2;
build.bazel.remote.execution.v2.DigestFunction.Value digest_function = 5;
}

/// Same as build.bazel.remote.execution.v2.BatchReadBlobsResponse,
/// but without the data field, and add a `data_len` field.
message BatchReadBlobsResponseOverride {
message Response {
build.bazel.remote.execution.v2.Digest digest = 1;
reserved 2; // Was data, but we are only tracking the length in `data_len`.
build.bazel.remote.execution.v2.Compressor.Value compressor = 4;
google.rpc.Status status = 3;

// Override/new field to track the length of the data.
uint64 data_len = 15; // Using 15 to stay at 1 byte, but higher than 3.
}
repeated Response responses = 1;
}

/// Same as google.bytestream.WriteRequest, but without the data field,
/// and add a `data_len` field.
message WriteRequestOverride {
string resource_name = 1;
int64 write_offset = 2;
bool finish_write = 3;
reserved 10; // Was data, but we are only tracking the length in `data_len`.

// Override/new field to track the length of the data.
uint64 data_len = 15; // Using 15 to stay at 1 byte, but higher than 3.
}

message GetTreeStreamResponse {
oneof response {
build.bazel.remote.execution.v2.GetTreeResponse success = 1;
google.rpc.Status error = 2;
}

reserved 3; // NextId.
}

message ReadStreamResponse {
oneof response {
uint64 success = 1;
google.rpc.Status error = 2;
}

reserved 3; // NextId.
}

message WriteStreamRequest {
oneof response {
WriteRequestOverride success = 1;
google.rpc.Status error = 2;
}

reserved 3; // NextId.
}

message ExecuteStreamResponse {
oneof response {
google.longrunning.Operation success = 1;
google.rpc.Status error = 2;
}

reserved 3; // NextId.
}

/// Nativelink event that has occurred.
message OriginEvent {
/// The version of this message.
uint32 version = 1;

/// The event UUIDv6. This is a unique identifier for the event for the
/// server that generated the event.
/// Note: The timestamp of when the event occurred is encoded in the UUID.
string event_id = 2;

/// [optional] The parent event UUID. This is used to track the
/// parent event that generated this event. This is useful for
/// tracking the flow of events.
string parent_event_id = 3;

/// If the client is bazel, this is the meatadata that was sent with the
/// request. This is useful for tracking the flow of events.
build.bazel.remote.execution.v2.RequestMetadata bazel_request_metadata = 4;

/// The identity header that generated the event. This will be populated with
/// the value of the specified by the `IdentityHeaderSpec::header_name`.
string identity = 5;

// These are reserved for data that is going to be frequently used.
// Fields <=15 save 1 byte in the encoding, so we are reserving all fields
// not used <=15 for future fields that will be sent a lot.
reserved 6 to 15; // Allowed to be used.

/// The event that occurred.
oneof event {
build.bazel.remote.execution.v2.GetCapabilitiesRequest get_capabilities_request = 16;
build.bazel.remote.execution.v2.ServerCapabilities get_capabilities_response = 17;
// Note: GetCapabilitiesRequest currently cannot error.

build.bazel.remote.execution.v2.GetActionResultRequest get_action_result_request = 18;
build.bazel.remote.execution.v2.ActionResult get_action_result_response = 19;
google.rpc.Status get_action_result_error = 20;

build.bazel.remote.execution.v2.UpdateActionResultRequest update_action_result_request = 21;
build.bazel.remote.execution.v2.ActionResult update_action_result_response = 22;
google.rpc.Status update_action_result_error = 23;

build.bazel.remote.execution.v2.FindMissingBlobsRequest find_missing_blobs_request = 24;
build.bazel.remote.execution.v2.FindMissingBlobsResponse find_missing_blobs_response = 25;
google.rpc.Status find_missing_blobs_error = 26;

build.bazel.remote.execution.v2.BatchReadBlobsRequest batch_read_blobs_request = 27;
BatchReadBlobsResponseOverride batch_read_blobs_response = 28;
google.rpc.Status batch_read_blobs_error = 29;

BatchUpdateBlobsRequestOverride batch_update_blobs_request = 30;
build.bazel.remote.execution.v2.BatchUpdateBlobsResponse batch_update_blobs_response = 31;
google.rpc.Status batch_update_blobs_error = 32;

build.bazel.remote.execution.v2.GetTreeRequest get_tree_request = 33;
google.protobuf.Empty get_tree_response = 34;
google.rpc.Status get_tree_error = 35;
GetTreeStreamResponse get_tree_stream_response = 36;

google.bytestream.ReadRequest read_request = 37;
google.protobuf.Empty read_response = 38;
google.rpc.Status read_error = 39;
ReadStreamResponse read_stream_response = 40;

google.protobuf.Empty write_request = 41;
google.bytestream.WriteResponse write_response = 42;
google.rpc.Status write_error = 43;
WriteStreamRequest write_stream_request = 44;

google.bytestream.QueryWriteStatusRequest query_write_status_request = 45;
google.bytestream.QueryWriteStatusResponse query_write_status_response = 46;
google.rpc.Status query_write_status_error = 47;

build.bazel.remote.execution.v2.ExecuteRequest execute_request = 48;
google.protobuf.Empty execute_response = 49;
google.rpc.Status execute_error = 50;
ExecuteStreamResponse execute_stream_response = 51;

build.bazel.remote.execution.v2.WaitExecutionRequest wait_execution_request = 52;
google.protobuf.Empty wait_execution_response = 53;
google.rpc.Status wait_execute_error = 54;
ExecuteStreamResponse wait_execute_stream_response = 55;
}

reserved 56; // NextId.
}

/// Batch of events that have occurred.
message OriginEvents {
repeated OriginEvent events = 1;

reserved 2; // NextId.
}

/// Bep event that has occurred.
message BepEvent {
/// The version of this message.
uint32 version = 1;

/// The identity header that generated the event. This will be populated
/// with the header value keyed by the specified by the
/// `IdentityHeaderSpec::header_name`.
string identity = 2;

/// The event that occurred.
oneof event {
google.devtools.build.v1.PublishLifecycleEventRequest lifecycle_event = 3;
google.devtools.build.v1.PublishBuildToolEventStreamRequest build_tool_event = 4;
}

reserved 5; // NextId.
}
9 changes: 8 additions & 1 deletion nativelink-proto/gen_lib_rs_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,14 @@
// This file is auto-generated. To update it, run:
// `bazel run nativelink-proto:update_protos`
#![allow(clippy::default_trait_access, clippy::doc_markdown)]
#![allow(
clippy::default_trait_access,
clippy::doc_lazy_continuation,
clippy::doc_markdown,
clippy::doc_markdown,
clippy::large_enum_variant,
rustdoc::invalid_html_tags,
)]
"""


Expand Down
Loading

0 comments on commit 32437c5

Please sign in to comment.