Skip to content

Commit

Permalink
Proxy mode for added for protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
paveldn committed Jan 30, 2024
1 parent d7eb9c9 commit 9fcd93f
Show file tree
Hide file tree
Showing 7 changed files with 165 additions and 76 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/.vscode/
/bin/

# Prerequisites
*.d
Expand Down
52 changes: 33 additions & 19 deletions include/protocol/haier_protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,20 @@ namespace haier_protocol

enum class HandlerError
{
HANDLER_OK = 0,
UNSUPPORTED_MESSAGE,
UNEXPECTED_MESSAGE,
UNSUPPORTED_SUBCOMMAND,
WRONG_MESSAGE_STRUCTURE,
RUNTIME_ERROR,
UNKNOWN_ERROR,
INVALID_ANSWER,
HANDLER_OK = 0,
UNSUPPORTED_MESSAGE,
UNEXPECTED_MESSAGE,
UNSUPPORTED_SUBCOMMAND,
WRONG_MESSAGE_STRUCTURE,
RUNTIME_ERROR,
UNKNOWN_ERROR,
INVALID_ANSWER,
};

enum class AnswerDestination
{
DIRECT_ANSWER,
PROXY_ANSWER
};

// Message handler type. Expected that function sends answer back.
Expand All @@ -36,16 +42,17 @@ using MessageHandler = std::function<HandlerError(FrameType, const uint8_t*, siz
// argument 2: Incoming message type
// argument 3: Incoming data buffer (nullptr if none)
// argument 4: Incoming data buffer size
// argument 5: Type of answer (direct or proxy)
// return: Result of processing
using AnswerHandler = std::function<HandlerError(FrameType, FrameType, const uint8_t*, size_t)>;
using AnswerHandler = std::function<HandlerError(FrameType, FrameType, const uint8_t*, size_t, AnswerDestination)>;

// Timeout handler type.
// argument 1: Request message type that caused this answer
// return: Result of processing
using TimeoutHandler = std::function<HandlerError(FrameType)>;

HandlerError default_message_handler(FrameType message_type, const uint8_t* data, size_t data_size);
HandlerError default_answer_handler(FrameType message_type, FrameType request_type, const uint8_t* data, size_t data_size);
HandlerError default_answer_handler(FrameType message_type, FrameType request_type, const uint8_t* data, size_t data_size, AnswerDestination dst);
HandlerError default_timeout_handler(FrameType message_type);

class ProtocolHandler
Expand All @@ -71,37 +78,44 @@ class ProtocolHandler
void remove_answer_handler(FrameType message_type);
void set_default_answer_handler(AnswerHandler handler);
void set_timeout_handler(FrameType message_type, TimeoutHandler handler);
void remove_timeout_handler(FrameType message_type);
void remove_timeout_handler(FrameType message_type);
void set_default_timeout_handler(TimeoutHandler handler);
void set_proxy(ProtocolStream&);
void remove_proxy();
bool has_proxy() const;
virtual void loop();
protected:
bool write_message_(const HaierMessage& message, bool use_crc);
size_t drop_extra_messages_(TransportLevelHandler& transport);
enum class ProtocolState
{
IDLE,
WAITING_FOR_ANSWER,
WAITING_FOR_PROXY_ANSWER,
};
struct OutgoingQueueItem
{
const HaierMessage message;
bool use_crc;
int number_of_retries;
std::chrono::milliseconds retry_interval;
bool proxy_message;
};
using OutgoingQueue = std::queue<OutgoingQueueItem>;
TransportLevelHandler transport_;
std::unique_ptr<TransportLevelHandler> proxy_transport_;
std::map<FrameType, MessageHandler> message_handlers_map_;
std::map<FrameType, AnswerHandler> answer_handlers_map_;
std::map<FrameType, TimeoutHandler> timeout_handlers_map_;
OutgoingQueue outgoing_messages_;
MessageHandler default_message_handler_;
AnswerHandler default_answer_handler_;
TimeoutHandler default_timeout_handler_;
ProtocolState state_;
bool processing_message_;
bool incoming_message_crc_status_;
bool answer_sent_;
FrameType last_message_type_;
MessageHandler default_message_handler_{default_message_handler};
AnswerHandler default_answer_handler_{default_answer_handler};
TimeoutHandler default_timeout_handler_{default_timeout_handler};
ProtocolState state_{ProtocolState::IDLE};
bool processing_message_{false};
bool incoming_message_crc_status_{false};
bool answer_sent_{false};
FrameType last_message_type_{FrameType::UNKNOWN_FRAME_TYPE};
std::chrono::milliseconds answer_timeout_interval_;
std::chrono::milliseconds cooldown_interval_;
std::chrono::steady_clock::time_point cooldown_time_point_;
Expand Down
8 changes: 4 additions & 4 deletions include/transport/protocol_transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,22 @@ class TransportLevelHandler
TransportLevelHandler(const TransportLevelHandler&) = delete;
TransportLevelHandler& operator=(const TransportLevelHandler&) = delete;
explicit TransportLevelHandler(ProtocolStream& stream, size_t buffer_size = MAX_FRAME_SIZE + 0x10) noexcept;
virtual ~TransportLevelHandler();
uint8_t send_data(uint8_t frameType, const uint8_t* data, size_t data_size, bool use_crc=true);
size_t read_data();
void process_data();
size_t available() const noexcept { return this->incoming_queue_.size(); };
bool pop(TimestampedFrame& tframe);
void drop(size_t frames_count);
void reset_protocol() noexcept;
virtual ~TransportLevelHandler();
protected:
void clear_();
void drop_bytes_(size_t size);
ProtocolStream& stream_;
CircularBuffer<uint8_t> buffer_;
size_t pos_;
size_t sep_count_;
bool frame_start_found_;
size_t pos_{0};
size_t sep_count_{0};
bool frame_start_found_{false};
HaierFrame current_frame_;
std::chrono::steady_clock::time_point frame_start_;
std::queue<TimestampedFrame> incoming_queue_;
Expand Down
2 changes: 1 addition & 1 deletion library.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "HaierProtocol",
"version": "0.9.25",
"version": "0.9.26",
"description": "A library to control Haier AC using serial protocol",
"keywords": "haier, air-conditioner, uart, serial",
"repository":
Expand Down
170 changes: 124 additions & 46 deletions src/protocol/haier_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,6 @@ constexpr std::chrono::milliseconds DEFAULT_ANSWER_TIMEOUT = std::chrono::millis
constexpr std::chrono::milliseconds DEFAULT_COOLDOWN_INTERVAL = std::chrono::milliseconds(400);

ProtocolHandler::ProtocolHandler(ProtocolStream &stream) noexcept : transport_(stream),
message_handlers_map_(),
answer_handlers_map_(),
timeout_handlers_map_(),
outgoing_messages_(),
default_message_handler_(default_message_handler),
default_answer_handler_(default_answer_handler),
default_timeout_handler_(default_timeout_handler),
state_(ProtocolState::IDLE),
processing_message_(false),
incoming_message_crc_status_(false),
answer_sent_(false),
last_message_type_(FrameType::UNKNOWN_FRAME_TYPE),
answer_timeout_interval_(DEFAULT_ANSWER_TIMEOUT),
cooldown_interval_(DEFAULT_COOLDOWN_INTERVAL)
{
Expand All @@ -33,24 +21,23 @@ void ProtocolHandler::loop()
{
this->transport_.read_data();
this->transport_.process_data();
if (this->proxy_transport_)
{
this->proxy_transport_->read_data();
this->proxy_transport_->process_data();
}
std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now();
switch (this->state_)
{
case ProtocolState::IDLE:
// Check incoming messages
{
size_t messagesCount = this->transport_.available();
if (messagesCount > 1)
{
// Shouldn't get more than 1 message, drop all except last
HAIER_LOGW("Incoming queue size %d (should be not more than 1). Dropping extra messages", messagesCount);
this->transport_.drop(messagesCount - 1);
messagesCount = 1;
}
if (messagesCount > 0)
size_t messages_dropped = this->drop_extra_messages_(this->transport_);
if (messages_dropped)
HAIER_LOGW("Incoming queue size %d (should be not more than 1). Dropping extra messages", messages_dropped + 1);
TimestampedFrame frame;
if (this->transport_.pop(frame))
{
TimestampedFrame frame;
this->transport_.pop(frame);
FrameType msg_type = (FrameType) frame.frame.get_frame_type();
this->incoming_message_crc_status_ = frame.frame.get_use_crc();
std::map<FrameType, MessageHandler>::const_iterator handler = this->message_handlers_map_.find(msg_type);
Expand All @@ -70,25 +57,46 @@ void ProtocolHandler::loop()
{
HAIER_LOGW("No answer sent in incoming messages handler, message type %02X", msg_type);
}
if (this->proxy_transport_)
{
// If proxy defined redirect frame to proxy transport
this->proxy_transport_->send_data(frame.frame.get_frame_type(), frame.frame.get_data(), frame.frame.get_data_size(), frame.frame.get_use_crc());
}
}
if (this->proxy_transport_)
{
if ((!this->outgoing_messages_.empty()) && (now >= this->cooldown_time_point_) && (now >= this->retry_time_point_))
// Processing incoming proxy messages
messages_dropped = this->drop_extra_messages_(*this->proxy_transport_);
if (messages_dropped)
HAIER_LOGW("Proxy queue size %d (should be not more than 1). Dropping extra messages", messages_dropped + 1);
if (this->proxy_transport_->pop(frame))
{
// Ready to send next message
OutgoingQueueItem &msg = this->outgoing_messages_.front();
if (msg.number_of_retries > 0) {
if (this->write_message_(msg.message, msg.use_crc))
{
this->last_message_type_ = msg.message.get_frame_type();
this->state_ = ProtocolState::WAITING_FOR_ANSWER;
this->answer_time_point_ = now + this->answer_timeout_interval_;
this->retry_time_point_ = now + msg.retry_interval;
}
msg.number_of_retries--;
} else {
this->outgoing_messages_.pop();
this->retry_time_point_ = now;
uint8_t data_size = frame.frame.get_data_size();
this->outgoing_messages_.push({
data_size == 0 ? HaierMessage((FrameType)frame.frame.get_frame_type()) : HaierMessage((FrameType)frame.frame.get_frame_type(), frame.frame.get_data(), data_size),
frame.frame.get_use_crc(),
0,
std::chrono::milliseconds::zero(),
true
});
}
}
if ((!this->outgoing_messages_.empty()) && (now >= this->cooldown_time_point_) && (now >= this->retry_time_point_))
{
// Ready to send next message
OutgoingQueueItem &msg = this->outgoing_messages_.front();
if (msg.number_of_retries > 0) {
if (this->write_message_(msg.message, msg.use_crc))
{
this->last_message_type_ = msg.message.get_frame_type();
this->state_ = msg.proxy_message ? ProtocolState::WAITING_FOR_PROXY_ANSWER : ProtocolState::WAITING_FOR_ANSWER;
this->answer_time_point_ = now + this->answer_timeout_interval_;
this->retry_time_point_ = now + msg.retry_interval;
}
msg.number_of_retries--;
} else {
this->outgoing_messages_.pop();
this->retry_time_point_ = now;
}
}
}
Expand All @@ -113,7 +121,7 @@ void ProtocolHandler::loop()
HAIER_LOGW("Timeout handler error, msg=%02X, err=%d", this->last_message_type_, hres);
}
}
state_ = ProtocolState::IDLE;
this->state_ = ProtocolState::IDLE;
break;
}
if (this->transport_.available() > 0)
Expand All @@ -124,22 +132,72 @@ void ProtocolHandler::loop()
HandlerError hres;
std::map<FrameType, AnswerHandler>::const_iterator handler = this->answer_handlers_map_.find(last_message_type_);
if (handler != this->answer_handlers_map_.end())
hres = handler->second(this->last_message_type_, msg_type, frame.frame.get_data(), frame.frame.get_data_size());
hres = handler->second(this->last_message_type_, msg_type, frame.frame.get_data(), frame.frame.get_data_size(), AnswerDestination::DIRECT_ANSWER);
else
hres = this->default_answer_handler_(this->last_message_type_, msg_type, frame.frame.get_data(), frame.frame.get_data_size());
hres = this->default_answer_handler_(this->last_message_type_, msg_type, frame.frame.get_data(), frame.frame.get_data_size(), AnswerDestination::DIRECT_ANSWER);
if (hres != HandlerError::HANDLER_OK)
{
HAIER_LOGW("Answer handler error, msg=%02X, answ=%02X, err=%d", this->last_message_type_, msg_type, hres);
}
// Answer received, remove message
this->outgoing_messages_.pop();
this->retry_time_point_ = now;
state_ = ProtocolState::IDLE;
this->state_ = ProtocolState::IDLE;
}
break;
case ProtocolState::WAITING_FOR_PROXY_ANSWER:
if (now > this->answer_time_point_)
{
// Proxy answer timeout, no callbacks
OutgoingQueueItem& msg = this->outgoing_messages_.front();
if (msg.number_of_retries == 0) {
// No more retries, remove message
HAIER_LOGW("Proxy message timeout, msg=%02X", this->last_message_type_);
this->outgoing_messages_.pop();
this->retry_time_point_ = now;
}
this->state_ = ProtocolState::IDLE;
break;
}
if (this->proxy_transport_)
{
TimestampedFrame frame;
if (this->transport_.pop(frame))
{
this->proxy_transport_->send_data(frame.frame.get_frame_type(), frame.frame.get_data(), frame.frame.get_data_size(), frame.frame.get_use_crc());
this->state_ = ProtocolState::IDLE;
}
FrameType msg_type = (FrameType) frame.frame.get_frame_type();
HandlerError hres;
std::map<FrameType, AnswerHandler>::const_iterator handler = this->answer_handlers_map_.find(last_message_type_);
if (handler != this->answer_handlers_map_.end())
hres = handler->second(this->last_message_type_, msg_type, frame.frame.get_data(), frame.frame.get_data_size(), AnswerDestination::PROXY_ANSWER);
else
hres = this->default_answer_handler_(this->last_message_type_, msg_type, frame.frame.get_data(), frame.frame.get_data_size(), AnswerDestination::PROXY_ANSWER);
if (hres != HandlerError::HANDLER_OK)
{
HAIER_LOGW("Proxy answer handler error, msg=%02X, answ=%02X, err=%d", this->last_message_type_, msg_type, hres);
}
}
else
this->state_ = ProtocolState::IDLE;
break;
}
}

