Skip to content

Commit

Permalink
Add OTel+logging to FlightSQL test app/server
Browse files Browse the repository at this point in the history
  • Loading branch information
benibus committed Apr 5, 2024
1 parent 2550c53 commit 7bbd51f
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 1 deletion.
67 changes: 66 additions & 1 deletion cpp/src/arrow/flight/sql/test_app_cli.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.

#include "arrow/util/config.h"

#include <gflags/gflags.h>
#define BOOST_NO_CXX98_FUNCTION_BASE // ARROW-17805
#include <boost/algorithm/string.hpp>
Expand All @@ -25,18 +27,33 @@
#include "arrow/array/builder_binary.h"
#include "arrow/array/builder_primitive.h"
#include "arrow/flight/api.h"
#include "arrow/flight/client_tracing_middleware.h"
#include "arrow/flight/sql/api.h"
#include "arrow/io/memory.h"
#include "arrow/pretty_print.h"
#include "arrow/status.h"
#include "arrow/table.h"

#ifdef ARROW_TELEMETRY
#include "arrow/telemetry/logging.h"
#include "arrow/util/tracing_internal.h"

#include <opentelemetry/context/propagation/global_propagator.h>
#include <opentelemetry/context/propagation/text_map_propagator.h>
#include <opentelemetry/sdk/trace/processor.h>
#include <opentelemetry/sdk/trace/tracer_provider.h>
#include <opentelemetry/trace/propagation/http_trace_context.h>
#include <opentelemetry/trace/provider.h>
#include <opentelemetry/trace/scope.h>
#endif

using arrow::Result;
using arrow::Schema;
using arrow::Status;
using arrow::flight::ClientAuthHandler;
using arrow::flight::FlightCallOptions;
using arrow::flight::FlightClient;
using arrow::flight::FlightClientOptions;
using arrow::flight::FlightDescriptor;
using arrow::flight::FlightEndpoint;
using arrow::flight::FlightInfo;
Expand All @@ -58,6 +75,49 @@ DEFINE_string(catalog, "", "Catalog");
DEFINE_string(schema, "", "Schema");
DEFINE_string(table, "", "Table");

#ifdef ARROW_TELEMETRY
class OtelScope {
public:
explicit OtelScope(opentelemetry::trace::Scope scope) : scope_(std::move(scope)) {}

static Result<std::unique_ptr<OtelScope>> Make() {
// Implicitly sets up TracerProvider
auto tracer = arrow::internal::tracing::GetTracer();

opentelemetry::context::propagation::GlobalTextMapPropagator::SetGlobalPropagator(
opentelemetry::nostd::shared_ptr<
opentelemetry::context::propagation::TextMapPropagator>(
new opentelemetry::trace::propagation::HttpTraceContext()));

ARROW_RETURN_NOT_OK(arrow::telemetry::internal::InitializeOtelLoggerProvider());

auto logging_options = arrow::telemetry::LoggingOptions::Defaults();
logging_options.severity_threshold = arrow::telemetry::LogLevel::ARROW_TRACE;
// Flush after every log message
logging_options.flush_severity = arrow::telemetry::LogLevel::ARROW_TRACE;
ARROW_ASSIGN_OR_RAISE(auto logger, arrow::telemetry::OtelLoggerProvider::MakeLogger(
"FlightGrpcClient", logging_options));
ARROW_RETURN_NOT_OK(
arrow::util::LoggerRegistry::RegisterLogger(logger->name(), logger));

opentelemetry::trace::StartSpanOptions span_options;
span_options.kind = opentelemetry::trace::SpanKind::kClient;
auto span = tracer->StartSpan("flight-sql-test-app", span_options);
auto scope = tracer->WithActiveSpan(span);

return std::make_unique<OtelScope>(std::move(scope));
}

private:
opentelemetry::trace::Scope scope_;
};
#else
class OtelScope {
public:
static Result<std::unique_ptr<OtelScope>> Make() { return nullptr; }
};
#endif

