Skip to content

Commit

Permalink
fix pre-srv-http-filter read proto msg err
Browse files Browse the repository at this point in the history
  • Loading branch information
ericling committed Sep 24, 2019
1 parent 06415c9 commit 1f9383a
Show file tree
Hide file tree
Showing 9 changed files with 79 additions and 23 deletions.
3 changes: 1 addition & 2 deletions include/envoy/server/filter_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -403,12 +403,11 @@ class PrivateProtoNamedHttpFilterConfigFactory : public ProtocolOptionsFactory {
virtual Http::PrivateProtoFilterFactoryCb createPrivateProtoFilterFactoryFromProto(const Protobuf::Message& config,
Server::Configuration::FactoryContext& context) {
UNREFERENCED_PARAMETER(config);
// UNREFERENCED_PARAMETER(stat_prefix);
UNREFERENCED_PARAMETER(context);
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
}

ProtobufTypes::MessagePtr createEmptyConfigProto() { return ProtobufTypes::MessagePtr{new Envoy::ProtobufWkt::Empty()}; }
virtual ProtobufTypes::MessagePtr createEmptyConfigProto() { return nullptr; }

virtual std::string name() PURE;
};
Expand Down
4 changes: 4 additions & 0 deletions source/common/router/config_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,7 @@ RouteEntryImplBase::RouteEntryImplBase(const VirtualHostImpl& vhost,
throw EnvoyException(fmt::format("Duplicate upgrade {}", upgrade_config.upgrade_type()));
}
}
ENVOY_LOG(debug,"route entry impl base end");
}

