Skip to content

Commit

Permalink
Add OriginEventPublisher
Browse files Browse the repository at this point in the history
Adds ability for nativelink to publish events to a store/database
that can be used by another service to aggregate and/or visualize
what is happening related to requests, users or other related
metadata. By default bazel's metadata will be published with every
event if present giving the ability to combine with BEP data.
  • Loading branch information
allada committed Dec 11, 2024
1 parent a0ca341 commit d057882
Show file tree
Hide file tree
Showing 39 changed files with 12,857 additions and 663 deletions.
866 changes: 321 additions & 545 deletions Cargo.lock

Large diffs are not rendered by default.

51 changes: 51 additions & 0 deletions nativelink-config/src/cas_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,52 @@ pub struct BepConfig {
/// The store name referenced in the `stores` map in the main config.
#[serde(deserialize_with = "convert_string_with_shellexpand")]
pub store: StoreRefName,

/// The config related to identifying the client.
/// The value of this header will be used to identify the caller and
/// will be added to the `BuildEvent::identity` field of the message.
/// Default: {see `IdentityHeaderSpec`}
#[serde(default)]
pub experimental_identity_header: IdentityHeaderSpec,
}

#[derive(Deserialize, Clone, Debug, Default)]
pub struct IdentityHeaderSpec {
/// The name of the header to look for the identity in.
/// Default: "x-identity"
#[serde(default, deserialize_with = "convert_optional_string_with_shellexpand")]
pub header_name: Option<String>,

/// If the header is required to be set or fail the request.
#[serde(default)]
pub required: bool,
}

#[derive(Deserialize, Clone, Debug)]
pub struct OriginEventsPublisherSpec {
/// The store to publish nativelink events to.
/// The store name referenced in the `stores` map in the main config.
#[serde(deserialize_with = "convert_string_with_shellexpand")]
pub store: StoreRefName,
}

#[derive(Deserialize, Clone, Debug)]
pub struct OriginEventsSpec {
/// The publisher configuration for origin events.
pub publisher: OriginEventsPublisherSpec,

/// The maximum number of events to queue before applying back pressure.
/// IMPORTANT: Backpressure causes all clients to slow down significantly.
/// Zero is default.
///
/// Default: 65536 (zero defaults to this)
#[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
pub max_event_queue_size: usize,

/// The config related to identifying the client.
/// Default: {see `IdentityHeaderSpec`}
#[serde(default)]
pub identity_header: IdentityHeaderSpec,
}

#[derive(Deserialize, Debug)]
Expand Down Expand Up @@ -738,6 +784,11 @@ pub struct CasConfig {
/// Servers to setup for this process.
pub servers: Vec<ServerConfig>,

/// Experimental - Origin events configuration. This is the service that will
/// collect and publish nativelink events to a store for processing by an
/// external service.
pub experimental_origin_events: Option<OriginEventsSpec>,

/// Any global configurations that apply to all modules live here.
pub global: Option<GlobalConfig>,
}
18 changes: 18 additions & 0 deletions nativelink-proto/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,20 @@ PROTO_NAMES = [
"build.bazel.remote.execution.v2",
"build.bazel.semver",
"com.github.trace_machina.nativelink.remote_execution",
"com.github.trace_machina.nativelink.events",
"google.api",
"google.bytestream",
"google.devtools.build.v1",
"google.longrunning",
"google.rpc",
"build_event_stream",
"command_line",
"devtools.build.lib.packages.metrics",
"blaze",
"options",
"failure_details",
"blaze.invocation_policy",
"blaze.strategy_policy",
]

