Skip to content
This repository has been archived by the owner on Aug 2, 2020. It is now read-only.

Commit

Permalink
introduce a new variable to maintain connection state
Browse files Browse the repository at this point in the history
  • Loading branch information
stdrc committed Mar 9, 2020
1 parent 11ba790 commit 60f1de1
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 35 deletions.
6 changes: 4 additions & 2 deletions src/cqhttp/plugins/web/websocket_reverse.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ namespace cqhttp::plugins {
void hook_after_event(EventContext<cq::Event> &ctx) override;

bool good() const override {
return (!api_ || api_->started()) && (!event_ || event_->started())
&& (!universal_ || universal_->started());
return (!api_ || api_->connected()) && (!event_ || event_->connected())
&& (!universal_ || universal_->connected());
}

private:
Expand All @@ -45,6 +45,7 @@ namespace cqhttp::plugins {
virtual void stop();

virtual bool started() const { return started_; }
virtual bool connected() const { return connected_; }

protected:
virtual void init();
Expand All @@ -60,6 +61,7 @@ namespace cqhttp::plugins {
bool reconnect_on_code_1000_;

std::atomic_bool started_ = false;
std::atomic_bool connected_ = false;

union Client {
std::shared_ptr<SimpleWeb::SocketClient<SimpleWeb::WS>> ws;
Expand Down
77 changes: 44 additions & 33 deletions src/cqhttp/plugins/web/websocket_reverse_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ namespace cqhttp::plugins {
}
client->on_close =
[&](shared_ptr<typename WsClientT::Connection> connection, const int code, const string &reason) {
connected_ = false;
if (reconnect_on_code_1000_ || code != 1000) {
logging::debug(TAG,
u8"反向 WebSocket 连接断开,close code: " + to_string(code) + ",reason:" + reason);
notify_should_reconnect();
}
};
client->on_error = [&](shared_ptr<typename WsClientT::Connection>, const SimpleWeb::error_code &e) {
connected_ = false;
logging::debug(TAG, u8"反向 WebSocket 连接发生错误,error code: " + to_string(e.value()));
notify_should_reconnect();
};
Expand Down Expand Up @@ -68,13 +70,15 @@ namespace cqhttp::plugins {
logging::debug(TAG, u8"反向 WebSocket 建立连接失败");
notify_should_reconnect();
}
connected_ = false;
started_ = false;
});
logging::info_success(TAG, u8"开启反向 WebSocket 客户端(" + name() + u8")成功,开始连接 " + url_);
}
}

void WebSocketReverse::ClientBase::disconnect() {
connected_ = false;
if (started_) {
if (client_is_wss_.value() == false) {
client_.ws->stop();
Expand Down Expand Up @@ -171,11 +175,13 @@ namespace cqhttp::plugins {

if (client_is_wss_.has_value()) {
if (client_is_wss_.value() == false) {
client_.ws->on_open = [&](auto) { connected_ = true; };
client_.ws->on_message = [&connection_mutex = client_.ws->connection_mutex](auto connection,
auto message) {
api_on_message<WsClient>(connection_mutex, connection, message);
};
} else {
client_.wss->on_open = [&](auto) { connected_ = true; };
client_.wss->on_message = [&connection_mutex = client_.wss->connection_mutex](auto connection,
auto message) {
api_on_message<WssClient>(connection_mutex, connection, message);
Expand All @@ -189,51 +195,56 @@ namespace cqhttp::plugins {

if (client_is_wss_.has_value()) {
if (client_is_wss_.value() == false) {
client_.ws->on_open = [](const shared_ptr<WsClient::Connection> connection) {
client_.ws->on_open = [&](const shared_ptr<WsClient::Connection> connection) {
connected_ = true;
emit_lifecycle_meta_event(MetaEvent::SubType::LIFECYCLE_CONNECT);
};
} else {
client_.ws->on_open = [](const shared_ptr<WsClient::Connection> connection) {
client_.ws->on_open = [&](const shared_ptr<WsClient::Connection> connection) {
connected_ = true;
emit_lifecycle_meta_event(MetaEvent::SubType::LIFECYCLE_CONNECT);
};
}
}
}

void WebSocketReverse::EventClient::push_event(const json &payload) {
if (started_) {
logging::debug(TAG, u8"开始通过反向 WebSocket 客户端上报事件");
if (!connected_) {
logging::info(TAG, u8"反向 WebSocket 连接尚未建立,无法上报");
return;
}

const auto send_cb = [=](const SimpleWeb::error_code &ec) {
if (!ec) {
logging::info_success(TAG, u8"通过反向 WebSocket 客户端上报数据到 " + url_ + u8" 成功");
} else {
logging::warning(TAG,
u8"通过反向 WebSocket 客户端上报数据到 " + url_ + u8" 失败,错误码:"
+ std::to_string(ec.value()) + u8",将尝试重连");
std::unique_lock<std::mutex> lock(mutex_);
should_reconnect_ = true;
}
};
try {
if (client_is_wss_.value() == false) {
const auto out_message = make_shared<WsClient::OutMessage>();
*out_message << payload.dump();
// the WsClient class is modified by us ("connection" property made public),
// so we must maintain the lock manually
unique_lock<mutex> lock(client_.ws->connection_mutex);
client_.ws->connection->send(out_message, send_cb); // TODO: send 失败应当重新连接
lock.unlock();
} else {
const auto out_message = make_shared<WssClient::OutMessage>();
*out_message << payload.dump();
unique_lock<mutex> lock(client_.wss->connection_mutex);
client_.wss->connection->send(out_message, send_cb);
lock.unlock();
}
} catch (...) {
logging::warning(TAG, u8"通过反向 WebSocket 客户端上报数据到 " + url_ + u8" 失败");
logging::debug(TAG, u8"开始通过反向 WebSocket 客户端上报事件");

const auto send_cb = [=](const SimpleWeb::error_code &ec) {
if (!ec) {
logging::info_success(TAG, u8"通过反向 WebSocket 客户端上报数据到 " + url_ + u8" 成功");
} else {
logging::warning(TAG,
u8"通过反向 WebSocket 客户端上报数据到 " + url_ + u8" 失败,错误码:"
+ std::to_string(ec.value()) + u8",将尝试重连");
std::unique_lock<std::mutex> lock(mutex_);
should_reconnect_ = true;
}
};
try {
if (client_is_wss_.value() == false) {
const auto out_message = make_shared<WsClient::OutMessage>();
*out_message << payload.dump();
// the WsClient class is modified by us ("connection" property made public),
// so we must maintain the lock manually
unique_lock<mutex> lock(client_.ws->connection_mutex);
client_.ws->connection->send(out_message, send_cb); // TODO: send 失败应当重新连接
lock.unlock();
} else {
const auto out_message = make_shared<WssClient::OutMessage>();
*out_message << payload.dump();
unique_lock<mutex> lock(client_.wss->connection_mutex);
client_.wss->connection->send(out_message, send_cb);
lock.unlock();
}
} catch (...) {
logging::warning(TAG, u8"通过反向 WebSocket 客户端上报数据到 " + url_ + u8" 失败");
}
}

Expand Down

0 comments on commit 60f1de1

Please sign in to comment.