Skip to content

Commit

Permalink
Added handler (proxy) class for passing to message loop (fixes crashe…
Browse files Browse the repository at this point in the history
…s in some cases).
  • Loading branch information
dchapyshev committed Dec 15, 2023
1 parent fb0554b commit 5741d50
Show file tree
Hide file tree
Showing 5 changed files with 394 additions and 134 deletions.
254 changes: 179 additions & 75 deletions source/base/ipc/ipc_channel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,73 @@ SessionId serverSessionIdImpl(HANDLE pipe_handle)

} // namespace

class IpcChannel::Handler
{
public:
explicit Handler(IpcChannel* channel);
~Handler();

void dettach();

void onWriteSize(const std::error_code& error_code, size_t bytes_transferred);
void onWriteData(const std::error_code& error_code, size_t bytes_transferred);
void onReadSize(const std::error_code& error_code, size_t bytes_transferred);
void onReadData(const std::error_code& error_code, size_t bytes_transferred);

private:
IpcChannel* channel_;
DISALLOW_COPY_AND_ASSIGN(Handler);
};

//--------------------------------------------------------------------------------------------------
IpcChannel::Handler::Handler(IpcChannel* channel)
: channel_(channel)
{
DCHECK(channel_);
}

//--------------------------------------------------------------------------------------------------
IpcChannel::Handler::~Handler() = default;

//--------------------------------------------------------------------------------------------------
void IpcChannel::Handler::dettach()
{
channel_ = nullptr;
}

//--------------------------------------------------------------------------------------------------
void IpcChannel::Handler::onWriteSize(const std::error_code& error_code, size_t bytes_transferred)
{
if (channel_)
channel_->onWriteSize(error_code, bytes_transferred);
}

//--------------------------------------------------------------------------------------------------
void IpcChannel::Handler::onWriteData(const std::error_code& error_code, size_t bytes_transferred)
{
if (channel_)
channel_->onWriteData(error_code, bytes_transferred);
}

//--------------------------------------------------------------------------------------------------
void IpcChannel::Handler::onReadSize(const std::error_code& error_code, size_t bytes_transferred)
{
if (channel_)
channel_->onReadSize(error_code, bytes_transferred);
}

//--------------------------------------------------------------------------------------------------
void IpcChannel::Handler::onReadData(const std::error_code& error_code, size_t bytes_transferred)
{
if (channel_)
channel_->onReadData(error_code, bytes_transferred);
}

//--------------------------------------------------------------------------------------------------
IpcChannel::IpcChannel()
: stream_(MessageLoop::current()->pumpAsio()->ioContext()),
proxy_(new IpcChannelProxy(MessageLoop::current()->taskRunner(), this))
proxy_(new IpcChannelProxy(MessageLoop::current()->taskRunner(), this)),
handler_(base::make_local_shared<Handler>(this))
{
LOG(LS_INFO) << "Ctor";
}
Expand All @@ -123,7 +186,8 @@ IpcChannel::IpcChannel(std::u16string_view channel_name, Stream&& stream)
: channel_name_(channel_name),
stream_(std::move(stream)),
proxy_(new IpcChannelProxy(MessageLoop::current()->taskRunner(), this)),
is_connected_(true)
is_connected_(true),
handler_(base::make_local_shared<Handler>(this))
{
LOG(LS_INFO) << "Ctor";

Expand All @@ -143,7 +207,6 @@ IpcChannel::~IpcChannel()
proxy_ = nullptr;

listener_ = nullptr;

disconnect();
}

Expand Down Expand Up @@ -242,6 +305,8 @@ void IpcChannel::disconnect()
{
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);

handler_->dettach();

if (!is_connected_)
{
LOG(LS_INFO) << "Channel not in connected state";
Expand Down Expand Up @@ -294,7 +359,7 @@ void IpcChannel::resume()
onMessageReceived();

DCHECK_EQ(read_size_, 0);
doReadMessage();
doReadSize();
}