rust_binary(
Expand Down Expand Up @@ -69,6 +78,7 @@ genrule(
srcs = [
"build/bazel/remote/execution/v2/remote_execution.proto",
"build/bazel/semver/semver.proto",
"com/github/trace_machina/nativelink/remote_execution/events.proto",
"com/github/trace_machina/nativelink/remote_execution/worker_api.proto",
"google/api/annotations.proto",
"google/api/client.proto",
Expand All @@ -86,6 +96,14 @@ genrule(
"google/protobuf/timestamp.proto",
"google/protobuf/wrappers.proto",
"google/rpc/status.proto",
"src/main/java/com/google/devtools/build/lib/buildeventstream/proto/build_event_stream.proto",
"src/main/java/com/google/devtools/build/lib/packages/metrics/package_load_metrics.proto",
"src/main/protobuf/command_line.proto",
"src/main/protobuf/action_cache.proto",
"src/main/protobuf/option_filters.proto",
"src/main/protobuf/failure_details.proto",
"src/main/protobuf/invocation_policy.proto",
"src/main/protobuf/strategy_policy.proto",
],
outs = ["{}.pb.rs".format(name) for name in PROTO_NAMES],
cmd = select({
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
// Copyright 2024 The NativeLink Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

syntax = "proto3";

package com.github.trace_machina.nativelink.events;

import "build/bazel/remote/execution/v2/remote_execution.proto";
import "google/bytestream/bytestream.proto";
import "google/devtools/build/v1/publish_build_event.proto";
import "google/longrunning/operations.proto";
import "google/protobuf/empty.proto";
import "google/rpc/status.proto";

/// Same as build.bazel.remote.execution.v2.BatchUpdateBlobsRequest,
/// but without the data field, and add a `data_len` field.
message BatchUpdateBlobsRequestOverride {
message Request {
build.bazel.remote.execution.v2.Digest digest = 1;
reserved 2; // Was data, but we are only tracking the length in `data_len`.
build.bazel.remote.execution.v2.Compressor.Value compressor = 3;

// Override/new field to track the length of the data.
uint64 data_len = 15; // Using 15 to stay at 1 byte, but higher than 3.
}

string instance_name = 1;
repeated Request requests = 2;
build.bazel.remote.execution.v2.DigestFunction.Value digest_function = 5;
}

/// Same as build.bazel.remote.execution.v2.BatchReadBlobsResponse,
/// but without the data field, and add a `data_len` field.
message BatchReadBlobsResponseOverride {
message Response {
build.bazel.remote.execution.v2.Digest digest = 1;
reserved 2; // Was data, but we are only tracking the length in `data_len`.
build.bazel.remote.execution.v2.Compressor.Value compressor = 4;
google.rpc.Status status = 3;

// Override/new field to track the length of the data.
uint64 data_len = 15; // Using 15 to stay at 1 byte, but higher than 3.
}
repeated Response responses = 1;
}

/// Same as google.bytestream.WriteRequest, but without the data field,
/// and add a `data_len` field.
message WriteRequestOverride {
string resource_name = 1;
int64 write_offset = 2;
bool finish_write = 3;
reserved 10; // Was data, but we are only tracking the length in `data_len`.

// Override/new field to track the length of the data.
uint64 data_len = 15; // Using 15 to stay at 1 byte, but higher than 3.
}

message RequestEvent {
oneof event {
build.bazel.remote.execution.v2.GetCapabilitiesRequest get_capabilities_request = 1;
build.bazel.remote.execution.v2.GetActionResultRequest get_action_result_request = 2;
build.bazel.remote.execution.v2.UpdateActionResultRequest update_action_result_request = 3;
build.bazel.remote.execution.v2.FindMissingBlobsRequest find_missing_blobs_request = 4;
build.bazel.remote.execution.v2.BatchReadBlobsRequest batch_read_blobs_request = 5;
BatchUpdateBlobsRequestOverride batch_update_blobs_request = 6;
build.bazel.remote.execution.v2.GetTreeRequest get_tree_request = 7;
google.bytestream.ReadRequest read_request = 8;
google.protobuf.Empty write_request = 9;
google.bytestream.QueryWriteStatusRequest query_write_status_request = 10;
build.bazel.remote.execution.v2.ExecuteRequest execute_request = 11;
build.bazel.remote.execution.v2.WaitExecutionRequest wait_execution_request = 12;
}
}

message ResponseEvent {
oneof event {
google.rpc.Status error = 1;
build.bazel.remote.execution.v2.ServerCapabilities server_capabilities = 2;
build.bazel.remote.execution.v2.ActionResult action_result = 3;
build.bazel.remote.execution.v2.FindMissingBlobsResponse find_missing_blobs_response = 4;
BatchReadBlobsResponseOverride batch_read_blobs_response = 5;
build.bazel.remote.execution.v2.BatchUpdateBlobsResponse batch_update_blobs_response = 6;
google.bytestream.WriteResponse write_response = 7;
google.bytestream.QueryWriteStatusResponse query_write_status_response = 8;

google.protobuf.Empty empty = 9;
}

reserved 10; // NextId.
}

message StreamEvent {
oneof event {
google.rpc.Status error = 1;
build.bazel.remote.execution.v2.GetTreeResponse get_tree_response = 2;
uint64 data_length = 3;
WriteRequestOverride write_request = 4;
google.longrunning.Operation operation = 5;
}

reserved 6; // NextId.
}

message Event {
oneof event {
RequestEvent request = 1;
ResponseEvent response = 2;
StreamEvent stream = 3;
}

reserved 4; // NextId.
}

/// Nativelink event that has occurred.
message OriginEvent {
/// The version of this message.
uint32 version = 1;

/// The event UUIDv6. This is a unique identifier for the event for the
/// server that generated the event.
/// Note: The timestamp of when the event occurred is encoded in the UUID.
string event_id = 2;

/// [optional] The parent event UUID. This is used to track the
/// parent event that generated this event. This is useful for
/// tracking the flow of events.
string parent_event_id = 3;

/// If the client is bazel, this is the meatadata that was sent with the
/// request. This is useful for tracking the flow of events.
build.bazel.remote.execution.v2.RequestMetadata bazel_request_metadata = 4;

/// The identity header that generated the event. This will be populated with
/// the value of the specified by the `IdentityHeaderSpec::header_name`.
string identity = 5;

/// The event that occurred.
Event event = 6;

reserved 7; // NextId.
}

/// Batch of events that have occurred.
message OriginEvents {
repeated OriginEvent events = 1;

reserved 2; // NextId.
}

/// Bep event that has occurred.
message BepEvent {
/// The version of this message.
uint32 version = 1;

/// The identity header that generated the event. This will be populated
/// with the header value keyed by the specified by the
/// `IdentityHeaderSpec::header_name`.
string identity = 2;

/// The event that occurred.
oneof event {
google.devtools.build.v1.PublishLifecycleEventRequest lifecycle_event = 3;
google.devtools.build.v1.PublishBuildToolEventStreamRequest build_tool_event = 4;
}

reserved 5; // NextId.
}
9 changes: 8 additions & 1 deletion nativelink-proto/gen_lib_rs_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,14 @@
// This file is auto-generated. To update it, run:
// `bazel run nativelink-proto:update_protos`
#![allow(clippy::default_trait_access, clippy::doc_markdown)]
#![allow(
clippy::default_trait_access,
clippy::doc_lazy_continuation,
clippy::doc_markdown,
clippy::doc_markdown,
clippy::large_enum_variant,
rustdoc::invalid_html_tags
)]
"""


Expand Down
Loading

0 comments on commit d057882

Please sign in to comment.