From 25b26fb16e1ed3859a6a34904be6a4ec3dc68e38 Mon Sep 17 00:00:00 2001 From: ericling Date: Thu, 26 Sep 2019 20:24:07 +0800 Subject: [PATCH] http-dubbo-filter test ok --- source/common/http/codec_client.cc | 13 ++-- source/common/http/codec_client.h | 17 +++--- source/common/http/codec_wrappers.h | 2 +- source/common/http/conn_manager_impl.cc | 34 +++++++++-- source/common/http/conn_manager_impl.h | 18 +++--- source/common/http/http1/codec_impl.cc | 1 + source/common/router/router.cc | 1 + .../http/dubbo_proxy/active_message.cc | 7 ++- .../filters/http/dubbo_proxy/config.cc | 2 + .../filters/http/dubbo_proxy/conn_manager.cc | 46 ++++++--------- .../filters/http/dubbo_proxy/conn_manager.h | 7 ++- .../filters/http/dubbo_proxy/decoder.cc | 59 +++++++++++++++++-- .../filters/http/dubbo_proxy/decoder.h | 9 ++- .../network/http_connection_manager/config.cc | 1 + 14 files changed, 151 insertions(+), 66 deletions(-) diff --git a/source/common/http/codec_client.cc b/source/common/http/codec_client.cc index 3d338ca8ee93..3e16d44ef096 100644 --- a/source/common/http/codec_client.cc +++ b/source/common/http/codec_client.cc @@ -51,15 +51,18 @@ void CodecClient::createPreSrvFilterChain(Http::PrivateProtoFilterChainFactoryCa void CodecClient::addClientFilter(Http::PrivateProtoFilterSharedPtr filter) { // set callback - privateProtoFilterPtr filterCallbackPtr(new privateProtoFilterCallbacks(*this)); - filter->setDecoderFilterCallbacks(*filterCallbackPtr); - filter->setEncoderFilterCallbacks(*filterCallbackPtr); +// privateProtoFilterPtr filterCallbackPtr(new privateProtoFilterCallbacks(*this)); +// filter->setDecoderFilterCallbacks(*filterCallbackPtr); +// filter->setEncoderFilterCallbacks(*filterCallbackPtr); ClientStreamFilterPtr wrapper(new ClientStreamFilter(*this, filter)); + filter->setDecoderFilterCallbacks(*wrapper); + filter->setEncoderFilterCallbacks(*wrapper); wrapper->moveIntoListBack(std::move(wrapper), pre_client_filters_); } -Network::Connection& CodecClient::privateProtoFilterCallbacks::connection() { - return *codec_client_.connection_; + +Network::Connection& CodecClient::ClientStreamFilter::connection() { + return *codec_client_.connection_; } void CodecClient::close() { connection_->close(Network::ConnectionCloseType::NoFlush); } diff --git a/source/common/http/codec_client.h b/source/common/http/codec_client.h index f43f644edc71..8bbe807f48f1 100644 --- a/source/common/http/codec_client.h +++ b/source/common/http/codec_client.h @@ -52,7 +52,7 @@ class CodecClient : public Logger::Loggable, public Event::DeferredDeletable { public: // private proto filter list - struct ClientStreamFilter : LinkedObject { + struct ClientStreamFilter : public PrivateProtoFilterCallbacks,LinkedObject { ClientStreamFilter(CodecClient& codec_client, PrivateProtoFilterSharedPtr filter) : codec_client_(codec_client), handle_(filter) {} CodecClient& codec_client_; @@ -67,6 +67,7 @@ class CodecClient : public Logger::Loggable, PrivateProtoFilterDataStatus status = handle_->encodeClientData(data, end_stream); return status; } + Network::Connection& connection() override ; }; typedef std::unique_ptr ClientStreamFilterPtr; std::list pre_client_filters_; @@ -82,13 +83,13 @@ class CodecClient : public Logger::Loggable, void addPreSrvDecodeFilter(Http::PrivateProtoFilterSharedPtr filter ABSL_ATTRIBUTE_UNUSED) override {} void decodePrivateProtoData(Buffer::Instance& data, bool end_stream) ; //wrapper for filter callbacks - struct privateProtoFilterCallbacks: public PrivateProtoFilterCallbacks { - privateProtoFilterCallbacks(CodecClient& codec_client) - : codec_client_(codec_client) {} - Network::Connection& connection() override ; - CodecClient& codec_client_; - }; - typedef std::unique_ptr privateProtoFilterPtr; +// struct privateProtoFilterCallbacks: public PrivateProtoFilterCallbacks { +// privateProtoFilterCallbacks(CodecClient& codec_client) +// : codec_client_(codec_client) {} +// Network::Connection& connection() override ; +// CodecClient& codec_client_; +// }; +// typedef std::unique_ptr privateProtoFilterPtr; /** * Type of HTTP codec to use. diff --git a/source/common/http/codec_wrappers.h b/source/common/http/codec_wrappers.h index d2db22f4c347..fafa853ae50f 100644 --- a/source/common/http/codec_wrappers.h +++ b/source/common/http/codec_wrappers.h @@ -90,7 +90,7 @@ class StreamEncoderWrapper : public StreamEncoder, Logger::LoggablesetDecoderFilterCallbacks(*filterPtr); + ENVOY_LOG(debug,"ConnectionManagerImpl::addPreSrvDecodeFilter,{}", static_cast(this)); +// privateProtoFilterPtr filterPtr(new privateProtoFilterCallbacks(*this)); +// filter->setDecoderFilterCallbacks(*filterPtr); +// +// ENVOY_LOG(debug,"filter cb conn manager pointer {},filterptr {}", +// static_cast(&(filterPtr->connection_manager_)), +// static_cast(&(*filterPtr))); // wrapper and add to list PreSrvStreamFilterPtr wrapper(new PreSrvStreamFilter(*this, filter)); + filter->setDecoderFilterCallbacks(*wrapper); wrapper->moveIntoListBack(std::move(wrapper), pre_srv_decoder_filters_); + + ENVOY_LOG(debug,"add to list end"); + ENVOY_LOG(debug,"decoder filter connection impl addr {}", static_cast(&(pre_srv_decoder_filters_.front()->connection_manager_))); } -Network::Connection& ConnectionManagerImpl::privateProtoFilterCallbacks::connection() { +//Network::Connection& ConnectionManagerImpl::privateProtoFilterCallbacks::connection() { +// ENVOY_LOG(debug,"privateProtoFilterCallbacks ,read_callbacks {}", static_cast(&connection_manager_.read_callbacks_)); +// return connection_manager_.read_callbacks_->connection(); +//} + +Network::Connection& ConnectionManagerImpl::PreSrvStreamFilter::connection() { +// ENVOY_LOG(debug,"PreSrvStreamFilter,read_callbacks {}", static_cast(&connection_manager_.read_callbacks_)); return connection_manager_.read_callbacks_->connection(); } @@ -293,6 +308,8 @@ StreamDecoder& ConnectionManagerImpl::newStream(StreamEncoder& response_encoder, } Network::FilterStatus ConnectionManagerImpl::onData(Buffer::Instance& data, bool end_stream) { + ENVOY_LOG(debug,"ConnectionManagerImpl::onData"); + // decode private proto data decodePrivateProtoData(data,end_stream); @@ -1395,9 +1412,14 @@ void ConnectionManagerImpl::ActiveStream::encodeHeaders(ActiveStreamEncoderFilte // Now actually encode via the codec. stream_info_.onFirstDownstreamTxByteSent(); - response_encoder_->encodeHeaders( - headers, - encoding_headers_only_ || (end_stream && continue_data_entry == encoder_filters_.end())); + if (Http::Utility::isPrivateProtoHeader(headers)) { + ENVOY_STREAM_LOG(debug, "rsponse header has private proto field,ignore encode", *this); + } else { + response_encoder_->encodeHeaders( + headers, + encoding_headers_only_ || (end_stream && continue_data_entry == encoder_filters_.end())); + } + if (continue_data_entry != encoder_filters_.end()) { // We use the continueEncoding() code since it will correctly handle not calling // encodeHeaders() again. Fake setting StopSingleIteration since the continueEncoding() code diff --git a/source/common/http/conn_manager_impl.h b/source/common/http/conn_manager_impl.h index a1fb9e75fc52..2fef51121a07 100644 --- a/source/common/http/conn_manager_impl.h +++ b/source/common/http/conn_manager_impl.h @@ -84,13 +84,13 @@ class ConnectionManagerImpl : Logger::Loggable, void encodePrivateProtoData(Buffer::Instance& data, bool end_stream); void addClientFilter(Http::PrivateProtoFilterSharedPtr filter ABSL_ATTRIBUTE_UNUSED) override {} // wrapper of private proto callbacks - struct privateProtoFilterCallbacks: public PrivateProtoFilterCallbacks { - privateProtoFilterCallbacks(ConnectionManagerImpl& connection_manager) - : connection_manager_(connection_manager) {} - Network::Connection& connection() override; - ConnectionManagerImpl& connection_manager_; - }; - typedef std::unique_ptr privateProtoFilterPtr; +// struct privateProtoFilterCallbacks: public PrivateProtoFilterCallbacks { +// privateProtoFilterCallbacks(ConnectionManagerImpl& connection_manager) +// : connection_manager_(connection_manager) {} +// Network::Connection& connection() override; +// ConnectionManagerImpl& connection_manager_; +// }; +// typedef std::unique_ptr privateProtoFilterPtr; // Network::ConnectionCallbacks void onEvent(Network::ConnectionEvent event) override; @@ -553,7 +553,7 @@ class ConnectionManagerImpl : Logger::Loggable, /** * Wrapper for a pre srv stream decoder filter. */ - struct PreSrvStreamFilter : LinkedObject { + struct PreSrvStreamFilter : public PrivateProtoFilterCallbacks, LinkedObject { PreSrvStreamFilter(ConnectionManagerImpl& connection_manager, PrivateProtoFilterSharedPtr filter) : connection_manager_(connection_manager), handle_(filter) {} ConnectionManagerImpl& connection_manager_; @@ -569,6 +569,8 @@ class ConnectionManagerImpl : Logger::Loggable, PrivateProtoFilterDataStatus status = handle_->encodeData(data, end_stream); return status; } + + Network::Connection& connection() override; }; typedef std::unique_ptr PreSrvStreamFilterPtr; diff --git a/source/common/http/http1/codec_impl.cc b/source/common/http/http1/codec_impl.cc index e7151be18ca5..6e0637d9f34d 100644 --- a/source/common/http/http1/codec_impl.cc +++ b/source/common/http/http1/codec_impl.cc @@ -165,6 +165,7 @@ void StreamEncoderImpl::encodeData(Buffer::Instance& data, bool end_stream) { } void StreamEncoderImpl::encodeRawData(Buffer::Instance& data, bool end_stream) { + ENVOY_LOG(debug,"stream encoder impl,encode raw data"); if (data.length() > 0) { connection_.buffer().move(data); } diff --git a/source/common/router/router.cc b/source/common/router/router.cc index 514aca9fd04e..d9c83a25218e 100644 --- a/source/common/router/router.cc +++ b/source/common/router/router.cc @@ -1,3 +1,4 @@ + #include "common/router/router.h" #include diff --git a/source/extensions/filters/http/dubbo_proxy/active_message.cc b/source/extensions/filters/http/dubbo_proxy/active_message.cc index 9a9ad767caff..18e249bf0d7f 100644 --- a/source/extensions/filters/http/dubbo_proxy/active_message.cc +++ b/source/extensions/filters/http/dubbo_proxy/active_message.cc @@ -21,7 +21,7 @@ DubboFilters::UpstreamResponseStatus ActiveResponseDecoder::onData(Buffer::Insta ENVOY_LOG(debug, "dubbo response: the received reply data length is {}", data.length()); bool underflow = false; - decoder_->onData(data, underflow); +// decoder_->onData(data, underflow); ASSERT(complete_ || underflow); return response_status_; @@ -187,8 +187,13 @@ ActiveMessage::ActiveMessage(ConnectionManager& parent) request_id_(-1), stream_id_(parent.random_generator().random()), stream_info_(parent.time_system()), pending_stream_decoded_(false), local_response_sent_(false) { +// ENVOY_LOG(debug,"active message new"); parent_.stats().request_active_.inc(); +// ENVOY_LOG(debug,"set adddr {}", static_cast(&stream_info_)); +// ENVOY_LOG(debug,"parent adddr {}", static_cast(&parent_)); +// ENVOY_LOG(debug,"conn addr {}", static_cast(&parent_.connection())); stream_info_.setDownstreamLocalAddress(parent_.connection().localAddress()); +// ENVOY_LOG(debug,"set adddr 2"); stream_info_.setDownstreamRemoteAddress(parent_.connection().remoteAddress()); } diff --git a/source/extensions/filters/http/dubbo_proxy/config.cc b/source/extensions/filters/http/dubbo_proxy/config.cc index 65a869988f1a..37fa4cb051e7 100644 --- a/source/extensions/filters/http/dubbo_proxy/config.cc +++ b/source/extensions/filters/http/dubbo_proxy/config.cc @@ -25,6 +25,8 @@ Http::PrivateProtoFilterFactoryCb DubboProxyFilterConfigFactory::createFilterFac return [filter_config, &context](Http::PrivateProtoFilterChainFactoryCallbacks& callbacks) -> void { callbacks.addPreSrvDecodeFilter(std::make_shared( *filter_config, context.random(), context.dispatcher().timeSource())); + callbacks.addClientFilter(std::make_shared( + *filter_config, context.random(), context.dispatcher().timeSource())); }; } diff --git a/source/extensions/filters/http/dubbo_proxy/conn_manager.cc b/source/extensions/filters/http/dubbo_proxy/conn_manager.cc index 1c13e99681b9..c6766fa3c8f3 100644 --- a/source/extensions/filters/http/dubbo_proxy/conn_manager.cc +++ b/source/extensions/filters/http/dubbo_proxy/conn_manager.cc @@ -28,6 +28,7 @@ ConnectionManager::ConnectionManager(Config& config, Runtime::RandomGenerator& r decoder_(std::make_unique(*protocol_, *this)) {} Http::PrivateProtoFilterDataStatus ConnectionManager::decodeData(Buffer::Instance& data, bool end_stream) { + ENVOY_LOG(debug,"dubbo: decode server data, {}", static_cast(this)); Network::FilterStatus status = onData(data,end_stream); if (status == Network::FilterStatus::StopIteration) { return Http::PrivateProtoFilterDataStatus::StopIteration; @@ -36,6 +37,9 @@ Http::PrivateProtoFilterDataStatus ConnectionManager::decodeData(Buffer::Instanc } Http::PrivateProtoFilterDataStatus ConnectionManager::decodeClientData(Buffer::Instance& data, bool end_stream) { + ENVOY_LOG(debug,"dubbo: decode client data,{}", static_cast(this)); + setIsResponse(true); + Network::FilterStatus status = onData(data,end_stream); if (status == Network::FilterStatus::StopIteration) { return Http::PrivateProtoFilterDataStatus::StopIteration; @@ -62,18 +66,22 @@ void ConnectionManager::setDecoderFilterCallbacks(Http::PrivateProtoFilterCallba void ConnectionManager::setEncoderFilterCallbacks(Http::PrivateProtoFilterCallbacks& callbacks) { private_proto_encoder_filter_callbacks_ = &callbacks; } +void ConnectionManager::setIsResponse(bool is_response) { + is_response_ = is_response; +} Network::FilterStatus ConnectionManager::onData(Buffer::Instance& data, bool end_stream) { + ENVOY_LOG(trace, "dubbo: read {} bytes", data.length()); request_buffer_.move(data); - ENVOY_LOG(trace, "dubbo: after move data length {} ", data.length()); dispatch(); //after dispath,put http buff to data data.prepend(http_buffer_); http_buffer_.drain(http_buffer_.length()); - ENVOY_LOG(trace, "dubbo: after encap to http,data length {} ", data.length()); + ENVOY_LOG(trace, "dubbo: after encap to http,data length {},http buff length {},data: {} ", + data.length(),http_buffer_.length(),data.toString()); if (end_stream) { ENVOY_CONN_LOG(trace, "downstream half-closed", private_proto_decoder_filter_callbacks_->connection()); @@ -128,15 +136,18 @@ StreamHandler& ConnectionManager::newStream() { ENVOY_LOG(debug, "dubbo: create the new decoder event handler"); ActiveMessagePtr new_message(std::make_unique(*this)); - new_message->createFilterChain(); +// new_message->createFilterChain(); new_message->moveIntoList(std::move(new_message), active_message_list_); return **active_message_list_.begin(); } +Network::Connection& ConnectionManager::connection() { + return private_proto_decoder_filter_callbacks_->connection(); +} + void ConnectionManager::onHeartbeat(MessageMetadataSharedPtr metadata) { stats_.request_event_.inc(); -// if (read_callbacks_->connection().state() != Network::Connection::State::Open) { if (private_proto_decoder_filter_callbacks_->connection().state() != Network::Connection::State::Open) { ENVOY_LOG(warn, "dubbo: downstream connection is closed or closing"); return; @@ -149,30 +160,9 @@ void ConnectionManager::onHeartbeat(MessageMetadataSharedPtr metadata) { Buffer::OwnedImpl response_buffer; heartbeat.encode(*metadata, *protocol_, response_buffer); -// read_callbacks_->connection().write(response_buffer, false); private_proto_decoder_filter_callbacks_->connection().write(response_buffer, false); } -void ConnectionManager::encapHttpPkg() { - ActiveStream* as = decoder_->stateMachine().getActiveStream(); - const auto invocation = dynamic_cast(&(as->metadata_->invocation_info())); - if (invocation->hasHeaders()) { - Http::Utility::encapHttpRequest("/", - read_callbacks_->connection().remoteAddress()->asString(), - const_cast(invocation->headers()), - as->context_->message_origin_data(), - http_buffer_); - } else { - Http::HeaderMapImpl header_map; - Http::Utility::encapHttpRequest("/", - read_callbacks_->connection().remoteAddress()->asString(), - header_map, - as->context_->message_origin_data(), - http_buffer_); - } - -} - void ConnectionManager::dispatch() { if (0 == request_buffer_.length()) { ENVOY_LOG(warn, "dubbo: it's empty data"); @@ -184,13 +174,11 @@ void ConnectionManager::dispatch() { return; } + decoder_->setIsResponse(is_response_); try { bool underflow = false; while (!underflow) { - decoder_->onData(request_buffer_, underflow); - - // one pkg decode ok,encap to http. - encapHttpPkg(); + decoder_->onData(request_buffer_, underflow,http_buffer_,private_proto_decoder_filter_callbacks_->connection()); } return; } catch (const EnvoyException& ex) { diff --git a/source/extensions/filters/http/dubbo_proxy/conn_manager.h b/source/extensions/filters/http/dubbo_proxy/conn_manager.h index ae88a3069eec..99277a016a60 100644 --- a/source/extensions/filters/http/dubbo_proxy/conn_manager.h +++ b/source/extensions/filters/http/dubbo_proxy/conn_manager.h @@ -68,7 +68,7 @@ class ConnectionManager : public Network::ReadFilter, void setEncoderFilterCallbacks(Http::PrivateProtoFilterCallbacks& callbacks) override; void setDecoderFilterCallbacks(Http::PrivateProtoFilterCallbacks& callbacks) override; - + void setIsResponse(bool); // Network::ReadFilter Network::FilterStatus onData(Buffer::Instance& data, bool end_stream) override; @@ -85,7 +85,8 @@ class ConnectionManager : public Network::ReadFilter, void onHeartbeat(MessageMetadataSharedPtr metadata) override; DubboFilterStats& stats() const { return stats_; } - Network::Connection& connection() const { return read_callbacks_->connection(); } +// Network::Connection& connection() const { return read_callbacks_->connection(); } + Network::Connection& connection(); TimeSource& time_system() const { return time_system_; } Runtime::RandomGenerator& random_generator() const { return random_generator_; } Config& config() const { return config_; } @@ -102,7 +103,6 @@ class ConnectionManager : public Network::ReadFilter, private: void dispatch(); - void encapHttpPkg(); void resetAllMessages(bool local_reset); Buffer::OwnedImpl request_buffer_; @@ -124,6 +124,7 @@ class ConnectionManager : public Network::ReadFilter, Http::PrivateProtoFilterCallbacks* private_proto_decoder_filter_callbacks_{}; Http::PrivateProtoFilterCallbacks* private_proto_encoder_filter_callbacks_{}; Buffer::OwnedImpl http_buffer_; + bool is_response_{false}; }; } // namespace DubboProxy diff --git a/source/extensions/filters/http/dubbo_proxy/decoder.cc b/source/extensions/filters/http/dubbo_proxy/decoder.cc index 27b9ff5250ff..9c587ccf780d 100644 --- a/source/extensions/filters/http/dubbo_proxy/decoder.cc +++ b/source/extensions/filters/http/dubbo_proxy/decoder.cc @@ -2,6 +2,7 @@ #include "common/common/macros.h" #include "common/http/utility.h" +#include "extensions/filters/http/dubbo_proxy/serializer_impl.h" namespace Envoy { namespace Extensions { @@ -12,6 +13,8 @@ DecoderStateMachine::DecoderStatus DecoderStateMachine::onDecodeStreamHeader(Buffer::Instance& buffer) { ASSERT(!active_stream_); + ENVOY_LOG(debug, "dubbo decoder: onDecodeStreamHeader,buffer {}", buffer.toString()); + auto metadata = std::make_shared(); auto ret = protocol_.decodeHeader(buffer, metadata); if (!ret.second) { @@ -33,6 +36,7 @@ DecoderStateMachine::onDecodeStreamHeader(Buffer::Instance& buffer) { return {ProtocolState::Done}; } + ENVOY_LOG(debug, "dubbo decoder: protocol {},newstream ", protocol_.name()); active_stream_ = delegate_.newStream(metadata, context); ASSERT(active_stream_); context->message_origin_data().move(buffer, context->header_size()); @@ -53,7 +57,9 @@ DecoderStateMachine::onDecodeStreamData(Buffer::Instance& buffer) { active_stream_->context_->message_origin_data().move(buffer, active_stream_->context_->body_size()); active_stream_->onStreamDecoded(); - active_stream_ = nullptr; + + // change: not set here, +// active_stream_ = nullptr; ENVOY_LOG(debug, "dubbo decoder: ends the deserialization of the message"); return {ProtocolState::Done}; @@ -92,7 +98,45 @@ DecoderBase::DecoderBase(Protocol& protocol) : protocol_(protocol) {} DecoderBase::~DecoderBase() { complete(); } -FilterStatus DecoderBase::onData(Buffer::Instance& data, bool& buffer_underflow) { +void DecoderBase::encapHttpPkg(Buffer::Instance& http_buff,Network::Connection& connection) { + ENVOY_LOG(debug,"encap http request"); + ActiveStream* as = stateMachine().getActiveStream(); + + const auto invocation = dynamic_cast(&(as->metadata_->invocation_info())); + + if (invocation->hasHeaders()) { + Http::Utility::encapHttpRequest("/", + connection.remoteAddress()->asString(), + const_cast(invocation->headers()), + as->context_->message_origin_data(), + http_buff); + } else { + Http::HeaderMapImpl header_map; + Http::Utility::encapHttpRequest("/", + connection.remoteAddress()->asString(), + header_map, + as->context_->message_origin_data(), + http_buff); + } + +} + +void DecoderBase::encapHttpResponse(Buffer::Instance& http_buff) { + ENVOY_LOG(debug,"encap http response,stateMachine {}", static_cast(&(*state_machine_))); + ActiveStream* as = stateMachine().getActiveStream(); + Http::HeaderMapImpl header_map; + Http::Utility::encapHttpResponse( + header_map, + as->context_->message_origin_data(), + http_buff); + +} + +void DecoderBase::setIsResponse(bool is_response) { + is_response_ = is_response; +} + +FilterStatus DecoderBase::onData(Buffer::Instance& data, bool& buffer_underflow,Buffer::Instance& http_buff,Network::Connection& connection) { ENVOY_LOG(debug, "dubbo decoder: {} bytes available", data.length()); buffer_underflow = false; @@ -117,11 +161,18 @@ FilterStatus DecoderBase::onData(Buffer::Instance& data, bool& buffer_underflow) ASSERT(rv == ProtocolState::Done); + //on whole dubbo pkg,encap to http pkg + if (is_response_) { + encapHttpResponse(http_buff); + } else { + encapHttpPkg(http_buff,connection); + } + state_machine_->resetActiveStream(); + complete(); buffer_underflow = (data.length() == 0); - - ENVOY_LOG(debug, "dubbo decoder: data length {}", data.length()); + ENVOY_LOG(debug, "dubbo decoder: data length {},buffer_underflow {}", data.length(),buffer_underflow); return FilterStatus::Continue; } diff --git a/source/extensions/filters/http/dubbo_proxy/decoder.h b/source/extensions/filters/http/dubbo_proxy/decoder.h index ba6660ee08bd..5314e404094b 100644 --- a/source/extensions/filters/http/dubbo_proxy/decoder.h +++ b/source/extensions/filters/http/dubbo_proxy/decoder.h @@ -97,6 +97,7 @@ class DecoderStateMachine : public Logger::Loggable { void setCurrentState(ProtocolState state) { state_ = state; } ActiveStream* getActiveStream() { return active_stream_;} + void resetActiveStream() { active_stream_ = nullptr;} private: struct DecoderStatus { @@ -138,8 +139,13 @@ class DecoderBase : public DecoderStateMachine::Delegate, * @param data a Buffer containing Dubbo protocol data * @throw EnvoyException on Dubbo protocol errors */ - FilterStatus onData(Buffer::Instance& data, bool& buffer_underflow); + FilterStatus onData(Buffer::Instance& data, bool& buffer_underflow,Buffer::Instance& http_buff,Network::Connection& connection); + void encapHttpPkg(Buffer::Instance& http_buff,Network::Connection& connection); + + void encapHttpResponse(Buffer::Instance& http_buff); + + void setIsResponse(bool); const Protocol& protocol() { return protocol_; } DecoderStateMachine& stateMachine() {return *state_machine_;} @@ -158,6 +164,7 @@ class DecoderBase : public DecoderStateMachine::Delegate, DecoderStateMachinePtr state_machine_; bool decode_started_{false}; + bool is_response_{false}; }; /** diff --git a/source/extensions/filters/network/http_connection_manager/config.cc b/source/extensions/filters/network/http_connection_manager/config.cc index 34c4ea7615ab..4a292238dfd8 100644 --- a/source/extensions/filters/network/http_connection_manager/config.cc +++ b/source/extensions/filters/network/http_connection_manager/config.cc @@ -417,6 +417,7 @@ void HttpConnectionManagerConfig::createFilterChain(Http::FilterChainFactoryCall } void HttpConnectionManagerConfig::createPreSrvFilterChain(Http::PrivateProtoFilterChainFactoryCallbacks& callbacks) { + ENVOY_LOG(debug,"create pre srv filter chain"); for (const Http::PrivateProtoFilterFactoryCb& factory : pre_srv_filter_factories_) { factory(callbacks); }