Skip to content

Commit

Permalink
OTel API/configuration changes
Browse files Browse the repository at this point in the history
  • Loading branch information
benibus committed Apr 5, 2024
1 parent 3a386c3 commit 2550c53
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 97 deletions.
78 changes: 28 additions & 50 deletions cpp/src/arrow/telemetry/logging.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@
#include <opentelemetry/sdk/logs/batch_log_record_processor_options.h>
#include <opentelemetry/sdk/logs/exporter.h>
#include <opentelemetry/sdk/logs/logger_provider.h>
#include <opentelemetry/sdk/resource/resource.h>
#include <opentelemetry/sdk/resource/resource_detector.h>
#include <opentelemetry/sdk/resource/semantic_conventions.h>
#include <opentelemetry/trace/tracer.h>

#include <opentelemetry/exporters/otlp/protobuf_include_prefix.h>
Expand All @@ -57,7 +54,7 @@ namespace telemetry {

namespace {

namespace SemanticConventions = otel::sdk::resource::SemanticConventions;
using internal::LogExporterOptions;

constexpr const char kLoggingBackendEnvVar[] = "ARROW_LOGGING_BACKEND";

Expand Down Expand Up @@ -150,12 +147,12 @@ std::unique_ptr<otel::sdk::logs::LogRecordExporter> MakeExporter(
}

std::unique_ptr<otel::sdk::logs::LogRecordExporter> MakeExporterFromEnv(
const LoggerProviderOptions& options) {
const LogExporterOptions& exporter_options) {
auto maybe_env_var = arrow::internal::GetEnvVar(kLoggingBackendEnvVar);
if (maybe_env_var.ok()) {
auto env_var = maybe_env_var.ValueOrDie();
auto* default_ostream =
options.default_export_stream ? options.default_export_stream : &std::cerr;
exporter_options.default_stream ? exporter_options.default_stream : &std::cerr;
if (env_var == "ostream") {
// TODO: Currently disabled as the log records returned by otel's ostream exporter
// don't maintain copies of their attributes, leading to lifetime issues. If/when
Expand Down Expand Up @@ -191,36 +188,13 @@ std::unique_ptr<otel::sdk::logs::LogRecordProcessor> MakeLogRecordProcessor(
options);
}

otel::sdk::resource::Resource MakeResource(const ServiceAttributes& service_attributes) {
// TODO: We could also include process info...
// - SemanticConventions::kProcessPid
// - SemanticConventions::kProcessExecutableName
// - SemanticConventions::kProcessExecutablePath
// - SemanticConventions::kProcessOwner
// - SemanticConventions::kProcessCommandArgs
otel::sdk::resource::ResourceAttributes resource_attributes{};

auto set_attr = [&](const char* key, const std::optional<std::string>& val) {
if (val.has_value()) resource_attributes.SetAttribute(key, val.value());
};
set_attr(SemanticConventions::kServiceName, service_attributes.name);
set_attr(SemanticConventions::kServiceNamespace, service_attributes.name_space);
set_attr(SemanticConventions::kServiceInstanceId, service_attributes.instance_id);
set_attr(SemanticConventions::kServiceVersion, service_attributes.version);

auto resource = otel::sdk::resource::Resource::Create(resource_attributes);
auto env_resource = otel::sdk::resource::OTELResourceDetector().Detect();
return resource.Merge(env_resource);
}

otel_shared_ptr<otel::logs::LoggerProvider> MakeLoggerProvider(
const LoggerProviderOptions& options) {
auto exporter = MakeExporterFromEnv(options);
const LogExporterOptions& exporter_options) {
auto exporter = MakeExporterFromEnv(exporter_options);
if (exporter) {
auto processor = MakeLogRecordProcessor(std::move(exporter));
auto resource = MakeResource(options.service_attributes);
return otel_shared_ptr<otel::sdk::logs::LoggerProvider>(
new otel::sdk::logs::LoggerProvider(std::move(processor), resource));
new otel::sdk::logs::LoggerProvider(std::move(processor)));
}
return otel_shared_ptr<otel::logs::LoggerProvider>(
new otel::logs::NoopLoggerProvider{});
Expand Down Expand Up @@ -274,7 +248,7 @@ class OtelLogger : public Logger {
}

bool Flush(std::chrono::microseconds timeout) override {
return GlobalLoggerProvider::Flush(timeout);
return OtelLoggerProvider::Flush(timeout);
}

bool is_enabled() const override { return true; }
Expand All @@ -295,21 +269,7 @@ class OtelLogger : public Logger {

} // namespace

Status GlobalLoggerProvider::Initialize(const LoggerProviderOptions& options) {
otel::logs::Provider::SetLoggerProvider(MakeLoggerProvider(options));
return Status::OK();
}

bool GlobalLoggerProvider::ShutDown() {
auto provider = otel::logs::Provider::GetLoggerProvider();
if (auto sdk_provider =
dynamic_cast<otel::sdk::logs::LoggerProvider*>(provider.get())) {
return sdk_provider->Shutdown();
}
return false;
}

bool GlobalLoggerProvider::Flush(std::chrono::microseconds timeout) {
bool OtelLoggerProvider::Flush(std::chrono::microseconds timeout) {
auto provider = otel::logs::Provider::GetLoggerProvider();
if (auto sdk_provider =
dynamic_cast<otel::sdk::logs::LoggerProvider*>(provider.get())) {
Expand All @@ -318,7 +278,7 @@ bool GlobalLoggerProvider::Flush(std::chrono::microseconds timeout) {
return false;
}

Result<std::shared_ptr<Logger>> GlobalLoggerProvider::MakeLogger(
Result<std::shared_ptr<Logger>> OtelLoggerProvider::MakeLogger(
std::string_view name, const LoggingOptions& options,
const AttributeHolder& attributes) {
auto ot_logger = otel::logs::Provider::GetLoggerProvider()->GetLogger(
Expand All @@ -327,10 +287,28 @@ Result<std::shared_ptr<Logger>> GlobalLoggerProvider::MakeLogger(
return std::make_shared<OtelLogger>(options, std::move(ot_logger));
}

Result<std::shared_ptr<Logger>> GlobalLoggerProvider::MakeLogger(
Result<std::shared_ptr<Logger>> OtelLoggerProvider::MakeLogger(
std::string_view name, const LoggingOptions& options) {
return MakeLogger(name, options, EmptyAttributeHolder{});
}

namespace internal {

Status InitializeOtelLoggerProvider(const LogExporterOptions& exporter_options) {
otel::logs::Provider::SetLoggerProvider(MakeLoggerProvider(exporter_options));
return Status::OK();
}

bool ShutdownOtelLoggerProvider() {
auto provider = otel::logs::Provider::GetLoggerProvider();
if (auto sdk_provider =
dynamic_cast<otel::sdk::logs::LoggerProvider*>(provider.get())) {
return sdk_provider->Shutdown();
}
return false;
}

} // namespace internal

} // namespace telemetry
} // namespace arrow
75 changes: 33 additions & 42 deletions cpp/src/arrow/telemetry/logging.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,11 @@
#include <chrono>
#include <iosfwd>
#include <memory>
#include <optional>
#include <string>
#include <string_view>

#include "arrow/result.h"
#include "arrow/status.h"
#include "arrow/util/config.h"
#include "arrow/util/logging.h"
#include "arrow/util/logging_v2.h"
#include "arrow/util/macros.h"
Expand All @@ -39,41 +37,9 @@ class AttributeHolder;

using LogLevel = util::ArrowLogLevel;

/// \brief Attributes to be set in an OpenTelemetry resource
///
/// The OTEL_RESOURCE_ATTRIBUTES envvar can be used to set additional attributes
struct ServiceAttributes {
static constexpr char kDefaultName[] = "arrow.unknown_service";
static constexpr char kDefaultNamespace[] = "org.apache";
static constexpr char kDefaultInstanceId[] = "arrow.unknown_id";
static constexpr char kDefaultVersion[] = ARROW_VERSION_STRING;

std::optional<std::string> name = kDefaultName;
std::optional<std::string> name_space = kDefaultNamespace;
std::optional<std::string> instance_id = kDefaultInstanceId;
std::optional<std::string> version = kDefaultVersion;

static ServiceAttributes Defaults() { return ServiceAttributes{}; }
};

struct LoggerProviderOptions {
/// \brief Attributes to set for the LoggerProvider's Resource
ServiceAttributes service_attributes = ServiceAttributes::Defaults();

/// \brief Default stream to use for the ostream/arrow_otlp_ostream log record exporters
///
/// If null, stderr will be used
std::ostream* default_export_stream = NULLPTR;

static LoggerProviderOptions Defaults() { return LoggerProviderOptions{}; }
};

constexpr LogLevel kDefaultSeverityThreshold = LogLevel::ARROW_WARNING;
constexpr LogLevel kDefaultSeverity = LogLevel::ARROW_INFO;

struct LoggingOptions {
/// \brief Minimum severity required to emit an OpenTelemetry log record
LogLevel severity_threshold = kDefaultSeverityThreshold;
LogLevel severity_threshold = LogLevel::ARROW_INFO;

/// \brief Minimum severity required to immediately attempt to flush pending log records
LogLevel flush_severity = LogLevel::ARROW_ERROR;
Expand All @@ -96,7 +62,7 @@ struct EventId {
};

struct LogDescriptor {
LogLevel severity = kDefaultSeverity;
LogLevel severity = LogLevel::ARROW_INFO;

std::chrono::system_clock::time_point timestamp = std::chrono::system_clock::now();

Expand Down Expand Up @@ -159,13 +125,15 @@ class ARROW_EXPORT Logger : public util::Logger {
virtual std::string_view name() const = 0;
};

class ARROW_EXPORT GlobalLoggerProvider {
/// \brief A wrapper interface for `opentelemetry::logs::Provider`
/// \details Application authors will typically want to set the global OpenTelemetry
/// logger provider themselves after configuring an exporter, processor, resource etc.
/// This API will then defer to the provider returned by
/// `opentelemetry::logs::Provider::GetLoggerProvider`
class ARROW_EXPORT OtelLoggerProvider {
public:
static Status Initialize(
const LoggerProviderOptions& = LoggerProviderOptions::Defaults());

static bool ShutDown();

/// \brief Attempt to flush the log record processor associated with the provider
/// \return `true` if the flush occured
static bool Flush(std::chrono::microseconds timeout = std::chrono::microseconds::max());

static Result<std::shared_ptr<Logger>> MakeLogger(
Expand All @@ -175,5 +143,28 @@ class ARROW_EXPORT GlobalLoggerProvider {
const AttributeHolder& attributes);
};

namespace internal {

// These utilities are primarily intended for Arrow developers

struct LogExporterOptions {
/// \brief Default stream to use for the ostream/arrow_otlp_ostream log record exporters
/// \details If null, stderr will be used
std::ostream* default_stream = NULLPTR;

static LogExporterOptions Defaults() { return LogExporterOptions{}; }
};

/// \brief Initialize the global OpenTelemetry logger provider with a default exporter
/// (based on the ARROW_LOGGING_BACKEND envvar) and batch processor
ARROW_EXPORT Status InitializeOtelLoggerProvider(
const LogExporterOptions& exporter_options = LogExporterOptions::Defaults());

/// \brief Attempt to shut down the global OpenTelemetry logger provider
/// \return `true` if shutdown was successful
ARROW_EXPORT bool ShutdownOtelLoggerProvider();

} // namespace internal

} // namespace telemetry
} // namespace arrow
9 changes: 4 additions & 5 deletions cpp/src/arrow/telemetry/telemetry_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,20 +49,19 @@ class OtelEnvironment : public ::testing::Environment {
otel::nostd::shared_ptr<otel::context::propagation::TextMapPropagator>(
new otel::trace::propagation::HttpTraceContext()));

auto provider_options = LoggerProviderOptions::Defaults();
ASSERT_OK(GlobalLoggerProvider::Initialize(provider_options));
ASSERT_OK(internal::InitializeOtelLoggerProvider());
auto logging_options = LoggingOptions::Defaults();
logging_options.severity_threshold = LogLevel::ARROW_TRACE;
logging_options.flush_severity = LogLevel::ARROW_TRACE;
ASSERT_OK_AND_ASSIGN(
auto logger,
GlobalLoggerProvider::MakeLogger(
OtelLoggerProvider::MakeLogger(
kLoggerName, logging_options,
AttributeList{Attribute{"fooInt", 42}, Attribute{"barStr", "fourty two"}}));
ASSERT_OK(util::LoggerRegistry::RegisterLogger(logger->name(), logger));
}

void TearDown() override { EXPECT_TRUE(GlobalLoggerProvider::ShutDown()); }
void TearDown() override { EXPECT_TRUE(internal::ShutdownOtelLoggerProvider()); }
};

static ::testing::Environment* kOtelEnvironment =
Expand All @@ -79,7 +78,7 @@ void Log(Args&&... args) {
class TestLogging : public ::testing::Test {
public:
void SetUp() override {
tracer_ = internal::tracing::GetTracer();
tracer_ = arrow::internal::tracing::GetTracer();
span_ = tracer_->StartSpan("test-logging");
}

Expand Down

0 comments on commit 2550c53

Please sign in to comment.