Skip to content

Commit

Permalink
http-dubbo-filter test ok
Browse files Browse the repository at this point in the history
  • Loading branch information
ericling committed Sep 26, 2019
1 parent 1f9383a commit 25b26fb
Show file tree
Hide file tree
Showing 14 changed files with 151 additions and 66 deletions.
13 changes: 8 additions & 5 deletions source/common/http/codec_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,18 @@ void CodecClient::createPreSrvFilterChain(Http::PrivateProtoFilterChainFactoryCa

void CodecClient::addClientFilter(Http::PrivateProtoFilterSharedPtr filter) {
// set callback
privateProtoFilterPtr filterCallbackPtr(new privateProtoFilterCallbacks(*this));
filter->setDecoderFilterCallbacks(*filterCallbackPtr);
filter->setEncoderFilterCallbacks(*filterCallbackPtr);
// privateProtoFilterPtr filterCallbackPtr(new privateProtoFilterCallbacks(*this));
// filter->setDecoderFilterCallbacks(*filterCallbackPtr);
// filter->setEncoderFilterCallbacks(*filterCallbackPtr);

ClientStreamFilterPtr wrapper(new ClientStreamFilter(*this, filter));
filter->setDecoderFilterCallbacks(*wrapper);
filter->setEncoderFilterCallbacks(*wrapper);
wrapper->moveIntoListBack(std::move(wrapper), pre_client_filters_);
}
Network::Connection& CodecClient::privateProtoFilterCallbacks::connection() {
return *codec_client_.connection_;

Network::Connection& CodecClient::ClientStreamFilter::connection() {
return *codec_client_.connection_;
}

void CodecClient::close() { connection_->close(Network::ConnectionCloseType::NoFlush); }
Expand Down
17 changes: 9 additions & 8 deletions source/common/http/codec_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class CodecClient : public Logger::Loggable<Logger::Id::client>,
public Event::DeferredDeletable {
public:
// private proto filter list
struct ClientStreamFilter : LinkedObject<ClientStreamFilter> {
struct ClientStreamFilter : public PrivateProtoFilterCallbacks,LinkedObject<ClientStreamFilter> {
ClientStreamFilter(CodecClient& codec_client, PrivateProtoFilterSharedPtr filter)
: codec_client_(codec_client), handle_(filter) {}
CodecClient& codec_client_;
Expand All @@ -67,6 +67,7 @@ class CodecClient : public Logger::Loggable<Logger::Id::client>,
PrivateProtoFilterDataStatus status = handle_->encodeClientData(data, end_stream);
return status;
}
Network::Connection& connection() override ;
};
typedef std::unique_ptr<ClientStreamFilter> ClientStreamFilterPtr;
std::list<ClientStreamFilterPtr> pre_client_filters_;
Expand All @@ -82,13 +83,13 @@ class CodecClient : public Logger::Loggable<Logger::Id::client>,
void addPreSrvDecodeFilter(Http::PrivateProtoFilterSharedPtr filter ABSL_ATTRIBUTE_UNUSED) override {}
void decodePrivateProtoData(Buffer::Instance& data, bool end_stream) ;
//wrapper for filter callbacks
struct privateProtoFilterCallbacks: public PrivateProtoFilterCallbacks {
privateProtoFilterCallbacks(CodecClient& codec_client)
: codec_client_(codec_client) {}
Network::Connection& connection() override ;
CodecClient& codec_client_;
};
typedef std::unique_ptr<privateProtoFilterCallbacks> privateProtoFilterPtr;
// struct privateProtoFilterCallbacks: public PrivateProtoFilterCallbacks {
// privateProtoFilterCallbacks(CodecClient& codec_client)
// : codec_client_(codec_client) {}
// Network::Connection& connection() override ;
// CodecClient& codec_client_;
// };
// typedef std::unique_ptr<privateProtoFilterCallbacks> privateProtoFilterPtr;

/**
* Type of HTTP codec to use.
Expand Down
2 changes: 1 addition & 1 deletion source/common/http/codec_wrappers.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class StreamEncoderWrapper : public StreamEncoder, Logger::Loggable<Logger::Id::

void encodeRawData(Buffer::Instance& data, bool end_stream) override {
ENVOY_LOG(debug,"stream encoder wrapper inner encode raw data");
encodeData(data,end_stream);
inner_.encodeRawData(data,end_stream);
}

void encodeTrailers(const HeaderMap& trailers) override {
Expand Down
34 changes: 28 additions & 6 deletions source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,15 +117,30 @@ ConnectionManagerImpl::ConnectionManagerImpl(ConnectionManagerConfig& config,
}

void ConnectionManagerImpl::addPreSrvDecodeFilter(Http::PrivateProtoFilterSharedPtr filter) {
privateProtoFilterPtr filterPtr(new privateProtoFilterCallbacks(*this));
filter->setDecoderFilterCallbacks(*filterPtr);
ENVOY_LOG(debug,"ConnectionManagerImpl::addPreSrvDecodeFilter,{}", static_cast<void*>(this));
// privateProtoFilterPtr filterPtr(new privateProtoFilterCallbacks(*this));
// filter->setDecoderFilterCallbacks(*filterPtr);
//
// ENVOY_LOG(debug,"filter cb conn manager pointer {},filterptr {}",
// static_cast<void*>(&(filterPtr->connection_manager_)),
// static_cast<void*>(&(*filterPtr)));

// wrapper and add to list
PreSrvStreamFilterPtr wrapper(new PreSrvStreamFilter(*this, filter));
filter->setDecoderFilterCallbacks(*wrapper);
wrapper->moveIntoListBack(std::move(wrapper), pre_srv_decoder_filters_);

ENVOY_LOG(debug,"add to list end");
ENVOY_LOG(debug,"decoder filter connection impl addr {}", static_cast<void*>(&(pre_srv_decoder_filters_.front()->connection_manager_)));
}

Network::Connection& ConnectionManagerImpl::privateProtoFilterCallbacks::connection() {
//Network::Connection& ConnectionManagerImpl::privateProtoFilterCallbacks::connection() {
// ENVOY_LOG(debug,"privateProtoFilterCallbacks ,read_callbacks {}", static_cast<void*>(&connection_manager_.read_callbacks_));
// return connection_manager_.read_callbacks_->connection();
//}

Network::Connection& ConnectionManagerImpl::PreSrvStreamFilter::connection() {
// ENVOY_LOG(debug,"PreSrvStreamFilter,read_callbacks {}", static_cast<void*>(&connection_manager_.read_callbacks_));
return connection_manager_.read_callbacks_->connection();
}

Expand Down Expand Up @@ -293,6 +308,8 @@ StreamDecoder& ConnectionManagerImpl::newStream(StreamEncoder& response_encoder,
}

Network::FilterStatus ConnectionManagerImpl::onData(Buffer::Instance& data, bool end_stream) {
ENVOY_LOG(debug,"ConnectionManagerImpl::onData");

// decode private proto data
decodePrivateProtoData(data,end_stream);

Expand Down Expand Up @@ -1395,9 +1412,14 @@ void ConnectionManagerImpl::ActiveStream::encodeHeaders(ActiveStreamEncoderFilte

// Now actually encode via the codec.
stream_info_.onFirstDownstreamTxByteSent();
response_encoder_->encodeHeaders(
headers,
encoding_headers_only_ || (end_stream && continue_data_entry == encoder_filters_.end()));
if (Http::Utility::isPrivateProtoHeader(headers)) {
ENVOY_STREAM_LOG(debug, "rsponse header has private proto field,ignore encode", *this);
} else {
response_encoder_->encodeHeaders(
headers,
encoding_headers_only_ || (end_stream && continue_data_entry == encoder_filters_.end()));
}

if (continue_data_entry != encoder_filters_.end()) {
// We use the continueEncoding() code since it will correctly handle not calling
// encodeHeaders() again. Fake setting StopSingleIteration since the continueEncoding() code
Expand Down
18 changes: 10 additions & 8 deletions source/common/http/conn_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,13 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
void encodePrivateProtoData(Buffer::Instance& data, bool end_stream);
void addClientFilter(Http::PrivateProtoFilterSharedPtr filter ABSL_ATTRIBUTE_UNUSED) override {}
// wrapper of private proto callbacks
struct privateProtoFilterCallbacks: public PrivateProtoFilterCallbacks {
privateProtoFilterCallbacks(ConnectionManagerImpl& connection_manager)
: connection_manager_(connection_manager) {}
Network::Connection& connection() override;
ConnectionManagerImpl& connection_manager_;
};
typedef std::unique_ptr<privateProtoFilterCallbacks> privateProtoFilterPtr;
// struct privateProtoFilterCallbacks: public PrivateProtoFilterCallbacks {
// privateProtoFilterCallbacks(ConnectionManagerImpl& connection_manager)
// : connection_manager_(connection_manager) {}
// Network::Connection& connection() override;
// ConnectionManagerImpl& connection_manager_;
// };
// typedef std::unique_ptr<privateProtoFilterCallbacks> privateProtoFilterPtr;

// Network::ConnectionCallbacks
void onEvent(Network::ConnectionEvent event) override;
Expand Down Expand Up @@ -553,7 +553,7 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
/**
* Wrapper for a pre srv stream decoder filter.
*/
struct PreSrvStreamFilter : LinkedObject<PreSrvStreamFilter> {
struct PreSrvStreamFilter : public PrivateProtoFilterCallbacks, LinkedObject<PreSrvStreamFilter> {
PreSrvStreamFilter(ConnectionManagerImpl& connection_manager, PrivateProtoFilterSharedPtr filter)
: connection_manager_(connection_manager), handle_(filter) {}
ConnectionManagerImpl& connection_manager_;
Expand All @@ -569,6 +569,8 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
PrivateProtoFilterDataStatus status = handle_->encodeData(data, end_stream);
return status;
}

Network::Connection& connection() override;
};

typedef std::unique_ptr<PreSrvStreamFilter> PreSrvStreamFilterPtr;
Expand Down
1 change: 1 addition & 0 deletions source/common/http/http1/codec_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ void StreamEncoderImpl::encodeData(Buffer::Instance& data, bool end_stream) {
}

void StreamEncoderImpl::encodeRawData(Buffer::Instance& data, bool end_stream) {
ENVOY_LOG(debug,"stream encoder impl,encode raw data");
if (data.length() > 0) {
connection_.buffer().move(data);
}
Expand Down
1 change: 1 addition & 0 deletions source/common/router/router.cc
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

#include "common/router/router.h"

#include <chrono>
Expand Down
7 changes: 6 additions & 1 deletion source/extensions/filters/http/dubbo_proxy/active_message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ DubboFilters::UpstreamResponseStatus ActiveResponseDecoder::onData(Buffer::Insta
ENVOY_LOG(debug, "dubbo response: the received reply data length is {}", data.length());

bool underflow = false;
decoder_->onData(data, underflow);
// decoder_->onData(data, underflow);
ASSERT(complete_ || underflow);

return response_status_;
Expand Down Expand Up @@ -187,8 +187,13 @@ ActiveMessage::ActiveMessage(ConnectionManager& parent)
request_id_(-1), stream_id_(parent.random_generator().random()),
stream_info_(parent.time_system()), pending_stream_decoded_(false),
local_response_sent_(false) {
// ENVOY_LOG(debug,"active message new");
parent_.stats().request_active_.inc();
// ENVOY_LOG(debug,"set adddr {}", static_cast<void*>(&stream_info_));
// ENVOY_LOG(debug,"parent adddr {}", static_cast<void*>(&parent_));
// ENVOY_LOG(debug,"conn addr {}", static_cast<void*>(&parent_.connection()));
stream_info_.setDownstreamLocalAddress(parent_.connection().localAddress());
// ENVOY_LOG(debug,"set adddr 2");
stream_info_.setDownstreamRemoteAddress(parent_.connection().remoteAddress());
}

Expand Down
2 changes: 2 additions & 0 deletions source/extensions/filters/http/dubbo_proxy/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ Http::PrivateProtoFilterFactoryCb DubboProxyFilterConfigFactory::createFilterFac
return [filter_config, &context](Http::PrivateProtoFilterChainFactoryCallbacks& callbacks) -> void {
callbacks.addPreSrvDecodeFilter(std::make_shared<ConnectionManager>(
*filter_config, context.random(), context.dispatcher().timeSource()));
callbacks.addClientFilter(std::make_shared<ConnectionManager>(
*filter_config, context.random(), context.dispatcher().timeSource()));
};
}

Expand Down
46 changes: 17 additions & 29 deletions source/extensions/filters/http/dubbo_proxy/conn_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ ConnectionManager::ConnectionManager(Config& config, Runtime::RandomGenerator& r
decoder_(std::make_unique<RequestDecoder>(*protocol_, *this)) {}

Http::PrivateProtoFilterDataStatus ConnectionManager::decodeData(Buffer::Instance& data, bool end_stream) {
ENVOY_LOG(debug,"dubbo: decode server data, {}", static_cast<void*>(this));
Network::FilterStatus status = onData(data,end_stream);
if (status == Network::FilterStatus::StopIteration) {
return Http::PrivateProtoFilterDataStatus::StopIteration;
Expand All @@ -36,6 +37,9 @@ Http::PrivateProtoFilterDataStatus ConnectionManager::decodeData(Buffer::Instanc
}

Http::PrivateProtoFilterDataStatus ConnectionManager::decodeClientData(Buffer::Instance& data, bool end_stream) {
ENVOY_LOG(debug,"dubbo: decode client data,{}", static_cast<void*>(this));
setIsResponse(true);

Network::FilterStatus status = onData(data,end_stream);
if (status == Network::FilterStatus::StopIteration) {
return Http::PrivateProtoFilterDataStatus::StopIteration;
Expand All @@ -62,18 +66,22 @@ void ConnectionManager::setDecoderFilterCallbacks(Http::PrivateProtoFilterCallba
void ConnectionManager::setEncoderFilterCallbacks(Http::PrivateProtoFilterCallbacks& callbacks) {
private_proto_encoder_filter_callbacks_ = &callbacks;
}
void ConnectionManager::setIsResponse(bool is_response) {
is_response_ = is_response;
}

Network::FilterStatus ConnectionManager::onData(Buffer::Instance& data, bool end_stream) {

ENVOY_LOG(trace, "dubbo: read {} bytes", data.length());
request_buffer_.move(data);
ENVOY_LOG(trace, "dubbo: after move data length {} ", data.length());

dispatch();

//after dispath,put http buff to data
data.prepend(http_buffer_);
http_buffer_.drain(http_buffer_.length());
ENVOY_LOG(trace, "dubbo: after encap to http,data length {} ", data.length());
ENVOY_LOG(trace, "dubbo: after encap to http,data length {},http buff length {},data: {} ",
data.length(),http_buffer_.length(),data.toString());

if (end_stream) {
ENVOY_CONN_LOG(trace, "downstream half-closed", private_proto_decoder_filter_callbacks_->connection());
Expand Down Expand Up @@ -128,15 +136,18 @@ StreamHandler& ConnectionManager::newStream() {
ENVOY_LOG(debug, "dubbo: create the new decoder event handler");

ActiveMessagePtr new_message(std::make_unique<ActiveMessage>(*this));
new_message->createFilterChain();
// new_message->createFilterChain();
new_message->moveIntoList(std::move(new_message), active_message_list_);
return **active_message_list_.begin();
}

Network::Connection& ConnectionManager::connection() {
return private_proto_decoder_filter_callbacks_->connection();
}

void ConnectionManager::onHeartbeat(MessageMetadataSharedPtr metadata) {
stats_.request_event_.inc();

// if (read_callbacks_->connection().state() != Network::Connection::State::Open) {
if (private_proto_decoder_filter_callbacks_->connection().state() != Network::Connection::State::Open) {
ENVOY_LOG(warn, "dubbo: downstream connection is closed or closing");
return;
Expand All @@ -149,30 +160,9 @@ void ConnectionManager::onHeartbeat(MessageMetadataSharedPtr metadata) {
Buffer::OwnedImpl response_buffer;
heartbeat.encode(*metadata, *protocol_, response_buffer);

// read_callbacks_->connection().write(response_buffer, false);
private_proto_decoder_filter_callbacks_->connection().write(response_buffer, false);
}

void ConnectionManager::encapHttpPkg() {
ActiveStream* as = decoder_->stateMachine().getActiveStream();
const auto invocation = dynamic_cast<const RpcInvocationImpl*>(&(as->metadata_->invocation_info()));
if (invocation->hasHeaders()) {
Http::Utility::encapHttpRequest("/",
read_callbacks_->connection().remoteAddress()->asString(),
const_cast<Http::HeaderMap &>(invocation->headers()),
as->context_->message_origin_data(),
http_buffer_);
} else {
Http::HeaderMapImpl header_map;
Http::Utility::encapHttpRequest("/",
read_callbacks_->connection().remoteAddress()->asString(),
header_map,
as->context_->message_origin_data(),
http_buffer_);
}

}

void ConnectionManager::dispatch() {
if (0 == request_buffer_.length()) {
ENVOY_LOG(warn, "dubbo: it's empty data");
Expand All @@ -184,13 +174,11 @@ void ConnectionManager::dispatch() {
return;
}

decoder_->setIsResponse(is_response_);
try {
bool underflow = false;
while (!underflow) {
decoder_->onData(request_buffer_, underflow);

// one pkg decode ok,encap to http.
encapHttpPkg();
decoder_->onData(request_buffer_, underflow,http_buffer_,private_proto_decoder_filter_callbacks_->connection());
}
return;
} catch (const EnvoyException& ex) {
Expand Down
7 changes: 4 additions & 3 deletions source/extensions/filters/http/dubbo_proxy/conn_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class ConnectionManager : public Network::ReadFilter,

void setEncoderFilterCallbacks(Http::PrivateProtoFilterCallbacks& callbacks) override;
void setDecoderFilterCallbacks(Http::PrivateProtoFilterCallbacks& callbacks) override;

void setIsResponse(bool);

// Network::ReadFilter
Network::FilterStatus onData(Buffer::Instance& data, bool end_stream) override;
Expand All @@ -85,7 +85,8 @@ class ConnectionManager : public Network::ReadFilter,
void onHeartbeat(MessageMetadataSharedPtr metadata) override;

DubboFilterStats& stats() const { return stats_; }
Network::Connection& connection() const { return read_callbacks_->connection(); }
// Network::Connection& connection() const { return read_callbacks_->connection(); }
Network::Connection& connection();
TimeSource& time_system() const { return time_system_; }
Runtime::RandomGenerator& random_generator() const { return random_generator_; }
Config& config() const { return config_; }
Expand All @@ -102,7 +103,6 @@ class ConnectionManager : public Network::ReadFilter,

private:
void dispatch();
void encapHttpPkg();
void resetAllMessages(bool local_reset);

Buffer::OwnedImpl request_buffer_;
Expand All @@ -124,6 +124,7 @@ class ConnectionManager : public Network::ReadFilter,
Http::PrivateProtoFilterCallbacks* private_proto_decoder_filter_callbacks_{};
Http::PrivateProtoFilterCallbacks* private_proto_encoder_filter_callbacks_{};
Buffer::OwnedImpl http_buffer_;
bool is_response_{false};
};

} // namespace DubboProxy
Expand Down
Loading

0 comments on commit 25b26fb

Please sign in to comment.