-
Notifications
You must be signed in to change notification settings - Fork 39
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Java interop doc update #556
base: main
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -45,28 +45,24 @@ To mitigate this limitation, the context must be shared manually. | |
## Before we start | ||
|
||
Since we need to manually modify the context we need direct access to `Local[F, Context]`. | ||
It can be constructed in the following way: | ||
The easiest way is to call `OtelJava#localContext`: | ||
|
||
```scala mdoc:silent | ||
import cats.effect._ | ||
import cats.mtl.Local | ||
import cats.syntax.functor._ | ||
import org.typelevel.otel4s.instances.local._ // brings Local derived from IOLocal | ||
import org.typelevel.otel4s.oteljava.context.Context | ||
import org.typelevel.otel4s.oteljava.OtelJava | ||
import io.opentelemetry.api.GlobalOpenTelemetry | ||
|
||
def createOtel4s[F[_]: Async](implicit L: Local[F, Context]): F[OtelJava[F]] = | ||
Async[F].delay(GlobalOpenTelemetry.get).map(OtelJava.local[F]) | ||
|
||
def program[F[_]: Async](otel4s: OtelJava[F])(implicit L: Local[F, Context]): F[Unit] = { | ||
val _ = (otel4s, L) // both OtelJava and Local[F, Context] are available here | ||
Async[F].unit | ||
def program[F[_]: Async](implicit L: Local[F, Context]): F[Unit] = { | ||
Local[F, Context].ask | ||
.flatMap(context => Async[F].unit) | ||
} | ||
|
||
val run: IO[Unit] = | ||
IOLocal(Context.root).flatMap { implicit ioLocal: IOLocal[Context] => | ||
createOtel4s[IO].flatMap(otel4s => program(otel4s)) | ||
def run: IO[Unit] = | ||
OtelJava.autoConfigured[IO]() { otel4s => | ||
implicit val local: Local[IO, Context] = otel4s.localContext | ||
program[IO] | ||
} | ||
``` | ||
|
||
|
@@ -144,7 +140,8 @@ When you invoke the `gen-random-name` endpoint, the spans will be structured in | |
## How to use otel4s context with Java SDK | ||
|
||
To interoperate with Java libraries that rely on the Java SDK context, you need to activate the context manually. | ||
The following utility method allows you to extract the current otel4s context and set it into the ThreadLocal variable: | ||
|
||
The following utility function will run a blocking call | ||
|
||
```scala mdoc:silent:reset | ||
import cats.effect.Sync | ||
|
@@ -153,13 +150,13 @@ import cats.syntax.flatMap._ | |
import org.typelevel.otel4s.oteljava.context.Context | ||
import io.opentelemetry.context.{Context => JContext} | ||
|
||
def useJContext[F[_]: Sync, A](use: JContext => A)(implicit L: Local[F, Context]): F[A] = | ||
Local[F, Context].ask.flatMap { ctx => // <1> | ||
Sync[F].delay { | ||
def blockingWithContext[F[_]: Sync, A](use: => A)(implicit L: Local[F, Context]): F[A] = | ||
Local[F, Context].ask.flatMap { ctx => // <1> | ||
Sync[F].blocking { | ||
val jContext: JContext = ctx.underlying // <2> | ||
val scope = jContext.makeCurrent() // <3> | ||
val scope = jContext.makeCurrent() // <3> | ||
try { | ||
use(jContext) | ||
use | ||
} finally { | ||
scope.close() | ||
} | ||
|
@@ -171,15 +168,14 @@ def useJContext[F[_]: Sync, A](use: JContext => A)(implicit L: Local[F, Context] | |
2) `ctx.underlying` - unwrap otel4s context and get `JContext` | ||
3) `jContext.makeCurrent()` - activate `JContext` within the current thread | ||
|
||
**Note:** we use `Sync[F].delay` to handle the side effects. | ||
Depending on your use case, you may prefer `Sync[F].interruptible` or `Sync[F].blocking`. | ||
Similarly you can write functions for `Sync[F].interruptible` or `Sync[F].delay`. | ||
|
||
Now we can run a slightly modified original 'problematic' example: | ||
```scala | ||
tracer.span("test").use { span => // start 'test' span using otel4s | ||
IO.println(s"Otel4s ctx: ${span.context}") >> useJContext[IO, Unit] { _ => | ||
IO.println(s"Otel4s ctx: ${span.context}") >> blockingWithContext { | ||
val jSpanContext = JSpan.current().getSpanContext // get a span from the ThreadLocal variable | ||
println(s"Java ctx: $jSpanContext") | ||
println(s"Java ctx: $jSpanContext") | ||
} | ||
} | ||
``` | ||
|
@@ -190,8 +186,30 @@ Java ctx: {traceId=06f5d9112efbe711947ebbded1287a30, spanId=26ed80c398cc039f, .. | |
Otel4s ctx: {traceId=06f5d9112efbe711947ebbded1287a30, spanId=26ed80c398cc039f, ...} | ||
``` | ||
|
||
As we can see, the tracing information is in sync now, | ||
and you can use Java-instrumented libraries within the `useJContext` block. | ||
As we can see, the tracing information is now available in ThreadLocal now too. | ||
Code instrumented using OpenTelemetry's Java API will work inside `blockingWithContext`. | ||
|
||
### Calling asynchronous code | ||
|
||
When interopting with asynchronous Java APIs: | ||
|
||
``` | ||
def asyncWithContext[F[_]: Async, A](k: (Either[Throwable, A] => Unit) => F[Option[F[Unit]]])(implicit L: Local[F, Context]): F[A] = | ||
Local[F, Context].ask.flatMap { ctx => // <1> | ||
Async[F].async[A] { cb => | ||
Async[F].delay { | ||
val jContext: JContext = ctx.underlying | ||
val _ = jContext.makeCurrent() | ||
}.flatMap(_ => k(cb)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is problematic because there's no guarantee that delay and the subsequent flatMap are executed together. Any ideas? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps we can use def tracedContext(ctx: JContext): ExecutionContext =
new ExecutionContext {
def execute(runnable: Runnable): Unit = {
val scope = ctx.makeCurrent()
try {
runnable.run()
} finally {
scope.close()
}
}
def reportFailure(cause: Throwable): Unit =
cause.printStackTrace()
}
def asyncWithContext[F[_]: Async, A](k: (Either[Throwable, A] => Unit) => F[Option[F[Unit]]])(implicit L: Local[F, Context]): F[A] =
Local[F, Context].ask.flatMap { ctx =>
Async[F].evalOn(
Async[F].async[A](cb => k(cb)),
tracedContext(ctx.underlying)
)
} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But implications on performance are unknown :( There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
def evalWithJContext[F[_]: Async, A](fa: F[A])(implicit L: Local[F, Context]): F[A] =
Local[F, Context].ask.flatMap { ctx =>
Async[F].evalOn(fa, tracedContext(ctx.underlying))
} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good idea :) Why do you think it'll affect performance (assuming There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. AFAIK context shifts may be costly. The current implementation of the Here is the logic for the If I get it right, @armanbilge perhaps you have some insides? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've added your suggestion for now. I'm thinking that a typical async API involves using another threadpool so context shift isn't avoidable in that case. (But yeah will be good if an additional context shift can be avoided) |
||
} | ||
} | ||
``` | ||
|
||
Note that we rely on the async java code to clean up the `Context` threadlocal, | ||
therefore the java async API you call must handle the OpenTelemetry `Context` threadlocal correctly. | ||
|
||
Java library instrumentations can be found [here](https://github.com/open-telemetry/opentelemetry-java-instrumentation), | ||
or you can manually "wrap" their underlying threadpool using `io.opentelemetry.context.Context.wrap`. | ||
|
||
## Pekko HTTP example | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done