Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Blocking Memory Store Usage in Streamed Mode #1144

Conversation

cescoffier
Copy link
Collaborator

Fix #1120

This commit addresses issues with using the blocking memory store in streamed responses.

  • Ensures the execution captures whether the caller is running on a worker thread.
  • Switches to worker threads for every emission and completion event when the caller is using a worker thread.
  • Relies on executeBlocking to propagate the context automatically when possible.

Note:

  • The blocking memory store cannot be used when invoked on the event loop. It now requires that the caller must be on a worker thread.

@cescoffier
Copy link
Collaborator Author

Draft - I would like to implement some tests.

Copy link

@florian-h05 florian-h05 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Many thanks - I have verified the fix to work!

@florian-h05
Copy link

BTW: I suspect using a RetrievalAugmentor for the AIService will cause the same issue as ChatMemory currently does.

@cescoffier
Copy link
Collaborator Author

@florian-h05 that's why I want to add some tests. I believe it's fine, as it uses a slightly weird manner to switch to another thread (loosing the context). I may adapt it at the same time.

@cescoffier cescoffier force-pushed the fix-blocking-memory-store-in-streamed-response branch from 43e09c6 to d68fd30 Compare December 10, 2024 17:56
@cescoffier cescoffier force-pushed the fix-blocking-memory-store-in-streamed-response branch from d68fd30 to 28451d3 Compare December 12, 2024 10:31
@cescoffier cescoffier marked this pull request as ready for review December 12, 2024 10:32
@cescoffier cescoffier requested a review from a team as a code owner December 12, 2024 10:32
/**
* Utility class for streaming tests.
*/
public class StreamTestUtils {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice!

This comment has been minimized.

@cescoffier
Copy link
Collaborator Author

@geoand .... I may have to fix the ordering issue actually, the completion signal got sent before the items.

@geoand
Copy link
Collaborator

geoand commented Dec 13, 2024

Darn.... Happened sooner than we expected :(

This commit addresses issues with using the blocking memory store in streamed responses.

* Ensures the execution captures whether the caller is running on a worker thread.
* Switches to worker threads for every emission and completion event when the caller is using a worker thread.
* Relies on executeBlocking to propagate the context automatically when possible.

Note:
* The blocking memory store cannot be used when invoked on the event loop. It now requires that the caller must be on a worker thread.
@cescoffier cescoffier force-pushed the fix-blocking-memory-store-in-streamed-response branch from 28451d3 to 0690a53 Compare December 13, 2024 07:18
Copy link

quarkus-bot bot commented Dec 13, 2024

Status for workflow Build (on pull request)

This is the status report for running Build (on pull request) on commit 0690a53.

✅ The latest workflow run for the pull request has completed successfully.

It should be safe to merge provided you have a look at the other checks in the summary.

@geoand geoand merged commit 742b816 into quarkiverse:main Dec 13, 2024
64 checks passed
@cescoffier cescoffier deleted the fix-blocking-memory-store-in-streamed-response branch December 15, 2024 16:01
@florian-h05
Copy link

@cescoffier The same issue occurs when using the retrieval augmentor:

org.jboss.resteasy.reactive.common.core.BlockingNotAllowedException
        at org.jboss.resteasy.reactive.client.impl.InvocationBuilderImpl.unwrap(InvocationBuilderImpl.java:199)
        at org.jboss.resteasy.reactive.client.impl.InvocationBuilderImpl.method(InvocationBuilderImpl.java:328)
        at io.quarkiverse.langchain4j.openai.common.OpenAiRestApi$$QuarkusRestClientInterface.blockingEmbedding(Unknown Source)
        at io.quarkiverse.langchain4j.openai.common.QuarkusOpenAiClient$5.execute(QuarkusOpenAiClient.java:339)
        at io.quarkiverse.langchain4j.openai.common.QuarkusOpenAiClient$5.execute(QuarkusOpenAiClient.java:336)
        at io.quarkiverse.langchain4j.azure.openai.AzureOpenAiEmbeddingModel.lambda$embedTexts$0(AzureOpenAiEmbeddingModel.java:119)
        at dev.langchain4j.internal.RetryUtils$RetryPolicy.withRetry(RetryUtils.java:192)
        at dev.langchain4j.internal.RetryUtils.withRetry(RetryUtils.java:229)
        at io.quarkiverse.langchain4j.azure.openai.AzureOpenAiEmbeddingModel.embedTexts(AzureOpenAiEmbeddingModel.java:119)
        at io.quarkiverse.langchain4j.azure.openai.AzureOpenAiEmbeddingModel.embedAll(AzureOpenAiEmbeddingModel.java:103)
        at dev.langchain4j.model.embedding.EmbeddingModel.embed(EmbeddingModel.java:34)
        at dev.langchain4j.model.embedding.EmbeddingModel.embed(EmbeddingModel.java:24)

@cescoffier
Copy link
Collaborator Author

cescoffier commented Jan 8, 2025 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Non-reactive chat memory crashes response streaming
4 participants