Skip to content

Commit

Permalink
SftpFile: disallow concurrent reads/writes. (#171)
Browse files Browse the repository at this point in the history
  • Loading branch information
tmds authored May 29, 2024
1 parent 371d600 commit ba41694
Showing 1 changed file with 58 additions and 21 deletions.
79 changes: 58 additions & 21 deletions src/Common/SftpFile.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Threading;
using System.Threading.Tasks;
using System.IO;
using System.Diagnostics;

namespace Tmds.Ssh;

Expand All @@ -20,6 +21,8 @@ public sealed class SftpFile : Stream
// The position is updated at the start of the operation to support concurrent requests.
private long _position;

private int _inProgress;

internal SftpFile(SftpClient client, byte[] handle)
{
_client = client;
Expand Down Expand Up @@ -68,17 +71,16 @@ public override async ValueTask<int> ReadAsync(Memory<byte> buffer, Cancellation
{
ThrowIfDisposed();

long readOffset = Interlocked.Add(ref _position, buffer.Length) - buffer.Length;
int bytesRead = 0;
SetInProgress(true);
try
{
bytesRead = await _client.ReadFileAsync(Handle, readOffset, buffer, cancellationToken).ConfigureAwait(false);

int bytesRead = await _client.ReadFileAsync(Handle, _position, buffer, cancellationToken).ConfigureAwait(false);
_position += bytesRead;
return bytesRead;
}
finally
{
Interlocked.Add(ref _position, bytesRead - buffer.Length);
SetInProgress(false);
}
}

Expand All @@ -99,15 +101,15 @@ public async override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, Cancella
{
ThrowIfDisposed();

long writeOffset = Interlocked.Add(ref _position, buffer.Length) - buffer.Length;
SetInProgress(true);
try
{
await _client.WriteFileAsync(Handle, writeOffset, buffer, cancellationToken).ConfigureAwait(false);
await _client.WriteFileAsync(Handle, _position, buffer, cancellationToken).ConfigureAwait(false);
_position += buffer.Length;
}
catch
finally
{
Interlocked.Add(ref _position, -buffer.Length);
throw;
SetInProgress(false);
}
}

Expand Down Expand Up @@ -147,18 +149,32 @@ public async ValueTask SetAttributesAsync(
{
ThrowIfDisposed();

await _client.SetAttributesForHandleAsync(
handle: Handle,
length: length,
ids: ids,
permissions: permissions,
times: times,
extendedAttributes: extendedAttributes,
cancellationToken).ConfigureAwait(false);

if (_position > length)
if (length.HasValue)
{
SetInProgress(true);
}
try
{
_position = length.Value;
await _client.SetAttributesForHandleAsync(
handle: Handle,
length: length,
ids: ids,
permissions: permissions,
times: times,
extendedAttributes: extendedAttributes,
cancellationToken).ConfigureAwait(false);

if (_position > length)
{
_position = length.Value;
}
}
finally
{
if (length.HasValue)
{
SetInProgress(false);
}
}
}

Expand Down Expand Up @@ -194,4 +210,25 @@ public override long Seek(long offset, SeekOrigin origin)

public override void SetLength(long value)
=> throw new NotSupportedException();

private void SetInProgress(bool value)
{
if (value)
{
if (Interlocked.CompareExchange(ref _inProgress, 1, 0) != 0)
{
ThrowConcurrentOperations();
}
}
else
{
Debug.Assert(_inProgress == 1);
Volatile.Write(ref _inProgress, 0);
}
}

private void ThrowConcurrentOperations()
{
throw new InvalidOperationException("Concurrent read/write operations are not allowed.");
}
}

0 comments on commit ba41694

Please sign in to comment.