Skip to content

Commit

Permalink
[phantom-presence][2] add a ping-pong worker to websocket.clj (#208)
Browse files Browse the repository at this point in the history
  • Loading branch information
stopachka authored Sep 13, 2024
1 parent 24516a0 commit 61b0dff
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 42 deletions.
82 changes: 67 additions & 15 deletions server/src/instant/lib/ring/websocket.clj
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
(:refer-clojure :exclude [send])
(:require [ring.adapter.undertow.headers :refer [set-headers]]
[instant.util.json :refer [->json]]
[instant.util.tracer :as tracer])
[instant.util.tracer :as tracer]
[instant.util.delay :as delay])
(:import
[io.undertow.server HttpServerExchange]
[io.undertow.websockets
Expand All @@ -21,43 +22,71 @@
BufferedBinaryMessage
BufferedTextMessage
CloseMessage
WebSocketChannel
StreamSourceFrameChannel WebSocketChannel
WebSockets
WebSocketCallback]
[io.undertow.websockets.spi WebSocketHttpExchange]
[org.xnio ChannelListener]
[ring.adapter.undertow Util]
[clojure.lang IPersistentMap]
[io.undertow.websockets.extensions PerMessageDeflateHandshake]
[java.util.concurrent.locks ReentrantLock]))
[java.util.concurrent.locks ReentrantLock]
[java.util.concurrent.atomic AtomicLong]
[java.nio ByteBuffer]
[org.xnio IoUtils]))

(defn ws-listener
"Creates an `AbstractReceiveListener`. This relays calls to
`on-message`, `on-close-message`, and `on-error` callbacks.
See `ws-callback` for more details."
[{:keys [on-message on-close-message on-error channel-wrapper]}]
[{:keys [on-message on-close-message on-error channel-wrapper atomic-last-received-at]}]
(let [on-message (or on-message (constantly nil))
on-error (or on-error (constantly nil))
on-close-message (or on-close-message (constantly nil))]
(proxy [AbstractReceiveListener] []
(onFullTextMessage [^WebSocketChannel channel ^BufferedTextMessage message]

(.set atomic-last-received-at (System/currentTimeMillis))
(on-message {:channel (channel-wrapper channel)
:data (.getData message)}))
(onFullBinaryMessage [^WebSocketChannel channel ^BufferedBinaryMessage message]
(.set atomic-last-received-at (System/currentTimeMillis))
(let [pooled (.getData message)]
(try
(let [payload (.getResource pooled)]
(on-message {:channel (channel-wrapper channel)
:data (Util/toArray payload)}))
(finally (.free pooled)))))
(onPong [^WebSocketChannel channel ^StreamSourceFrameChannel channel]
(tracer/with-span! {:name "socket/pong"}
(.set atomic-last-received-at (System/currentTimeMillis))))
(onCloseMessage [^CloseMessage message ^WebSocketChannel channel]
(on-close-message {:channel (channel-wrapper channel)
:message message}))
(onError [^WebSocketChannel channel ^Throwable error]
(on-error {:channel (channel-wrapper channel)
:error error})))))

(defonce ping-pool (delay/make-pool!))

(defn straight-jacket-run-ping-job [^WebSocketChannel channel
^AtomicLong atomic-last-received-at
idle-timeout-ms]
(tracer/with-span! {:name "socket/ping"}
(try
(let [now (System/currentTimeMillis)
last-received-at (.get atomic-last-received-at)
ms-since-last-message (- now last-received-at)]
(if (> ms-since-last-message idle-timeout-ms)
(tracer/with-span! {:name "socket/close-inactive"}
(IoUtils/safeClose channel))
(WebSockets/sendPingBlocking (ByteBuffer/allocate 0)
channel)))
(catch Exception e
(tracer/record-exception-span! e {:name "socket/ping-err"
:escaping? false})))))

