From da5f246668feb8da95791fbf097fbf2726902f65 Mon Sep 17 00:00:00 2001 From: Stefan Rossbach Date: Fri, 31 Jul 2020 02:35:35 +0200 Subject: [PATCH] [INTERNAL][CORE] Redo optimization to Socks5 transport The current code was hard to understand and not really maintainable. Removed the simultaneous connection attempts in both directions. Only one connection attempt is made as is it very likely the the reverse connection attempt will also fail if the first one fails. The logic could be reintroduced but we should ensure that the connection attempt to B->A is made after A->B and not in parallel to keep the logic simple. --- .../saros/net/stream/Socks5StreamService.java | 934 +++--------------- 1 file changed, 155 insertions(+), 779 deletions(-) diff --git a/core/src/saros/net/stream/Socks5StreamService.java b/core/src/saros/net/stream/Socks5StreamService.java index a23ac82336..693a756bbb 100644 --- a/core/src/saros/net/stream/Socks5StreamService.java +++ b/core/src/saros/net/stream/Socks5StreamService.java @@ -1,23 +1,9 @@ package saros.net.stream; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.lang.reflect.Field; import java.net.Socket; -import java.net.SocketTimeoutException; -import java.util.HashMap; -import java.util.List; -import java.util.Random; -import java.util.concurrent.Callable; -import java.util.concurrent.Exchanger; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.regex.Pattern; +import java.util.Objects; import org.apache.log4j.Logger; import org.jivesoftware.smack.Connection; import org.jivesoftware.smack.XMPPException; @@ -34,49 +20,20 @@ import saros.net.internal.IByteStreamConnectionListener; import saros.net.util.NetworkingUtils; import saros.net.xmpp.JID; -import saros.util.NamedThreadFactory; /** - * Transport class for SOCKS5 bytestreams. When a Request is received always it is tried to - * establish a connection to the side peer, too. A special ID is used to distinguish connect - * requests and response requests. If there is a direct connection, we keep it, the other one - * discarded. If the is no one, a SMACK will establish a mediated connection by the server. + * This stream service establishes Socks5 streams. * - *

Are both connection direct, we use the one of the connect request (same for mediated). - * - *

However, still there might be a server that only supports unidirectional SOCKS5 bytestreams - * (i.e. OpenFire). In that case both mediated unidirectional connections are wrapped into a - * bidirectional one. (see {#link WrappedBidirectionalSocks5BytestreamSession}) - */ -/* - * FIXME DELETE this service, it try to solve things that nowadays are really - * uncommon, i.e on side is reachable and the other side is not. Normally both - * sides are unreachable either due to NAT or firewall settings and so - * connecting to each other will not gain us any advantage. Furthermore the - * implementation is broken by design, i.e how Socks5 connections are - * established. Calling #connect(TO) will not connect us TO but ask TO to - * connect to US ! This is not the fault of this service, it is how XEP-65 is - * defined. - * - * The code is full of race conditions (regarding time) and is hardly - * maintainable. - * - * There are enough software solutions out there that are able to create VPN - * environments (and the Smack Socks5 implementation can use them) that perform - * this task ways better than this logic. + *

See https://xmpp.org/extensions/xep-0065.html for more details. */ public class Socks5StreamService implements IStreamService, BytestreamListener { private static final Logger log = Logger.getLogger(Socks5StreamService.class); - private static final Random ID_GENERATOR = new Random(); - - private static final String RESPONSE_SESSION_ID_PREFIX = "response-socks5"; - - private static final byte BIDIRECTIONAL_TEST_BYTE = 0x1A; - - private static final int MEDIATED_BIDIRECTIONAL_TEST_TIMEOUT = 5000; - + /** + * Property to either enable or disable the Nagle algorithm for Socks5 byte stream. The default + * value is to disable the Nagle algorithm. + */ private static final boolean TCP_NODELAY = Boolean.valueOf(System.getProperty("saros.net.socks5.TCP_NODELAY", "true")); @@ -99,714 +56,217 @@ public class Socks5StreamService implements IStreamService, BytestreamListener { private static final int TOTAL_CONNECT_TIMEOUT = Integer.getInteger("saros.net.socks5.TOTAL_CONNECT_TIMEOUT", 20000); - private HashMap> runningRemoteConnects = - new HashMap>(); - private ExecutorService executorService; - private volatile Socks5BytestreamManager socks5Manager; private volatile IByteStreamConnectionListener connectionListener; + private volatile JID localAddress; - private JID localAddress; - - /** - * @param sessionID - * @param peer - * @return a Future that tries to establish a second connection to the peer's local SOCKS5 proxy - */ - private Future futureToEstablishResponseSession( - final String sessionID, final String peer) { - - return executorService.submit( - new Callable() { - @Override - public Socks5BytestreamSession call() throws Exception { - return (Socks5BytestreamSession) establishResponseSession(sessionID, peer); - } - }); - } - - /** - * Starts a new thread that waits until the connection is established to close it correctly. - * - * @param future - */ - private void waitToCloseResponse(final Future future) { - log.debug(prefix() + "canceling response connection as it is not needed"); - - Thread waitToCloseResponse = - new Thread("CloseUnneededResponseConnection") { - - @Override - public void run() { - try { - closeQuietly(future.get()); - } catch (InterruptedException e) { - // nothing to do here - } catch (ExecutionException e) { - log.debug( - prefix() - + "Exception while waiting to close unneeded connection: " - + e.getMessage()); - } - } - }; - waitToCloseResponse.start(); - } - - private String verboseLocalProxyInfo() { - final Socks5Proxy proxy = NetworkingUtils.getSocks5ProxySafe(); - final StringBuilder builder = new StringBuilder(); - - builder.append("local SOCKS5 proxy"); - - if (!proxy.isRunning()) { - builder.append(" not running"); - return builder.toString(); - } - - builder.append(" running ["); - builder.append("port=").append(proxy.getPort()).append(","); - builder.append(" configured streamhosts="); - builder.append(proxy.getLocalAddresses()); - builder.append("]"); - return builder.toString(); - } - - /* - * the Smack API does not tell us, if a mediated Socks5 connection is - * unidirectional. But some server implementation (OpenFire) may return such - * a connection. In this case we have to wrap the unidirectional - * connections. - */ + // *********** IStreamService impl start + @Override + public IByteStreamConnection connect(final String connectionId, final JID remoteAddress) + throws IOException, InterruptedException { - /** - * Tests one of the bytestreams != null in the opposite direction. It returns it if bidirectional - * or tries to wrap two unidirectional streams if possible. Else an exception is thrown. The - * testing order is defined by the boolean preferInSession. - * - * @pre inSession!=null || outSession!=null - * @param inSession - * @param outSession - * @param preferInSession which stream to test preferable (if != null) - * @return a bidirectional BytestreamSession - * @throws IOException if there is only one unidirectional session - */ - private BytestreamSession testAndGetMediatedBidirectionalBytestream( - Socks5BytestreamSession inSession, - Socks5BytestreamSession outSession, - boolean preferInSession) - throws IOException { + Objects.requireNonNull(connectionId, "connectionId must not be null"); + Objects.requireNonNull(remoteAddress, "remoteAddress must not be null"); - String msg = prefix() + "response connection is mediated, too, "; + if (connectionId.isEmpty()) + throw new IllegalArgumentException("connectionId must not be empty"); - Socks5BytestreamSession session = preferInSession ? inSession : outSession; + log.debug( + "establishing Socks5 bytestream to: " + + remoteAddress + + ", " + + verboseLocalProxyInfo() + + "..."); - // if preferable session did not work, try the other - if (session == null) { - preferInSession = !preferInSession; - session = preferInSession ? inSession : outSession; - } + final Socks5BytestreamManager currentSocks5Manager; + final IByteStreamConnectionListener currentConnectionListener; + final JID currentLocalAddress; - log.debug( - prefix() - + "trying if " - + (preferInSession ? "incoming " : "outgoing") - + " session is bidirectional"); - - if (streamIsBidirectional(session, preferInSession)) { - log.debug( - msg - + "but at least the server allows bidirectional connections. (using " - + (preferInSession ? "incoming session" : "outgoing session") - + ")"); - closeQuietly(preferInSession ? outSession : inSession); - return session; + synchronized (this) { + currentSocks5Manager = socks5Manager; + currentConnectionListener = connectionListener; + currentLocalAddress = localAddress; } - if (inSession == null || outSession == null) { - closeQuietly(inSession); - closeQuietly(outSession); + if (currentSocks5Manager == null + || currentConnectionListener == null + || currentLocalAddress == null) { throw new IOException( - "Could only establish one unidirectional connection but need two for wrapping."); + "service is not initialized - aborting establishing a Socks5 bytestream to: " + + remoteAddress); } - log.debug( - msg - + "and the server does not allow bidirectional connections. Wrapped session established."); - - return new WrappedBidirectionalSocks5BytestreamSession(inSession, outSession); - } - - /** - * Sends and receives an INT to distinguish between bidirectional and unidirectional streams. - * - * @param session - * @param sendFirst - * @return whether a stream is bidirectional - * @throws IOException - */ - private boolean streamIsBidirectional(Socks5BytestreamSession session, boolean sendFirst) - throws IOException { + final Socks5BytestreamSession session; try { - OutputStream out = session.getOutputStream(); - InputStream in = session.getInputStream(); - int test = 0; - - session.setReadTimeout(MEDIATED_BIDIRECTIONAL_TEST_TIMEOUT); - - if (sendFirst) { - out.write(BIDIRECTIONAL_TEST_BYTE); - test = in.read(); - } else { - test = in.read(); - out.write(BIDIRECTIONAL_TEST_BYTE); - } - - if (test == BIDIRECTIONAL_TEST_BYTE) { - log.trace( - prefix() + "stream is bidirectional. (" + (sendFirst ? "sending" : "receiving") + ")"); - return true; - } else { - log.error(prefix() + "stream can send and receive but got wrong result: " + test); - throw new IOException("SOCKS5 bytestream connections got mixed up. Try another transport."); - /* - * Note: a reason here might be a too low TEST_TIMEOUT but the - * exception enables fallback to IBB instead of having the - * stream crash on first use. - */ - } - - } catch (SocketTimeoutException ste) { - /* - * At least we have to wait TEST_STREAM_TIMEOUT to cause a timeout - * on the peer side, too. - * - * Else the first package might be read and the above error occurs - * (test != BIDIRECTIONAL_TEST_BYTE). - */ - try { - Thread.sleep(MEDIATED_BIDIRECTIONAL_TEST_TIMEOUT); - } catch (InterruptedException e) { - // nothing to do here - } + session = currentSocks5Manager.establishSession(remoteAddress.toString(), connectionId); + } catch (XMPPException e) { + throw translateXmppException(e, remoteAddress.toString()); } - /* - * Note: the streams cannot be closed here - even not the unused ones - - * as setting the timeout later on would throw an exception - */ + try { + setNagleAlgorithm(session); - log.debug(prefix() + "stream is unidirectional. Trying to wrap bidirectional one."); + return new BinaryChannelConnection( + currentLocalAddress, + remoteAddress, + connectionId, + new XMPPByteStreamAdapter(session), + session.isDirect() ? StreamMode.SOCKS5_DIRECT : StreamMode.SOCKS5_MEDIATED, + currentConnectionListener); - return false; + } catch (IOException e) { + closeSessionQuietly(session); + throw e; + } } - /** - * Handles a response request. - * - *

The session is exchanged to the connecting thread. - * - * @param request - * @throws XMPPException - * @throws InterruptedException - */ - private void handleResponse(BytestreamRequest request) - throws XMPPException, InterruptedException { - - String peer = request.getFrom(); - log.debug( - prefix() + "receiving response connection from " + peer + ", " + verboseLocalProxyInfo()); - - Socks5BytestreamSession inSession = (Socks5BytestreamSession) request.accept(); - - String sessionID = getSessionID(request.getSessionID()); - - // get running connect - Exchanger exchanger = runningRemoteConnects.get(sessionID); - - if (exchanger == null) { - log.warn(prefix() + "Received response connection without a running connect"); - closeQuietly(inSession); - return; - } + @Override + public synchronized void initialize( + Connection connection, IByteStreamConnectionListener listener) { + + localAddress = new JID(connection.getUser()); + socks5Manager = Socks5BytestreamManager.getBytestreamManager(connection); + socks5Manager.setTargetResponseTimeout(TARGET_RESPONSE_TIMEOUT); + connectionListener = listener; + socks5Manager.addIncomingBytestreamListener(this); + } - try { - exchanger.exchange(inSession, TARGET_RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - log.debug(prefix() + "Wrapping bidirectional stream was interrupted."); - closeQuietly(inSession); - } catch (TimeoutException e) { - log.error( - prefix() - + "Wrapping bidirectional stream timed out in Request! Shouldn't have happened."); - closeQuietly(inSession); - } + @Override + public synchronized void uninitialize() { + if (socks5Manager == null) return; + socks5Manager.removeIncomingBytestreamListener(this); + socks5Manager = null; + connectionListener = null; + localAddress = null; } - /** - * Accepts a Request and returns an established IByteStreamConnection. - * - *

Immediately tries to establish a second session to the requesting peer but also accepts this - * request to achieve a direct connection although one peer might be behind a NAT. - * - *

A direct connection is used, the other discarded where the requesting session is preferred. - * - *

In case of unidirectional connections both sessions a wrapped into a bidirectional one. - * - * @param request - * @return established BinaryChannel - * @throws XMPPException - * @throws InterruptedException - * @throws IOException - */ - private IByteStreamConnection acceptNewRequest(BytestreamRequest request) - throws XMPPException, IOException, InterruptedException { - String peer = request.getFrom(); + // *********** IStreamService impl end - log.debug(prefix() + "receiving request from " + peer + ", " + verboseLocalProxyInfo()); + @Override + public void incomingBytestreamRequest(final BytestreamRequest request) { - IByteStreamConnectionListener listener = connectionListener; + final String remoteAddress = request.getFrom(); - if (listener == null) throw new IOException(this + " transport is not initialized"); + log.debug("received request to establish a Socks5 bytestream to: " + remoteAddress); - String sessionID = request.getSessionID(); + final IByteStreamConnectionListener currentConnectionListener; + final JID currentLocalAddress; - String connectionIdentifier = getConnectionIdentifier(sessionID); + synchronized (this) { + currentConnectionListener = connectionListener; + currentLocalAddress = localAddress; + } - if (connectionIdentifier == null) { - log.warn( - "rejecting request from " + peer + " , no connection identifier found: " + sessionID); + if (currentConnectionListener == null || currentLocalAddress == null) { + log.warn("service is not initialized - rejecting request from: " + remoteAddress); request.reject(); - return null; + return; } - assert sessionID != null; - - // start to establish response - Future responseFuture = - futureToEstablishResponseSession(sessionID, peer); - - Socks5BytestreamSession inSession = null; + final Socks5BytestreamSession session; try { + ((Socks5BytestreamRequest) request).setTotalConnectTimeout(TOTAL_CONNECT_TIMEOUT); + session = (Socks5BytestreamSession) request.accept(); + } catch (InterruptedException e) { + log.warn("interrupted while accepting request from: " + remoteAddress); - inSession = (Socks5BytestreamSession) request.accept(); - - if (inSession.isDirect()) { - waitToCloseResponse(responseFuture); - configureSocks5Socket(inSession); - - return new BinaryChannelConnection( - localAddress, - new JID(peer), - connectionIdentifier, - new XMPPByteStreamAdapter(inSession), - StreamMode.SOCKS5_DIRECT, - listener); - } else { - log.debug(prefix() + "incoming connection is mediated."); - } - - } catch (Exception e) { - log.warn( - prefix() - + "Couldn't accept request but still trying to establish a response connection: " - + e.getMessage()); + /* This is called by Smack so avoid interrupting the thread again as nobody knows what will happen. */ + // Thread.currentThread().interrupt(); + return; + } catch (XMPPException e) { + log.error("failed to accept request from from: " + remoteAddress, e); + return; } - Socks5BytestreamSession outSession = null; - try { - - outSession = responseFuture.get(); - - if (outSession.isDirect()) { - log.debug(prefix() + "newly established session is direct! Discarding the other."); - closeQuietly(inSession); - configureSocks5Socket(outSession); - - return new BinaryChannelConnection( - localAddress, - new JID(peer), - connectionIdentifier, - new XMPPByteStreamAdapter(outSession), - StreamMode.SOCKS5_DIRECT, - listener); - } + setNagleAlgorithm(session); + + final IByteStreamConnection connection = + new BinaryChannelConnection( + currentLocalAddress, + new JID(remoteAddress), + request.getSessionID(), + new XMPPByteStreamAdapter(session), + session.isDirect() ? StreamMode.SOCKS5_DIRECT : StreamMode.SOCKS5_MEDIATED, + currentConnectionListener); + + /* FIXME race condition, connection fails, triggers connectionClosed + * on the listener and afterwards this is called + */ + currentConnectionListener.connectionChanged(connection.getConnectionID(), connection, true); } catch (IOException e) { - log.error(prefix() + "Socket crashed while initiating sending session (for wrapping)", e); - } catch (ExecutionException e) { - log.error("An error occurred while establishing a response connection ", e.getCause()); - } - - if (inSession == null && outSession == null) - throw new IOException(prefix() + "Neither connection could be established. "); - - BytestreamSession session = - testAndGetMediatedBidirectionalBytestream(inSession, outSession, true); - - return new BinaryChannelConnection( - localAddress, - new JID(peer), - connectionIdentifier, - new XMPPByteStreamAdapter(session), - StreamMode.SOCKS5_MEDIATED, - listener); - } - - /** - * Handles the SOCKS5Bytestream Request and distinguishes between connect requests and response - * requests. - * - *

see handleResponse() and acceptNewRequest() - */ - private IByteStreamConnection acceptRequest(BytestreamRequest request) - throws XMPPException, IOException, InterruptedException { - - ((Socks5BytestreamRequest) request).setTotalConnectTimeout(TOTAL_CONNECT_TIMEOUT); - - if (isResponse(request)) { - handleResponse(request); - return null; - } else { - return acceptNewRequest(request); - } - } - - /** - * Tries to establish a connection to peer and waits for peer to connect. See handleResponse(). - */ - private IByteStreamConnection establishBinaryChannel(String connectionIdentifier, String peer) - throws XMPPException, IOException, InterruptedException { - - Socks5BytestreamManager manager = socks5Manager; - IByteStreamConnectionListener listener = connectionListener; - - if (manager == null || listener == null) - throw new IOException(this + " transport is not initialized"); - - log.debug( - prefix() + "establishing connection to " + peer + ", " + verboseLocalProxyInfo() + "..."); - - // before establishing, we have to put the exchanger to the map - Exchanger exchanger = new Exchanger(); - - String sessionID = generateSessionID(connectionIdentifier, getNextNegotiationID()); - - runningRemoteConnects.put(sessionID, exchanger); - - try { - - Exception exception = null; - Socks5BytestreamSession outSession = null; - // Do we get a working connection? - try { - - outSession = manager.establishSession(peer, sessionID); - - if (outSession.isDirect()) { - configureSocks5Socket(outSession); - return new BinaryChannelConnection( - localAddress, - new JID(peer), - connectionIdentifier, - new XMPPByteStreamAdapter(outSession), - StreamMode.SOCKS5_DIRECT, - listener); - } - - log.debug( - prefix() - + "connection/session is mediated, performing additional connection optimization..."); - - } catch (IOException e) { - exception = e; - log.warn( - prefix() - + "could not establish a connection to " - + peer - + " due to an error in the socket communictation", - e); - } catch (XMPPException e) { - exception = e; - XMPPError error = e.getXMPPError(); - - if (error != null && error.getCode() == 406) { - log.warn( - prefix() - + "could not establish a connection to " - + peer - + ", remote Socks5 transport is disabled or encountered an error: " - + e.getMessage()); - /* - * quit here as it makes no sense to wait for the remote - * side to connect because this will never happen ! - */ - throw e; - } else if (error != null && error.getCode() == 404) { - log.warn( - prefix() - + "could not establish a connection to " - + peer - + ", remote side could not connect to any offered stream hosts: " - + e.getMessage()); - } else { - log.error(prefix() + "could not establish a connection to " + peer, e); - } - } catch (Exception e) { - // FIXME handle the InterruptedException correctly ! - exception = e; - - /* - * catch any possible RuntimeException because we must wait for - * the peer that may attempt to connect - */ - log.error( - prefix() - + "could not connect to " - + peer - + " because of an internal error: " - + exception.getMessage(), - e); - } - - log.debug(prefix() + "waiting for " + peer + " to establish a connection..."); - Socks5BytestreamSession inSession = null; - - // else wait for request - try { - inSession = exchanger.exchange(null, TARGET_RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS); - - if (inSession.isDirect()) { - log.debug(prefix() + "response connection is direct! Discarding the other."); - closeQuietly(outSession); - configureSocks5Socket(inSession); - - return new BinaryChannelConnection( - localAddress, - new JID(peer), - connectionIdentifier, - new XMPPByteStreamAdapter(inSession), - StreamMode.SOCKS5_DIRECT, - listener); - } - - } catch (TimeoutException e) { - closeQuietly(outSession); - String msg = "waiting for a response session timed out (" + TARGET_RESPONSE_TIMEOUT + "ms)"; - if (exception != null) - throw new IOException( - prefix() + msg + " and could not establish a connection from this side, too:", - exception); - else log.debug(msg); - } - - BytestreamSession session = - testAndGetMediatedBidirectionalBytestream(inSession, outSession, false); - - return new BinaryChannelConnection( - localAddress, - new JID(peer), - connectionIdentifier, - new XMPPByteStreamAdapter(session), - StreamMode.SOCKS5_MEDIATED, - listener); - - } finally { - runningRemoteConnects.remove(sessionID); - } - } - - /** - * @param peer - * @return a BytestreamSession with a response ID - * @throws XMPPException - * @throws IOException - * @throws InterruptedException - */ - private BytestreamSession establishResponseSession(String sessionID, String peer) - throws XMPPException, IOException, InterruptedException { - - log.debug(prefix() + "Start to establish new response connection"); - - Socks5BytestreamManager manager = socks5Manager; - - if (manager == null) throw new IOException(this + " transport is not initialized"); - - return manager.establishSession(peer, getResponseSessionID(sessionID)); - } - - // *********** IStreamService impl start - @Override - public IByteStreamConnection connect(String connectionID, JID remoteAddress) - throws IOException, InterruptedException { - - if (connectionID == null) throw new NullPointerException("connectionID is null"); - - if (remoteAddress == null) throw new NullPointerException("remoteAddress is null"); - - if (connectionID.isEmpty()) - throw new IllegalArgumentException("connectionID must not be empty"); - - if (connectionID.contains(String.valueOf(IStreamService.SESSION_ID_DELIMITER))) - throw new IllegalArgumentException( - "connectionID must not contain '" + IStreamService.SESSION_ID_DELIMITER + "'"); - - log.debug("establishing Socks5 bytestream to: " + remoteAddress); - - try { - return establishBinaryChannel(connectionID, remoteAddress.toString()); - } catch (XMPPException e) { - throw new IOException(e); + log.error("failed to setup connection for request from: " + remoteAddress, e); + closeSessionQuietly(session); } } @Override - public void initialize(Connection connection, IByteStreamConnectionListener listener) { - - synchronized (this) { - localAddress = new JID(connection.getUser()); - socks5Manager = createManager(connection); - connectionListener = listener; - socks5Manager.addIncomingBytestreamListener(this); - } - - executorService = - Executors.newCachedThreadPool(new NamedThreadFactory("SOCKS5ConnectionResponse-")); + public String toString() { + return "XMPP-Socks5-Stream-Service"; } - @Override - public void uninitialize() { + private static String verboseLocalProxyInfo() { + final Socks5Proxy proxy = NetworkingUtils.getSocks5ProxySafe(); + final StringBuilder builder = new StringBuilder(); - synchronized (this) { - if (socks5Manager == null) return; + builder.append("local SOCKS5 proxy"); - socks5Manager.removeIncomingBytestreamListener(this); - socks5Manager = null; - connectionListener = null; + if (!proxy.isRunning()) { + builder.append(" not running"); + return builder.toString(); } - assert (executorService != null); - - List notCommenced = executorService.shutdownNow(); - if (notCommenced.size() > 0) - log.warn(prefix() + "threads for response connections found that didn't commence yet"); - executorService = null; + builder.append(" running ["); + builder.append("port=").append(proxy.getPort()).append(","); + builder.append(" configured streamhosts="); + builder.append(proxy.getLocalAddresses()); + builder.append("]"); + return builder.toString(); } - // *********** IStreamService impl end - - @Override - public void incomingBytestreamRequest(BytestreamRequest request) { - log.debug( - "received request to establish a Socks5 bytestream to " - + request.getFrom() - + " [" - + this - + "]"); - + private static void closeSessionQuietly(BytestreamSession session) { try { - - IByteStreamConnection connection = acceptRequest(request); - - if (connection == null) return; - - IByteStreamConnectionListener listener = connectionListener; - - if (listener == null) { - log.warn( - "closing bytestream connection " - + connection - + " because transport " - + this - + " was uninitilized during connection establishment"); - connection.close(); - return; - } - - listener.connectionChanged(connection.getConnectionID(), connection, true); - - } catch (InterruptedException e) { - /* - * do not interrupt here as this is called by SMACK and nobody knows - * how SMACK handle thread interruption - */ - log.warn("interrupted while establishing bytestream connection to " + request.getFrom()); - } catch (Exception e) { - log.error("could not establish bytestream connection to " + request.getFrom(), e); + session.close(); + } catch (IOException e) { + // swallow } } - private Socks5BytestreamManager createManager(Connection connection) { - Socks5BytestreamManager socks5ByteStreamManager = - Socks5BytestreamManager.getBytestreamManager(connection); + private static IOException translateXmppException( + final XMPPException exception, final String remoteAddress) throws IOException { + final XMPPError error = exception.getXMPPError(); - socks5ByteStreamManager.setTargetResponseTimeout(TARGET_RESPONSE_TIMEOUT); + if (error != null && error.getCode() == 406) { + throw new IOException( + "could not establish a Socks5 bytestream to " + + remoteAddress + + ", remote Socks5 support is disabled or encountered an error", + exception); + } else if (error != null && error.getCode() == 404) { + throw new IOException( + "could not establish a Socks5 bytestream to " + + remoteAddress + + ", remote side could not connect to any offered stream hosts", + exception); - return socks5ByteStreamManager; + } else { + throw new IOException( + "could not establish a Socks5 bytestream to " + remoteAddress, exception); + } } /** - * Wraps two Socks5BytestreamSessions in one, where for the first one, "in", the InputStream has - * to work, for the second one, "out", the OutputStream. + * Enables or disables the Nagle algorithm depending on the value of {@linkplain #TCP_NODELAY}. + * + * @param session the byte stream session to modify */ - private static class WrappedBidirectionalSocks5BytestreamSession implements BytestreamSession { - - private Socks5BytestreamSession in; - private Socks5BytestreamSession out; - - public WrappedBidirectionalSocks5BytestreamSession( - Socks5BytestreamSession in, Socks5BytestreamSession out) { - this.in = in; - this.out = out; - } - - @Override - public void close() throws IOException { - IOException e = null; - - try { - in.close(); - } catch (IOException e1) { - e = e1; - } - - try { - out.close(); - } catch (IOException e1) { - e = e1; - } - - if (e != null) throw e; - } + private static void setNagleAlgorithm(final Socks5BytestreamSession session) { - @Override - public InputStream getInputStream() throws IOException { - return in.getInputStream(); - } - - @Override - public OutputStream getOutputStream() throws IOException { - return out.getOutputStream(); - } - - @Override - public int getReadTimeout() throws IOException { - return in.getReadTimeout(); - } - - @Override - public void setReadTimeout(int timeout) throws IOException { - in.setReadTimeout(timeout); - } - } - - private void configureSocks5Socket(Socks5BytestreamSession session) { - - Field socketField = null; - Socket socket; + final Field socketField; + final Socket socket; try { socketField = Socks5BytestreamSession.class.getDeclaredField("socket"); @@ -819,93 +279,9 @@ private void configureSocks5Socket(Socks5BytestreamSession session) { try { socket.setTcpNoDelay(TCP_NODELAY); - log.debug("nagle algorithm for socket disabled: " + TCP_NODELAY); + log.debug("Nagle algorithm for session " + session + " disabled: " + TCP_NODELAY); } catch (Exception e) { log.warn("could not modifiy TCP_NODELAY socket option", e); } - - /* - * avoid lingering to long (is this needed at all, Saros is not a high - * performance server application at all - */ - - // final int lingerTimeout = 10; // seconds - // - // try { - // socket.setSoLinger(true, lingerTimeout); - // log.debug("socket is configured with SO_LINGER timeout: " - // + socket.getSoLinger() + " s"); - // } catch (Exception e) { - // log.warn("could not modify SO_LINGER socket option", e); - // } - - } - - private String prefix() { - return "[SOCKS5Transport] "; - } - - private long getNextNegotiationID() { - return ID_GENERATOR.nextLong() & Long.MAX_VALUE; - } - - private static String generateSessionID(String connectionIdentifier, long negotiationID) { - return connectionIdentifier + SESSION_ID_DELIMITER + negotiationID; - } - - private static String getResponseSessionID(String sessionID) { - - if (sessionID == null || sessionID.isEmpty()) return null; - - if (sessionID.startsWith(RESPONSE_SESSION_ID_PREFIX)) return sessionID; - - return RESPONSE_SESSION_ID_PREFIX + SESSION_ID_DELIMITER + sessionID; - } - - // [response]:connectionID:negotiationID - private static String getConnectionIdentifier(String sessionID) { - - if (sessionID == null || sessionID.isEmpty()) return null; - - String[] sessionIDTokens = sessionID.split(Pattern.quote(String.valueOf(SESSION_ID_DELIMITER))); - - if (sessionIDTokens.length == 2) return sessionIDTokens[0]; - - if (sessionIDTokens.length == 3 && sessionIDTokens[0].equals(RESPONSE_SESSION_ID_PREFIX)) - return sessionIDTokens[1]; - - return null; - } - - // response:connectionID:negotiationID - private static String getSessionID(String responseSessionID) { - - if (responseSessionID == null || responseSessionID.isEmpty()) return null; - - String[] sessionIDTokens = - responseSessionID.split(Pattern.quote(String.valueOf(SESSION_ID_DELIMITER))); - - if (sessionIDTokens.length != 3 || !sessionIDTokens[0].equals(RESPONSE_SESSION_ID_PREFIX)) - return null; - - return sessionIDTokens[1] + SESSION_ID_DELIMITER + sessionIDTokens[2]; - } - - private static boolean isResponse(BytestreamRequest request) { - return request.getSessionID().startsWith(RESPONSE_SESSION_ID_PREFIX); - } - - private static void closeQuietly(BytestreamSession stream) { - if (stream == null) return; - try { - stream.close(); - } catch (Exception e) { - // NOP - } - } - - @Override - public String toString() { - return "XMPP-Socks5-Stream-Service"; } }