Status PrintResultsForEndpoint(FlightSqlClient& client,
const FlightCallOptions& call_options,
const FlightEndpoint& endpoint) {
Expand Down Expand Up @@ -101,8 +161,13 @@ Status PrintResults(FlightSqlClient& client, const FlightCallOptions& call_optio
}

Status RunMain() {
ARROW_ASSIGN_OR_RAISE(auto otel_scope, OtelScope::Make());

ARROW_ASSIGN_OR_RAISE(auto location, Location::ForGrpcTcp(FLAGS_host, FLAGS_port));
ARROW_ASSIGN_OR_RAISE(auto client, FlightClient::Connect(location));
auto client_options = FlightClientOptions::Defaults();
client_options.middleware.push_back(
arrow::flight::MakeTracingClientMiddlewareFactory());
ARROW_ASSIGN_OR_RAISE(auto client, FlightClient::Connect(location, client_options));

FlightCallOptions call_options;

Expand Down
43 changes: 43 additions & 0 deletions cpp/src/arrow/flight/sql/test_server_cli.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.

#include "arrow/util/config.h"

#include <gflags/gflags.h>

#include <csignal>
Expand All @@ -23,16 +25,57 @@
#include <string>

#include "arrow/flight/server.h"
#include "arrow/flight/server_tracing_middleware.h"
#include "arrow/flight/sql/example/sqlite_server.h"
#include "arrow/io/test_common.h"
#include "arrow/util/logging.h"

#ifdef ARROW_TELEMETRY
#include "arrow/telemetry/logging.h"

#include <opentelemetry/context/propagation/global_propagator.h>
#include <opentelemetry/context/propagation/text_map_propagator.h>
#include <opentelemetry/trace/propagation/http_trace_context.h>
#endif

DEFINE_int32(port, 31337, "Server port to listen on");

#ifdef ARROW_TELEMETRY
arrow::Status SetupOTel() {
opentelemetry::context::propagation::GlobalTextMapPropagator::SetGlobalPropagator(
opentelemetry::nostd::shared_ptr<
opentelemetry::context::propagation::TextMapPropagator>(
new opentelemetry::trace::propagation::HttpTraceContext()));

ARROW_RETURN_NOT_OK(arrow::telemetry::internal::InitializeOtelLoggerProvider());

auto logging_options = arrow::telemetry::LoggingOptions::Defaults();
logging_options.severity_threshold = arrow::telemetry::LogLevel::ARROW_TRACE;
// Flush after every log message
logging_options.flush_severity = arrow::telemetry::LogLevel::ARROW_TRACE;
ARROW_ASSIGN_OR_RAISE(auto logger1, arrow::telemetry::OtelLoggerProvider::MakeLogger(
"FlightGrpcServer", logging_options));
ARROW_ASSIGN_OR_RAISE(auto logger2, arrow::telemetry::OtelLoggerProvider::MakeLogger(
"FlightSqlServer", logging_options));
ARROW_RETURN_NOT_OK(
arrow::util::LoggerRegistry::RegisterLogger(logger1->name(), logger1));
ARROW_RETURN_NOT_OK(
arrow::util::LoggerRegistry::RegisterLogger(logger2->name(), logger2));

return arrow::Status::OK();
}
#else
arrow::Status SetupOTel() { return arrow::Status::OK(); }
#endif

arrow::Status RunMain() {
ARROW_RETURN_NOT_OK(SetupOTel());

ARROW_ASSIGN_OR_RAISE(auto location,
arrow::flight::Location::ForGrpcTcp("0.0.0.0", FLAGS_port));
arrow::flight::FlightServerOptions options(location);
options.middleware.emplace_back("tracing",
arrow::flight::MakeTracingServerMiddlewareFactory());

std::shared_ptr<arrow::flight::sql::example::SQLiteFlightSqlServer> server;
ARROW_ASSIGN_OR_RAISE(server,
Expand Down

0 comments on commit 7bbd51f

Please sign in to comment.