(defn ws-callback
"Creates a `WebsocketConnectionCallback`. This relays data to the
following callbacks:
Expand Down Expand Up @@ -85,26 +114,49 @@
on-error: Called when the server encounters an error sending a message
:channel - The `WebSocketChannel` object
:error - The error Throwable"
[{:keys [on-open on-close listener]
:or {on-open (constantly nil) on-close (constantly nil)}
:error - The error Throwable
We also kick off a ping worker. It sends a `ping` message every
`ping-interval-ms`. If the client doesn't send any message for
`idle-timeout-ms`, we close the connection.
"
[{:keys [on-open on-close listener ping-interval-ms idle-timeout-ms]
:or {on-open (constantly nil)
on-close (constantly nil)
ping-interval-ms 5000
idle-timeout-ms 15000}
:as ws-opts}]
(let [send-lock (ReentrantLock.)
atomic-last-received-at (AtomicLong. (System/currentTimeMillis))
channel-wrapper (fn [ch]
{:undertow-websocket ch
:send-lock send-lock})
listener (if (instance? ChannelListener listener)
listener
(ws-listener (assoc ws-opts :channel-wrapper channel-wrapper)))
close-task (reify ChannelListener
(handleEvent [_this channel]
(on-close (channel-wrapper channel))))]
(ws-listener (assoc ws-opts
:channel-wrapper channel-wrapper
:atomic-last-received-at atomic-last-received-at)))]

(reify WebSocketConnectionCallback
(^void onConnect [_ ^WebSocketHttpExchange exchange ^WebSocketChannel channel]
(on-open {:exchange exchange :channel (channel-wrapper channel)})
(.addCloseTask channel close-task)
(.set (.getReceiveSetter channel) listener)
(.resumeReceives channel)))))
(let [ping-job (delay/repeat-fn
ping-pool
ping-interval-ms
(fn []
(straight-jacket-run-ping-job channel
atomic-last-received-at
idle-timeout-ms)))

close-task (reify ChannelListener
(handleEvent [_this channel]
(.cancel ping-job false)
(on-close (channel-wrapper channel))))]
(.set atomic-last-received-at (System/currentTimeMillis))
(on-open {:exchange exchange
:channel (channel-wrapper channel)})
(.addCloseTask channel close-task)
(.set (.getReceiveSetter channel) listener)
(.resumeReceives channel))))))

(defn ws-request [^HttpServerExchange exchange ^IPersistentMap headers ^WebSocketConnectionCallback callback]
(let [handler (-> (WebSocketProtocolHandshakeHandler. callback)
Expand Down
38 changes: 11 additions & 27 deletions server/src/instant/reactive/session.clj
Original file line number Diff line number Diff line change
Expand Up @@ -492,15 +492,6 @@
;; -----------------
;; Websocket Interop

(defonce ping-pool (delay/make-pool!))

(defn start-ping-job [store-conn id]
(delay/repeat-fn
ping-pool
5000
(fn []
(rs/try-send-event! store-conn id {:op :ping}))))

(defn on-open [store-conn {:keys [id] :as socket}]
(tracer/with-span! {:name "socket/on-open"
:attributes {:session-id (:id socket)}}
Expand All @@ -521,23 +512,17 @@
(defn on-close [store-conn eph-store-atom {:keys [id pending-handlers]}]
(tracer/with-span! {:name "socket/on-close"
:attributes {:session-id id}}
(let [{:keys [ping-job]} (rs/get-socket @store-conn id)]
(if ping-job
(.cancel ping-job false)
(tracer/record-info! {:name "socket/on-close-no-ping-job"
:attributes {:session-id id}}))

(doseq [{:keys [future silence-exceptions op]} @pending-handlers]
(tracer/with-span! {:name "cancel-pending-handler"
:attributes {:op op}}
(silence-exceptions true)
(future-cancel future)))

(let [app-id (-> (rs/get-auth @store-conn id)
:app
:id)]
(eph/leave-by-session-id! eph-store-atom app-id id)
(rs/remove-session! store-conn id)))))
(doseq [{:keys [future silence-exceptions op]} @pending-handlers]
(tracer/with-span! {:name "cancel-pending-handler"
:attributes {:op op}}
(silence-exceptions true)
(future-cancel future)))

(let [app-id (-> (rs/get-auth @store-conn id)
:app
:id)]
(eph/leave-by-session-id! eph-store-atom app-id id)
(rs/remove-session! store-conn id))))

(defn undertow-config
[store-conn eph-store-atom receive-q {:keys [id]}]
Expand All @@ -548,7 +533,6 @@
:http-req http-req
:ws-conn ws-conn
:receive-q receive-q
:ping-job (start-ping-job store-conn id)
:pending-handlers pending-handlers}]
(on-open store-conn socket)))
:on-message (fn [{:keys [data]}]
Expand Down

0 comments on commit 61b0dff

Please sign in to comment.