bool RouteEntryImplBase::evaluateRuntimeMatch(const uint64_t random_value) const {
Expand All @@ -500,11 +501,14 @@ void RouteEntryImplBase::processPreClientFilter(const envoy::api::v2::route::Htt

// Now see if there is a factory that will accept the config.
auto& factory = Envoy::Config::Utility::getAndCheckFactory<Server::Configuration::PrivateProtoNamedHttpFilterConfigFactory>(string_name);
ENVOY_LOG(debug, "check factory {}",factory.name());

Http::PrivateProtoFilterFactoryCb callback;
ProtobufTypes::MessagePtr message = Envoy::Config::Utility::translateToFactoryConfig(
proto_config, context_.messageValidationVisitor(), factory);
callback = factory.createPrivateProtoFilterFactoryFromProto(*message,context_);
filter_factories.push_back(callback);
ENVOY_LOG(debug, "filter factory size : {}", filter_factories.size());
}

bool RouteEntryImplBase::matchRoute(const Http::HeaderMap& headers, uint64_t random_value) const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,15 @@ class PrivateProtoFactoryBase : public Server::Configuration::PrivateProtoNamedH
Http::PrivateProtoFilterFactoryCb
createPrivateProtoFilterFactoryFromProto(const Protobuf::Message& proto_config,
Server::Configuration::FactoryContext& context) override {
// ENVOY_LOG(debug,"private proto factory base create filter factory proto,mesg {}",proto_config.DebugString());
return createFilterFactoryFromProtoTyped(
MessageUtil::downcastAndValidate<const ConfigProto&>(proto_config), context);
}

ProtobufTypes::MessagePtr createEmptyConfigProto() override {
return std::make_unique<ConfigProto>();
}

std::string name() override { return name_; }

protected:
Expand Down
35 changes: 18 additions & 17 deletions source/extensions/filters/http/dubbo_proxy/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ namespace HttpFilters {
namespace DubboProxy {

Http::PrivateProtoFilterFactoryCb DubboProxyFilterConfigFactory::createFilterFactoryFromProtoTyped(
const envoy::config::filter::network::dubbo_proxy::v2alpha1::DubboProxy& proto_config,
Server::Configuration::FactoryContext& context) {

const envoy::config::filter::network::dubbo_proxy::v2alpha1::DubboProxy& proto_config ABSL_ATTRIBUTE_UNUSED,
Server::Configuration::FactoryContext& context ABSL_ATTRIBUTE_UNUSED) {
ENVOY_LOG(debug,"dubbo: proxy create filter factory");
std::shared_ptr<Config> filter_config(std::make_shared<ConfigImpl>(proto_config, context));

return [filter_config, &context](Http::PrivateProtoFilterChainFactoryCallbacks& callbacks) -> void {
Expand Down Expand Up @@ -102,20 +102,21 @@ ConfigImpl::ConfigImpl(const DubboProxyConfig& config,
serialization_type_(
SerializationTypeMapper::lookupSerializationType(config.serialization_type())),
protocol_type_(ProtocolTypeMapper::lookupProtocolType(config.protocol_type())) {
auto type = RouteMatcherTypeMapper::lookupRouteMatcherType(config.protocol_type());
route_matcher_ = Router::NamedRouteMatcherConfigFactory::getFactory(type).createRouteMatcher(
config.route_config(), context);
if (config.dubbo_filters().empty()) {
ENVOY_LOG(debug, "using default router filter");

envoy::config::filter::network::dubbo_proxy::v2alpha1::DubboFilter router_config;
router_config.set_name(DubboFilters::DubboFilterNames::get().ROUTER);
registerFilter(router_config);
} else {
for (const auto& filter_config : config.dubbo_filters()) {
registerFilter(filter_config);
}
}
ENVOY_LOG(debug,"dubbo config impl create");
// auto type = RouteMatcherTypeMapper::lookupRouteMatcherType(config.protocol_type());
// route_matcher_ = Router::NamedRouteMatcherConfigFactory::getFactory(type).createRouteMatcher(
// config.route_config(), context);
// if (config.dubbo_filters().empty()) {
// ENVOY_LOG(debug, "using default router filter");
//
// envoy::config::filter::network::dubbo_proxy::v2alpha1::DubboFilter router_config;
// router_config.set_name(DubboFilters::DubboFilterNames::get().ROUTER);
// registerFilter(router_config);
// } else {
// for (const auto& filter_config : config.dubbo_filters()) {
// registerFilter(filter_config);
// }
// }
}

void ConfigImpl::createFilterChain(DubboFilters::FilterChainFactoryCallbacks& callbacks) {
Expand Down
43 changes: 39 additions & 4 deletions source/extensions/filters/http/dubbo_proxy/conn_manager.cc
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
#include "extensions/filters/http/dubbo_proxy/conn_manager.h"

#include <cstdint>
//#include <tclDecls.h>

#include "envoy/common/exception.h"

#include "common/common/fmt.h"
//#include "common/common/fmt.h"
#include "common/http/utility.h"

#include "extensions/filters/http/dubbo_proxy/app_exception.h"
#include "extensions/filters/http/dubbo_proxy/dubbo_hessian2_serializer_impl.h"
#include "extensions/filters/http/dubbo_proxy/dubbo_protocol_impl.h"
//#include "extensions/filters/http/dubbo_proxy/dubbo_protocol_impl.h"
#include "extensions/filters/http/dubbo_proxy/heartbeat_response.h"
#include "extensions/filters/http/dubbo_proxy/serializer_impl.h"

namespace Envoy {
namespace Extensions {
Expand Down Expand Up @@ -63,8 +66,15 @@ void ConnectionManager::setEncoderFilterCallbacks(Http::PrivateProtoFilterCallba
Network::FilterStatus ConnectionManager::onData(Buffer::Instance& data, bool end_stream) {
ENVOY_LOG(trace, "dubbo: read {} bytes", data.length());
request_buffer_.move(data);
ENVOY_LOG(trace, "dubbo: after move data length {} ", data.length());

dispatch();

//after dispath,put http buff to data
data.prepend(http_buffer_);
http_buffer_.drain(http_buffer_.length());
ENVOY_LOG(trace, "dubbo: after encap to http,data length {} ", data.length());

if (end_stream) {
ENVOY_CONN_LOG(trace, "downstream half-closed", private_proto_decoder_filter_callbacks_->connection());

Expand Down Expand Up @@ -126,7 +136,8 @@ StreamHandler& ConnectionManager::newStream() {
void ConnectionManager::onHeartbeat(MessageMetadataSharedPtr metadata) {
stats_.request_event_.inc();

if (read_callbacks_->connection().state() != Network::Connection::State::Open) {
// if (read_callbacks_->connection().state() != Network::Connection::State::Open) {
if (private_proto_decoder_filter_callbacks_->connection().state() != Network::Connection::State::Open) {
ENVOY_LOG(warn, "dubbo: downstream connection is closed or closing");
return;
}
Expand All @@ -138,7 +149,28 @@ void ConnectionManager::onHeartbeat(MessageMetadataSharedPtr metadata) {
Buffer::OwnedImpl response_buffer;
heartbeat.encode(*metadata, *protocol_, response_buffer);

read_callbacks_->connection().write(response_buffer, false);
// read_callbacks_->connection().write(response_buffer, false);
private_proto_decoder_filter_callbacks_->connection().write(response_buffer, false);
}

void ConnectionManager::encapHttpPkg() {
ActiveStream* as = decoder_->stateMachine().getActiveStream();
const auto invocation = dynamic_cast<const RpcInvocationImpl*>(&(as->metadata_->invocation_info()));
if (invocation->hasHeaders()) {
Http::Utility::encapHttpRequest("/",
read_callbacks_->connection().remoteAddress()->asString(),
const_cast<Http::HeaderMap &>(invocation->headers()),
as->context_->message_origin_data(),
http_buffer_);
} else {
Http::HeaderMapImpl header_map;
Http::Utility::encapHttpRequest("/",
read_callbacks_->connection().remoteAddress()->asString(),
header_map,
as->context_->message_origin_data(),
http_buffer_);
}

}

void ConnectionManager::dispatch() {
Expand All @@ -156,6 +188,9 @@ void ConnectionManager::dispatch() {
bool underflow = false;
while (!underflow) {
decoder_->onData(request_buffer_, underflow);

// one pkg decode ok,encap to http.
encapHttpPkg();
}
return;
} catch (const EnvoyException& ex) {
Expand Down
2 changes: 2 additions & 0 deletions source/extensions/filters/http/dubbo_proxy/conn_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ class ConnectionManager : public Network::ReadFilter,

private:
void dispatch();
void encapHttpPkg();
void resetAllMessages(bool local_reset);

Buffer::OwnedImpl request_buffer_;
Expand All @@ -122,6 +123,7 @@ class ConnectionManager : public Network::ReadFilter,

Http::PrivateProtoFilterCallbacks* private_proto_decoder_filter_callbacks_{};
Http::PrivateProtoFilterCallbacks* private_proto_encoder_filter_callbacks_{};
Buffer::OwnedImpl http_buffer_;
};

} // namespace DubboProxy
Expand Down
3 changes: 3 additions & 0 deletions source/extensions/filters/http/dubbo_proxy/decoder.cc
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "extensions/filters/http/dubbo_proxy/decoder.h"

#include "common/common/macros.h"
#include "common/http/utility.h"

namespace Envoy {
namespace Extensions {
Expand Down Expand Up @@ -118,6 +119,8 @@ FilterStatus DecoderBase::onData(Buffer::Instance& data, bool& buffer_underflow)

complete();
buffer_underflow = (data.length() == 0);


ENVOY_LOG(debug, "dubbo decoder: data length {}", data.length());
return FilterStatus::Continue;
}
Expand Down
4 changes: 4 additions & 0 deletions source/extensions/filters/http/dubbo_proxy/decoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ class DecoderStateMachine : public Logger::Loggable<Logger::Id::dubbo> {
*/
void setCurrentState(ProtocolState state) { state_ = state; }

ActiveStream* getActiveStream() { return active_stream_;}

private:
struct DecoderStatus {
DecoderStatus() = default;
Expand Down Expand Up @@ -140,6 +142,8 @@ class DecoderBase : public DecoderStateMachine::Delegate,

const Protocol& protocol() { return protocol_; }

DecoderStateMachine& stateMachine() {return *state_machine_;}

// It is assumed that all of the protocol parsing are stateless,
// if there is a state of the need to provide the reset interface call here.
void reset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,7 @@ void HttpConnectionManagerConfig::processFilter(
ENVOY_LOG(debug, "protobuf config: createFilterFactoryFromProto");
ProtobufTypes::MessagePtr message = Config::Utility::translateToFactoryConfig(
proto_config, context_.messageValidationVisitor(), factory);
ENVOY_LOG(debug, "protobuf config: message {}",message->DebugString());
callback = factory.createFilterFactoryFromProto(*message, stats_prefix_, context_);
}
filter_factories.push_back(callback);
Expand All @@ -378,10 +379,12 @@ void HttpConnectionManagerConfig::processPreSrvFilter(

// Now see if there is a factory that will accept the config.
auto& factory = Config::Utility::getAndCheckFactory<Server::Configuration::PrivateProtoNamedHttpFilterConfigFactory>(string_name);
ENVOY_LOG(debug, " facotory name: {}", factory.name());
Http::PrivateProtoFilterFactoryCb callback;
ProtobufTypes::MessagePtr message = Config::Utility::translateToFactoryConfig(
proto_config, context_.messageValidationVisitor(), factory);
callback = factory.createPrivateProtoFilterFactoryFromProto(*message,context_);
ENVOY_LOG(debug, " pre srv config msg : {}", message->DebugString());
filter_factories.push_back(callback);
}

Expand Down

0 comments on commit 1f9383a

Please sign in to comment.