Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add scala future test case using multiple threads #1274

Closed
wants to merge 1 commit into from

Conversation

henrikno
Copy link

@henrikno henrikno commented Jul 6, 2020

What does this PR do?

I wanted to give the scala futures implementation a spin but, hit a snag when using it with multiple threads.
It seems like the transaction is not activated/deactivated at the right time in some cases.
The existing test cases only use a single transaction so it wouldn't catch it.

Sometimes it fails because current tracer != the tracer passed as parameter.
Also sometimes num is different some the transaction name.
Example output:
thread=35, trace='Transaction7' 00-852e1e7ca03813413465fc7c21b4ab5e-42a231f21779d2aa-01 (4cd1e659) start transaction num=20
num should be the same as the number in the transaction name. It looks like the transaction from one future is still active when starting a different unrelated future.

The test sometimes passes. The Thread.sleep is not necessary for it to fail but makes it happen more often.

Not intended to be merged as-is, just to show the issue.

Checklist

  • This is an enhancement of existing features, or a new feature in existing plugins
    • I have updated CHANGELOG.asciidoc
    • I have added tests that prove my fix is effective or that my feature works
    • Added an API method or config option? Document in which version this will be introduced
    • I have made corresponding changes to the documentation
  • This is a bugfix
  • This is a new plugin
    • I have updated CHANGELOG.asciidoc
    • My code follows the style guidelines of this project
    • I have made corresponding changes to the documentation
    • I have added tests that prove my fix is effective or that my feature works
    • New and existing unit tests pass locally with my changes
    • I have updated CHANGELOG.asciidoc
    • I have updated supported-technologies.asciidoc
    • Added an API method or config option? Document in which version this will be introduced
    • Added an instrumentation plugin? Describe how you made sure that old, non-supported versions are not instrumented by accident.
  • This is something else

Sometimes fails because current tracer != the tracer passed as parameter. Also
sometimes num is different some the transaction name.
@apmmachine
Copy link
Contributor

apmmachine commented Jul 6, 2020

💔 Build Failed

the below badges are clickable and redirect to their specific view in the CI or DOCS
Pipeline View Test View Changes Artifacts preview preview

Expand to view the summary

Build stats

  • Start Time: 2023-07-03T11:39:28.352+0000

  • Duration: 17 min 48 sec

Steps errors 6

Expand to view the steps failures

Load a resource file from a library
  • Took 0 min 0 sec . View more details here
  • Description: approval-list/elastic/apm-agent-java.yml
mvn install
  • Took 10 min 2 sec . View more details here
  • Description: ./mvnw clean install -DskipTests=true -Dmaven.javadoc.skip=true
mvn install
  • Took 0 min 27 sec . View more details here
  • Description: ./mvnw clean install -DskipTests=true -Dmaven.javadoc.skip=true
mvn install
  • Took 0 min 27 sec . View more details here
  • Description: ./mvnw clean install -DskipTests=true -Dmaven.javadoc.skip=true
mvn install
  • Took 0 min 28 sec . View more details here
  • Description: ./mvnw clean install -DskipTests=true -Dmaven.javadoc.skip=true
mvn install
  • Took 0 min 27 sec . View more details here
  • Description: ./mvnw clean install -DskipTests=true -Dmaven.javadoc.skip=true

❕ Flaky test report

No test was executed to be analysed.

🤖 GitHub comments

Expand to view the GitHub comments

To re-run your PR in the CI, just comment with:

  • /test : Re-trigger the build.

  • run benchmark tests : Run the benchmark tests.

  • run jdk compatibility tests : Run the JDK Compatibility tests.

  • run integration tests : Run the Agent Integration tests.

  • run end-to-end tests : Run the APM-ITs.

  • run windows tests : Build & tests on windows.

  • run elasticsearch-ci/docs : Re-trigger the docs validation. (use unformatted text in the comment!)

@milanvdm
Copy link
Contributor

milanvdm commented Jul 7, 2020

I will have a look later today :) Looks interesting!

You are indeed correct the current scenario is only tested with 1 transaction. Although I've tested this with a dummy-application as well that has multiple transactions going on at the same time: https://github.com/milanvdm/scala-elastic-apm/tree/master/src/main/scala/me/milan/main/future

