From 3fdd926d39e674055a6ba489c8ce1588ea4df57c Mon Sep 17 00:00:00 2001 From: Stephen Hodgson Date: Thu, 20 Jun 2024 21:36:52 -0400 Subject: [PATCH] OpenAI-DotNet 8.1.0 (#334) - Fixed streaming event race conditions where the subscriber to the stream would finish before steam events were executed - Refactored streaming events callbacks from `Action` to `Func` - Added `Exception` data to `OpenAI.Error` response - Added `ChatEndpoint.StreamCompletionAsync` with `Func` overload --- OpenAI-DotNet-Tests/TestFixture_03_Threads.cs | 84 +++++++++++++++++-- OpenAI-DotNet-Tests/TestFixture_04_Chat.cs | 10 ++- .../TestServices/WeatherService.cs | 7 ++ .../Assistants/AssistantExtensions.cs | 12 ++- OpenAI-DotNet/Chat/ChatEndpoint.cs | 31 +++++-- OpenAI-DotNet/Chat/ChatResponse.cs | 9 +- OpenAI-DotNet/Common/Error.cs | 13 +++ OpenAI-DotNet/Common/Function.cs | 3 + .../Extensions/BaseEndpointExtensions.cs | 15 ++-- OpenAI-DotNet/OpenAI-DotNet.csproj | 7 +- OpenAI-DotNet/Threads/CodeInterpreter.cs | 2 + .../Threads/CodeInterpreterOutputs.cs | 2 + OpenAI-DotNet/Threads/FunctionCall.cs | 2 + OpenAI-DotNet/Threads/MessageResponse.cs | 2 +- OpenAI-DotNet/Threads/RunResponse.cs | 2 +- OpenAI-DotNet/Threads/RunStepResponse.cs | 2 +- OpenAI-DotNet/Threads/ThreadExtensions.cs | 48 +++++++++-- OpenAI-DotNet/Threads/ThreadsEndpoint.cs | 62 ++++++++++---- OpenAI-DotNet/Threads/ToolCall.cs | 3 + README.md | 66 ++++++++------- 20 files changed, 295 insertions(+), 87 deletions(-) diff --git a/OpenAI-DotNet-Tests/TestFixture_03_Threads.cs b/OpenAI-DotNet-Tests/TestFixture_03_Threads.cs index 3b74f3f2..f1eedc0d 100644 --- a/OpenAI-DotNet-Tests/TestFixture_03_Threads.cs +++ b/OpenAI-DotNet-Tests/TestFixture_03_Threads.cs @@ -257,7 +257,7 @@ public async Task Test_03_03_01_CreateRun_Streaming() var message = await thread.CreateMessageAsync("I need to solve the equation `3x + 11 = 14`. Can you help me?"); Assert.NotNull(message); - var run = await thread.CreateRunAsync(assistant, streamEvent => + var run = await thread.CreateRunAsync(assistant, async streamEvent => { Console.WriteLine(streamEvent.ToJsonString()); @@ -300,11 +300,9 @@ public async Task Test_03_03_01_CreateRun_Streaming() case Error errorEvent: Assert.NotNull(errorEvent); break; - //default: - // handle event not already processed by library - // var @event = JsonSerializer.Deserialize(streamEvent.ToJsonString()); - //break; } + + await Task.CompletedTask; }); Assert.IsNotNull(run); @@ -343,7 +341,7 @@ public async Task Test_03_03_02_CreateRun_Streaming_ToolCalls() try { - async void StreamEventHandler(IServerSentEvent streamEvent) + async Task StreamEventHandler(IServerSentEvent streamEvent) { try { @@ -469,9 +467,10 @@ public async Task Test_04_02_CreateThreadAndRun_Streaming() try { var run = await assistant.CreateThreadAndRunAsync("I need to solve the equation `3x + 11 = 14`. Can you help me?", - @event => + async @event => { Console.WriteLine(@event.ToJsonString()); + await Task.CompletedTask; }); Assert.IsNotNull(run); thread = await run.GetThreadAsync(); @@ -500,7 +499,76 @@ public async Task Test_04_02_CreateThreadAndRun_Streaming() } [Test] - public async Task Test_04_03_CreateThreadAndRun_SubmitToolOutput() + public async Task Test_04_03_CreateThreadAndRun_Streaming_ToolCalls() + { + Assert.NotNull(OpenAIClient.ThreadsEndpoint); + + var tools = new List + { + Tool.GetOrCreateTool(typeof(DateTimeUtility), nameof(DateTimeUtility.GetDateTime)) + }; + var assistantRequest = new CreateAssistantRequest( + instructions: "You are a helpful assistant.", + tools: tools); + var assistant = await OpenAIClient.AssistantsEndpoint.CreateAssistantAsync(assistantRequest); + Assert.IsNotNull(assistant); + ThreadResponse thread = null; + // check if any exceptions thrown in stream event handler + var exceptionThrown = false; + + try + { + async Task StreamEventHandler(IServerSentEvent streamEvent) + { + Console.WriteLine($"{streamEvent.ToJsonString()}"); + + try + { + switch (streamEvent) + { + case ThreadResponse threadResponse: + thread = threadResponse; + break; + case RunResponse runResponse: + if (runResponse.Status == RunStatus.RequiresAction) + { + var toolOutputs = await assistant.GetToolOutputsAsync(runResponse); + var toolRun = await runResponse.SubmitToolOutputsAsync(toolOutputs, StreamEventHandler); + Assert.NotNull(toolRun); + Assert.IsTrue(toolRun.Status == RunStatus.Completed); + } + break; + case Error errorResponse: + throw errorResponse.Exception ?? new Exception(errorResponse.Message); + } + } + catch (Exception e) + { + Console.WriteLine(e); + exceptionThrown = true; + } + } + + var run = await assistant.CreateThreadAndRunAsync("What date is it?", StreamEventHandler); + Assert.NotNull(thread); + Assert.IsNotNull(run); + Assert.IsFalse(exceptionThrown); + Assert.IsTrue(run.Status == RunStatus.Completed); + } + finally + { + await assistant.DeleteAsync(deleteToolResources: thread == null); + + if (thread != null) + { + var isDeleted = await thread.DeleteAsync(deleteToolResources: true); + Assert.IsTrue(isDeleted); + } + } + } + + [Test] + public async Task Test_04_04_CreateThreadAndRun_SubmitToolOutput() { var tools = new List { diff --git a/OpenAI-DotNet-Tests/TestFixture_04_Chat.cs b/OpenAI-DotNet-Tests/TestFixture_04_Chat.cs index 636402c7..e1cff903 100644 --- a/OpenAI-DotNet-Tests/TestFixture_04_Chat.cs +++ b/OpenAI-DotNet-Tests/TestFixture_04_Chat.cs @@ -144,7 +144,10 @@ public async Task Test_02_01_GetChatToolCompletion() Console.WriteLine($"{message.Role}: {message.Content}"); } - var tools = Tool.GetAllAvailableTools(false, forceUpdate: true, clearCache: true); + var tools = new List + { + Tool.GetOrCreateTool(typeof(WeatherService), nameof(WeatherService.GetCurrentWeatherAsync)) + }; var chatRequest = new ChatRequest(messages, tools: tools, toolChoice: "none"); var response = await OpenAIClient.ChatEndpoint.GetCompletionAsync(chatRequest); Assert.IsNotNull(response); @@ -211,7 +214,10 @@ public async Task Test_02_02_GetChatToolCompletion_Streaming() Console.WriteLine($"{message.Role}: {message.Content}"); } - var tools = Tool.GetAllAvailableTools(false); + var tools = new List + { + Tool.GetOrCreateTool(typeof(WeatherService), nameof(WeatherService.GetCurrentWeatherAsync)) + }; var chatRequest = new ChatRequest(messages, tools: tools, toolChoice: "none"); var response = await OpenAIClient.ChatEndpoint.StreamCompletionAsync(chatRequest, partialResponse => { diff --git a/OpenAI-DotNet-Tests/TestServices/WeatherService.cs b/OpenAI-DotNet-Tests/TestServices/WeatherService.cs index 6499c4f6..e74535d1 100644 --- a/OpenAI-DotNet-Tests/TestServices/WeatherService.cs +++ b/OpenAI-DotNet-Tests/TestServices/WeatherService.cs @@ -31,4 +31,11 @@ public static async Task GetCurrentWeatherAsync( public static int CelsiusToFahrenheit(int celsius) => (celsius * 9 / 5) + 32; } + + internal static class DateTimeUtility + { + [Function("Get the current date and time.")] + public static async Task GetDateTime() + => await Task.FromResult(DateTimeOffset.Now.ToString()); + } } diff --git a/OpenAI-DotNet/Assistants/AssistantExtensions.cs b/OpenAI-DotNet/Assistants/AssistantExtensions.cs index e5890018..f83dcb5e 100644 --- a/OpenAI-DotNet/Assistants/AssistantExtensions.cs +++ b/OpenAI-DotNet/Assistants/AssistantExtensions.cs @@ -63,15 +63,23 @@ from vectorStoreId in assistant.ToolResources?.FileSearch?.VectorStoreIds return deleteTasks.TrueForAll(task => task.Result); } + [Obsolete("use new overload with Func instead.")] + public static async Task CreateThreadAndRunAsync(this AssistantResponse assistant, CreateThreadRequest request, Action streamEventHandler, CancellationToken cancellationToken = default) + => await CreateThreadAndRunAsync(assistant, request, streamEventHandler == null ? null : async serverSentEvent => + { + streamEventHandler.Invoke(serverSentEvent); + await Task.CompletedTask; + }, cancellationToken); + /// /// Create a thread and run it. /// /// . /// Optional, . - /// Optional, stream callback handler. + /// Optional, stream callback handler. /// Optional, . /// . - public static async Task CreateThreadAndRunAsync(this AssistantResponse assistant, CreateThreadRequest request = null, Action streamEventHandler = null, CancellationToken cancellationToken = default) + public static async Task CreateThreadAndRunAsync(this AssistantResponse assistant, CreateThreadRequest request = null, Func streamEventHandler = null, CancellationToken cancellationToken = default) => await assistant.Client.ThreadsEndpoint.CreateThreadAndRunAsync(new CreateThreadAndRunRequest(assistant.Id, createThreadRequest: request), streamEventHandler, cancellationToken).ConfigureAwait(false); #region Tools diff --git a/OpenAI-DotNet/Chat/ChatEndpoint.cs b/OpenAI-DotNet/Chat/ChatEndpoint.cs index d8937b1c..85a47d2a 100644 --- a/OpenAI-DotNet/Chat/ChatEndpoint.cs +++ b/OpenAI-DotNet/Chat/ChatEndpoint.cs @@ -44,7 +44,7 @@ public async Task GetCompletionAsync(ChatRequest chatRequest, Canc /// Created a completion for the chat message and stream the results to the as they come in. /// /// The chat request which contains the message content. - /// An action to be called as each new result arrives. + /// An to be invoked as each new result arrives. /// /// Optional, If set, an additional chunk will be streamed before the 'data: [DONE]' message. /// The 'usage' field on this chunk shows the token usage statistics for the entire request, @@ -54,12 +54,34 @@ public async Task GetCompletionAsync(ChatRequest chatRequest, Canc /// Optional, . /// . public async Task StreamCompletionAsync(ChatRequest chatRequest, Action resultHandler, bool streamUsage = false, CancellationToken cancellationToken = default) + => await StreamCompletionAsync(chatRequest, async response => + { + resultHandler.Invoke(response); + await Task.CompletedTask; + }, streamUsage, cancellationToken); + + /// + /// Created a completion for the chat message and stream the results to the as they come in. + /// + /// The chat request which contains the message content. + /// A to to be invoked as each new result arrives. + /// + /// Optional, If set, an additional chunk will be streamed before the 'data: [DONE]' message. + /// The 'usage' field on this chunk shows the token usage statistics for the entire request, + /// and the 'choices' field will always be an empty array. All other chunks will also include a 'usage' field, + /// but with a null value. + /// + /// Optional, . + /// . + public async Task StreamCompletionAsync(ChatRequest chatRequest, Func resultHandler, bool streamUsage = false, CancellationToken cancellationToken = default) { + if (chatRequest == null) { throw new ArgumentNullException(nameof(chatRequest)); } + if (resultHandler == null) { throw new ArgumentNullException(nameof(resultHandler)); } chatRequest.Stream = true; chatRequest.StreamOptions = streamUsage ? new StreamOptions() : null; ChatResponse chatResponse = null; using var payload = JsonSerializer.Serialize(chatRequest, OpenAIClient.JsonSerializationOptions).ToJsonStringContent(); - using var response = await this.StreamEventsAsync(GetUrl("/completions"), payload, (sseResponse, ssEvent) => + using var response = await this.StreamEventsAsync(GetUrl("/completions"), payload, async (sseResponse, ssEvent) => { var partialResponse = sseResponse.Deserialize(ssEvent, client); @@ -72,13 +94,12 @@ public async Task StreamCompletionAsync(ChatRequest chatRequest, A chatResponse.AppendFrom(partialResponse); } - resultHandler?.Invoke(partialResponse); - + await resultHandler.Invoke(partialResponse); }, cancellationToken); if (chatResponse == null) { return null; } chatResponse.SetResponseData(response.Headers, client); - resultHandler?.Invoke(chatResponse); + await resultHandler.Invoke(chatResponse); return chatResponse; } diff --git a/OpenAI-DotNet/Chat/ChatResponse.cs b/OpenAI-DotNet/Chat/ChatResponse.cs index f26a71da..bdd76176 100644 --- a/OpenAI-DotNet/Chat/ChatResponse.cs +++ b/OpenAI-DotNet/Chat/ChatResponse.cs @@ -77,7 +77,14 @@ internal void AppendFrom(ChatResponse other) { if (other is null) { return; } - if (!string.IsNullOrWhiteSpace(other.Id)) + if (!string.IsNullOrWhiteSpace(Id)) + { + if (Id != other.Id) + { + throw new InvalidOperationException($"Attempting to append a different object than the original! {Id} != {other.Id}"); + } + } + else { Id = other.Id; } diff --git a/OpenAI-DotNet/Common/Error.cs b/OpenAI-DotNet/Common/Error.cs index b6eabec1..641b649a 100644 --- a/OpenAI-DotNet/Common/Error.cs +++ b/OpenAI-DotNet/Common/Error.cs @@ -1,5 +1,6 @@ // Licensed under the MIT License. See LICENSE in the project root for license information. +using System; using System.Text; using System.Text.Json.Serialization; @@ -7,6 +8,15 @@ namespace OpenAI { public sealed class Error : BaseResponse, IServerSentEvent { + public Error() { } + + internal Error(Exception e) + { + Type = e.GetType().Name; + Message = e.Message; + Exception = e; + } + /// /// An error code identifying the error type. /// @@ -50,6 +60,9 @@ public sealed class Error : BaseResponse, IServerSentEvent [JsonIgnore] public string Object => "error"; + [JsonIgnore] + public Exception Exception { get; } + public override string ToString() { var builder = new StringBuilder(); diff --git a/OpenAI-DotNet/Common/Function.cs b/OpenAI-DotNet/Common/Function.cs index 7ebd3a42..1cb0352f 100644 --- a/OpenAI-DotNet/Common/Function.cs +++ b/OpenAI-DotNet/Common/Function.cs @@ -154,6 +154,7 @@ public static Function FromFunc [JsonInclude] [JsonPropertyName("description")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] public string Description { get; private set; } private string parametersString; @@ -166,6 +167,7 @@ public static Function FromFunc [JsonInclude] [JsonPropertyName("parameters")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] public JsonNode Parameters { get @@ -190,6 +192,7 @@ public JsonNode Parameters /// [JsonInclude] [JsonPropertyName("arguments")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] public JsonNode Arguments { get diff --git a/OpenAI-DotNet/Extensions/BaseEndpointExtensions.cs b/OpenAI-DotNet/Extensions/BaseEndpointExtensions.cs index ad19f951..44570e0d 100644 --- a/OpenAI-DotNet/Extensions/BaseEndpointExtensions.cs +++ b/OpenAI-DotNet/Extensions/BaseEndpointExtensions.cs @@ -21,7 +21,7 @@ internal static class BaseEndpointExtensions /// /// https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events /// - public static async Task StreamEventsAsync(this OpenAIBaseEndpoint baseEndpoint, string endpoint, StringContent payload, Action eventCallback, CancellationToken cancellationToken) + public static async Task StreamEventsAsync(this OpenAIBaseEndpoint baseEndpoint, string endpoint, StringContent payload, Func eventCallback, CancellationToken cancellationToken) { using var request = new HttpRequestMessage(HttpMethod.Post, endpoint); request.Content = payload; @@ -34,7 +34,7 @@ public static async Task StreamEventsAsync(this OpenAIBaseE try { - while (await reader.ReadLineAsync() is { } streamData) + while (await reader.ReadLineAsync().ConfigureAwait(false) is { } streamData) { if (isEndOfStream) { @@ -56,11 +56,10 @@ public static async Task StreamEventsAsync(this OpenAIBaseE string value; string data; - Match match = matches[i]; + var match = matches[i]; // If the field type is not provided, treat it as a comment type = ServerSentEvent.EventMap.GetValueOrDefault(match.Groups[nameof(type)].Value.Trim(), ServerSentEventKind.Comment); - // The UTF-8 decode algorithm strips one leading UTF-8 Byte Order Mark (BOM), if any. value = match.Groups[nameof(value)].Value.TrimStart(' '); data = match.Groups[nameof(data)].Value; @@ -104,17 +103,17 @@ public static async Task StreamEventsAsync(this OpenAIBaseE { var previousEvent = events.Pop(); previousEvent.Data = @event.Value; - eventCallback?.Invoke(response, previousEvent); events.Push(previousEvent); + await eventCallback.Invoke(response, previousEvent).ConfigureAwait(false); } else { + events.Push(@event); + if (type != ServerSentEventKind.Event) { - eventCallback?.Invoke(response, @event); + await eventCallback.Invoke(response, @event).ConfigureAwait(false); } - - events.Push(@event); } } } diff --git a/OpenAI-DotNet/OpenAI-DotNet.csproj b/OpenAI-DotNet/OpenAI-DotNet.csproj index 6653566e..3d43b967 100644 --- a/OpenAI-DotNet/OpenAI-DotNet.csproj +++ b/OpenAI-DotNet/OpenAI-DotNet.csproj @@ -28,8 +28,13 @@ More context [on Roger Pincombe's blog](https://rogerpincombe.com/openai-dotnet- OpenAI-DotNet.pfx True True - 8.0.3 + 8.1.0 +Version 8.1.0 +- Fixed streaming event race conditions where the subscriber to the stream would finish before steam events were executed +- Refactored streaming events callbacks from Action<IServerSentEvent> to Func<IServerSentEvent, Task> +- Added Exception data to OpenAI.Error response +- Added ChatEndpoint.StreamCompletionAsync with Func<ChatResponse, Task> overload Version 8.0.3 - Fixed Thread.MessageResponse and Thread.RunStepResponse Delta stream event objects not being properly populated - Added Thread.MessageDelta.PrintContent() diff --git a/OpenAI-DotNet/Threads/CodeInterpreter.cs b/OpenAI-DotNet/Threads/CodeInterpreter.cs index e1713536..901ab72b 100644 --- a/OpenAI-DotNet/Threads/CodeInterpreter.cs +++ b/OpenAI-DotNet/Threads/CodeInterpreter.cs @@ -13,6 +13,7 @@ public sealed class CodeInterpreter /// [JsonInclude] [JsonPropertyName("input")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] public string Input { get; private set; } private List outputs; @@ -24,6 +25,7 @@ public sealed class CodeInterpreter /// [JsonInclude] [JsonPropertyName("outputs")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] public IReadOnlyList Outputs { get => outputs; diff --git a/OpenAI-DotNet/Threads/CodeInterpreterOutputs.cs b/OpenAI-DotNet/Threads/CodeInterpreterOutputs.cs index a2d73c82..575bcfa1 100644 --- a/OpenAI-DotNet/Threads/CodeInterpreterOutputs.cs +++ b/OpenAI-DotNet/Threads/CodeInterpreterOutputs.cs @@ -24,6 +24,7 @@ public sealed class CodeInterpreterOutputs : IAppendable /// [JsonInclude] [JsonPropertyName("logs")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] public string Logs { get; private set; } /// @@ -31,6 +32,7 @@ public sealed class CodeInterpreterOutputs : IAppendable /// [JsonInclude] [JsonPropertyName("image")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] public ImageFile Image { get; private set; } public void AppendFrom(CodeInterpreterOutputs other) diff --git a/OpenAI-DotNet/Threads/FunctionCall.cs b/OpenAI-DotNet/Threads/FunctionCall.cs index 1c2029ef..db3ab538 100644 --- a/OpenAI-DotNet/Threads/FunctionCall.cs +++ b/OpenAI-DotNet/Threads/FunctionCall.cs @@ -18,6 +18,7 @@ public sealed class FunctionCall /// [JsonInclude] [JsonPropertyName("arguments")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] public string Arguments { get; private set; } /// @@ -25,6 +26,7 @@ public sealed class FunctionCall /// [JsonInclude] [JsonPropertyName("output")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] public string Output { get; private set; } internal void AppendFrom(FunctionCall other) diff --git a/OpenAI-DotNet/Threads/MessageResponse.cs b/OpenAI-DotNet/Threads/MessageResponse.cs index a24e55b7..0c0600ed 100644 --- a/OpenAI-DotNet/Threads/MessageResponse.cs +++ b/OpenAI-DotNet/Threads/MessageResponse.cs @@ -185,7 +185,7 @@ internal void AppendFrom(MessageResponse other) { if (Id != other.Id) { - throw new InvalidOperationException("Attempting to append a different object than the original!"); + throw new InvalidOperationException($"Attempting to append a different object than the original! {Id} != {other.Id}"); } } else diff --git a/OpenAI-DotNet/Threads/RunResponse.cs b/OpenAI-DotNet/Threads/RunResponse.cs index 7027c254..bffb3b93 100644 --- a/OpenAI-DotNet/Threads/RunResponse.cs +++ b/OpenAI-DotNet/Threads/RunResponse.cs @@ -296,7 +296,7 @@ internal void AppendFrom(RunResponse other) { if (Id != other.Id) { - throw new InvalidOperationException("Attempting to append a different object than the original!"); + throw new InvalidOperationException($"Attempting to append a different object than the original! {Id} != {other.Id}"); } } else diff --git a/OpenAI-DotNet/Threads/RunStepResponse.cs b/OpenAI-DotNet/Threads/RunStepResponse.cs index de19c4c1..c71c7574 100644 --- a/OpenAI-DotNet/Threads/RunStepResponse.cs +++ b/OpenAI-DotNet/Threads/RunStepResponse.cs @@ -195,7 +195,7 @@ internal void AppendFrom(RunStepResponse other) { if (Id != other.Id) { - throw new InvalidOperationException("Attempting to append a different object than the original!"); + throw new InvalidOperationException($"Attempting to append a different object than the original! {Id} != {other.Id}"); } } else diff --git a/OpenAI-DotNet/Threads/ThreadExtensions.cs b/OpenAI-DotNet/Threads/ThreadExtensions.cs index 28c7ed14..375abce2 100644 --- a/OpenAI-DotNet/Threads/ThreadExtensions.cs +++ b/OpenAI-DotNet/Threads/ThreadExtensions.cs @@ -177,26 +177,42 @@ public static async Task RetrieveFileAsync(this MessageResp #region Runs + [Obsolete("use new overload with Func instead.")] + public static async Task CreateRunAsync(this ThreadResponse thread, CreateRunRequest request, Action streamEventHandler, CancellationToken cancellationToken = default) + => await thread.CreateRunAsync(request, async streamEvent => + { + streamEventHandler?.Invoke(streamEvent); + await Task.CompletedTask; + }, cancellationToken); + /// /// Create a run. /// /// . /// . - /// Optional, stream callback handler. + /// Optional, stream callback handler. /// Optional, . /// . - public static async Task CreateRunAsync(this ThreadResponse thread, CreateRunRequest request = null, Action streamEventHandler = null, CancellationToken cancellationToken = default) + public static async Task CreateRunAsync(this ThreadResponse thread, CreateRunRequest request = null, Func streamEventHandler = null, CancellationToken cancellationToken = default) => await thread.Client.ThreadsEndpoint.CreateRunAsync(thread, request, streamEventHandler, cancellationToken).ConfigureAwait(false); + [Obsolete("use new overload with Func instead.")] + public static async Task CreateRunAsync(this ThreadResponse thread, AssistantResponse assistant, Action streamEventHandler, CancellationToken cancellationToken = default) + => await thread.CreateRunAsync(assistant, async streamEvent => + { + streamEventHandler?.Invoke(streamEvent); + await Task.CompletedTask; + }, cancellationToken); + /// /// Create a run. /// /// . /// The to use for the run. - /// Optional, stream callback handler. + /// Optional, stream callback handler. /// Optional, . /// . - public static async Task CreateRunAsync(this ThreadResponse thread, AssistantResponse assistant, Action streamEventHandler = null, CancellationToken cancellationToken = default) + public static async Task CreateRunAsync(this ThreadResponse thread, AssistantResponse assistant, Func streamEventHandler = null, CancellationToken cancellationToken = default) { var request = new CreateRunRequest( assistant, @@ -286,6 +302,14 @@ public static async Task WaitForStatusChangeAsync(this RunResponse return result; } + [Obsolete("use new overload with Func instead.")] + public static async Task SubmitToolOutputsAsync(this RunResponse run, SubmitToolOutputsRequest request, Action streamEventHandler, CancellationToken cancellationToken = default) + => await run.SubmitToolOutputsAsync(request, async streamEvent => + { + streamEventHandler?.Invoke(streamEvent); + await Task.CompletedTask; + }, cancellationToken); + /// /// When a run has the status: "requires_action" and required_action.type is submit_tool_outputs, /// this endpoint can be used to submit the outputs from the tool calls once they're all completed. @@ -293,12 +317,20 @@ public static async Task WaitForStatusChangeAsync(this RunResponse /// /// to submit outputs for. /// . - /// Optional, stream callback handler. + /// Optional, stream callback handler. /// Optional, . /// . - public static async Task SubmitToolOutputsAsync(this RunResponse run, SubmitToolOutputsRequest request, Action streamEventHandler = null, CancellationToken cancellationToken = default) + public static async Task SubmitToolOutputsAsync(this RunResponse run, SubmitToolOutputsRequest request, Func streamEventHandler = null, CancellationToken cancellationToken = default) => await run.Client.ThreadsEndpoint.SubmitToolOutputsAsync(run.ThreadId, run.Id, request, streamEventHandler, cancellationToken).ConfigureAwait(false); + [Obsolete("use new overload with Func instead.")] + public static async Task SubmitToolOutputsAsync(this RunResponse run, IEnumerable outputs, Action streamEventHandler, CancellationToken cancellationToken = default) + => await run.SubmitToolOutputsAsync(new SubmitToolOutputsRequest(outputs), async streamEvent => + { + streamEventHandler?.Invoke(streamEvent); + await Task.CompletedTask; + }, cancellationToken); + /// /// When a run has the status: "requires_action" and required_action.type is submit_tool_outputs, /// this endpoint can be used to submit the outputs from the tool calls once they're all completed. @@ -306,10 +338,10 @@ public static async Task SubmitToolOutputsAsync(this RunResponse ru /// /// to submit outputs for. /// s - /// Optional, stream callback handler. + /// Optional, stream callback handler. /// Optional, . /// . - public static async Task SubmitToolOutputsAsync(this RunResponse run, IEnumerable outputs, Action streamEventHandler = null, CancellationToken cancellationToken = default) + public static async Task SubmitToolOutputsAsync(this RunResponse run, IEnumerable outputs, Func streamEventHandler = null, CancellationToken cancellationToken = default) => await run.SubmitToolOutputsAsync(new SubmitToolOutputsRequest(outputs), streamEventHandler, cancellationToken).ConfigureAwait(false); /// diff --git a/OpenAI-DotNet/Threads/ThreadsEndpoint.cs b/OpenAI-DotNet/Threads/ThreadsEndpoint.cs index 08043854..c6223fc9 100644 --- a/OpenAI-DotNet/Threads/ThreadsEndpoint.cs +++ b/OpenAI-DotNet/Threads/ThreadsEndpoint.cs @@ -191,15 +191,23 @@ public async Task> ListRunsAsync(string threadId, List return response.Deserialize>(responseAsString, client); } + [Obsolete("use new overload with Func instead.")] + public async Task CreateRunAsync(string threadId, CreateRunRequest request, Action streamEventHandler, CancellationToken cancellationToken = default) + => await CreateRunAsync(threadId, request, streamEventHandler == null ? null : serverSentEvent => + { + streamEventHandler.Invoke(serverSentEvent); + return Task.CompletedTask; + }, cancellationToken); + /// /// Create a run. /// /// The id of the thread to run. /// . - /// Optional, stream callback handler. + /// Optional, stream callback handler. /// Optional, . /// . - public async Task CreateRunAsync(string threadId, CreateRunRequest request = null, Action streamEventHandler = null, CancellationToken cancellationToken = default) + public async Task CreateRunAsync(string threadId, CreateRunRequest request = null, Func streamEventHandler = null, CancellationToken cancellationToken = default) { if (request == null || string.IsNullOrWhiteSpace(request.AssistantId)) { @@ -221,14 +229,22 @@ public async Task CreateRunAsync(string threadId, CreateRunRequest return response.Deserialize(responseAsString, client); } + [Obsolete("use new overload with Func instead.")] + public async Task CreateThreadAndRunAsync(CreateThreadAndRunRequest request, Action streamEventHandler, CancellationToken cancellationToken = default) + => await CreateThreadAndRunAsync(request, streamEventHandler == null ? null : serverSentEvent => + { + streamEventHandler.Invoke(serverSentEvent); + return Task.CompletedTask; + }, cancellationToken); + /// /// Create a thread and run it in one request. /// /// . - /// Optional, stream callback handler. + /// Optional, stream callback handler. /// Optional, . /// . - public async Task CreateThreadAndRunAsync(CreateThreadAndRunRequest request = null, Action streamEventHandler = null, CancellationToken cancellationToken = default) + public async Task CreateThreadAndRunAsync(CreateThreadAndRunRequest request = null, Func streamEventHandler = null, CancellationToken cancellationToken = default) { if (request == null || string.IsNullOrWhiteSpace(request.AssistantId)) { @@ -285,6 +301,14 @@ public async Task ModifyRunAsync(string threadId, string runId, IRe return response.Deserialize(responseAsString, client); } + [Obsolete("use new overload with Func instead.")] + public async Task SubmitToolOutputsAsync(string threadId, string runId, SubmitToolOutputsRequest request, Action streamEventHandler, CancellationToken cancellationToken = default) + => await SubmitToolOutputsAsync(threadId, runId, request, streamEventHandler == null ? null : serverSentEvent => + { + streamEventHandler.Invoke(serverSentEvent); + return Task.CompletedTask; + }, cancellationToken); + /// /// When a run has the status: "requires_action" and required_action.type is submit_tool_outputs, /// this endpoint can be used to submit the outputs from the tool calls once they're all completed. @@ -293,10 +317,10 @@ public async Task ModifyRunAsync(string threadId, string runId, IRe /// The id of the thread to which this run belongs. /// The id of the run that requires the tool output submission. /// . - /// Optional, stream callback handler. + /// Optional, stream callback handler. /// Optional, . /// . - public async Task SubmitToolOutputsAsync(string threadId, string runId, SubmitToolOutputsRequest request, Action streamEventHandler = null, CancellationToken cancellationToken = default) + public async Task SubmitToolOutputsAsync(string threadId, string runId, SubmitToolOutputsRequest request, Func streamEventHandler = null, CancellationToken cancellationToken = default) { request.Stream = streamEventHandler != null; using var payload = JsonSerializer.Serialize(request, OpenAIClient.JsonSerializationOptions).ToJsonStringContent(); @@ -408,19 +432,20 @@ public async Task RetrieveFileAsync(string threadId, string #endregion Files (Obsolete) - private async Task StreamRunAsync(string endpoint, StringContent payload, Action streamEventHandler, CancellationToken cancellationToken = default) + private async Task StreamRunAsync(string endpoint, StringContent payload, Func streamEventHandler, CancellationToken cancellationToken = default) { RunResponse run = null; RunStepResponse runStep = null; MessageResponse message = null; - using var response = await this.StreamEventsAsync(endpoint, payload, (sseResponse, ssEvent) => + + using var response = await this.StreamEventsAsync(endpoint, payload, async (sseResponse, ssEvent) => { try { switch (ssEvent.Value.GetValue()) { case "thread.created": - streamEventHandler?.Invoke(sseResponse.Deserialize(ssEvent, client)); + await streamEventHandler.Invoke(sseResponse.Deserialize(ssEvent, client)); return; case "thread.run.created": case "thread.run.queued": @@ -443,7 +468,7 @@ private async Task StreamRunAsync(string endpoint, StringContent pa run.AppendFrom(partialRun); } - streamEventHandler?.Invoke(run); + await streamEventHandler.Invoke(run); return; case "thread.run.step.created": case "thread.run.step.in_progress": @@ -454,7 +479,8 @@ private async Task StreamRunAsync(string endpoint, StringContent pa case "thread.run.step.expired": var partialRunStep = sseResponse.Deserialize(ssEvent, client); - if (runStep == null) + if (runStep == null || + runStep.Id != partialRunStep.Id) { runStep = partialRunStep; } @@ -463,7 +489,7 @@ private async Task StreamRunAsync(string endpoint, StringContent pa runStep.AppendFrom(partialRunStep); } - streamEventHandler?.Invoke(runStep); + await streamEventHandler.Invoke(runStep); return; case "thread.message.created": case "thread.message.in_progress": @@ -472,7 +498,8 @@ private async Task StreamRunAsync(string endpoint, StringContent pa case "thread.message.incomplete": var partialMessage = sseResponse.Deserialize(ssEvent, client); - if (message == null) + if (message == null || + message.Id != partialMessage.Id) { message = partialMessage; } @@ -481,24 +508,25 @@ private async Task StreamRunAsync(string endpoint, StringContent pa message.AppendFrom(partialMessage); } - streamEventHandler?.Invoke(message); + await streamEventHandler.Invoke(message); return; case "error": - streamEventHandler?.Invoke(sseResponse.Deserialize(ssEvent, client)); + await streamEventHandler.Invoke(sseResponse.Deserialize(ssEvent, client)); return; default: // if not properly handled raise it up to caller to deal with it themselves. - streamEventHandler.Invoke(ssEvent); + await streamEventHandler.Invoke(ssEvent); return; } } catch (Exception e) { - Console.WriteLine(e); + await streamEventHandler.Invoke(new Error(e)); } }, cancellationToken); if (run == null) { return null; } + run = await run.WaitForStatusChangeAsync(timeout: -1, cancellationToken: cancellationToken); run.SetResponseData(response.Headers, client); return run; } diff --git a/OpenAI-DotNet/Threads/ToolCall.cs b/OpenAI-DotNet/Threads/ToolCall.cs index 34425d7d..03452d28 100644 --- a/OpenAI-DotNet/Threads/ToolCall.cs +++ b/OpenAI-DotNet/Threads/ToolCall.cs @@ -34,6 +34,7 @@ public sealed class ToolCall : IAppendable /// [JsonInclude] [JsonPropertyName("function")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] public FunctionCall FunctionCall { get; private set; } /// @@ -41,6 +42,7 @@ public sealed class ToolCall : IAppendable /// [JsonInclude] [JsonPropertyName("code_interpreter")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] public CodeInterpreter CodeInterpreter { get; private set; } /// @@ -51,6 +53,7 @@ public sealed class ToolCall : IAppendable /// [JsonInclude] [JsonPropertyName("file_search")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] public IReadOnlyDictionary FileSearch { get; private set; } /// diff --git a/README.md b/README.md index e01e94c1..afcff31a 100644 --- a/README.md +++ b/README.md @@ -42,8 +42,7 @@ Install-Package OpenAI-DotNet ### Table of Contents -- [Authentication](#authentication) :new: :warning: :construction: -- [OpenAIClient](#handling-openaiclient-and-httpclient-lifecycle) +- [Authentication](#authentication) - [Azure OpenAI](#azure-openai) - [Azure Active Directory Authentication](#azure-active-directory-authentication) - [OpenAI API Proxy](#openai-api-proxy) @@ -51,17 +50,17 @@ Install-Package OpenAI-DotNet - [List Models](#list-models) - [Retrieve Models](#retrieve-model) - [Delete Fine Tuned Model](#delete-fine-tuned-model) -- [Assistants](#assistants) :new: :warning: :construction: +- [Assistants](#assistants) :warning: :construction: - [List Assistants](#list-assistants) - [Create Assistant](#create-assistant) - [Retrieve Assistant](#retrieve-assistant) - [Modify Assistant](#modify-assistant) - [Delete Assistant](#delete-assistant) - - [Assistant Streaming](#assistant-streaming) :new: - - [Threads](#threads) :new: :warning: :construction: + - [Assistant Streaming](#assistant-streaming) :warning: :construction: + - [Threads](#threads) :warning: :construction: - [Create Thread](#create-thread) - [Create Thread and Run](#create-thread-and-run) - - [Streaming](#create-thread-and-run-streaming) :new: + - [Streaming](#create-thread-and-run-streaming) :warning: :construction: - [Retrieve Thread](#retrieve-thread) - [Modify Thread](#modify-thread) - [Delete Thread](#delete-thread) @@ -73,29 +72,29 @@ Install-Package OpenAI-DotNet - [Thread Runs](#thread-runs) - [List Runs](#list-thread-runs) - [Create Run](#create-thread-run) - - [Streaming](#create-thread-run-streaming) :new: + - [Streaming](#create-thread-run-streaming) :warning: :construction: - [Retrieve Run](#retrieve-thread-run) - [Modify Run](#modify-thread-run) - [Submit Tool Outputs to Run](#thread-submit-tool-outputs-to-run) - [List Run Steps](#list-thread-run-steps) - [Retrieve Run Step](#retrieve-thread-run-step) - [Cancel Run](#cancel-thread-run) - - [Vector Stores](#vector-stores) :new: - - [List Vector Stores](#list-vector-stores) :new: - - [Create Vector Store](#create-vector-store) :new: - - [Retrieve Vector Store](#retrieve-vector-store) :new: - - [Modify Vector Store](#modify-vector-store) :new: - - [Delete Vector Store](#delete-vector-store) :new: - - [Vector Store Files](#vector-store-files) :new: - - [List Vector Store Files](#list-vector-store-files) :new: - - [Create Vector Store File](#create-vector-store-file) :new: - - [Retrieve Vector Store File](#retrieve-vector-store-file) :new: - - [Delete Vector Store File](#delete-vector-store-file) :new: - - [Vector Store File Batches](#vector-store-file-batches) :new: - - [Create Vector Store File Batch](#create-vector-store-file-batch) :new: - - [Retrieve Vector Store File Batch](#retrieve-vector-store-file-batch) :new: - - [List Files In Vector Store Batch](#list-files-in-vector-store-batch) :new: - - [Cancel Vector Store File Batch](#cancel-vector-store-file-batch) :new: + - [Vector Stores](#vector-stores) + - [List Vector Stores](#list-vector-stores) + - [Create Vector Store](#create-vector-store) + - [Retrieve Vector Store](#retrieve-vector-store) + - [Modify Vector Store](#modify-vector-store) + - [Delete Vector Store](#delete-vector-store) + - [Vector Store Files](#vector-store-files) + - [List Vector Store Files](#list-vector-store-files) + - [Create Vector Store File](#create-vector-store-file) + - [Retrieve Vector Store File](#retrieve-vector-store-file) + - [Delete Vector Store File](#delete-vector-store-file) + - [Vector Store File Batches](#vector-store-file-batches) + - [Create Vector Store File Batch](#create-vector-store-file-batch) + - [Retrieve Vector Store File Batch](#retrieve-vector-store-file-batch) + - [List Files In Vector Store Batch](#list-files-in-vector-store-batch) + - [Cancel Vector Store File Batch](#cancel-vector-store-file-batch) - [Chat](#chat) - [Chat Completions](#chat-completions) - [Streaming](#chat-streaming) @@ -104,6 +103,7 @@ Install-Package OpenAI-DotNet - [Json Mode](#chat-json-mode) - [Audio](#audio) - [Create Speech](#create-speech) + - [Stream Speech](#stream-speech) - [Create Transcription](#create-transcription) - [Create Translation](#create-translation) - [Images](#images) :warning: :construction: @@ -122,11 +122,11 @@ Install-Package OpenAI-DotNet - [Retrieve Fine Tune Job Info](#retrieve-fine-tune-job-info) - [Cancel Fine Tune Job](#cancel-fine-tune-job) - [List Fine Tune Job Events](#list-fine-tune-job-events) -- [Batches](#batches) :new: - - [List Batches](#list-batches) :new: - - [Create Batch](#create-batch) :new: - - [Retrieve Batch](#retrieve-batch) :new: - - [Cancel Batch](#cancel-batch) :new: +- [Batches](#batches) + - [List Batches](#list-batches) + - [Create Batch](#create-batch) + - [Retrieve Batch](#retrieve-batch) + - [Cancel Batch](#cancel-batch) - [Embeddings](#embeddings) - [Create Embedding](#create-embeddings) - [Moderations](#moderations) @@ -476,7 +476,7 @@ Assert.IsTrue(isDeleted); #### [Assistant Streaming](https://platform.openai.com/docs/api-reference/assistants-streaming) > [!NOTE] -> Assistant stream events can be easily added to existing thread calls by passing `Action streamEventHandler` callback to any existing method that supports streaming. +> Assistant stream events can be easily added to existing thread calls by passing `Func streamEventHandler` callback to any existing method that supports streaming. #### [Threads](https://platform.openai.com/docs/api-reference/threads) @@ -526,7 +526,7 @@ var tools = new List var assistantRequest = new CreateAssistantRequest(tools: tools, instructions: "You are a helpful weather assistant. Use the appropriate unit based on geographical location."); var assistant = await api.AssistantsEndpoint.CreateAssistantAsync(assistantRequest); ThreadResponse thread = null; -async void StreamEventHandler(IServerSentEvent streamEvent) +async Task StreamEventHandler(IServerSentEvent streamEvent) { switch (streamEvent) { @@ -720,9 +720,10 @@ var assistant = await api.AssistantsEndpoint.CreateAssistantAsync( responseFormat: ChatResponseFormat.Json)); var thread = await api.ThreadsEndpoint.CreateThreadAsync(); var message = await thread.CreateMessageAsync("I need to solve the equation `3x + 11 = 14`. Can you help me?"); -var run = await thread.CreateRunAsync(assistant, streamEvent => +var run = await thread.CreateRunAsync(assistant, async streamEvent => { Console.WriteLine(streamEvent.ToJsonString()); + await Task.CompletedTask; }); var messages = await thread.ListMessagesAsync(); @@ -1054,9 +1055,10 @@ var messages = new List new Message(Role.User, "Where was it played?"), }; var chatRequest = new ChatRequest(messages); -var response = await api.ChatEndpoint.StreamCompletionAsync(chatRequest, partialResponse => +var response = await api.ChatEndpoint.StreamCompletionAsync(chatRequest, async partialResponse => { Console.Write(partialResponse.FirstChoice.Delta.ToString()); + await Task.CompletedTask; }); var choice = response.FirstChoice; Console.WriteLine($"[{choice.Index}] {choice.Message.Role}: {choice.Message} | Finish Reason: {choice.FinishReason}");