diff --git a/Sources/OpenTelemetryApi/Metrics/Stable/DefaultStableMeter.swift b/Sources/OpenTelemetryApi/Metrics/Stable/DefaultStableMeter.swift index f13a20ae..52a1d413 100644 --- a/Sources/OpenTelemetryApi/Metrics/Stable/DefaultStableMeter.swift +++ b/Sources/OpenTelemetryApi/Metrics/Stable/DefaultStableMeter.swift @@ -60,7 +60,7 @@ public class DefaultStableMeter : StableMeter { } } - private class NoopLongGaugeBuilder : LongGaugeBuilder { + private class NoopLongGaugeBuilder : LongGaugeBuilder { func buildWithCallback(_ callback: @escaping (ObservableLongMeasurement) -> Void) -> ObservableLongGauge { NoopObservableLongGauge() } @@ -127,11 +127,11 @@ public class DefaultStableMeter : StableMeter { func ofDoubles() -> DoubleCounterBuilder { NoopDoubleCounterBuilder() } - + func build() -> LongCounter { NoopLongCounter() } - + func buildWithCallback(_ callback: @escaping (ObservableLongMeasurement) -> Void) -> ObservableLongCounter { NoopObservableLongCounter() } diff --git a/Sources/OpenTelemetrySdk/Metrics/Stable/LongGaugeBuilderSdk.swift b/Sources/OpenTelemetrySdk/Metrics/Stable/LongGaugeBuilderSdk.swift index fa9c0e96..54721426 100644 --- a/Sources/OpenTelemetrySdk/Metrics/Stable/LongGaugeBuilderSdk.swift +++ b/Sources/OpenTelemetrySdk/Metrics/Stable/LongGaugeBuilderSdk.swift @@ -33,8 +33,4 @@ public class LongGaugeBuilderSdk : LongGaugeBuilder, InstrumentBuilder { public func buildWithCallback(_ callback: @escaping (OpenTelemetryApi.ObservableLongMeasurement) -> Void) -> OpenTelemetryApi.ObservableLongGauge { registerLongAsynchronousInstrument(type: type, updater: callback) } - - - - } diff --git a/Sources/OpenTelemetrySdk/Trace/SpanProcessors/BatchSpanProcessor.swift b/Sources/OpenTelemetrySdk/Trace/SpanProcessors/BatchSpanProcessor.swift index a393e3d9..c9906fb1 100644 --- a/Sources/OpenTelemetrySdk/Trace/SpanProcessors/BatchSpanProcessor.swift +++ b/Sources/OpenTelemetrySdk/Trace/SpanProcessors/BatchSpanProcessor.swift @@ -15,21 +15,36 @@ import OpenTelemetryApi /// exports the spans to wake up and start a new export cycle. /// This batchSpanProcessor can cause high contention in a very high traffic service. public struct BatchSpanProcessor: SpanProcessor { + fileprivate static let SPAN_PROCESSOR_TYPE_LABEL: String = "processorType" + fileprivate static let SPAN_PROCESSOR_DROPPED_LABEL: String = "dropped" + fileprivate static let SPAN_PROCESSOR_TYPE_VALUE: String = BatchSpanProcessor.name + + fileprivate var worker: BatchWorker - - fileprivate var worker: BatchWorker - - public init(spanExporter: SpanExporter, scheduleDelay: TimeInterval = 5, exportTimeout: TimeInterval = 30, - maxQueueSize: Int = 2048, maxExportBatchSize: Int = 512, willExportCallback: ((inout [SpanData]) -> Void)? = nil) - { - worker = BatchWorker(spanExporter: spanExporter, - scheduleDelay: scheduleDelay, - exportTimeout: exportTimeout, - maxQueueSize: maxQueueSize, - maxExportBatchSize: maxExportBatchSize, - willExportCallback: willExportCallback) - worker.start() - } + public static var name: String { + String(describing: Self.self) + } + + public init( + spanExporter: SpanExporter, + meterProvider: StableMeterProvider, + scheduleDelay: TimeInterval = 5, + exportTimeout: TimeInterval = 30, + maxQueueSize: Int = 2048, + maxExportBatchSize: Int = 512, + willExportCallback: ((inout [SpanData]) -> Void)? = nil + ) { + worker = BatchWorker( + spanExporter: spanExporter, + meterProvider: meterProvider, + scheduleDelay: scheduleDelay, + exportTimeout: exportTimeout, + maxQueueSize: maxQueueSize, + maxExportBatchSize: maxExportBatchSize, + willExportCallback: willExportCallback + ) + worker.start() + } public let isStartRequired = false public let isEndRequired = true @@ -57,40 +72,105 @@ public struct BatchSpanProcessor: SpanProcessor { /// the data. /// The list of batched data is protected by a NSCondition which ensures full concurrency. private class BatchWorker: Thread { - let spanExporter: SpanExporter - let scheduleDelay: TimeInterval - let maxQueueSize: Int - let exportTimeout: TimeInterval - let maxExportBatchSize: Int - let willExportCallback: ((inout [SpanData]) -> Void)? - let halfMaxQueueSize: Int - private let cond = NSCondition() - var spanList = [ReadableSpan]() - var queue: OperationQueue - - init(spanExporter: SpanExporter, scheduleDelay: TimeInterval, exportTimeout: TimeInterval, maxQueueSize: Int, maxExportBatchSize: Int, willExportCallback: ((inout [SpanData]) -> Void)?) { - self.spanExporter = spanExporter - self.scheduleDelay = scheduleDelay - self.exportTimeout = exportTimeout - self.maxQueueSize = maxQueueSize - halfMaxQueueSize = maxQueueSize >> 1 - self.maxExportBatchSize = maxExportBatchSize - self.willExportCallback = willExportCallback - queue = OperationQueue() - queue.name = "BatchWorker Queue" - queue.maxConcurrentOperationCount = 1 - } + let spanExporter: SpanExporter + let meterProvider: StableMeterProvider + let scheduleDelay: TimeInterval + let maxQueueSize: Int + let exportTimeout: TimeInterval + let maxExportBatchSize: Int + let willExportCallback: ((inout [SpanData]) -> Void)? + let halfMaxQueueSize: Int + private let cond = NSCondition() + var spanList = [ReadableSpan]() + var queue: OperationQueue + + private var queueSizeGauge: ObservableLongGauge? + private var spanGaugeObserver: ObservableLongGauge? + + private var processedSpansCounter: LongCounter? + private let droppedAttrs: [String: AttributeValue] + private let exportedAttrs: [String: AttributeValue] + private let spanGaugeBuilder: LongGaugeBuilder + init( + spanExporter: SpanExporter, + meterProvider: StableMeterProvider, + scheduleDelay: TimeInterval, + exportTimeout: TimeInterval, + maxQueueSize: Int, + maxExportBatchSize: Int, + willExportCallback: ((inout [SpanData]) -> Void)? + ) { + self.spanExporter = spanExporter + self.meterProvider = meterProvider + self.scheduleDelay = scheduleDelay + self.exportTimeout = exportTimeout + self.maxQueueSize = maxQueueSize + halfMaxQueueSize = maxQueueSize >> 1 + self.maxExportBatchSize = maxExportBatchSize + self.willExportCallback = willExportCallback + queue = OperationQueue() + queue.name = "BatchWorker Queue" + queue.maxConcurrentOperationCount = 1 + + let meter = meterProvider.meterBuilder(name: "io.opentelemetry.sdk.trace").build() + + var longGaugeSdk = meter.gaugeBuilder(name: "queueSize").ofLongs() as? LongGaugeBuilderSdk + longGaugeSdk = longGaugeSdk?.setDescription("The number of items queued") + longGaugeSdk = longGaugeSdk?.setUnit("1") + self.queueSizeGauge = longGaugeSdk?.buildWithCallback { result in + result.record( + value: maxQueueSize, + attributes: [ + BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE) + ] + ) + } + + self.spanGaugeBuilder = meter.gaugeBuilder(name: "spanSize") + .ofLongs() + + var longCounterSdk = meter.counterBuilder(name: "processedSpans") as? LongCounterMeterBuilderSdk + longCounterSdk = longCounterSdk?.setUnit("1") + longCounterSdk = longCounterSdk?.setDescription("The number of spans processed by the BatchSpanProcessor. [dropped=true if they were dropped due to high throughput]") + processedSpansCounter = longCounterSdk?.build() + + droppedAttrs = [ + BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE), + BatchSpanProcessor.SPAN_PROCESSOR_DROPPED_LABEL: .bool(true) + ] + exportedAttrs = [ + BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE), + BatchSpanProcessor.SPAN_PROCESSOR_DROPPED_LABEL: .bool(false) + ] + + // Subscribe to new gauge observer + self.spanGaugeObserver = self.spanGaugeBuilder + .buildWithCallback { [count = spanList.count] result in + result.record( + value: count, + attributes: [ + BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE) + ] + ) + } + } + deinit { + // Cleanup all gauge observer + self.queueSizeGauge?.close() + self.spanGaugeObserver?.close() + } + func addSpan(span: ReadableSpan) { cond.lock() defer { cond.unlock() } if spanList.count == maxQueueSize { - // TODO: Record a counter for dropped spans. + processedSpansCounter?.add(value: 1, attribute: droppedAttrs) return } - // TODO: Record a gauge for referenced spans. spanList.append(span) + // Notify the worker thread that at half of the queue is available. It will take // time anyway for the thread to wake up. if spanList.count >= halfMaxQueueSize { @@ -148,11 +228,16 @@ private class BatchWorker: Thread { timeoutTimer.cancel() } - private func exportAction(spanList: [ReadableSpan], explicitTimeout: TimeInterval? = nil) { - stride(from: 0, to: spanList.endIndex, by: maxExportBatchSize).forEach { - var spansToExport = spanList[$0 ..< min($0 + maxExportBatchSize, spanList.count)].map { $0.toSpanData() } - willExportCallback?(&spansToExport) - spanExporter.export(spans: spansToExport, explicitTimeout: explicitTimeout) + private func exportAction(spanList: [ReadableSpan], explicitTimeout: TimeInterval? = nil) { + stride(from: 0, to: spanList.endIndex, by: maxExportBatchSize).forEach { + var spansToExport = spanList[$0 ..< min($0 + maxExportBatchSize, spanList.count)].map { $0.toSpanData() } + willExportCallback?(&spansToExport) + let result = spanExporter.export(spans: spansToExport, explicitTimeout: explicitTimeout) + if result == .success { + cond.lock() + processedSpansCounter?.add(value: spanList.count, attribute: exportedAttrs) + cond.unlock() + } + } } - } } diff --git a/Tests/OpenTelemetrySdkTests/Trace/Export/BatchSpansProcessorTests.swift b/Tests/OpenTelemetrySdkTests/Trace/Export/BatchSpansProcessorTests.swift index 47507f0e..26217290 100644 --- a/Tests/OpenTelemetrySdkTests/Trace/Export/BatchSpansProcessorTests.swift +++ b/Tests/OpenTelemetrySdkTests/Trace/Export/BatchSpansProcessorTests.swift @@ -44,7 +44,10 @@ class BatchSpansProcessorTests: XCTestCase { } func testStartEndRequirements() { - let spansProcessor = BatchSpanProcessor(spanExporter: WaitingSpanExporter(numberToWaitFor: 0)) + let spansProcessor = BatchSpanProcessor( + spanExporter: WaitingSpanExporter(numberToWaitFor: 0), + meterProvider: DefaultStableMeterProvider.instance + ) XCTAssertFalse(spansProcessor.isStartRequired) XCTAssertTrue(spansProcessor.isEndRequired) } @@ -52,7 +55,11 @@ class BatchSpansProcessorTests: XCTestCase { func testExportDifferentSampledSpans() { let waitingSpanExporter = WaitingSpanExporter(numberToWaitFor: 2) - tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(spanExporter: waitingSpanExporter, scheduleDelay: maxScheduleDelay)) + tracerSdkFactory.addSpanProcessor(BatchSpanProcessor( + spanExporter: waitingSpanExporter, + meterProvider: DefaultStableMeterProvider.instance, + scheduleDelay: maxScheduleDelay) + ) let span1 = createSampledEndedSpan(spanName: spanName1) let span2 = createSampledEndedSpan(spanName: spanName2) let exported = waitingSpanExporter.waitForExport() @@ -63,7 +70,12 @@ class BatchSpansProcessorTests: XCTestCase { func testExportMoreSpansThanTheBufferSize() { let waitingSpanExporter = WaitingSpanExporter(numberToWaitFor: 6) - tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(spanExporter: waitingSpanExporter, scheduleDelay: maxScheduleDelay, maxQueueSize: 6, maxExportBatchSize: 2)) + tracerSdkFactory.addSpanProcessor(BatchSpanProcessor( + spanExporter: waitingSpanExporter, + meterProvider: DefaultStableMeterProvider.instance, + scheduleDelay: maxScheduleDelay, + maxQueueSize: 6, maxExportBatchSize: 2) + ) let span1 = createSampledEndedSpan(spanName: spanName1) let span2 = createSampledEndedSpan(spanName: spanName1) @@ -82,7 +94,13 @@ class BatchSpansProcessorTests: XCTestCase { func testForceExport() { let waitingSpanExporter = WaitingSpanExporter(numberToWaitFor: 1) - let batchSpansProcessor = BatchSpanProcessor(spanExporter: waitingSpanExporter, scheduleDelay: 10, maxQueueSize: 10000, maxExportBatchSize: 2000) + let batchSpansProcessor = BatchSpanProcessor( + spanExporter: waitingSpanExporter, + meterProvider: DefaultStableMeterProvider.instance, + scheduleDelay: 10, + maxQueueSize: 10000, + maxExportBatchSize: 2000 + ) tracerSdkFactory.addSpanProcessor(batchSpansProcessor) for _ in 0 ..< 100 { @@ -96,7 +114,10 @@ class BatchSpansProcessorTests: XCTestCase { func testExportSpansToMultipleServices() { let waitingSpanExporter = WaitingSpanExporter(numberToWaitFor: 2) let waitingSpanExporter2 = WaitingSpanExporter(numberToWaitFor: 2) - tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(spanExporter: MultiSpanExporter(spanExporters: [waitingSpanExporter, waitingSpanExporter2]), scheduleDelay: maxScheduleDelay)) + tracerSdkFactory.addSpanProcessor(BatchSpanProcessor( + spanExporter: MultiSpanExporter(spanExporters: [waitingSpanExporter, waitingSpanExporter2]), + meterProvider: DefaultStableMeterProvider.instance, + scheduleDelay: maxScheduleDelay)) let span1 = createSampledEndedSpan(spanName: spanName1) let span2 = createSampledEndedSpan(spanName: spanName2) @@ -110,7 +131,13 @@ class BatchSpansProcessorTests: XCTestCase { let maxQueuedSpans = 8 let waitingSpanExporter = WaitingSpanExporter(numberToWaitFor: maxQueuedSpans) - tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(spanExporter: MultiSpanExporter(spanExporters: [waitingSpanExporter, blockingSpanExporter]), scheduleDelay: maxScheduleDelay, maxQueueSize: maxQueuedSpans, maxExportBatchSize: maxQueuedSpans / 2)) + tracerSdkFactory.addSpanProcessor(BatchSpanProcessor( + spanExporter: MultiSpanExporter(spanExporters: [waitingSpanExporter, blockingSpanExporter]), + meterProvider: DefaultStableMeterProvider.instance, + scheduleDelay: maxScheduleDelay, + maxQueueSize: maxQueuedSpans, + maxExportBatchSize: maxQueuedSpans / 2) + ) var spansToExport = [SpanData]() // Wait to block the worker thread in the BatchSampledSpansProcessor. This ensures that no items @@ -162,7 +189,11 @@ class BatchSpansProcessorTests: XCTestCase { func testExportNotSampledSpans() { let waitingSpanExporter = WaitingSpanExporter(numberToWaitFor: 1) - tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(spanExporter: waitingSpanExporter, scheduleDelay: maxScheduleDelay)) + tracerSdkFactory.addSpanProcessor(BatchSpanProcessor( + spanExporter: waitingSpanExporter, + meterProvider: DefaultStableMeterProvider.instance, + scheduleDelay: maxScheduleDelay) + ) createNotSampledEndedSpan(spanName: spanName1) createNotSampledEndedSpan(spanName: spanName2) @@ -181,7 +212,11 @@ class BatchSpansProcessorTests: XCTestCase { let waitingSpanExporter = WaitingSpanExporter(numberToWaitFor: 1) // Set the export delay to zero, for no timeout, in order to confirm the #flush() below works - tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(spanExporter: waitingSpanExporter, scheduleDelay: 0.1)) + tracerSdkFactory.addSpanProcessor(BatchSpanProcessor( + spanExporter: waitingSpanExporter, + meterProvider: DefaultStableMeterProvider.instance, + scheduleDelay: 0.1) + ) let span2 = createSampledEndedSpan(spanName: spanName2)