Skip to content

Commit

Permalink
Added BatchSpanProcessor metrics on drop and export span (#635)
Browse files Browse the repository at this point in the history
* added metrics for drop and export Span processor

* added span size gauge

* addressed PR comments

* Tried to fix the indent

* addressed PR comment

---------

Co-authored-by: Ignacio Bonafonte <[email protected]>
Co-authored-by: Bryce Buchanan <[email protected]>
  • Loading branch information
3 people authored Nov 22, 2024
1 parent 13c3857 commit 32b7b1e
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public class DefaultStableMeter : StableMeter {
}
}

private class NoopLongGaugeBuilder : LongGaugeBuilder {
private class NoopLongGaugeBuilder : LongGaugeBuilder {
func buildWithCallback(_ callback: @escaping (ObservableLongMeasurement) -> Void) -> ObservableLongGauge {
NoopObservableLongGauge()
}
Expand Down Expand Up @@ -127,11 +127,11 @@ public class DefaultStableMeter : StableMeter {
func ofDoubles() -> DoubleCounterBuilder {
NoopDoubleCounterBuilder()
}

func build() -> LongCounter {
NoopLongCounter()
}

func buildWithCallback(_ callback: @escaping (ObservableLongMeasurement) -> Void) -> ObservableLongCounter {
NoopObservableLongCounter()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,4 @@ public class LongGaugeBuilderSdk : LongGaugeBuilder, InstrumentBuilder {
public func buildWithCallback(_ callback: @escaping (OpenTelemetryApi.ObservableLongMeasurement) -> Void) -> OpenTelemetryApi.ObservableLongGauge {
registerLongAsynchronousInstrument(type: type, updater: callback)
}




}
175 changes: 130 additions & 45 deletions Sources/OpenTelemetrySdk/Trace/SpanProcessors/BatchSpanProcessor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,36 @@ import OpenTelemetryApi
/// exports the spans to wake up and start a new export cycle.
/// This batchSpanProcessor can cause high contention in a very high traffic service.
public struct BatchSpanProcessor: SpanProcessor {
fileprivate static let SPAN_PROCESSOR_TYPE_LABEL: String = "processorType"
fileprivate static let SPAN_PROCESSOR_DROPPED_LABEL: String = "dropped"
fileprivate static let SPAN_PROCESSOR_TYPE_VALUE: String = BatchSpanProcessor.name

fileprivate var worker: BatchWorker


fileprivate var worker: BatchWorker

public init(spanExporter: SpanExporter, scheduleDelay: TimeInterval = 5, exportTimeout: TimeInterval = 30,
maxQueueSize: Int = 2048, maxExportBatchSize: Int = 512, willExportCallback: ((inout [SpanData]) -> Void)? = nil)
{
worker = BatchWorker(spanExporter: spanExporter,
scheduleDelay: scheduleDelay,
exportTimeout: exportTimeout,
maxQueueSize: maxQueueSize,
maxExportBatchSize: maxExportBatchSize,
willExportCallback: willExportCallback)
worker.start()
}
public static var name: String {
String(describing: Self.self)
}

public init(
spanExporter: SpanExporter,
meterProvider: StableMeterProvider,
scheduleDelay: TimeInterval = 5,
exportTimeout: TimeInterval = 30,
maxQueueSize: Int = 2048,
maxExportBatchSize: Int = 512,
willExportCallback: ((inout [SpanData]) -> Void)? = nil
) {
worker = BatchWorker(
spanExporter: spanExporter,
meterProvider: meterProvider,
scheduleDelay: scheduleDelay,
exportTimeout: exportTimeout,
maxQueueSize: maxQueueSize,
maxExportBatchSize: maxExportBatchSize,
willExportCallback: willExportCallback
)
worker.start()
}

public let isStartRequired = false
public let isEndRequired = true
Expand Down Expand Up @@ -57,40 +72,105 @@ public struct BatchSpanProcessor: SpanProcessor {
/// the data.
/// The list of batched data is protected by a NSCondition which ensures full concurrency.
private class BatchWorker: Thread {
let spanExporter: SpanExporter
let scheduleDelay: TimeInterval
let maxQueueSize: Int
let exportTimeout: TimeInterval
let maxExportBatchSize: Int
let willExportCallback: ((inout [SpanData]) -> Void)?
let halfMaxQueueSize: Int
private let cond = NSCondition()
var spanList = [ReadableSpan]()
var queue: OperationQueue

init(spanExporter: SpanExporter, scheduleDelay: TimeInterval, exportTimeout: TimeInterval, maxQueueSize: Int, maxExportBatchSize: Int, willExportCallback: ((inout [SpanData]) -> Void)?) {
self.spanExporter = spanExporter
self.scheduleDelay = scheduleDelay
self.exportTimeout = exportTimeout
self.maxQueueSize = maxQueueSize
halfMaxQueueSize = maxQueueSize >> 1
self.maxExportBatchSize = maxExportBatchSize
self.willExportCallback = willExportCallback
queue = OperationQueue()
queue.name = "BatchWorker Queue"
queue.maxConcurrentOperationCount = 1
}
let spanExporter: SpanExporter
let meterProvider: StableMeterProvider
let scheduleDelay: TimeInterval
let maxQueueSize: Int
let exportTimeout: TimeInterval
let maxExportBatchSize: Int
let willExportCallback: ((inout [SpanData]) -> Void)?
let halfMaxQueueSize: Int
private let cond = NSCondition()
var spanList = [ReadableSpan]()
var queue: OperationQueue

private var queueSizeGauge: ObservableLongGauge?
private var spanGaugeObserver: ObservableLongGauge?

private var processedSpansCounter: LongCounter?
private let droppedAttrs: [String: AttributeValue]
private let exportedAttrs: [String: AttributeValue]
private let spanGaugeBuilder: LongGaugeBuilder
init(
spanExporter: SpanExporter,
meterProvider: StableMeterProvider,
scheduleDelay: TimeInterval,
exportTimeout: TimeInterval,
maxQueueSize: Int,
maxExportBatchSize: Int,
willExportCallback: ((inout [SpanData]) -> Void)?
) {
self.spanExporter = spanExporter
self.meterProvider = meterProvider
self.scheduleDelay = scheduleDelay
self.exportTimeout = exportTimeout
self.maxQueueSize = maxQueueSize
halfMaxQueueSize = maxQueueSize >> 1
self.maxExportBatchSize = maxExportBatchSize
self.willExportCallback = willExportCallback
queue = OperationQueue()
queue.name = "BatchWorker Queue"
queue.maxConcurrentOperationCount = 1

let meter = meterProvider.meterBuilder(name: "io.opentelemetry.sdk.trace").build()

var longGaugeSdk = meter.gaugeBuilder(name: "queueSize").ofLongs() as? LongGaugeBuilderSdk
longGaugeSdk = longGaugeSdk?.setDescription("The number of items queued")
longGaugeSdk = longGaugeSdk?.setUnit("1")
self.queueSizeGauge = longGaugeSdk?.buildWithCallback { result in
result.record(
value: maxQueueSize,
attributes: [
BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE)
]
)
}

self.spanGaugeBuilder = meter.gaugeBuilder(name: "spanSize")
.ofLongs()

var longCounterSdk = meter.counterBuilder(name: "processedSpans") as? LongCounterMeterBuilderSdk
longCounterSdk = longCounterSdk?.setUnit("1")
longCounterSdk = longCounterSdk?.setDescription("The number of spans processed by the BatchSpanProcessor. [dropped=true if they were dropped due to high throughput]")
processedSpansCounter = longCounterSdk?.build()

droppedAttrs = [
BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE),
BatchSpanProcessor.SPAN_PROCESSOR_DROPPED_LABEL: .bool(true)
]
exportedAttrs = [
BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE),
BatchSpanProcessor.SPAN_PROCESSOR_DROPPED_LABEL: .bool(false)
]

// Subscribe to new gauge observer
self.spanGaugeObserver = self.spanGaugeBuilder
.buildWithCallback { [count = spanList.count] result in
result.record(
value: count,
attributes: [
BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE)
]
)
}
}

deinit {
// Cleanup all gauge observer
self.queueSizeGauge?.close()
self.spanGaugeObserver?.close()
}

func addSpan(span: ReadableSpan) {
cond.lock()
defer { cond.unlock() }

if spanList.count == maxQueueSize {
// TODO: Record a counter for dropped spans.
processedSpansCounter?.add(value: 1, attribute: droppedAttrs)
return
}
// TODO: Record a gauge for referenced spans.
spanList.append(span)

// Notify the worker thread that at half of the queue is available. It will take
// time anyway for the thread to wake up.
if spanList.count >= halfMaxQueueSize {
Expand Down Expand Up @@ -148,11 +228,16 @@ private class BatchWorker: Thread {
timeoutTimer.cancel()
}

private func exportAction(spanList: [ReadableSpan], explicitTimeout: TimeInterval? = nil) {
stride(from: 0, to: spanList.endIndex, by: maxExportBatchSize).forEach {
var spansToExport = spanList[$0 ..< min($0 + maxExportBatchSize, spanList.count)].map { $0.toSpanData() }
willExportCallback?(&spansToExport)
spanExporter.export(spans: spansToExport, explicitTimeout: explicitTimeout)
private func exportAction(spanList: [ReadableSpan], explicitTimeout: TimeInterval? = nil) {
stride(from: 0, to: spanList.endIndex, by: maxExportBatchSize).forEach {
var spansToExport = spanList[$0 ..< min($0 + maxExportBatchSize, spanList.count)].map { $0.toSpanData() }
willExportCallback?(&spansToExport)
let result = spanExporter.export(spans: spansToExport, explicitTimeout: explicitTimeout)
if result == .success {
cond.lock()
processedSpansCounter?.add(value: spanList.count, attribute: exportedAttrs)
cond.unlock()
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,22 @@ class BatchSpansProcessorTests: XCTestCase {
}

func testStartEndRequirements() {
let spansProcessor = BatchSpanProcessor(spanExporter: WaitingSpanExporter(numberToWaitFor: 0))
let spansProcessor = BatchSpanProcessor(
spanExporter: WaitingSpanExporter(numberToWaitFor: 0),
meterProvider: DefaultStableMeterProvider.instance
)
XCTAssertFalse(spansProcessor.isStartRequired)
XCTAssertTrue(spansProcessor.isEndRequired)
}

func testExportDifferentSampledSpans() {
let waitingSpanExporter = WaitingSpanExporter(numberToWaitFor: 2)

tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(spanExporter: waitingSpanExporter, scheduleDelay: maxScheduleDelay))
tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(
spanExporter: waitingSpanExporter,
meterProvider: DefaultStableMeterProvider.instance,
scheduleDelay: maxScheduleDelay)
)
let span1 = createSampledEndedSpan(spanName: spanName1)
let span2 = createSampledEndedSpan(spanName: spanName2)
let exported = waitingSpanExporter.waitForExport()
Expand All @@ -63,7 +70,12 @@ class BatchSpansProcessorTests: XCTestCase {
func testExportMoreSpansThanTheBufferSize() {
let waitingSpanExporter = WaitingSpanExporter(numberToWaitFor: 6)

tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(spanExporter: waitingSpanExporter, scheduleDelay: maxScheduleDelay, maxQueueSize: 6, maxExportBatchSize: 2))
tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(
spanExporter: waitingSpanExporter,
meterProvider: DefaultStableMeterProvider.instance,
scheduleDelay: maxScheduleDelay,
maxQueueSize: 6, maxExportBatchSize: 2)
)

let span1 = createSampledEndedSpan(spanName: spanName1)
let span2 = createSampledEndedSpan(spanName: spanName1)
Expand All @@ -82,7 +94,13 @@ class BatchSpansProcessorTests: XCTestCase {

func testForceExport() {
let waitingSpanExporter = WaitingSpanExporter(numberToWaitFor: 1)
let batchSpansProcessor = BatchSpanProcessor(spanExporter: waitingSpanExporter, scheduleDelay: 10, maxQueueSize: 10000, maxExportBatchSize: 2000)
let batchSpansProcessor = BatchSpanProcessor(
spanExporter: waitingSpanExporter,
meterProvider: DefaultStableMeterProvider.instance,
scheduleDelay: 10,
maxQueueSize: 10000,
maxExportBatchSize: 2000
)
tracerSdkFactory.addSpanProcessor(batchSpansProcessor)

for _ in 0 ..< 100 {
Expand All @@ -96,7 +114,10 @@ class BatchSpansProcessorTests: XCTestCase {
func testExportSpansToMultipleServices() {
let waitingSpanExporter = WaitingSpanExporter(numberToWaitFor: 2)
let waitingSpanExporter2 = WaitingSpanExporter(numberToWaitFor: 2)
tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(spanExporter: MultiSpanExporter(spanExporters: [waitingSpanExporter, waitingSpanExporter2]), scheduleDelay: maxScheduleDelay))
tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(
spanExporter: MultiSpanExporter(spanExporters: [waitingSpanExporter, waitingSpanExporter2]),
meterProvider: DefaultStableMeterProvider.instance,
scheduleDelay: maxScheduleDelay))

let span1 = createSampledEndedSpan(spanName: spanName1)
let span2 = createSampledEndedSpan(spanName: spanName2)
Expand All @@ -110,7 +131,13 @@ class BatchSpansProcessorTests: XCTestCase {
let maxQueuedSpans = 8
let waitingSpanExporter = WaitingSpanExporter(numberToWaitFor: maxQueuedSpans)

tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(spanExporter: MultiSpanExporter(spanExporters: [waitingSpanExporter, blockingSpanExporter]), scheduleDelay: maxScheduleDelay, maxQueueSize: maxQueuedSpans, maxExportBatchSize: maxQueuedSpans / 2))
tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(
spanExporter: MultiSpanExporter(spanExporters: [waitingSpanExporter, blockingSpanExporter]),
meterProvider: DefaultStableMeterProvider.instance,
scheduleDelay: maxScheduleDelay,
maxQueueSize: maxQueuedSpans,
maxExportBatchSize: maxQueuedSpans / 2)
)

var spansToExport = [SpanData]()
// Wait to block the worker thread in the BatchSampledSpansProcessor. This ensures that no items
Expand Down Expand Up @@ -162,7 +189,11 @@ class BatchSpansProcessorTests: XCTestCase {
func testExportNotSampledSpans() {
let waitingSpanExporter = WaitingSpanExporter(numberToWaitFor: 1)

tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(spanExporter: waitingSpanExporter, scheduleDelay: maxScheduleDelay))
tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(
spanExporter: waitingSpanExporter,
meterProvider: DefaultStableMeterProvider.instance,
scheduleDelay: maxScheduleDelay)
)

createNotSampledEndedSpan(spanName: spanName1)
createNotSampledEndedSpan(spanName: spanName2)
Expand All @@ -181,7 +212,11 @@ class BatchSpansProcessorTests: XCTestCase {
let waitingSpanExporter = WaitingSpanExporter(numberToWaitFor: 1)

// Set the export delay to zero, for no timeout, in order to confirm the #flush() below works
tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(spanExporter: waitingSpanExporter, scheduleDelay: 0.1))
tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(
spanExporter: waitingSpanExporter,
meterProvider: DefaultStableMeterProvider.instance,
scheduleDelay: 0.1)
)

let span2 = createSampledEndedSpan(spanName: spanName2)

Expand Down

0 comments on commit 32b7b1e

Please sign in to comment.