Skip to content

Commit

Permalink
Using Serializer class for avoid message allocations.
Browse files Browse the repository at this point in the history
  • Loading branch information
dchapyshev committed Dec 10, 2023
1 parent 8cb7a4f commit cf47b92
Show file tree
Hide file tree
Showing 40 changed files with 172 additions and 77 deletions.
2 changes: 2 additions & 0 deletions source/base/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,8 @@ list(APPEND SOURCE_BASE_MEMORY
memory/aligned_memory.h
memory/byte_array.cc
memory/byte_array.h
memory/serializer.cc
memory/serializer.h
memory/local_memory.h
memory/typed_buffer.h
memory/local_memory_impl/bad_local_weak_ptr.h
Expand Down
17 changes: 17 additions & 0 deletions source/base/ipc/ipc_channel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -417,9 +417,13 @@ void IpcChannel::doWrite()
DCHECK_EQ(bytes_transferred, write_size_);
DCHECK(!write_queue_.empty());

ByteArray buffer = std::move(write_queue_.front());

// Delete the sent message from the queue.
write_queue_.pop();

onMessageWritten(std::move(buffer));

// If the queue is not empty, then we send the following message.
if (write_queue_.empty() && !proxy_->reloadWriteQueue(&write_queue_))
return;
Expand Down Expand Up @@ -497,4 +501,17 @@ void IpcChannel::onMessageReceived()
read_size_ = 0;
}

//--------------------------------------------------------------------------------------------------
void IpcChannel::onMessageWritten(ByteArray&& buffer)
{
if (listener_)
{
listener_->onIpcMessageWritten(std::move(buffer));
}
else
{
LOG(LS_ERROR) << "No listener (channel_name=" << channel_name_ << ")";
}
}

} // namespace base
2 changes: 2 additions & 0 deletions source/base/ipc/ipc_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class IpcChannel

virtual void onIpcDisconnected() = 0;
virtual void onIpcMessageReceived(const ByteArray& buffer) = 0;
virtual void onIpcMessageWritten(ByteArray&& buffer) = 0;
};

std::shared_ptr<IpcChannelProxy> channelProxy();
Expand Down Expand Up @@ -94,6 +95,7 @@ class IpcChannel
void doWrite();
void doReadMessage();
void onMessageReceived();
void onMessageWritten(ByteArray&& buffer);

std::u16string channel_name_;
Stream stream_;
Expand Down
7 changes: 4 additions & 3 deletions source/base/net/tcp_channel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -430,10 +430,10 @@ void TcpChannel::onErrorOccurred(const Location& location, ErrorCode error_code)
}

//--------------------------------------------------------------------------------------------------
void TcpChannel::onMessageWritten(uint8_t channel_id)
void TcpChannel::onMessageWritten(uint8_t channel_id, ByteArray&& buffer)
{
if (listener_)
listener_->onTcpMessageWritten(channel_id, write_queue_.size());
listener_->onTcpMessageWritten(channel_id, std::move(buffer), write_queue_.size());
}

//--------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -581,6 +581,7 @@ void TcpChannel::onWrite(const std::error_code& error_code, size_t bytes_transfe
const WriteTask& task = write_queue_.front();
WriteTask::Type task_type = task.type();
uint8_t channel_id = task.channelId();
ByteArray buffer = std::move(task.data());

// Delete the sent message from the queue.
write_queue_.pop();
Expand All @@ -589,7 +590,7 @@ void TcpChannel::onWrite(const std::error_code& error_code, size_t bytes_transfe
bool schedule_write = !write_queue_.empty() || proxy_->reloadWriteQueue(&write_queue_);

if (task_type == WriteTask::Type::USER_DATA)
onMessageWritten(channel_id);
onMessageWritten(channel_id, std::move(buffer));

if (schedule_write)
doWrite();
Expand Down
4 changes: 2 additions & 2 deletions source/base/net/tcp_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class TcpChannel : public NetworkChannel
virtual void onTcpConnected() = 0;
virtual void onTcpDisconnected(ErrorCode error_code) = 0;
virtual void onTcpMessageReceived(uint8_t channel_id, const ByteArray& buffer) = 0;
virtual void onTcpMessageWritten(uint8_t channel_id, size_t pending) = 0;
virtual void onTcpMessageWritten(uint8_t channel_id, ByteArray&& buffer, size_t pending) = 0;
};

std::shared_ptr<TcpChannelProxy> channelProxy();
Expand Down Expand Up @@ -164,7 +164,7 @@ class TcpChannel : public NetworkChannel

void onErrorOccurred(const Location& location, const std::error_code& error_code);
void onErrorOccurred(const Location& location, ErrorCode error_code);
void onMessageWritten(uint8_t channel_id);
void onMessageWritten(uint8_t channel_id, ByteArray&& buffer);
void onMessageReceived();

void addWriteTask(WriteTask::Type type, uint8_t channel_id, ByteArray&& data);
Expand Down
3 changes: 2 additions & 1 deletion source/base/net/write_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,12 @@ class WriteTask
Type type() const { return type_; }
uint8_t channelId() const { return channel_id_; }
const ByteArray& data() const { return data_; }
ByteArray& data() { return data_; }

