Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HTTP client request cancellation #714

Merged
merged 4 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions src-java/aleph/utils/RequestCancellationException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package aleph.utils;

import java.util.concurrent.CancellationException;

public class RequestCancellationException extends CancellationException {

public RequestCancellationException() { }

public RequestCancellationException(String message) {
super(message);
}

public RequestCancellationException(Throwable cause) {
super(cause.getMessage());
initCause(cause);
}

public RequestCancellationException(String message, Throwable cause) {
super(message);
initCause(cause);
}

}
194 changes: 104 additions & 90 deletions src/aleph/http.clj
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
ConnectionTimeoutException
PoolTimeoutException
ReadTimeoutException
RequestCancellationException
RequestTimeoutException)
(io.aleph.dirigiste Pools)
(io.netty.handler.codec Headers)
Expand Down Expand Up @@ -336,6 +337,9 @@
by [clj-http](https://github.com/dakrone/clj-http), and returns a deferred representing
the HTTP response. Also allows for a custom `pool` or `middleware` to be defined.

Putting the returned deferred into an error state will cancel the underlying request if it is
still in flight.

KingMob marked this conversation as resolved.
Show resolved Hide resolved
Param key | Description
-------------------- | -----------------------------------------------------------------------------------------------------------------------------------------------------------------
`connection-timeout` | timeout in milliseconds for the connection to become established
Expand All @@ -358,96 +362,106 @@
middleware identity
connection-timeout 6e4} ;; 60 seconds
:as req}]