//--------------------------------------------------------------------------------------------------
Expand All @@ -308,7 +373,7 @@ void IpcChannel::send(ByteArray&& buffer)
write_queue_.emplace(std::move(buffer));

if (schedule_write)
doWrite();
doWriteSize();
}

//--------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -380,7 +445,7 @@ void IpcChannel::onErrorOccurred(const Location& location, const std::error_code
}

//--------------------------------------------------------------------------------------------------
void IpcChannel::doWrite()
void IpcChannel::doWriteSize()
{
write_size_ = static_cast<uint32_t>(write_queue_.front().size());

Expand All @@ -390,100 +455,139 @@ void IpcChannel::doWrite()
return;
}

asio::async_write(stream_, asio::buffer(&write_size_, sizeof(write_size_)),
[this](const std::error_code& error_code, size_t bytes_transferred)
asio::async_write(stream_,
asio::buffer(&write_size_, sizeof(write_size_)),
std::bind(&Handler::onWriteSize,
handler_,
std::placeholders::_1,
std::placeholders::_2));
}

//--------------------------------------------------------------------------------------------------
void IpcChannel::onWriteSize(const std::error_code& error_code, size_t bytes_transferred)
{
if (error_code)
{
if (error_code)
{
onErrorOccurred(FROM_HERE, error_code);
return;
}
onErrorOccurred(FROM_HERE, error_code);
return;
}

DCHECK_EQ(bytes_transferred, sizeof(write_size_));
doWriteData();
}

//--------------------------------------------------------------------------------------------------
void IpcChannel::doWriteData()
{
DCHECK(!write_queue_.empty());

DCHECK_EQ(bytes_transferred, sizeof(write_size_));
DCHECK(!write_queue_.empty());
const ByteArray& buffer = write_queue_.front();

const ByteArray& buffer = write_queue_.front();
// Send the buffer to the recipient.
asio::async_write(stream_, asio::buffer(buffer.data(), buffer.size()),
std::bind(&Handler::onWriteData,
handler_,
std::placeholders::_1,
std::placeholders::_2));
}

// Send the buffer to the recipient.
asio::async_write(stream_, asio::buffer(buffer.data(), buffer.size()),
[this](const std::error_code& error_code, size_t bytes_transferred)
{
if (error_code)
{
onErrorOccurred(FROM_HERE, error_code);
return;
}
//--------------------------------------------------------------------------------------------------
void IpcChannel::onWriteData(const std::error_code& error_code, size_t bytes_transferred)
{
if (error_code)
{
onErrorOccurred(FROM_HERE, error_code);
return;
}

DCHECK_EQ(bytes_transferred, write_size_);
DCHECK(!write_queue_.empty());
DCHECK_EQ(bytes_transferred, write_size_);
DCHECK(!write_queue_.empty());

ByteArray buffer = std::move(write_queue_.front());
ByteArray buffer = std::move(write_queue_.front());

// Delete the sent message from the queue.
write_queue_.pop();
// Delete the sent message from the queue.
write_queue_.pop();

onMessageWritten(std::move(buffer));
onMessageWritten(std::move(buffer));

// If the queue is not empty, then we send the following message.
if (write_queue_.empty() && !proxy_->reloadWriteQueue(&write_queue_))
return;
// If the queue is not empty, then we send the following message.
if (write_queue_.empty() && !proxy_->reloadWriteQueue(&write_queue_))
return;

doWrite();
});
});
doWriteSize();
}