private:
const Type type_;
const uint8_t channel_id_;
const ByteArray data_;
ByteArray data_;
};

} // namespace base
Expand Down
3 changes: 2 additions & 1 deletion source/base/peer/authenticator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,8 @@ void Authenticator::onTcpMessageReceived(uint8_t /* channel_id */, const ByteArr
}

//--------------------------------------------------------------------------------------------------
void Authenticator::onTcpMessageWritten(uint8_t /* channel_id */, size_t /* pending */)
void Authenticator::onTcpMessageWritten(
uint8_t /* channel_id */, ByteArray&& /* buffer */, size_t /* pending */)
{
if (state() != State::PENDING)
return;
Expand Down
2 changes: 1 addition & 1 deletion source/base/peer/authenticator.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class Authenticator : public TcpChannel::Listener
void onTcpConnected() final;
void onTcpDisconnected(NetworkChannel::ErrorCode error_code) final;
void onTcpMessageReceived(uint8_t channel_id, const ByteArray& buffer) final;
void onTcpMessageWritten(uint8_t channel_id, size_t pending) final;
void onTcpMessageWritten(uint8_t channel_id, ByteArray&& buffer, size_t pending) final;

[[nodiscard]] bool onSessionKeyChanged();

Expand Down
6 changes: 4 additions & 2 deletions source/client/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ void Client::sendMessage(uint8_t channel_id, const google::protobuf::MessageLite
return;
}

channel_->send(channel_id, base::serialize(message));
channel_->send(channel_id, serializer_.serialize(message));
}

//--------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -314,8 +314,10 @@ void Client::onTcpMessageReceived(uint8_t channel_id, const base::ByteArray& buf
}

//--------------------------------------------------------------------------------------------------
void Client::onTcpMessageWritten(uint8_t channel_id, size_t pending)
void Client::onTcpMessageWritten(uint8_t channel_id, base::ByteArray&& buffer, size_t pending)
{
serializer_.addBuffer(std::move(buffer));

if (channel_id == proto::HOST_CHANNEL_ID_SESSION)
{
onSessionMessageWritten(channel_id, pending);
Expand Down
4 changes: 3 additions & 1 deletion source/client/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include "base/waitable_timer.h"
#include "base/version.h"
#include "base/memory/serializer.h"
#include "client/client_config.h"
#include "client/client_session_state.h"
#include "client/router_controller.h"
Expand Down Expand Up @@ -82,7 +83,7 @@ class Client
void onTcpConnected() override;
void onTcpDisconnected(base::NetworkChannel::ErrorCode error_code) override;
void onTcpMessageReceived(uint8_t channel_id, const base::ByteArray& buffer) override;
void onTcpMessageWritten(uint8_t channel_id, size_t pending) override;
void onTcpMessageWritten(uint8_t channel_id, base::ByteArray&& buffer, size_t pending) override;

// RouterController::Delegate implementation.
void onRouterConnected(const base::Version& router_version) override;
Expand All @@ -109,6 +110,7 @@ class Client
State state_ = State::CREATED;

bool is_connected_to_router_ = false;
base::Serializer serializer_;
};

} // namespace client
Expand Down
4 changes: 2 additions & 2 deletions source/client/online_checker/online_checker_direct.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class OnlineCheckerDirect::Instance : public base::TcpChannel::Listener
void onTcpConnected() override;
void onTcpDisconnected(base::NetworkChannel::ErrorCode error_code) override;
void onTcpMessageReceived(uint8_t channel_id, const base::ByteArray& buffer) override;
void onTcpMessageWritten(uint8_t channel_id, size_t pending) override;
void onTcpMessageWritten(uint8_t channel_id, base::ByteArray&& buffer, size_t pending) override;

private:
void onFinished(const base::Location& location, bool online);
Expand Down Expand Up @@ -164,7 +164,7 @@ void OnlineCheckerDirect::Instance::onTcpMessageReceived(

//--------------------------------------------------------------------------------------------------
void OnlineCheckerDirect::Instance::onTcpMessageWritten(
uint8_t /* channel_id */, size_t /* pending */)
uint8_t /* channel_id */, base::ByteArray&& /* buffer */, size_t /* pending */)
{
// Nothing
}
Expand Down
3 changes: 2 additions & 1 deletion source/client/online_checker/online_checker_router.cc
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ void OnlineCheckerRouter::onTcpMessageReceived(
}

//--------------------------------------------------------------------------------------------------
void OnlineCheckerRouter::onTcpMessageWritten(uint8_t /* channel_id */, size_t /* pending */)
void OnlineCheckerRouter::onTcpMessageWritten(
uint8_t /* channel_id */, base::ByteArray&& /* buffer */, size_t /* pending */)
{
// Nothing
}
Expand Down
2 changes: 1 addition & 1 deletion source/client/online_checker/online_checker_router.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class OnlineCheckerRouter : public base::TcpChannel::Listener
void onTcpConnected() override;
void onTcpDisconnected(base::NetworkChannel::ErrorCode error_code) override;
void onTcpMessageReceived(uint8_t channel_id, const base::ByteArray& buffer) override;
void onTcpMessageWritten(uint8_t channel_id, size_t pending) override;
void onTcpMessageWritten(uint8_t channel_id, base::ByteArray&& buffer, size_t pending) override;

