Skip to content

Commit

Permalink
ByLineTextMessageReader/Writer: Try to wait for the ongoing async ope…
Browse files Browse the repository at this point in the history
…ration to finish before disposing StreamReader/Writer.
  • Loading branch information
CXuesong committed Mar 17, 2018
1 parent 3653482 commit 52e0f39
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 18 deletions.
19 changes: 12 additions & 7 deletions JsonRpc.Streams/ByLineTextMessageReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -144,17 +144,22 @@ protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
if (Reader == null) return;
readerSemaphore.Dispose();
if (LeaveReaderOpen)
if (underlyingStream != null)
{
if (underlyingStream != null)
Reader.Dispose();
Utility.TryDispose(Reader, readerSemaphore, this);
}
else
if (!LeaveReaderOpen)
{
underlyingStream?.Dispose();
Reader.Dispose();
if (underlyingStream != null)
{
Utility.TryDispose(underlyingStream, readerSemaphore, this);
}
else
{
Utility.TryDispose(Reader, readerSemaphore, this);
}
}
readerSemaphore.Dispose();
underlyingStream = null;
Reader = null;
}
Expand Down
32 changes: 23 additions & 9 deletions JsonRpc.Streams/ByLineTextMessageWriter.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Text;
using System.Threading;
Expand Down Expand Up @@ -101,18 +102,25 @@ public override async Task WriteAsync(Message message, CancellationToken cancell
using (var linkedTokenSource =
CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, DisposalToken))
{
var linkedToken = linkedTokenSource.Token;
var content = message.ToString();
await writerSemaphore.WaitAsync(linkedTokenSource.Token);
await writerSemaphore.WaitAsync(linkedToken);
try
{
await Writer.WriteLineAsync(content);
if (Delimiter != null) await Writer.WriteLineAsync(Delimiter);
linkedToken.ThrowIfCancellationRequested();
if (Delimiter != null)
{
await Writer.WriteLineAsync(Delimiter);
linkedToken.ThrowIfCancellationRequested();
}

await Writer.FlushAsync();
}
catch (ObjectDisposedException)
{
// Throws OperationCanceledException if the cancellation has already been requested.
linkedTokenSource.Token.ThrowIfCancellationRequested();
linkedToken.ThrowIfCancellationRequested();
throw;
}
finally
Expand All @@ -127,16 +135,22 @@ protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
if (Writer == null) return;
if (LeaveWriterOpen)
if (underlyingStream != null)
{
if (underlyingStream != null)
Writer.Dispose();
Utility.TryDispose(Writer, writerSemaphore, this);
}
else
if (!LeaveWriterOpen)
{
underlyingStream?.Dispose();
Writer.Dispose();
if (underlyingStream != null)
{
Utility.TryDispose(underlyingStream, writerSemaphore, this);
}
else
{
Utility.TryDispose(Writer, writerSemaphore, this);
}
}
writerSemaphore.Dispose();
underlyingStream = null;
Writer = null;
}
Expand Down
2 changes: 1 addition & 1 deletion JsonRpc.Streams/JsonRpc.Streams.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<GenerateDocumentationFile>true</GenerateDocumentationFile>

<GeneratePackageOnBuild>True</GeneratePackageOnBuild>
<Version>0.4.0</Version>
<Version>0.4.1</Version>
<Authors>CXuesong</Authors>
<Company />
<PackageId>CXuesong.JsonRpc.Streams</PackageId>
Expand Down
2 changes: 1 addition & 1 deletion JsonRpc.Streams/MessageWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public abstract class MessageWriter : IDisposable
/// <summary>
/// Asynchronously writes a message.
/// </summary>
/// <param name="message">The meesage to write.</param>
/// <param name="message">The message to write.</param>
/// <param name="cancellationToken">A token that cancels the operation.</param>
/// <remarks>This method should be thread-safe.</remarks>
/// <exception cref="ArgumentNullException"><paramref name="message"/> is <c>null</c>.</exception>
Expand Down
27 changes: 27 additions & 0 deletions JsonRpc.Streams/Utility.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -34,5 +35,31 @@ public static int IndexOf<T>(this IList<T> source, IList<T> match, IEqualityComp
}
return -1;
}

// IDisposable: TextReader, TextWriter, or Stream
public static bool TryDispose(IDisposable disposable, SemaphoreSlim waitFor, object source)
{
// Try to wait until the operation indicated by waitFor to finish first, elegantly.
// However, if the operation is blocking for too long, we will just abandon it,
// which may cause disposable.Dispose to fail.
waitFor.Wait(1000);
try
{
disposable.Dispose();
return true;
}
catch (InvalidOperationException ex)
{
// TextWriter can throw InvalidOperationException on disposal
// when an ongoing asynchronous operation
// has not been finished yet.
// Theoretically, we can do nothing about it, but to wait.
// Sadly, we can't.
Debug.WriteLine("{0}: Cannot dispose {1}. {2}", source, disposable, ex.Message);
}

return false;
}

}
}

0 comments on commit 52e0f39

Please sign in to comment.