Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Java interop doc update #556

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 42 additions & 24 deletions docs/instrumentation/tracing-java-interop.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,28 +45,24 @@ To mitigate this limitation, the context must be shared manually.
## Before we start

Since we need to manually modify the context we need direct access to `Local[F, Context]`.
It can be constructed in the following way:
The easiest way is to call `OtelJava#localContext`:

```scala mdoc:silent
import cats.effect._
import cats.mtl.Local
import cats.syntax.functor._
import org.typelevel.otel4s.instances.local._ // brings Local derived from IOLocal
import org.typelevel.otel4s.oteljava.context.Context
import org.typelevel.otel4s.oteljava.OtelJava
import io.opentelemetry.api.GlobalOpenTelemetry

def createOtel4s[F[_]: Async](implicit L: Local[F, Context]): F[OtelJava[F]] =
Async[F].delay(GlobalOpenTelemetry.get).map(OtelJava.local[F])

def program[F[_]: Async](otel4s: OtelJava[F])(implicit L: Local[F, Context]): F[Unit] = {
val _ = (otel4s, L) // both OtelJava and Local[F, Context] are available here
Async[F].unit
def program[F[_]: Async](implicit L: Local[F, Context]): F[Unit] = {
Local[F, Context].ask
.flatMap(context => Async[F].unit)
}

val run: IO[Unit] =
IOLocal(Context.root).flatMap { implicit ioLocal: IOLocal[Context] =>
createOtel4s[IO].flatMap(otel4s => program(otel4s))
def run: IO[Unit] =
OtelJava.autoConfigured[IO]() { otel4s =>
implicit val local: Local[IO, Context] = otel4s.localContext
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
implicit val local: Local[IO, Context] = otel4s.localContext
import otel4s.localContext

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

program[IO]
}
```

Expand Down Expand Up @@ -144,7 +140,8 @@ When you invoke the `gen-random-name` endpoint, the spans will be structured in
## How to use otel4s context with Java SDK

To interoperate with Java libraries that rely on the Java SDK context, you need to activate the context manually.
The following utility method allows you to extract the current otel4s context and set it into the ThreadLocal variable:

The following utility function will run a blocking call

