Skip to content

Commit

Permalink
GH-524: Avoid unnecessary work in SFTP uploads
Browse files Browse the repository at this point in the history
Streamline ACK checking a bit. Do a quick check for the OK case first
and do the full and proper parsing only if the status is not OK. Also
avoid getting the IDLE_TIMEOUT repeatedly when draining remaining ACKs
when SftpOutputStreamAsync is closed.
  • Loading branch information
tomaswolf committed Aug 6, 2024
1 parent d53fea9 commit c2a4760
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,19 @@
import org.apache.sshd.common.SshConstants;
import org.apache.sshd.common.session.Session;
import org.apache.sshd.common.session.helpers.PacketBuffer;
import org.apache.sshd.common.util.GenericUtils;
import org.apache.sshd.common.util.ValidateUtils;
import org.apache.sshd.common.util.buffer.Buffer;
import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
import org.apache.sshd.common.util.io.output.OutputStreamWithChannel;
import org.apache.sshd.core.CoreModuleProperties;
import org.apache.sshd.sftp.client.SftpClient;
import org.apache.sshd.sftp.client.SftpClient.CloseableHandle;
import org.apache.sshd.sftp.client.SftpClient.OpenMode;
import org.apache.sshd.sftp.client.SftpClientHolder;
import org.apache.sshd.sftp.client.SftpMessage;
import org.apache.sshd.sftp.common.SftpConstants;
import org.apache.sshd.sftp.common.SftpException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -327,8 +330,7 @@ private void internalFlush() throws IOException {
}

pendingAcks.removeFirst();
SftpResponse response = SftpResponse.parse(SftpConstants.SSH_FXP_WRITE, buf);
client.checkResponseStatus(SftpConstants.SSH_FXP_WRITE, ack.id, SftpStatus.parse(response));
checkStatus(client, buf);
}

if (buffer == null) {
Expand Down Expand Up @@ -366,6 +368,27 @@ private void internalFlush() throws IOException {
buffer = null;
}

private void checkStatus(AbstractSftpClient client, Buffer buf) throws IOException {
if (buf.available() >= 13) {
int rpos = buf.rpos();
buf.rpos(rpos + 4); // Skip length
int cmd = buf.getUByte();
if (cmd != SftpConstants.SSH_FXP_STATUS) {
throw new SftpException(SftpConstants.SSH_FX_BAD_MESSAGE,
"Unexpected SFTP response; expected SSH_FXP_STATUS but got "
+ SftpConstants.getCommandMessageName(cmd));
}
buf.rpos(rpos + 9); // Skip ahead until after the id
if (buf.getInt() == SftpConstants.SSH_FX_OK) {
return;
}
// Reset and do the full parse
buf.rpos(rpos);
}
SftpResponse response = SftpResponse.parse(SftpConstants.SSH_FXP_WRITE, buf);
client.checkResponseStatus(SftpConstants.SSH_FXP_WRITE, response.getId(), SftpStatus.parse(response));
}

@Override
public void close() throws IOException {
if (!isOpen()) {
Expand All @@ -388,18 +411,26 @@ public void close() throws IOException {
lastMsg = null;
}

Duration idleTimeout = CoreModuleProperties.IDLE_TIMEOUT.getRequired(getClient().getClientSession());
if (GenericUtils.isNegativeOrNull(idleTimeout)) {
idleTimeout = CoreModuleProperties.IDLE_TIMEOUT.getRequiredDefault();
}
AbstractSftpClient client = getClient();
for (int ackIndex = 1; !pendingAcks.isEmpty(); ackIndex++) {
SftpAckData ack = pendingAcks.removeFirst();
if (debugEnabled) {
log.debug("close({}) processing ack #{}: {}", this, ackIndex, ack);
}

SftpResponse response = client.response(SftpConstants.SSH_FXP_WRITE, ack.id);
Buffer buf = client.receive(ack.id, idleTimeout);
if (buf == null) {
log.debug("close({}) no ack response for {}", this, ack);
break;
}
if (debugEnabled) {
log.debug("close({}) processing ack #{} response for {}", this, ackIndex, ack);
}
client.checkResponseStatus(SftpConstants.SSH_FXP_WRITE, response.getId(), SftpStatus.parse(response));
checkStatus(client, buf);
}
} finally {
if (ownsHandle) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ public static SftpResponse parse(int cmd, Buffer buffer) throws IOException {
int length = buffer.getInt();
int type = buffer.getUByte();
int id = buffer.getInt();
validateIncomingResponse(cmd, id, type, length, buffer);
// No need to validate the length here: the way we assemble these buffers guarantees that
// the length is reasonable and does not exceed buffer.available().
return new SftpResponse(cmd, id, type, length, buffer);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.sshd.common.util.buffer.Buffer;
import org.apache.sshd.sftp.common.SftpConstants;
import org.apache.sshd.sftp.common.SftpException;

/**
* A representation of a SSH_FXP_STATUS record.
Expand Down Expand Up @@ -69,7 +70,12 @@ static SftpStatus parse(Buffer buffer) {
return new SftpStatus(code, message, language);
}

public static SftpStatus parse(SftpResponse response) {
public static SftpStatus parse(SftpResponse response) throws SftpException {
if (response.getType() != SftpConstants.SSH_FXP_STATUS) {
throw new SftpException(
SftpConstants.SSH_FX_BAD_MESSAGE, "Unexpected SFTP response: expected SSH_FXP_STATUS but got "
+ SftpConstants.getCommandMessageName(response.getType()));
}
return parse(response.getBuffer());
}
}

0 comments on commit c2a4760

Please sign in to comment.