diff --git a/JsonRpc.Streams/ByLineTextMessageReader.cs b/JsonRpc.Streams/ByLineTextMessageReader.cs index aa104f2..90d5135 100644 --- a/JsonRpc.Streams/ByLineTextMessageReader.cs +++ b/JsonRpc.Streams/ByLineTextMessageReader.cs @@ -144,17 +144,22 @@ protected override void Dispose(bool disposing) { base.Dispose(disposing); if (Reader == null) return; - readerSemaphore.Dispose(); - if (LeaveReaderOpen) + if (underlyingStream != null) { - if (underlyingStream != null) - Reader.Dispose(); + Utility.TryDispose(Reader, readerSemaphore, this); } - else + if (!LeaveReaderOpen) { - underlyingStream?.Dispose(); - Reader.Dispose(); + if (underlyingStream != null) + { + Utility.TryDispose(underlyingStream, readerSemaphore, this); + } + else + { + Utility.TryDispose(Reader, readerSemaphore, this); + } } + readerSemaphore.Dispose(); underlyingStream = null; Reader = null; } diff --git a/JsonRpc.Streams/ByLineTextMessageWriter.cs b/JsonRpc.Streams/ByLineTextMessageWriter.cs index 1a62bea..ad1e621 100644 --- a/JsonRpc.Streams/ByLineTextMessageWriter.cs +++ b/JsonRpc.Streams/ByLineTextMessageWriter.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.IO; using System.Text; using System.Threading; @@ -101,18 +102,25 @@ public override async Task WriteAsync(Message message, CancellationToken cancell using (var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, DisposalToken)) { + var linkedToken = linkedTokenSource.Token; var content = message.ToString(); - await writerSemaphore.WaitAsync(linkedTokenSource.Token); + await writerSemaphore.WaitAsync(linkedToken); try { await Writer.WriteLineAsync(content); - if (Delimiter != null) await Writer.WriteLineAsync(Delimiter); + linkedToken.ThrowIfCancellationRequested(); + if (Delimiter != null) + { + await Writer.WriteLineAsync(Delimiter); + linkedToken.ThrowIfCancellationRequested(); + } + await Writer.FlushAsync(); } catch (ObjectDisposedException) { // Throws OperationCanceledException if the cancellation has already been requested. - linkedTokenSource.Token.ThrowIfCancellationRequested(); + linkedToken.ThrowIfCancellationRequested(); throw; } finally @@ -127,16 +135,22 @@ protected override void Dispose(bool disposing) { base.Dispose(disposing); if (Writer == null) return; - if (LeaveWriterOpen) + if (underlyingStream != null) { - if (underlyingStream != null) - Writer.Dispose(); + Utility.TryDispose(Writer, writerSemaphore, this); } - else + if (!LeaveWriterOpen) { - underlyingStream?.Dispose(); - Writer.Dispose(); + if (underlyingStream != null) + { + Utility.TryDispose(underlyingStream, writerSemaphore, this); + } + else + { + Utility.TryDispose(Writer, writerSemaphore, this); + } } + writerSemaphore.Dispose(); underlyingStream = null; Writer = null; } diff --git a/JsonRpc.Streams/JsonRpc.Streams.csproj b/JsonRpc.Streams/JsonRpc.Streams.csproj index 5c87a59..13e03f2 100644 --- a/JsonRpc.Streams/JsonRpc.Streams.csproj +++ b/JsonRpc.Streams/JsonRpc.Streams.csproj @@ -6,7 +6,7 @@ true True - 0.4.0 + 0.4.1 CXuesong CXuesong.JsonRpc.Streams diff --git a/JsonRpc.Streams/MessageWriter.cs b/JsonRpc.Streams/MessageWriter.cs index b0ba5b2..8acb37f 100644 --- a/JsonRpc.Streams/MessageWriter.cs +++ b/JsonRpc.Streams/MessageWriter.cs @@ -16,7 +16,7 @@ public abstract class MessageWriter : IDisposable /// /// Asynchronously writes a message. /// - /// The meesage to write. + /// The message to write. /// A token that cancels the operation. /// This method should be thread-safe. /// is null. diff --git a/JsonRpc.Streams/Utility.cs b/JsonRpc.Streams/Utility.cs index f3c6225..95f3a49 100644 --- a/JsonRpc.Streams/Utility.cs +++ b/JsonRpc.Streams/Utility.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Text; using System.Threading; using System.Threading.Tasks; @@ -34,5 +35,31 @@ public static int IndexOf(this IList source, IList match, IEqualityComp } return -1; } + + // IDisposable: TextReader, TextWriter, or Stream + public static bool TryDispose(IDisposable disposable, SemaphoreSlim waitFor, object source) + { + // Try to wait until the operation indicated by waitFor to finish first, elegantly. + // However, if the operation is blocking for too long, we will just abandon it, + // which may cause disposable.Dispose to fail. + waitFor.Wait(1000); + try + { + disposable.Dispose(); + return true; + } + catch (InvalidOperationException ex) + { + // TextWriter can throw InvalidOperationException on disposal + // when an ongoing asynchronous operation + // has not been finished yet. + // Theoretically, we can do nothing about it, but to wait. + // Sadly, we can't. + Debug.WriteLine("{0}: Cannot dispose {1}. {2}", source, disposable, ex.Message); + } + + return false; + } + } }