def startFuture(num: Int): Future[(Transaction, Int)] = {
Future {
println(s"thread=${Thread.currentThread().getId}, trace=${tracer.currentTransaction()} before num=$num")
val transaction = tracer.startRootTransaction(getClass.getClassLoader).withName("Transaction" + num).activate()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This transaction is never deactivated on the thread and can therefore leak into other operations.

A more sensible test scenario would be to start a transaction before doing the multi-threaded operations and check if the transaction successfully propagates to the futures executed in different threads. I've tried this out and it even works when commenting out the instrumentations in apm-agent-plugins/apm-scala-concurrent-plugin/src/main/resources/META-INF/services/co.elastic.apm.agent.bci.ElasticApmInstrumentation. That's because Scala relies on the Java concurrent framework that we already instrument.

The instrumentations in this module are only needed when the context gets lost along the way which seems to be the case for some AsyncHttpClient callbacks.

val transaction = tracer.startRootTransaction(getClass.getClassLoader).withName("Transaction").activate()
val futures = (1 to 10).map(x => Future {
  Thread.sleep(10)
  println(s"thread=${Thread.currentThread().getId}, trace=${tracer.currentTransaction()}")
  val x = transaction == tracer.currentTransaction()
  assertEquals(transaction, tracer.currentTransaction())
  (transaction, x)
})
val future = Future.sequence(futures)
val result = Await.result(future, 10.seconds).toList
transaction.deactivate().end()
assertEquals(result.forall(x => x._2), true)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This transaction is never deactivated on the thread and can therefore leak into other operations.
But if I deactivate it, it won't propagate... right?

How do I "activate" the transaction inside the future, so that it's passed on to the next .map() operation, but while deactivating it so that it's not passed on to unrelated operation that happens on the same thread?

Looks like your example is using a single transaction. What I'm trying to simulate is a "http server" scenario, which has multiple transactions corresponding to http requests concurrently. If I take your example and make it run that 10 times in different threads I get the same issue.
https://gist.github.com/henrikno/71e54b7c2d43e63936fd33fdd45411bb
Interestingly it happens less often with ForkJoinPool than with FixedThreadPool.

If the test is wrong, could you help explain how it should be used with multiple "requests"/threads handing different transactions concurrently (each of which will typically spawn nested futures to do the actual work)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test in the linked gist looks good. I had to increase the number of threads though. Seems like it does expose some edge cases where the context isn't properly transferred.

To make investigation easier, we'd ideally want a test that's a bit simpler but that still fails.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ill spend some time on this tomorrow 🙂

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@felixbarny I am not sure how the elastic-agent handles the following case for JavaFutures:

  1. Create multiple transactions (could be on different or the same threads) - https://gist.github.com/henrikno/71e54b7c2d43e63936fd33fdd45411bb#file-test-scala-L41
  2. After you have multiple transactions active, create a new Future - https://gist.github.com/henrikno/71e54b7c2d43e63936fd33fdd45411bb#file-test-scala-L43

In this case, the problem seems to be, how does the Future from (2) know which transaction to use?

That's how I currently understand the issue show-cased on the Gist.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There can only be one transaction active on a thread at a time. I think that's maintained as each Future is either executed in its own thread concurrently or steals time from the main thread and executes synchronously.

how does the Future from (2) know which transaction to use?

The map schedules the Future for execution (by calling eventually Executor#execute). Whichever transaction is active on the current thread when the Future is scheduled is going to be restored when the Future is executed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@felixbarny That map is not actually being called on any Future. Therefore, it will actually create a new Future and use whatever Transaction is active at that point.

I think that is the exact problem here. Transactions are started in parallel but are not followed up on correctly to pass the correct context.

So somehow, we need to have Transactions created inside of a Future, actually be linked to that Future and continue the logic with map, flatMap on that Future.

I'm not sure if the above not already happens though, say: Future(startTransaction()).flatMap(logic) maybe always runs on the same thread and use the same transaction. But Im not sure about that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my tests, I have commented out the scala-specific Future instrumentation. The tests do fail more often when doing that but when using a ForkJoinPool, it works a bit more reliably.

Calling map ends up calling Executor#execute and executing a Future ends up calling Runnable#run. These methods are instrumented by the java-concurrent plugin which captures and restores the context. Something must be going wrong in some cases there.

So somehow, we need to have Transactions created inside of a Future, actually be linked to that Future and continue the logic with map, flatMap on that Future.

When transactions are activated, they are attached to the current thread. If on that thread, a future gets scheduled for execution via map, the transaction gets associated with that future and will be restored when the future is executed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@felixbarny So since the Future that creates the transaction will be the same Runnable that executes the underlying Futures using that transaction, the context should be passed along correctly?

Thanks for the information as always :)

@milanvdm
Copy link
Contributor

@felixbarny I've spent some time investigating this issue again :)

Kamon's implementation has some nice descriptions on how and why they instrument certain methods to make the Future context propagation work correctly.

Seems like currently the part of https://github.com/kamon-io/Kamon/blob/master/instrumentation/kamon-scala-future/src/main/scala-2.13/kamon/instrumentation/futures/scala/FutureChainingInstrumentation.scala#L39-L49 is implemented in the elastic-agent but the other cases are missing. Might be the cause of certain cases not working atm.

I tried implementing these parts but Im stuck on https://github.com/kamon-io/Kamon/blob/master/instrumentation/kamon-scala-future/src/main/scala-2.13/kamon/instrumentation/futures/scala/FutureChainingInstrumentation.scala#L56 where it cleans a context on a certain match. I cannot figure out how that should be done in the elastic-agent.

@felixbarny
Copy link
Member

That's going to be tricky as the Elastic agent allows to be attached at runtime. At that point, the type initializer of Future has most likely already been executed. Also, the Kamon agent adds interfaces/mixins such as HasContext which also can't be done using runtime attachment.

Maybe an alternative could be to avoid context propagation by checking if the future is (as in reference equality (==)) scala.concurrent.Future.unit.

@milanvdm
Copy link
Contributor

milanvdm commented Jun 29, 2021

@felixbarny Been debugging this a bit more again :)

I have some strange behavior that I have a question around:

The following code is instrumenting a specific method:

@Nullable
        @Advice.OnMethodEnter(suppress = Throwable.class, inline = false)
        public static Object onEnter(@Advice.This Object thiz) {
            logger.warn(promisesToContext.toString());
            AbstractSpan<?> context = promisesToContext.remove(thiz);
            if (context != null) {
                logger.warn("==============");
                logger.warn("ACTIVATE Run on " + Thread.currentThread().getId() + " - " + context);
                context.activate();
                logger.warn(tracer.currentTransaction().toString());
                logger.warn("==============");
                // decrements the reference we incremented to avoid that the parent context gets recycled before the promise is run
                // because we have activated it, we can be sure it doesn't get recycled until we deactivate in the OnMethodExit advice
                context.decrementReferences();
            }
            return context;

But in the logs I get the following:

2021-06-29 22:38:13,721 [ForkJoinPool-1-worker-13] WARN  co.elastic.apm.agent.cache.WeakKeySoftValueLoadingCache - ==============
2021-06-29 22:38:13,721 [ForkJoinPool-1-worker-13] WARN  co.elastic.apm.agent.cache.WeakKeySoftValueLoadingCache - ACTIVATE Run on 30 - '10' 00-97d38655e32835994ec9d4277289de50-0e7cd552f87e9631-01 (7c2f827d)
2021-06-29 22:38:13,721 [ForkJoinPool-1-worker-13] WARN  co.elastic.apm.agent.cache.WeakKeySoftValueLoadingCache - '6' 00-ae27a70994d59099ff6d696c1b290a03-6bb34461a653557e-01 (fef99bd)
2021-06-29 22:38:13,721 [ForkJoinPool-1-worker-13] WARN  co.elastic.apm.agent.cache.WeakKeySoftValueLoadingCache - ==============
thread=30 transactionNumber=10, trace='6' 00-ae27a70994d59099ff6d696c1b290a03-6bb34461a653557e-01 (fef99bd) before futureNumber=1, 6

So even after calling context.activate();, the same thread will return a different context when calling tracer.currentTransaction().toString(). I am probably missing some understanding on how the context.activate() works?

@felixbarny
Copy link
Member

You'll need to call tracer.getActive(), which returns a AbstractSpan<?> (either a Span or a Transaction). The method tracer.currentTransaction() returns the transaction. Transactions are the entry point into the service, whereas spans are operations within that.

@milanvdm
Copy link
Contributor

milanvdm commented Jun 30, 2021

@felixbarny Im not sure how the getActive and the currentTransaction can give a different result though.

In the following test:

val fs = (1 to 10).map(transactionNumber => Future {
      Thread.sleep(10)

      val transaction = tracer.startRootTransaction(getClass.getClassLoader).withName(transactionNumber.toString).activate()

      println(s"thread=${Thread.currentThread().getId} transaction=$transactionNumber, span=${tracer.getActive}, trace=${tracer.currentTransaction()} starting transaction")

      val futures = (1 to 1)
        .map(futureNumber => Future {
          Thread.sleep(10)

          val currentTransactionNumber = tracer.currentTransaction().getNameAsString

          println(s"thread=${Thread.currentThread().getId} transactionNumber=$transactionNumber, span=${tracer.getActive}. trace=${tracer.currentTransaction()} before futureNumber=$futureNumber, $currentTransactionNumber")

          assertEquals(transaction, tracer.currentTransaction())
          assertEquals(currentTransactionNumber.toInt, transactionNumber)

          (transaction, futureNumber)
        }

Since there are only rootTransactions started, shouldnt both methods always return the same transaction (as no spans are ever created as a child of the root-transaction)?

The test is failing due to this. It is activating the correct context at the right times. So getActive will correctly propagate the contexts between the Futures. But the currentTransaction method doesn't return the correct context for some reason.

@felixbarny
Copy link
Member

Seems you are activating multiple transactions that belong to different traces on the same thread. That's an illegal state but it seems we currently don't guard against that. What happens when calling currentTransaction() is that the bottom-most span's transaction is returned.

public Transaction currentTransaction() {
final AbstractSpan<?> bottomOfStack = activeStack.get().peekLast();
return bottomOfStack != null ? bottomOfStack.getTransaction() : null;
}

That means if you activate multiple transactions, the first activated one will be returned.
I believe the specific issue in your example is that you don't deactivate the transaction after the inner map operation.

@milanvdm
Copy link
Contributor

milanvdm commented Jul 11, 2021

@felixbarny It indeed looks like the spans are correctly passed across threads and being linked to the correct Future. Meaning that tracer.getActive() will give the expected results.

As you mention, the difference is with the tracer.currentTransaction. In the current implementation, tracer.currentTransaction will always return the transaction that was created on the thread that the currentTransaction is run on.

So here I am a bit lost. I've got a solution that correctly activates and deactivates spans across different threads correctly for getActive. So I dont see how (and why) tracer.getActive and tracer.currentTransaction return different results.
How can I deactivate and activate transactions for currentTransaction in the same way I do for getActive?

@eyalkoren
Copy link
Contributor

eyalkoren commented Jul 13, 2021

I didn't read through everything, but I hope I can help:
We maintain a stack of transactions/spans for each thread. getActive returns the topmost span/transaction. currentTransaction returns the bottommost transaction or the parent transaction if the bottom is a span.
A span/transaction is pushed to the stack when you activate it and popped when you deactivate it. This has nothing to do with where the transactions/spans are created or ended.

If you run multiple futures on a thread pool, you must make sure any activated transaction is also deactivated before the thread is returned to the pool to run the next task. Otherwise, the next time you activate a transaction on that thread (either explicitly or through the context propagation mechanism), you would have two activated transactions, which is an illegal state and in which case getActive() != currentTransaction().

@SylvainJuge SylvainJuge added the size:small Small (S) tasks label Jan 31, 2022
@eyalkoren
Copy link
Contributor

@henrikno a lot has changed since this was opened. Is this still relevant? Would you want to follow up on this, or should we close it for now?

@botelastic
Copy link

botelastic bot commented Sep 1, 2023

Hi! We just realized that we haven't looked into this issue in a while. We're sorry! We're labeling this issue as stalled to make it hit our filters and make sure we get back to it in as soon as possible. In the meantime, it'd be extremely helpful if you could take a look at it as well and confirm its relevance. A simple comment with a nice emoji will be enough :+1. Thank you for your contribution!

@botelastic botelastic bot added the stalled label Sep 1, 2023
@botelastic
Copy link

botelastic bot commented Sep 15, 2023

Hi! This issue has been stale for a while and we're going to close it as part of our cleanup procedure. We appreciate your contribution and would like to apologize if we have not been able to review it, due to the current heavy load of the team. Feel free to re-open this issue if you think it should stay open. Thank you for your contribution!

@botelastic botelastic bot closed this Sep 15, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants