Skip to content

Commit

Permalink
Issue #12266 - InvocationType improvements and cleanups. (#12551)
Browse files Browse the repository at this point in the history
This is the work for the server-side only, the client side will be done in another pull request.

Previously, `AbstractConnection.getInvocationType()` was called by `AbstractConnection.ReadCallback`, but it was deprecated and is now removed, along with all its overrides.

This mechanism is now replaced by using a specific Callback implementation for each `AbstractConnection` subclass.
For example, `HttpConnection` uses `HttpConnection.FillableCallback` that in turn asks the `InvocationType` to the Server, and therefore the `Handler` tree.

Introduced `AbstractConnection.NonBlocking` for the cases where `onFillable()` is non-blocking.

Restored synchronous code for `ServerFCGIConnection.close()`, ensuring `super.close()` is always called.
Ensuring that in `HttpConnection.close()` `super.close()` is always called.

Fixed promise notification to avoid race between the task (writing an error response) and the promise (resetting the stream) in HTTP/2 and HTTP/3.

Signed-off-by: Simone Bordet <[email protected]>
Co-authored-by: Greg Wilkins <[email protected]>
  • Loading branch information
sbordet and gregw authored Nov 22, 2024
1 parent c56a24f commit 3d68e2b
Show file tree
Hide file tree
Showing 14 changed files with 244 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ public Connection newConnection(EndPoint endPoint, Map<String, Object> context)

protected abstract ProxyProtocolConnection newProxyProtocolConnection(EndPoint endPoint, Map<String, Object> context);

protected abstract static class ProxyProtocolConnection extends AbstractConnection implements Callback
protected abstract static class ProxyProtocolConnection extends AbstractConnection.NonBlocking implements Callback
{
static final Logger LOG = LoggerFactory.getLogger(ProxyProtocolConnection.class);

Expand Down Expand Up @@ -508,12 +508,6 @@ public void failed(Throwable x)
promise.failed(x);
}

@Override
public InvocationType getInvocationType()
{
return InvocationType.NON_BLOCKING;
}

@Override
public void onFillable()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.util.Attributes;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.thread.ThreadPool;
import org.slf4j.Logger;
Expand All @@ -43,6 +44,7 @@ public class ServerFCGIConnection extends AbstractMetaDataConnection implements
{
private static final Logger LOG = LoggerFactory.getLogger(ServerFCGIConnection.class);

private final Callback fillableCallback = new FillableCallback();
private final HttpChannel.Factory httpChannelFactory = new HttpChannel.DefaultFactory();
private final Attributes attributes = new Lazy();
private final Connector connector;
Expand Down Expand Up @@ -161,7 +163,7 @@ public void clearAttributes()
public void onOpen()
{
super.onOpen();
fillInterested();
fillInterested(fillableCallback);
}

@Override
Expand Down Expand Up @@ -189,7 +191,7 @@ public void onFillable()
else if (read == 0)
{
releaseInputBuffer();
fillInterested();
fillInterested(fillableCallback);
return;
}
else
Expand Down Expand Up @@ -305,7 +307,7 @@ void onCompleted(Throwable failure)
{
releaseInputBuffer();
if (failure == null)
fillInterested();
fillInterested(fillableCallback);
else
getFlusher().shutdown();
}
Expand Down Expand Up @@ -421,4 +423,27 @@ public void close()
super.close();
}
}

private class FillableCallback implements Callback
{
private final InvocationType invocationType = getConnector().getServer().getInvocationType();

@Override
public void succeeded()
{
onFillable();
}

@Override
public void failed(Throwable x)
{
onFillInterestedFailed(x);
}

@Override
public InvocationType getInvocationType()
{
return invocationType;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,6 @@ public void failed(Throwable x)
close();
promise.failed(x);
}

@Override
public InvocationType getInvocationType()
{
return InvocationType.NON_BLOCKING;
}
}

private static class ConnectionListener implements Connection.Listener
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ else if (filled == 0)
}
networkBuffer = null;
if (interested)
getEndPoint().fillInterested(fillableCallback);
fillInterested(fillableCallback);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ public abstract class AbstractConnection implements Connection, Invocable
private static final Logger LOG = LoggerFactory.getLogger(AbstractConnection.class);

