Skip to content

Commit

Permalink
Add new logging_v2 APIs to arrow, integrate with OTel
Browse files Browse the repository at this point in the history
  • Loading branch information
benibus committed Mar 29, 2024
1 parent 1e7374d commit 872537b
Show file tree
Hide file tree
Showing 8 changed files with 568 additions and 90 deletions.
1 change: 1 addition & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,7 @@ set(ARROW_UTIL_SRCS
util/io_util.cc
util/list_util.cc
util/logging.cc
util/logging_v2.cc
util/key_value_metadata.cc
util/memory.cc
util/mutex.cc
Expand Down
49 changes: 10 additions & 39 deletions cpp/src/arrow/telemetry/logging.cc
Original file line number Diff line number Diff line change
Expand Up @@ -226,13 +226,6 @@ otel_shared_ptr<otel::logs::LoggerProvider> MakeLoggerProvider(
new otel::logs::NoopLoggerProvider{});
}

class NoopLogger : public Logger {
public:
void Log(const LogDescriptor&) override {}
bool Flush(std::chrono::microseconds) override { return true; }
std::string_view name() const override { return "noop"; }
};

class OtelLogger : public Logger {
public:
OtelLogger(LoggingOptions options, otel_shared_ptr<otel::logs::Logger> ot_logger)
Expand Down Expand Up @@ -276,14 +269,20 @@ class OtelLogger : public Logger {
logger_->EmitLogRecord(std::move(log));

if (desc.severity >= options_.flush_severity) {
Logger::Flush();
util::Logger::Flush();
}
}

bool Flush(std::chrono::microseconds timeout) override {
return GlobalLoggerProvider::Flush(timeout);
}

bool is_enabled() const override { return true; }

util::ArrowLogLevel severity_threshold() const override {
return options_.severity_threshold;
}

std::string_view name() const override {
otel_string_view s = logger_->GetName();
return std::string_view(s.data(), s.length());
Expand Down Expand Up @@ -319,47 +318,19 @@ bool GlobalLoggerProvider::Flush(std::chrono::microseconds timeout) {
return false;
}

Result<std::unique_ptr<Logger>> GlobalLoggerProvider::MakeLogger(
Result<std::shared_ptr<Logger>> GlobalLoggerProvider::MakeLogger(
std::string_view name, const LoggingOptions& options,
const AttributeHolder& attributes) {
auto ot_logger = otel::logs::Provider::GetLoggerProvider()->GetLogger(
ToOtel(name), /*library_name=*/"", /*library_version=*/"", /*schema_url=*/"",
OtelAttributeHolder(attributes));
return std::make_unique<OtelLogger>(options, std::move(ot_logger));
return std::make_shared<OtelLogger>(options, std::move(ot_logger));
}

Result<std::unique_ptr<Logger>> GlobalLoggerProvider::MakeLogger(
Result<std::shared_ptr<Logger>> GlobalLoggerProvider::MakeLogger(
std::string_view name, const LoggingOptions& options) {
return MakeLogger(name, options, EmptyAttributeHolder{});
}

std::unique_ptr<Logger> GlobalLogger::logger_ = nullptr;

std::unique_ptr<Logger> MakeNoopLogger() { return std::make_unique<NoopLogger>(); }

class LogMessage::Impl {
public:
Impl(LogLevel severity, Logger* logger) : logger(logger), severity(severity) {}

~Impl() {
if (logger) {
auto body = stream.str();
LogDescriptor desc;
desc.body = body;
desc.severity = severity;
logger->Log(desc);
}
}

Logger* logger;
LogLevel severity;
std::stringstream stream;
};

LogMessage::LogMessage(LogLevel severity, Logger* logger)
: impl_(std::make_shared<Impl>(severity, logger)) {}

std::ostream& LogMessage::Stream() { return impl_->stream; }

} // namespace telemetry
} // namespace arrow
58 changes: 12 additions & 46 deletions cpp/src/arrow/telemetry/logging.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "arrow/status.h"
#include "arrow/util/config.h"
#include "arrow/util/logging.h"
#include "arrow/util/logging_v2.h"
#include "arrow/util/macros.h"
#include "arrow/util/visibility.h"

Expand Down Expand Up @@ -106,12 +107,20 @@ struct LogDescriptor {
const AttributeHolder* attributes = NULLPTR;
};

class ARROW_EXPORT Logger {
class ARROW_EXPORT Logger : public util::Logger {
public:
virtual ~Logger() = default;

virtual void Log(const LogDescriptor&) = 0;

void Log(const util::LogDetails& details) override {
LogDescriptor desc;
desc.body = details.message;
desc.severity = details.severity;
desc.timestamp = details.timestamp;
this->Log(desc);
}

void Log(LogLevel severity, std::string_view body, const AttributeHolder& attributes,
EventId event_id = EventId::Invalid()) {
LogDescriptor desc;
Expand Down Expand Up @@ -147,14 +156,9 @@ class ARROW_EXPORT Logger {
this->Log(desc);
}

virtual bool Flush(std::chrono::microseconds timeout) = 0;
bool Flush() { return this->Flush(std::chrono::microseconds::max()); }

virtual std::string_view name() const = 0;
};

ARROW_EXPORT std::unique_ptr<Logger> MakeNoopLogger();

class ARROW_EXPORT GlobalLoggerProvider {
public:
static Status Initialize(
Expand All @@ -164,50 +168,12 @@ class ARROW_EXPORT GlobalLoggerProvider {

static bool Flush(std::chrono::microseconds timeout = std::chrono::microseconds::max());

static Result<std::unique_ptr<Logger>> MakeLogger(
static Result<std::shared_ptr<Logger>> MakeLogger(
std::string_view name, const LoggingOptions& options = LoggingOptions::Defaults());
static Result<std::unique_ptr<Logger>> MakeLogger(std::string_view name,
static Result<std::shared_ptr<Logger>> MakeLogger(std::string_view name,
const LoggingOptions& options,
const AttributeHolder& attributes);
};

class ARROW_EXPORT GlobalLogger {
public:
static Logger* Get() {
if (!logger_) {
logger_ = MakeNoopLogger();
}
return logger_.get();
}

static void Set(std::unique_ptr<Logger> logger) {
if (logger) {
logger_ = std::move(logger);
} else {
logger_ = MakeNoopLogger();
}
}

private:
static std::unique_ptr<Logger> logger_;
};

class ARROW_EXPORT LogMessage {
public:
explicit LogMessage(LogLevel, Logger*);

std::ostream& Stream();

template <typename T>
LogMessage& operator<<(const T& t) {
Stream() << t;
return *this;
}

private:
class Impl;
std::shared_ptr<Impl> impl_;
};

} // namespace telemetry
} // namespace arrow
13 changes: 8 additions & 5 deletions cpp/src/arrow/telemetry/telemetry_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ namespace telemetry {

class OtelEnvironment : public ::testing::Environment {
public:
static constexpr std::string_view kLoggerName = "arrow-telemetry-test";

void SetUp() override {
std::vector<std::unique_ptr<otel::sdk::trace::SpanProcessor>> processors;
auto tracer_provider = otel::nostd::shared_ptr<otel::sdk::trace::TracerProvider>(
Expand All @@ -55,9 +57,9 @@ class OtelEnvironment : public ::testing::Environment {
ASSERT_OK_AND_ASSIGN(
auto logger,
GlobalLoggerProvider::MakeLogger(
"arrow-telemetry-test", logging_options,
kLoggerName, logging_options,
AttributeList{Attribute{"fooInt", 42}, Attribute{"barStr", "fourty two"}}));
GlobalLogger::Set(std::move(logger));
ASSERT_OK(util::LoggerRegistry::RegisterLogger(logger->name(), logger));
}

void TearDown() override { EXPECT_TRUE(GlobalLoggerProvider::ShutDown()); }
Expand All @@ -68,7 +70,10 @@ static ::testing::Environment* kOtelEnvironment =

template <typename... Args>
void Log(Args&&... args) {
GlobalLogger::Get()->Log(std::forward<Args>(args)...);
auto logger = std::dynamic_pointer_cast<telemetry::Logger>(
util::LoggerRegistry::GetLogger(OtelEnvironment::kLoggerName));
ASSERT_NE(logger, nullptr);
logger->Log(std::forward<Args>(args)...);
}

class TestLogging : public ::testing::Test {
Expand All @@ -91,8 +96,6 @@ TEST_F(TestLogging, Basics) {
Log(LogLevel::ARROW_WARNING, "baz bal",
AttributeList{Attribute{"intAttr", 24}, Attribute{"boolAttr", true},
Attribute{"strAttr", std::string("ab") + "c"}});
LogMessage(LogLevel::ARROW_INFO, GlobalLogger::Get()) << "This is a "
<< "log message";
}

} // namespace telemetry
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/util/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ add_arrow_test(utility-test
iterator_test.cc
list_util_test.cc
logging_test.cc
logging_v2_test.cc
queue_test.cc
range_test.cc
ree_util_test.cc
Expand Down
Loading

0 comments on commit 872537b

Please sign in to comment.