(executor/with-executor response-executor
((middleware
(fn [req]
(let [k (client/req->domain req)
start (System/currentTimeMillis)]

;; acquire a connection
(-> (flow/acquire pool k)
(maybe-timeout! pool-timeout)

;; pool timeout triggered
(d/catch' TimeoutException
(fn [^Throwable e]
(d/error-deferred (PoolTimeoutException. e))))

(d/chain'
(fn [conn]

;; get the wrapper for the connection, which may or may not be realized yet
(-> (first conn)
(maybe-timeout! connection-timeout)

;; connection timeout triggered, dispose of the connetion
(d/catch' TimeoutException
(fn [^Throwable e]
(log/error e "Timed out waiting for connection to be established")
(flow/dispose pool k conn)
(d/error-deferred (ConnectionTimeoutException. e))))

;; connection failed, bail out
(d/catch'
(fn [e]
(log/error e "Connection failure")
(flow/dispose pool k conn)
(d/error-deferred e)))

;; actually make the request now
(d/chain'
(fn [conn']
(when-not (nil? conn')
(let [end (System/currentTimeMillis)]
(-> (conn' req)
(maybe-timeout! request-timeout)

;; request timeout triggered, dispose of the connection
(d/catch' TimeoutException
(fn [^Throwable e]
(flow/dispose pool k conn)
(d/error-deferred (RequestTimeoutException. e))))

;; request failed, dispose of the connection
(d/catch'
(fn [e]
(log/trace "Request failed. Disposing of connection...")
(flow/dispose pool k conn)
(d/error-deferred e)))

;; clean up the connection
(d/chain'
(fn cleanup-conn [rsp]

;; either destroy/dispose of the conn, or release it back for reuse
(-> (:aleph/destroy-conn? rsp)
(maybe-timeout! read-timeout)

(d/catch' TimeoutException
(fn [^Throwable e]
(log/trace "Request timed out. Disposing of connection...")
(flow/dispose pool k conn)
(d/error-deferred (ReadTimeoutException. e))))

(d/chain'
(fn [early?]
(if (or early?
(not (:aleph/keep-alive? rsp))
(<= 400 (:status rsp)))
(do
(log/trace "Connection finished. Disposing...")
(flow/dispose pool k conn))
(flow/release pool k conn)))))
(-> rsp
(dissoc :aleph/destroy-conn?)
(assoc :connection-time (- end start)))))))))

(fn handle-response [rsp]
(->> rsp
(middleware/handle-cookies req)
(middleware/handle-redirects request req)))))))))))
req))))
(let [dispose-conn! (atom (fn []))
result (d/deferred response-executor)
response (executor/with-executor response-executor
((middleware
(fn [req]
(let [k (client/req->domain req)
start (System/currentTimeMillis)]

;; acquire a connection
(-> (flow/acquire pool k)
(maybe-timeout! pool-timeout)

;; pool timeout triggered
(d/catch' TimeoutException
(fn [^Throwable e]
(d/error-deferred (PoolTimeoutException. e))))

(d/chain'
(fn [conn]
;; NOTE: All error handlers below delegate disposal of the
;; connection to the error handler on `result` which uses this
;; function.
(reset! dispose-conn! (fn [] (flow/dispose pool k conn)))

(if (realized? result)
;; to account for race condition between setting `dispose-conn!`
;; and putting `result` into error state for cancellation
(@dispose-conn!)
;; get the wrapper for the connection, which may or may not be realized yet
(-> (first conn)
(maybe-timeout! connection-timeout)

;; connection timeout triggered
(d/catch' TimeoutException
(fn [^Throwable e]
(d/error-deferred (ConnectionTimeoutException. e))))

;; actually make the request now
(d/chain'
(fn [conn']
(when-not (nil? conn')
(let [end (System/currentTimeMillis)]
(-> (conn' req)
(maybe-timeout! request-timeout)

;; request timeout triggered
(d/catch' TimeoutException
(fn [^Throwable e]
(d/error-deferred (RequestTimeoutException. e))))

;; clean up the connection
(d/chain'
(fn cleanup-conn [rsp]

;; either destroy/dispose of the conn, or release it back for reuse
(-> (:aleph/destroy-conn? rsp)
(maybe-timeout! read-timeout)

;; read timeout triggered
(d/catch' TimeoutException
(fn [^Throwable e]
(log/trace "Request timed out.")
(d/error-deferred (ReadTimeoutException. e))))

(d/chain'
(fn [early?]
(if (or early?
(not (:aleph/keep-alive? rsp))
(<= 400 (:status rsp)))
(do
(log/trace "Connection finished. Disposing...")
(flow/dispose pool k conn))
(flow/release pool k conn)))))
(-> rsp
(dissoc :aleph/destroy-conn?)
(assoc :connection-time (- end start)))))))))

(fn handle-response [rsp]
(->> rsp
(middleware/handle-cookies req)
(middleware/handle-redirects request req))))))))))))
req))]
(d/connect response result)
arnaudgeiser marked this conversation as resolved.
Show resolved Hide resolved
(d/catch' result
(fn [e]
(log/trace e "Request failed. Disposing of connection...")
arnaudgeiser marked this conversation as resolved.
Show resolved Hide resolved
(@dispose-conn!)
(d/error-deferred e)))
KingMob marked this conversation as resolved.
Show resolved Hide resolved
result)))

(defn cancel-request!
"Accepts a response deferred as returned by `request` and closes the underlying TCP connection. If
the request had already completed by the time this function is invoked, it has no effect (as per
Manifold deferred semantics). If cancellation succeeded, the deferred will be put into error state
with an `aleph.utils.RequestCancellationException` instance.

Note that the request may already have been (partially) processed by the server at the point of
cancellation."
[r]
(d/error! r (RequestCancellationException. "Request cancelled")))

(defn- req
([method url]
Expand Down
15 changes: 15 additions & 0 deletions test/aleph/http_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
(:import
(aleph.utils
ConnectionTimeoutException
RequestCancellationException
RequestTimeoutException)
(clojure.lang
ExceptionInfo)
Expand Down Expand Up @@ -1439,6 +1440,20 @@
(is (instance? IllegalArgumentException result))
(is (= "use-h2c? may only be true when HTTP/2 is enabled." (ex-message result))))))

(deftest test-in-flight-request-cancellation
(let [conn-established (promise)
conn-closed (promise)]
(with-raw-handler (fn [req]
(deliver conn-established true)
(s/on-closed (:body req)
(fn []
(deliver conn-closed true))))
(let [rsp (http-get "/")]
(is (= true (deref conn-established 1000 :timeout)))
(http/cancel-request! rsp)
(is (= true (deref conn-closed 1000 :timeout)))
(is (thrown? RequestCancellationException (deref rsp 1000 :timeout)))))))

(deftest ^:leak test-leak-in-raw-stream-handler
;; NOTE: Expecting 2 leaks because `with-raw-handler` will run its body for both http1 and
;; http2. It would be nicer to put this assertion into the body but the http1 server seems to
Expand Down