private final List<Listener> _listeners = new CopyOnWriteArrayList<>();
private final Callback _fillableCallback = new FillableCallback();
private final long _created = System.currentTimeMillis();
private final EndPoint _endPoint;
private final Executor _executor;
private final Callback _readCallback;
private int _inputBufferSize = 2048;

protected AbstractConnection(EndPoint endPoint, Executor executor)
Expand All @@ -48,16 +48,6 @@ protected AbstractConnection(EndPoint endPoint, Executor executor)
throw new IllegalArgumentException("Executor must not be null!");
_endPoint = endPoint;
_executor = executor;
_readCallback = new ReadCallback();
}

@Deprecated
@Override
public InvocationType getInvocationType()
{
// TODO consider removing the #fillInterested method from the connection and only use #fillInterestedCallback
// so a connection need not be Invocable
return Invocable.super.getInvocationType();
}

@Override
Expand Down Expand Up @@ -90,25 +80,27 @@ protected Executor getExecutor()
}

/**
* <p>Utility method to be called to register read interest.</p>
* <p>After a call to this method, {@link #onFillable()} or {@link #onFillInterestedFailed(Throwable)}
* will be called back as appropriate.</p>
* <p>Registers read interest using the default {@link Callback} with {@link Invocable.InvocationType#BLOCKING}.</p>
* <p>When read readiness is signaled, {@link #onFillable()} or {@link #onFillInterestedFailed(Throwable)}
* will be invoked.</p>
* <p>This method should be used sparingly, mainly from {@link #onOpen()}, and {@link #fillInterested(Callback)}
* should be preferred instead, passing a {@link Callback} that specifies the {@link Invocable.InvocationType}
* for each specific case where read interest needs to be registered.</p>
*
* @see #fillInterested(Callback)
* @see #onFillable()
* @see #onFillInterestedFailed(Throwable)
*/
public void fillInterested()
{
if (LOG.isDebugEnabled())
LOG.debug("fillInterested {}", this);
getEndPoint().fillInterested(_readCallback);
fillInterested(_fillableCallback);
}