size_t ProtocolHandler::drop_extra_messages_(TransportLevelHandler& transport)
{
size_t messagesCount = this->transport_.available();
if (messagesCount > 1)
{
// Shouldn't get more than 1 message, drop all except last
this->transport_.drop(messagesCount - 1);
return messagesCount - 1;
}
return 0;
}


bool ProtocolHandler::write_message_(const HaierMessage &message, bool use_crc)
{
size_t buf_size = message.get_buffer_size();
Expand Down Expand Up @@ -182,7 +240,7 @@ void ProtocolHandler::set_cooldown_interval(std::chrono::milliseconds answer_tim

void ProtocolHandler::send_message(const HaierMessage& message, bool use_crc, uint8_t num_repeats, std::chrono::milliseconds interval)
{
this->outgoing_messages_.push({ message, use_crc, std::min(num_repeats, MAX_PACKET_RETRIES) + 1, interval });
this->outgoing_messages_.push({ message, use_crc, std::min(num_repeats, MAX_PACKET_RETRIES) + 1, interval, false });
}

void ProtocolHandler::send_answer(const HaierMessage &answer)
Expand Down Expand Up @@ -256,6 +314,22 @@ void ProtocolHandler::set_default_timeout_handler(TimeoutHandler handler)
this->default_timeout_handler_ = handler;
}

void ProtocolHandler::set_proxy(ProtocolStream& proxy_stream)
{
this->proxy_transport_.reset(new TransportLevelHandler(proxy_stream));
}

void ProtocolHandler::remove_proxy()
{
this->proxy_transport_.reset();
}

bool ProtocolHandler::has_proxy() const
{
return this->proxy_transport_.operator bool();
}


/// <summary>
/// Default message handler, log everything and return UNSUPPORTED_MESSAGE
/// </summary>
Expand All @@ -276,11 +350,15 @@ HandlerError default_message_handler(FrameType message_type, const uint8_t *data
/// <param name="message_type">Type of incoming message</param>
/// <param name="data">Incoming message data</param>
/// <param name="data_size">Size of incoming data</param>
/// <parem name="dst">Answer destination (direct or proxy</param>
/// <returns>Error code</returns>
HandlerError default_answer_handler(FrameType requestType, FrameType message_type, const uint8_t *data, size_t data_size)
HandlerError default_answer_handler(FrameType requestType, FrameType message_type, const uint8_t *data, size_t data_size, AnswerDestination dst)
{
HAIER_LOGW("Unsupported answer to %02X received: type %02X data: %s", requestType, message_type, data_size > 0 ? buf_to_hex(data, data_size).c_str() : "<empty>");
return HandlerError::UNSUPPORTED_MESSAGE;
if (dst == AnswerDestination::DIRECT_ANSWER) {
HAIER_LOGW("Unsupported answer to %02X received: type %02X data: %s", requestType, message_type, data_size > 0 ? buf_to_hex(data, data_size).c_str() : "<empty>");
return HandlerError::UNSUPPORTED_MESSAGE;
}
return HandlerError::HANDLER_OK;
}

/// <summary>
Expand Down
Loading

0 comments on commit 9fcd93f

Please sign in to comment.