```scala mdoc:silent:reset
import cats.effect.Sync
Expand All @@ -153,13 +150,13 @@ import cats.syntax.flatMap._
import org.typelevel.otel4s.oteljava.context.Context
import io.opentelemetry.context.{Context => JContext}

def useJContext[F[_]: Sync, A](use: JContext => A)(implicit L: Local[F, Context]): F[A] =
Local[F, Context].ask.flatMap { ctx => // <1>
Sync[F].delay {
def blockingWithContext[F[_]: Sync, A](use: => A)(implicit L: Local[F, Context]): F[A] =
Local[F, Context].ask.flatMap { ctx => // <1>
Sync[F].blocking {
val jContext: JContext = ctx.underlying // <2>
val scope = jContext.makeCurrent() // <3>
val scope = jContext.makeCurrent() // <3>
try {
use(jContext)
use
} finally {
scope.close()
}
Expand All @@ -171,15 +168,14 @@ def useJContext[F[_]: Sync, A](use: JContext => A)(implicit L: Local[F, Context]
2) `ctx.underlying` - unwrap otel4s context and get `JContext`
3) `jContext.makeCurrent()` - activate `JContext` within the current thread

**Note:** we use `Sync[F].delay` to handle the side effects.
Depending on your use case, you may prefer `Sync[F].interruptible` or `Sync[F].blocking`.
Similarly you can write functions for `Sync[F].interruptible` or `Sync[F].delay`.

Now we can run a slightly modified original 'problematic' example:
```scala
tracer.span("test").use { span => // start 'test' span using otel4s
IO.println(s"Otel4s ctx: ${span.context}") >> useJContext[IO, Unit] { _ =>
IO.println(s"Otel4s ctx: ${span.context}") >> blockingWithContext {
val jSpanContext = JSpan.current().getSpanContext // get a span from the ThreadLocal variable
println(s"Java ctx: $jSpanContext")
println(s"Java ctx: $jSpanContext")
}
}
```
Expand All @@ -190,8 +186,30 @@ Java ctx: {traceId=06f5d9112efbe711947ebbded1287a30, spanId=26ed80c398cc039f, ..
Otel4s ctx: {traceId=06f5d9112efbe711947ebbded1287a30, spanId=26ed80c398cc039f, ...}
```

As we can see, the tracing information is in sync now,
and you can use Java-instrumented libraries within the `useJContext` block.
As we can see, the tracing information is now available in ThreadLocal now too.
Code instrumented using OpenTelemetry's Java API will work inside `blockingWithContext`.

### Calling asynchronous code

When interopting with asynchronous Java APIs:

```
def asyncWithContext[F[_]: Async, A](k: (Either[Throwable, A] => Unit) => F[Option[F[Unit]]])(implicit L: Local[F, Context]): F[A] =
Local[F, Context].ask.flatMap { ctx => // <1>
Async[F].async[A] { cb =>
Async[F].delay {
val jContext: JContext = ctx.underlying
val _ = jContext.makeCurrent()
}.flatMap(_ => k(cb))
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is problematic because there's no guarantee that delay and the subsequent flatMap are executed together. Any ideas?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we can use evalOn with a custom ExecutonContext:

def tracedContext(ctx: JContext): ExecutionContext =
  new ExecutionContext {
    def execute(runnable: Runnable): Unit = {
      val scope = ctx.makeCurrent()
      try {
        runnable.run()
      } finally {
        scope.close()
      }
    }
    def reportFailure(cause: Throwable): Unit =
      cause.printStackTrace()
  }
  
def asyncWithContext[F[_]: Async, A](k: (Either[Throwable, A] => Unit) => F[Option[F[Unit]]])(implicit L: Local[F, Context]): F[A] =
  Local[F, Context].ask.flatMap { ctx =>
    Async[F].evalOn(
      Async[F].async[A](cb => k(cb)), 
      tracedContext(ctx.underlying)
    )
  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But implications on performance are unknown :(

Copy link
Contributor

@iRevive iRevive Mar 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

evalOn is applicable for all scenarios, actually. But again, it may (and highly likely will) affect the performance.

def evalWithJContext[F[_]: Async, A](fa: F[A])(implicit L: Local[F, Context]): F[A] = 
  Local[F, Context].ask.flatMap { ctx =>
    Async[F].evalOn(fa, tracedContext(ctx.underlying))
  }

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea :) Why do you think it'll affect performance (assuming fa pretty much immediately calls the async API instead of doing a bunch of CPU work)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK context shifts may be costly.

The current implementation of the tracedContext is basically a same-thread execution.

Here is the logic for the evalOn in the CE: https://github.com/typelevel/cats-effect/blob/3cf97ae4fd643f333febb4554af2fb603ed2c9d2/core/shared/src/main/scala/cats/effect/IOFiber.scala#L976-L988.

If I get it right, fa will be executed right within the fiber's loop.

@armanbilge perhaps you have some insides?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added your suggestion for now. I'm thinking that a typical async API involves using another threadpool so context shift isn't avoidable in that case. (But yeah will be good if an additional context shift can be avoided)

}
}
```

Note that we rely on the async java code to clean up the `Context` threadlocal,
therefore the java async API you call must handle the OpenTelemetry `Context` threadlocal correctly.

Java library instrumentations can be found [here](https://github.com/open-telemetry/opentelemetry-java-instrumentation),
or you can manually "wrap" their underlying threadpool using `io.opentelemetry.context.Context.wrap`.

## Pekko HTTP example

Expand Down
1 change: 1 addition & 0 deletions examples/src/main/scala/PekkoHttpExample.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ import scala.concurrent.duration._
*/
object PekkoHttpExample extends IOApp.Simple {


def run: IO[Unit] =
IOLocal(Context.root).flatMap { implicit ioLocal: IOLocal[Context] =>
implicit val local: Local[IO, Context] = localForIOLocal
Expand Down
Loading