//--------------------------------------------------------------------------------------------------
void IpcChannel::doReadMessage()
void IpcChannel::doReadSize()
{
asio::async_read(stream_, asio::buffer(&read_size_, sizeof(read_size_)),
[this](const std::error_code& error_code, size_t bytes_transferred)
asio::async_read(stream_,
asio::buffer(&read_size_, sizeof(read_size_)),
std::bind(&Handler::onReadSize,
handler_,
std::placeholders::_1,
std::placeholders::_2));
}

//--------------------------------------------------------------------------------------------------
void IpcChannel::onReadSize(const std::error_code& error_code, size_t bytes_transferred)
{
if (error_code)
{
if (error_code)
{
onErrorOccurred(FROM_HERE, error_code);
return;
}
onErrorOccurred(FROM_HERE, error_code);
return;
}

DCHECK_EQ(bytes_transferred, sizeof(read_size_));
DCHECK_EQ(bytes_transferred, sizeof(read_size_));

if (!read_size_ || read_size_ > kMaxMessageSize)
{
onErrorOccurred(FROM_HERE, asio::error::message_size);
return;
}
if (!read_size_ || read_size_ > kMaxMessageSize)
{
onErrorOccurred(FROM_HERE, asio::error::message_size);
return;
}

if (read_buffer_.capacity() < read_size_)
{
read_buffer_.clear();
read_buffer_.reserve(read_size_);
}
doReadData();
}

read_buffer_.resize(read_size_);
//--------------------------------------------------------------------------------------------------
void IpcChannel::doReadData()
{
if (read_buffer_.capacity() < read_size_)
{
read_buffer_.clear();
read_buffer_.reserve(read_size_);
}

asio::async_read(stream_, asio::buffer(read_buffer_.data(), read_buffer_.size()),
[this](const std::error_code& error_code, size_t bytes_transferred)
{
if (error_code)
{
onErrorOccurred(FROM_HERE, error_code);
return;
}
read_buffer_.resize(read_size_);

DCHECK_EQ(bytes_transferred, read_size_);
asio::async_read(stream_,
asio::buffer(read_buffer_.data(), read_buffer_.size()),
std::bind(&Handler::onReadData,
handler_,
std::placeholders::_1,
std::placeholders::_2));
}

if (is_paused_)
return;
//--------------------------------------------------------------------------------------------------
void IpcChannel::onReadData(const std::error_code& error_code, size_t bytes_transferred)
{
if (error_code)
{
onErrorOccurred(FROM_HERE, error_code);
return;
}

onMessageReceived();
DCHECK_EQ(bytes_transferred, read_size_);

if (is_paused_)
return;
if (is_paused_)
return;

onMessageReceived();

DCHECK_EQ(read_size_, 0);
doReadMessage();
});
});
if (is_paused_)
return;

DCHECK_EQ(read_size_, 0);
doReadSize();
}

//--------------------------------------------------------------------------------------------------
Expand Down
15 changes: 13 additions & 2 deletions source/base/ipc/ipc_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "base/process_handle.h"
#include "base/session_id.h"
#include "base/memory/byte_array.h"
#include "base/memory/local_memory.h"
#include "base/threading/thread_checker.h"

#if defined(OS_WIN)
Expand Down Expand Up @@ -92,8 +93,15 @@ class IpcChannel
static std::u16string channelName(std::u16string_view channel_id);

void onErrorOccurred(const Location& location, const std::error_code& error_code);
void doWrite();
void doReadMessage();
void doWriteSize();
void onWriteSize(const std::error_code& error_code, size_t bytes_transferred);
void doWriteData();
void onWriteData(const std::error_code& error_code, size_t bytes_transferred);
void doReadSize();
void onReadSize(const std::error_code& error_code, size_t bytes_transferred);
void doReadData();
void onReadData(const std::error_code& error_code, size_t bytes_transferred);

void onMessageReceived();
void onMessageWritten(ByteArray&& buffer);

Expand All @@ -115,6 +123,9 @@ class IpcChannel
ProcessId peer_process_id_ = kNullProcessId;
SessionId peer_session_id_ = kInvalidSessionId;

class Handler;
base::local_shared_ptr<Handler> handler_;

THREAD_CHECKER(thread_checker_);

DISALLOW_COPY_AND_ASSIGN(IpcChannel);
Expand Down
2 changes: 1 addition & 1 deletion source/base/ipc/ipc_channel_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ void IpcChannelProxy::scheduleWrite()
if (!reloadWriteQueue(&channel_->write_queue_))
return;

channel_->doWrite();
channel_->doWriteSize();
}

//--------------------------------------------------------------------------------------------------
Expand Down
Loading

0 comments on commit 5741d50

Please sign in to comment.