Skip to content

Commit

Permalink
Pull request #389: Fix lags on socket close
Browse files Browse the repository at this point in the history
Merge in ADGUARD-CORE-LIBS/dns-libs from fix/doq_lag_on_close to master

Squashed commit of the following:

commit 1181a71894fa3b130a9421040f7f8631614ed2b0
Author: Max Grupper <[email protected]>
Date:   Tue Jun 21 16:13:29 2022 +0300

    fix

commit 494653b99440e28cbfe4331ff6d279b611742ef2
Author: Max Grupper <[email protected]>
Date:   Tue Jun 21 14:49:17 2022 +0300

    use error from utils

commit c0bd7ecbf7ba3ad6af4f873483e7260cb553e33f
Author: Max Grupper <[email protected]>
Date:   Tue Jun 21 14:34:05 2022 +0300

    use correct errcode

commit ab0c0f9cf415e8e9b9af4a0c29a5e92f076b2a9f
Author: Max Grupper <[email protected]>
Date:   Mon Jun 20 20:40:01 2022 +0300

    test
  • Loading branch information
grumaxxx committed Jun 22, 2022
1 parent f73cefa commit 288479d
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 7 deletions.
14 changes: 9 additions & 5 deletions net/outbound_socks_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ void SocksOProxy::on_read(void *arg, Uint8View data) {
case CS_CONNECTED:
if (self->is_udp_association_connection(conn->id)) {
log_conn(self, conn->id, dbg, "Unexpected data ({} bytes) on UDP association connection", data.size());
self->terminate_udp_association(conn);
self->terminate_udp_association(conn, {{-1, "Unexpected data"}});
} else if (Callbacks cbx = self->get_connection_callbacks_locked(conn); cbx.on_read != nullptr) {
if (conn->parameters.proto == utils::TP_UDP) {
data.remove_prefix(get_full_udp_header_size((Socks5UdpHeader *) data.data()));
Expand Down Expand Up @@ -562,7 +562,7 @@ void SocksOProxy::handle_connection_close(Connection *conn, std::optional<Socket

if (this->is_udp_association_connection(conn->id)) {
if (conn->state != CS_CONNECTED || !error.has_value() || error->code != utils::AG_ETIMEDOUT) {
this->terminate_udp_association(conn);
this->terminate_udp_association(conn, std::move(error));
}
} else if (callbacks.on_close != nullptr) {
callbacks.on_close(callbacks.arg, std::move(error));
Expand All @@ -580,7 +580,7 @@ void SocksOProxy::on_udp_association_established(Connection *assoc_conn, SocketA
} else {
log_conn(this, assoc_conn->id, dbg, "UDP association is not found");
m_guard.unlock();
this->terminate_udp_association(assoc_conn);
this->terminate_udp_association(assoc_conn, {{-1, "UDP association is not found"}});
return;
}

Expand All @@ -602,7 +602,7 @@ void SocksOProxy::on_udp_association_established(Connection *assoc_conn, SocketA
}
}

void SocksOProxy::terminate_udp_association(Connection *assoc_conn) {
void SocksOProxy::terminate_udp_association(Connection *assoc_conn, std::optional<Socket::Error> error) {
log_conn(this, assoc_conn->id, trace, "...");

std::vector<Callbacks> udp_connections_callbacks;
Expand All @@ -623,7 +623,11 @@ void SocksOProxy::terminate_udp_association(Connection *assoc_conn) {

for (auto &cbx : udp_connections_callbacks) {
if (cbx.on_close != nullptr) {
cbx.on_close(cbx.arg, {{-1, "UDP association terminated"}});
if (error.has_value()) {
cbx.on_close(cbx.arg, {{error->code, "UDP association terminated: " + error->description}});
} else {
cbx.on_close(cbx.arg, {{-1, "UDP association terminated"}});
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion net/outbound_socks_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class SocksOProxy : public OutboundProxy {
[[nodiscard]] bool is_udp_association_connection(uint32_t conn_id) const;
void handle_connection_close(Connection *conn, std::optional<Socket::Error> error);
void on_udp_association_established(Connection *assoc_conn, SocketAddress bound_addr);
void terminate_udp_association(Connection *assoc_conn);
void terminate_udp_association(Connection *assoc_conn, std::optional<Socket::Error> error);
void terminate_udp_association_silently(Connection *assoc_conn, std::optional<uint32_t> initiated_conn_id);
Callbacks get_connection_callbacks_locked(Connection *conn);

Expand Down
6 changes: 5 additions & 1 deletion upstream/upstream_doq.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -480,8 +480,12 @@ void DoqUpstream::on_socket_close(void *arg, std::optional<Socket::Error> error)
if (!self->m_conn_state.is_peer_selected()) {
self->disqualify_server_address(peer);
}
if (error->code == utils::AG_ECONNREFUSED) {
self->m_socket_error = true;
}
} else {
dbglog(self->m_log, "Connection to {} closed", peer.str());
self->m_socket_error = false;
}

auto drop = self->m_conn_state.extract_socket(ctx);
Expand Down Expand Up @@ -1083,7 +1087,7 @@ void DoqUpstream::disconnect(std::string_view reason) {

std::scoped_lock l(m_global);
for (auto &cur : m_requests) {
if (cur.second.is_onfly) {
if (cur.second.is_onfly || m_socket_error) {
tracelog(m_log, "Call condvar for request, id: {}", cur.first);
cur.second.cond.notify_all();
}
Expand Down
1 change: 1 addition & 0 deletions upstream/upstream_doq.h
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ class DoqUpstream : public Upstream {
static std::atomic_int64_t m_next_request_id;
std::array<uint8_t, 32> m_static_secret;
TlsSessionCache m_tls_session_cache;
std::atomic_bool m_socket_error{false};
};

} // ag

0 comments on commit 288479d

Please sign in to comment.