Skip to content

Commit

Permalink
opentelemetry: update to async
Browse files Browse the repository at this point in the history
  • Loading branch information
naisanzaa committed Apr 3, 2024
1 parent 0a8a8c9 commit de3a352
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 20 deletions.
47 changes: 36 additions & 11 deletions automon/integrations/openTelemetryWrapper/client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import json

from opentelemetry.util import types
from opentelemetry.trace import Status, StatusCode

from automon.log import logging

from .config import OpenTelemetryConfig
Expand All @@ -14,28 +16,47 @@ class OpenTelemetryClient(object):
def __init__(self):
self.config = OpenTelemetryConfig()

def add_event(self, name: str, attributes: types.Attributes = None, **kwargs):
return self.config.current_span.add_event(name=name, attributes=attributes, **kwargs)
async def add_event(self, name: str, attributes: types.Attributes = None, **kwargs):
logger.debug(dict(name=name, attributes=attributes, kwargs=kwargs))
span = await self.current_span()
return span.add_event(name=name, attributes=attributes, **kwargs)

async def clear(self):
logger.debug('clear')
return await self.config.clear()

async def current_span(self):
logger.debug('current_span')
return await self.config.current_span()

async def is_ready(self):
if await self.config.is_ready():
return True

async def get_finished_spans(self):
logger.debug('get_finished_spans')
return await self.config.get_finished_spans()

async def pop_finished_spans(self):
logger.debug('pop_finished_spans')
return await self.config.pop_finished_spans()

def record_exception(self, exception: Exception):
return self.config.current_span.record_exception(exception=exception)

def start_as_current_span(self, name: str, attributes: types.Attributes = None, **kwargs):
async def record_exception(self, exception: Exception):
logger.error(f'{exception}')
span = await self.current_span()
span.set_status(Status(StatusCode.ERROR))
return span.record_exception(exception=exception)

async def start_as_current_span(
self, name: str,
attributes: types.Attributes = None,
**kwargs
):
logger.debug(dict(name=name, attributes=attributes, kwargs=kwargs))
return self.config.tracer.start_as_current_span(
name=name, attributes=attributes, **kwargs)
name=name,
attributes=attributes,
**kwargs)

async def start_consumer(self):
"""adds spans from memory to queue"""
Expand All @@ -50,10 +71,14 @@ async def start_producer(self):
return

async def test(self):
with self.start_as_current_span(name='rootSpan') as trace_root:
with self.start_as_current_span(name='childSpan') as trace_child:
self.add_event('AAAAAAAA')
self.add_event('BBBBBBBB')
with await self.start_as_current_span(name='rootSpan') as trace_root:
await self.add_event('AAAAAAAA')

with await self.start_as_current_span(name='childSpan') as trace_child:
await self.add_event('AAAAAAAA')
await self.add_event('BBBBBBBB')

await self.add_event('BBBBBBBB')

return True

Expand Down
15 changes: 8 additions & 7 deletions automon/integrations/openTelemetryWrapper/config.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import asyncio
import opentelemetry

from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
Expand All @@ -19,25 +18,27 @@ def __init__(self):
self.processor = SimpleSpanProcessor(self.memory_processor)
self.provider.add_span_processor(self.processor)

trace.set_tracer_provider(self.provider)
opentelemetry.trace.set_tracer_provider(self.provider)

self.tracer = trace.get_tracer(__name__)
self.tracer = opentelemetry.trace.get_tracer(__name__)

self.queue_consumer = asyncio.Queue()
self.queue_producer = asyncio.Queue()

async def clear(self):
logger.debug('clear')
return self.memory_processor.clear()

@property
def current_span(self):
return trace.get_current_span()
async def current_span(self):
logger.debug('get_current_span')
return opentelemetry.trace.get_current_span()

async def is_ready(self):
if self.provider and self.memory_processor and self.processor:
return True

async def get_finished_spans(self):
logger.debug('get_finished_spans')
return self.memory_processor.get_finished_spans()

async def pop_finished_spans(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@ class MyTestCase(unittest.TestCase):

def test_something(self):
self.assertTrue(asyncio.run(
self.client.test())
)
self.client.test()))

spans = asyncio.run(
self.client.get_finished_spans())

pass


if __name__ == '__main__':
Expand Down

0 comments on commit de3a352

Please sign in to comment.