private:
void checkNextComputer();
Expand Down
3 changes: 2 additions & 1 deletion source/client/router.cc
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,8 @@ void Router::onTcpMessageReceived(uint8_t /* channel_id */, const base::ByteArra
}

//--------------------------------------------------------------------------------------------------
void Router::onTcpMessageWritten(uint8_t /* channel_id */, size_t /* pending */)
void Router::onTcpMessageWritten(
uint8_t /* channel_id */, base::ByteArray&& /* buffer */, size_t /* pending */)
{
// Not used.
}
Expand Down
2 changes: 1 addition & 1 deletion source/client/router.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class Router : public base::TcpChannel::Listener
void onTcpConnected() override;
void onTcpDisconnected(base::NetworkChannel::ErrorCode error_code) override;
void onTcpMessageReceived(uint8_t channel_id, const base::ByteArray& buffer) override;
void onTcpMessageWritten(uint8_t channel_id, size_t pending) override;
void onTcpMessageWritten(uint8_t channel_id, base::ByteArray&& buffer, size_t pending) override;

private:
std::shared_ptr<base::TaskRunner> io_task_runner_;
Expand Down
3 changes: 2 additions & 1 deletion source/client/router_controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,8 @@ void RouterController::onTcpMessageReceived(uint8_t /* channel_id */, const base
}

//--------------------------------------------------------------------------------------------------
void RouterController::onTcpMessageWritten(uint8_t /* channel_id */, size_t /* pending */)
void RouterController::onTcpMessageWritten(
uint8_t /* channel_id */, base::ByteArray&& /* buffer */, size_t /* pending */)
{
// Nothing
}
Expand Down
2 changes: 1 addition & 1 deletion source/client/router_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class RouterController
void onTcpConnected() override;
void onTcpDisconnected(base::NetworkChannel::ErrorCode error_code) override;
void onTcpMessageReceived(uint8_t channel_id, const base::ByteArray& buffer) override;
void onTcpMessageWritten(uint8_t channel_id, size_t pending) override;
void onTcpMessageWritten(uint8_t channel_id, base::ByteArray&& buffer, size_t pending) override;

// base::RelayPeer::Delegate implementation.
void onRelayConnectionReady(std::unique_ptr<base::TcpChannel> channel) override;
Expand Down
10 changes: 9 additions & 1 deletion source/host/client_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,12 @@ void ClientSession::sendMessage(uint8_t channel_id, base::ByteArray&& buffer)
channel_->send(channel_id, std::move(buffer));
}

//--------------------------------------------------------------------------------------------------
void ClientSession::sendMessage(uint8_t channel_id, const google::protobuf::MessageLite& message)
{
channel_->send(channel_id, serializer_.serialize(message));
}

//--------------------------------------------------------------------------------------------------
void ClientSession::onTcpConnected()
{
Expand Down Expand Up @@ -199,8 +205,10 @@ void ClientSession::onTcpMessageReceived(uint8_t channel_id, const base::ByteArr
}

//--------------------------------------------------------------------------------------------------
void ClientSession::onTcpMessageWritten(uint8_t channel_id, size_t pending)
void ClientSession::onTcpMessageWritten(uint8_t channel_id, base::ByteArray&& buffer, size_t pending)
{
serializer_.addBuffer(std::move(buffer));

if (channel_id == proto::HOST_CHANNEL_ID_SESSION)
{
onWritten(channel_id, pending);
Expand Down
5 changes: 4 additions & 1 deletion source/host/client_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include "base/session_id.h"
#include "base/version.h"
#include "base/memory/serializer.h"
#include "base/net/tcp_channel.h"
#include "proto/desktop_extensions.pb.h"
#include "proto/text_chat.pb.h"
Expand Down Expand Up @@ -98,12 +99,13 @@ class ClientSession : public base::TcpChannel::Listener

std::shared_ptr<base::TcpChannelProxy> channelProxy();
void sendMessage(uint8_t channel_id, base::ByteArray&& buffer);
void sendMessage(uint8_t channel_id, const google::protobuf::MessageLite& message);

// base::TcpChannel::Listener implementation.
void onTcpConnected() override;
void onTcpDisconnected(base::NetworkChannel::ErrorCode error_code) override;
void onTcpMessageReceived(uint8_t channel_id, const base::ByteArray& buffer) override;
void onTcpMessageWritten(uint8_t channel_id, size_t pending) override;
void onTcpMessageWritten(uint8_t channel_id, base::ByteArray&& buffer, size_t pending) override;

size_t pendingMessages() const;

Expand All @@ -120,6 +122,7 @@ class ClientSession : public base::TcpChannel::Listener
std::string display_name_;

std::unique_ptr<base::TcpChannel> channel_;
base::Serializer serializer_;
};

} // namespace host
Expand Down
Loading

0 comments on commit cf47b92

Please sign in to comment.