From 1f9383a2fe904450275ac8cb10725e0038e8ccb6 Mon Sep 17 00:00:00 2001 From: ericling Date: Tue, 24 Sep 2019 20:38:30 +0800 Subject: [PATCH] fix pre-srv-http-filter read proto msg err --- include/envoy/server/filter_config.h | 3 +- source/common/router/config_impl.cc | 4 ++ .../http/common/private_proto_factory_base.h | 5 +++ .../filters/http/dubbo_proxy/config.cc | 35 +++++++-------- .../filters/http/dubbo_proxy/conn_manager.cc | 43 +++++++++++++++++-- .../filters/http/dubbo_proxy/conn_manager.h | 2 + .../filters/http/dubbo_proxy/decoder.cc | 3 ++ .../filters/http/dubbo_proxy/decoder.h | 4 ++ .../network/http_connection_manager/config.cc | 3 ++ 9 files changed, 79 insertions(+), 23 deletions(-) diff --git a/include/envoy/server/filter_config.h b/include/envoy/server/filter_config.h index 45a1fbecae7d..7c3fa7fac8d8 100644 --- a/include/envoy/server/filter_config.h +++ b/include/envoy/server/filter_config.h @@ -403,12 +403,11 @@ class PrivateProtoNamedHttpFilterConfigFactory : public ProtocolOptionsFactory { virtual Http::PrivateProtoFilterFactoryCb createPrivateProtoFilterFactoryFromProto(const Protobuf::Message& config, Server::Configuration::FactoryContext& context) { UNREFERENCED_PARAMETER(config); -// UNREFERENCED_PARAMETER(stat_prefix); UNREFERENCED_PARAMETER(context); NOT_IMPLEMENTED_GCOVR_EXCL_LINE; } - ProtobufTypes::MessagePtr createEmptyConfigProto() { return ProtobufTypes::MessagePtr{new Envoy::ProtobufWkt::Empty()}; } + virtual ProtobufTypes::MessagePtr createEmptyConfigProto() { return nullptr; } virtual std::string name() PURE; }; diff --git a/source/common/router/config_impl.cc b/source/common/router/config_impl.cc index 1d7fae264306..47afa13c1fa2 100644 --- a/source/common/router/config_impl.cc +++ b/source/common/router/config_impl.cc @@ -479,6 +479,7 @@ RouteEntryImplBase::RouteEntryImplBase(const VirtualHostImpl& vhost, throw EnvoyException(fmt::format("Duplicate upgrade {}", upgrade_config.upgrade_type())); } } + ENVOY_LOG(debug,"route entry impl base end"); } bool RouteEntryImplBase::evaluateRuntimeMatch(const uint64_t random_value) const { @@ -500,11 +501,14 @@ void RouteEntryImplBase::processPreClientFilter(const envoy::api::v2::route::Htt // Now see if there is a factory that will accept the config. auto& factory = Envoy::Config::Utility::getAndCheckFactory(string_name); + ENVOY_LOG(debug, "check factory {}",factory.name()); + Http::PrivateProtoFilterFactoryCb callback; ProtobufTypes::MessagePtr message = Envoy::Config::Utility::translateToFactoryConfig( proto_config, context_.messageValidationVisitor(), factory); callback = factory.createPrivateProtoFilterFactoryFromProto(*message,context_); filter_factories.push_back(callback); + ENVOY_LOG(debug, "filter factory size : {}", filter_factories.size()); } bool RouteEntryImplBase::matchRoute(const Http::HeaderMap& headers, uint64_t random_value) const { diff --git a/source/extensions/filters/http/common/private_proto_factory_base.h b/source/extensions/filters/http/common/private_proto_factory_base.h index bf64fe79d126..21494b023a43 100644 --- a/source/extensions/filters/http/common/private_proto_factory_base.h +++ b/source/extensions/filters/http/common/private_proto_factory_base.h @@ -17,10 +17,15 @@ class PrivateProtoFactoryBase : public Server::Configuration::PrivateProtoNamedH Http::PrivateProtoFilterFactoryCb createPrivateProtoFilterFactoryFromProto(const Protobuf::Message& proto_config, Server::Configuration::FactoryContext& context) override { +// ENVOY_LOG(debug,"private proto factory base create filter factory proto,mesg {}",proto_config.DebugString()); return createFilterFactoryFromProtoTyped( MessageUtil::downcastAndValidate(proto_config), context); } + ProtobufTypes::MessagePtr createEmptyConfigProto() override { + return std::make_unique(); + } + std::string name() override { return name_; } protected: diff --git a/source/extensions/filters/http/dubbo_proxy/config.cc b/source/extensions/filters/http/dubbo_proxy/config.cc index c8663af312a7..65a869988f1a 100644 --- a/source/extensions/filters/http/dubbo_proxy/config.cc +++ b/source/extensions/filters/http/dubbo_proxy/config.cc @@ -17,9 +17,9 @@ namespace HttpFilters { namespace DubboProxy { Http::PrivateProtoFilterFactoryCb DubboProxyFilterConfigFactory::createFilterFactoryFromProtoTyped( - const envoy::config::filter::network::dubbo_proxy::v2alpha1::DubboProxy& proto_config, - Server::Configuration::FactoryContext& context) { - + const envoy::config::filter::network::dubbo_proxy::v2alpha1::DubboProxy& proto_config ABSL_ATTRIBUTE_UNUSED, + Server::Configuration::FactoryContext& context ABSL_ATTRIBUTE_UNUSED) { + ENVOY_LOG(debug,"dubbo: proxy create filter factory"); std::shared_ptr filter_config(std::make_shared(proto_config, context)); return [filter_config, &context](Http::PrivateProtoFilterChainFactoryCallbacks& callbacks) -> void { @@ -102,20 +102,21 @@ ConfigImpl::ConfigImpl(const DubboProxyConfig& config, serialization_type_( SerializationTypeMapper::lookupSerializationType(config.serialization_type())), protocol_type_(ProtocolTypeMapper::lookupProtocolType(config.protocol_type())) { - auto type = RouteMatcherTypeMapper::lookupRouteMatcherType(config.protocol_type()); - route_matcher_ = Router::NamedRouteMatcherConfigFactory::getFactory(type).createRouteMatcher( - config.route_config(), context); - if (config.dubbo_filters().empty()) { - ENVOY_LOG(debug, "using default router filter"); - - envoy::config::filter::network::dubbo_proxy::v2alpha1::DubboFilter router_config; - router_config.set_name(DubboFilters::DubboFilterNames::get().ROUTER); - registerFilter(router_config); - } else { - for (const auto& filter_config : config.dubbo_filters()) { - registerFilter(filter_config); - } - } + ENVOY_LOG(debug,"dubbo config impl create"); +// auto type = RouteMatcherTypeMapper::lookupRouteMatcherType(config.protocol_type()); +// route_matcher_ = Router::NamedRouteMatcherConfigFactory::getFactory(type).createRouteMatcher( +// config.route_config(), context); +// if (config.dubbo_filters().empty()) { +// ENVOY_LOG(debug, "using default router filter"); +// +// envoy::config::filter::network::dubbo_proxy::v2alpha1::DubboFilter router_config; +// router_config.set_name(DubboFilters::DubboFilterNames::get().ROUTER); +// registerFilter(router_config); +// } else { +// for (const auto& filter_config : config.dubbo_filters()) { +// registerFilter(filter_config); +// } +// } } void ConfigImpl::createFilterChain(DubboFilters::FilterChainFactoryCallbacks& callbacks) { diff --git a/source/extensions/filters/http/dubbo_proxy/conn_manager.cc b/source/extensions/filters/http/dubbo_proxy/conn_manager.cc index 7fbcc0e5043b..1c13e99681b9 100644 --- a/source/extensions/filters/http/dubbo_proxy/conn_manager.cc +++ b/source/extensions/filters/http/dubbo_proxy/conn_manager.cc @@ -1,15 +1,18 @@ #include "extensions/filters/http/dubbo_proxy/conn_manager.h" #include +//#include #include "envoy/common/exception.h" -#include "common/common/fmt.h" +//#include "common/common/fmt.h" +#include "common/http/utility.h" #include "extensions/filters/http/dubbo_proxy/app_exception.h" #include "extensions/filters/http/dubbo_proxy/dubbo_hessian2_serializer_impl.h" -#include "extensions/filters/http/dubbo_proxy/dubbo_protocol_impl.h" +//#include "extensions/filters/http/dubbo_proxy/dubbo_protocol_impl.h" #include "extensions/filters/http/dubbo_proxy/heartbeat_response.h" +#include "extensions/filters/http/dubbo_proxy/serializer_impl.h" namespace Envoy { namespace Extensions { @@ -63,8 +66,15 @@ void ConnectionManager::setEncoderFilterCallbacks(Http::PrivateProtoFilterCallba Network::FilterStatus ConnectionManager::onData(Buffer::Instance& data, bool end_stream) { ENVOY_LOG(trace, "dubbo: read {} bytes", data.length()); request_buffer_.move(data); + ENVOY_LOG(trace, "dubbo: after move data length {} ", data.length()); + dispatch(); + //after dispath,put http buff to data + data.prepend(http_buffer_); + http_buffer_.drain(http_buffer_.length()); + ENVOY_LOG(trace, "dubbo: after encap to http,data length {} ", data.length()); + if (end_stream) { ENVOY_CONN_LOG(trace, "downstream half-closed", private_proto_decoder_filter_callbacks_->connection()); @@ -126,7 +136,8 @@ StreamHandler& ConnectionManager::newStream() { void ConnectionManager::onHeartbeat(MessageMetadataSharedPtr metadata) { stats_.request_event_.inc(); - if (read_callbacks_->connection().state() != Network::Connection::State::Open) { +// if (read_callbacks_->connection().state() != Network::Connection::State::Open) { + if (private_proto_decoder_filter_callbacks_->connection().state() != Network::Connection::State::Open) { ENVOY_LOG(warn, "dubbo: downstream connection is closed or closing"); return; } @@ -138,7 +149,28 @@ void ConnectionManager::onHeartbeat(MessageMetadataSharedPtr metadata) { Buffer::OwnedImpl response_buffer; heartbeat.encode(*metadata, *protocol_, response_buffer); - read_callbacks_->connection().write(response_buffer, false); +// read_callbacks_->connection().write(response_buffer, false); + private_proto_decoder_filter_callbacks_->connection().write(response_buffer, false); +} + +void ConnectionManager::encapHttpPkg() { + ActiveStream* as = decoder_->stateMachine().getActiveStream(); + const auto invocation = dynamic_cast(&(as->metadata_->invocation_info())); + if (invocation->hasHeaders()) { + Http::Utility::encapHttpRequest("/", + read_callbacks_->connection().remoteAddress()->asString(), + const_cast(invocation->headers()), + as->context_->message_origin_data(), + http_buffer_); + } else { + Http::HeaderMapImpl header_map; + Http::Utility::encapHttpRequest("/", + read_callbacks_->connection().remoteAddress()->asString(), + header_map, + as->context_->message_origin_data(), + http_buffer_); + } + } void ConnectionManager::dispatch() { @@ -156,6 +188,9 @@ void ConnectionManager::dispatch() { bool underflow = false; while (!underflow) { decoder_->onData(request_buffer_, underflow); + + // one pkg decode ok,encap to http. + encapHttpPkg(); } return; } catch (const EnvoyException& ex) { diff --git a/source/extensions/filters/http/dubbo_proxy/conn_manager.h b/source/extensions/filters/http/dubbo_proxy/conn_manager.h index 20fe4022ed8c..ae88a3069eec 100644 --- a/source/extensions/filters/http/dubbo_proxy/conn_manager.h +++ b/source/extensions/filters/http/dubbo_proxy/conn_manager.h @@ -102,6 +102,7 @@ class ConnectionManager : public Network::ReadFilter, private: void dispatch(); + void encapHttpPkg(); void resetAllMessages(bool local_reset); Buffer::OwnedImpl request_buffer_; @@ -122,6 +123,7 @@ class ConnectionManager : public Network::ReadFilter, Http::PrivateProtoFilterCallbacks* private_proto_decoder_filter_callbacks_{}; Http::PrivateProtoFilterCallbacks* private_proto_encoder_filter_callbacks_{}; + Buffer::OwnedImpl http_buffer_; }; } // namespace DubboProxy diff --git a/source/extensions/filters/http/dubbo_proxy/decoder.cc b/source/extensions/filters/http/dubbo_proxy/decoder.cc index 7e638f81f432..27b9ff5250ff 100644 --- a/source/extensions/filters/http/dubbo_proxy/decoder.cc +++ b/source/extensions/filters/http/dubbo_proxy/decoder.cc @@ -1,6 +1,7 @@ #include "extensions/filters/http/dubbo_proxy/decoder.h" #include "common/common/macros.h" +#include "common/http/utility.h" namespace Envoy { namespace Extensions { @@ -118,6 +119,8 @@ FilterStatus DecoderBase::onData(Buffer::Instance& data, bool& buffer_underflow) complete(); buffer_underflow = (data.length() == 0); + + ENVOY_LOG(debug, "dubbo decoder: data length {}", data.length()); return FilterStatus::Continue; } diff --git a/source/extensions/filters/http/dubbo_proxy/decoder.h b/source/extensions/filters/http/dubbo_proxy/decoder.h index e69634009192..ba6660ee08bd 100644 --- a/source/extensions/filters/http/dubbo_proxy/decoder.h +++ b/source/extensions/filters/http/dubbo_proxy/decoder.h @@ -96,6 +96,8 @@ class DecoderStateMachine : public Logger::Loggable { */ void setCurrentState(ProtocolState state) { state_ = state; } + ActiveStream* getActiveStream() { return active_stream_;} + private: struct DecoderStatus { DecoderStatus() = default; @@ -140,6 +142,8 @@ class DecoderBase : public DecoderStateMachine::Delegate, const Protocol& protocol() { return protocol_; } + DecoderStateMachine& stateMachine() {return *state_machine_;} + // It is assumed that all of the protocol parsing are stateless, // if there is a state of the need to provide the reset interface call here. void reset(); diff --git a/source/extensions/filters/network/http_connection_manager/config.cc b/source/extensions/filters/network/http_connection_manager/config.cc index 2b06874f0387..34c4ea7615ab 100644 --- a/source/extensions/filters/network/http_connection_manager/config.cc +++ b/source/extensions/filters/network/http_connection_manager/config.cc @@ -361,6 +361,7 @@ void HttpConnectionManagerConfig::processFilter( ENVOY_LOG(debug, "protobuf config: createFilterFactoryFromProto"); ProtobufTypes::MessagePtr message = Config::Utility::translateToFactoryConfig( proto_config, context_.messageValidationVisitor(), factory); + ENVOY_LOG(debug, "protobuf config: message {}",message->DebugString()); callback = factory.createFilterFactoryFromProto(*message, stats_prefix_, context_); } filter_factories.push_back(callback); @@ -378,10 +379,12 @@ void HttpConnectionManagerConfig::processPreSrvFilter( // Now see if there is a factory that will accept the config. auto& factory = Config::Utility::getAndCheckFactory(string_name); + ENVOY_LOG(debug, " facotory name: {}", factory.name()); Http::PrivateProtoFilterFactoryCb callback; ProtobufTypes::MessagePtr message = Config::Utility::translateToFactoryConfig( proto_config, context_.messageValidationVisitor(), factory); callback = factory.createPrivateProtoFilterFactoryFromProto(*message,context_); + ENVOY_LOG(debug, " pre srv config msg : {}", message->DebugString()); filter_factories.push_back(callback); }