/**
* <p>Utility method to be called to register read interest.</p>
* <p>After a call to this method, {@link #onFillable()} or {@link #onFillInterestedFailed(Throwable)}
* will be called back as appropriate.</p>
* <p>Registers read interest with the given callback.</p>
* <p>When read readiness is signaled, the callback will be completed.</p>
*
* @see #onFillable()
* @param callback the callback to complete when read readiness is signaled
*/
public void fillInterested(Callback callback)
{
Expand All @@ -130,7 +122,7 @@ public boolean isFillInterested()
/**
* <p>Callback method invoked when the endpoint is ready to be read.</p>
*
* @see #fillInterested()
* @see #fillInterested(Callback)
*/
public abstract void onFillable();

Expand All @@ -139,7 +131,7 @@ public boolean isFillInterested()
*
* @param cause the exception that caused the failure
*/
protected void onFillInterestedFailed(Throwable cause)
public void onFillInterestedFailed(Throwable cause)
{
if (LOG.isDebugEnabled())
LOG.debug("onFillInterestedFailed {}", this, cause);
Expand Down Expand Up @@ -286,7 +278,39 @@ public String toConnectionString()
return String.format("%s@%x", getClass().getSimpleName(), hashCode());
}

private class ReadCallback implements Callback, Invocable
public abstract static class NonBlocking extends AbstractConnection
{
private final Callback _nonBlockingReadCallback = new NonBlockingFillableCallback();

public NonBlocking(EndPoint endPoint, Executor executor)
{
super(endPoint, executor);
}

/**
* <p>Registers read interest using the default {@link Callback} with {@link Invocable.InvocationType#NON_BLOCKING}.</p>
* <p>When read readiness is signaled, {@link #onFillable()} or {@link #onFillInterestedFailed(Throwable)}
* will be invoked.</p>
* <p>This method should be used sparingly, and {@link #fillInterested(Callback)}
* should be preferred instead, passing a {@link Callback} that specifies the {@link Invocable.InvocationType}
* for each specific case where read interest needs to be registered.</p> */
@Override
public void fillInterested()
{
fillInterested(_nonBlockingReadCallback);
}

private class NonBlockingFillableCallback extends FillableCallback
{
@Override
public InvocationType getInvocationType()
{
return InvocationType.NON_BLOCKING;
}
}
}

private class FillableCallback implements Callback
{
@Override
public void succeeded()
Expand All @@ -305,11 +329,5 @@ public String toString()
{
return String.format("%s@%x{%s}", getClass().getSimpleName(), hashCode(), AbstractConnection.this);
}

@Override
public InvocationType getInvocationType()
{
return AbstractConnection.this.getInvocationType();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class NegotiatingClientConnection extends AbstractConnection
public abstract class NegotiatingClientConnection extends AbstractConnection.NonBlocking
{
private static final Logger LOG = LoggerFactory.getLogger(NegotiatingClientConnection.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public Connection newConnection(Connector connector, EndPoint endPoint)
return configure(new DetectorConnection(endPoint, connector), connector, endPoint);
}

private class DetectorConnection extends AbstractConnection implements Connection.UpgradeFrom, Connection.UpgradeTo
private class DetectorConnection extends AbstractConnection.NonBlocking implements Connection.UpgradeFrom, Connection.UpgradeTo
{
private final Connector _connector;
private final RetainableByteBuffer _buffer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class NegotiatingServerConnection extends AbstractConnection
public abstract class NegotiatingServerConnection extends AbstractConnection.NonBlocking
{
private static final Logger LOG = LoggerFactory.getLogger(NegotiatingServerConnection.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public Connection newConnection(Connector connector, EndPoint endp)
return configure(new ProxyProtocolV1Connection(endp, connector, nextConnectionFactory), connector, endp);
}

private static class ProxyProtocolV1Connection extends AbstractConnection implements Connection.UpgradeFrom, Connection.UpgradeTo
private static class ProxyProtocolV1Connection extends AbstractConnection.NonBlocking implements Connection.UpgradeFrom, Connection.UpgradeTo
{
// 0 1 2 3 4 5 6
// 98765432109876543210987654321
Expand Down Expand Up @@ -451,7 +451,7 @@ public Connection newConnection(Connector connector, EndPoint endp)
return configure(new ProxyProtocolV2Connection(endp, connector, nextConnectionFactory), connector, endp);
}

private class ProxyProtocolV2Connection extends AbstractConnection implements Connection.UpgradeFrom, Connection.UpgradeTo
private class ProxyProtocolV2Connection extends AbstractConnection.NonBlocking implements Connection.UpgradeFrom, Connection.UpgradeTo
{
private static final int HEADER_LENGTH = 16;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ protected void connectionFailed(SelectableChannel channel, final Throwable ex, f
}
}

protected static class ConnectContext
public static class ConnectContext
{
private final ConcurrentMap<String, Object> context = new ConcurrentHashMap<>();
private final Request request;
Expand Down Expand Up @@ -659,7 +659,7 @@ protected void write(EndPoint endPoint, ByteBuffer buffer, Callback callback)
}
}

private abstract static class TunnelConnection extends AbstractConnection
private abstract static class TunnelConnection extends AbstractConnection.NonBlocking
{
private final IteratingCallback pipe = new ProxyIteratingCallback();
private final ByteBufferPool bufferPool;
Expand Down Expand Up @@ -781,6 +781,12 @@ protected void onCompleteFailure(Throwable cause)
buffer = Retainable.release(buffer);
}

@Override
public InvocationType getInvocationType()
{
return InvocationType.NON_BLOCKING;
}

private void disconnect(Throwable x)
{
TunnelConnection.this.close(x);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,11 +445,16 @@ private Runnable onFailure(Throwable x, boolean remote)
// If not handled, then we just fail the request callback
if (!_handled && _handling == null)
{
if (LOG.isDebugEnabled())
LOG.debug("failing request not yet handled {} {}", _request, this);
Callback callback = _request._callback;
task = () -> callback.failed(x);
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("failing request {} {}", _request, this);

// Set the failure to arrange for any subsequent reads or demands to fail.
if (_readFailure == null)
_readFailure = Content.Chunk.from(x, true);
Expand Down
Loading

0 comments on commit 3d68e2b

Please sign in to comment.