Skip to content

Commit

Permalink
Fixed bugs related to protocols and encoding. (#154)
Browse files Browse the repository at this point in the history
  • Loading branch information
AdachiAndShimamura authored Sep 1, 2023
1 parent ec7ca4d commit d966a5e
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 78 deletions.
45 changes: 19 additions & 26 deletions dubbo/src/triple/client/triple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,17 +142,16 @@ impl TripleClient {
M1: Message + Send + Sync + 'static + Serialize,
M2: Message + Send + Sync + 'static + for<'a> Deserialize<'a> + Default,
{
let is_json = false;
let (decoder, encoder): (
Box<dyn Decoder<Item = M2, Error = Status> + Send + 'static>,
Box<dyn Encoder<Error = Status, Item = M1> + Send + 'static>,
) = get_codec(is_json);
) = get_codec("application/grpc+proto");
let req = req.map(|m| stream::once(future::ready(m)));
let body_stream = encode(
encoder,
req.into_inner().map(Ok),
self.send_compression_encoding,
is_json,
true,
)
.into_stream();
let body = hyper::Body::wrap_stream(body_stream);
Expand All @@ -179,9 +178,8 @@ impl TripleClient {

match response {
Ok(v) => {
let resp = v.map(|body| {
Decoding::new(body, decoder, self.send_compression_encoding, is_json)
});
let resp = v
.map(|body| Decoding::new(body, decoder, self.send_compression_encoding, true));
let (mut parts, body) = Response::from_http(resp).into_parts();

futures_util::pin_mut!(body);
Expand Down Expand Up @@ -215,17 +213,16 @@ impl TripleClient {
M1: Message + Send + Sync + 'static + Serialize,
M2: Message + Send + Sync + 'static + for<'a> Deserialize<'a> + Default,
{
let is_json = false;
let (decoder, encoder): (
Box<dyn Decoder<Item = M2, Error = Status> + Send + 'static>,
Box<dyn Encoder<Error = Status, Item = M1> + Send + 'static>,
) = get_codec(is_json);
) = get_codec("application/grpc+proto");
let req = req.into_streaming_request();
let en = encode(
encoder,
req.into_inner().map(Ok),
self.send_compression_encoding,
is_json,
true,
)
.into_stream();
let body = hyper::Body::wrap_stream(en);
Expand All @@ -251,9 +248,8 @@ impl TripleClient {

match response {
Ok(v) => {
let resp = v.map(|body| {
Decoding::new(body, decoder, self.send_compression_encoding, is_json)
});
let resp = v
.map(|body| Decoding::new(body, decoder, self.send_compression_encoding, true));

Ok(Response::from_http(resp))
}
Expand All @@ -271,17 +267,16 @@ impl TripleClient {
M1: Message + Send + Sync + 'static + Serialize,
M2: Message + Send + Sync + 'static + for<'a> Deserialize<'a> + Default,
{
let is_json = false;
let (decoder, encoder): (
Box<dyn Decoder<Item = M2, Error = Status> + Send + 'static>,
Box<dyn Encoder<Error = Status, Item = M1> + Send + 'static>,
) = get_codec(is_json);
) = get_codec("application/grpc+proto");
let req = req.into_streaming_request();
let en = encode(
encoder,
req.into_inner().map(Ok),
self.send_compression_encoding,
is_json,
true,
)
.into_stream();
let body = hyper::Body::wrap_stream(en);
Expand All @@ -308,9 +303,8 @@ impl TripleClient {

match response {
Ok(v) => {
let resp = v.map(|body| {
Decoding::new(body, decoder, self.send_compression_encoding, is_json)
});
let resp = v
.map(|body| Decoding::new(body, decoder, self.send_compression_encoding, true));
let (mut parts, body) = Response::from_http(resp).into_parts();

futures_util::pin_mut!(body);
Expand Down Expand Up @@ -344,17 +338,16 @@ impl TripleClient {
M1: Message + Send + Sync + 'static + Serialize,
M2: Message + Send + Sync + 'static + for<'a> Deserialize<'a> + Default,
{
let is_json = false;
let (decoder, encoder): (
Box<dyn Decoder<Item = M2, Error = Status> + Send + 'static>,
Box<dyn Encoder<Error = Status, Item = M1> + Send + 'static>,
) = get_codec(is_json);
) = get_codec("application/grpc+proto");
let req = req.map(|m| stream::once(future::ready(m)));
let en = encode(
encoder,
req.into_inner().map(Ok),
self.send_compression_encoding,
is_json,
true,
)
.into_stream();
let body = hyper::Body::wrap_stream(en);
Expand All @@ -379,9 +372,8 @@ impl TripleClient {

match response {
Ok(v) => {
let resp = v.map(|body| {
Decoding::new(body, decoder, self.send_compression_encoding, is_json)
});
let resp = v
.map(|body| Decoding::new(body, decoder, self.send_compression_encoding, true));

Ok(Response::from_http(resp))
}
Expand All @@ -391,7 +383,7 @@ impl TripleClient {
}

pub fn get_codec<M1, M2>(
is_json: bool,
content_type: &str,
) -> (
Box<dyn Decoder<Item = M2, Error = Status> + Send + 'static>,
Box<dyn Encoder<Error = Status, Item = M1> + Send + 'static>,
Expand All @@ -400,7 +392,8 @@ where
M1: Message + Send + Sync + 'static + Serialize,
M2: Message + Send + Sync + 'static + for<'a> Deserialize<'a> + Default,
{
match is_json {
//Determine whether to use JSON as the serialization method.
match content_type.ends_with("json") {
true => {
let mut codec = SerdeCodec::<M1, M2>::default();
(Box::new(codec.decoder()), Box::new(codec.encoder()))
Expand Down
23 changes: 12 additions & 11 deletions dubbo/src/triple/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ pub struct Decoding<T> {
trailers: Option<Metadata>,
compress: Option<CompressionEncoding>,
decompress_buf: BytesMut,
is_json: bool,
decode_as_grpc: bool,
}

#[derive(PartialEq)]
enum State {
ReadHeader,
ReadJSON,
ReadHttpBody,
ReadBody { len: usize, is_compressed: bool },
Error,
}
Expand All @@ -54,12 +54,13 @@ impl<T> Decoding<T> {
body: B,
decoder: Box<dyn Decoder<Item = T, Error = crate::status::Status> + Send + 'static>,
compress: Option<CompressionEncoding>,
is_json: bool,
decode_as_grpc: bool,
) -> Self
where
B: Body + Send + 'static,
B::Error: Into<crate::Error>,
{
//Determine whether to use the gRPC mode to handle request data
Self {
state: State::ReadHeader,
body: body
Expand All @@ -76,7 +77,7 @@ impl<T> Decoding<T> {
trailers: None,
compress,
decompress_buf: BytesMut::new(),
is_json,
decode_as_grpc,
}
}

Expand All @@ -98,12 +99,12 @@ impl<T> Decoding<T> {
trailer.map(|data| data.map(Metadata::from_headers))
}

pub fn decode_json(&mut self) -> Result<Option<T>, crate::status::Status> {
pub fn decode_http(&mut self) -> Result<Option<T>, crate::status::Status> {
if self.state == State::ReadHeader {
self.state = State::ReadJSON;
self.state = State::ReadHttpBody;
return Ok(None);
}
if let State::ReadJSON = self.state {
if let State::ReadHttpBody = self.state {
if self.buf.is_empty() {
return Ok(None);
}
Expand Down Expand Up @@ -138,7 +139,7 @@ impl<T> Decoding<T> {
Ok(None)
}

pub fn decode_proto(&mut self) -> Result<Option<T>, crate::status::Status> {
pub fn decode_grpc(&mut self) -> Result<Option<T>, crate::status::Status> {
if self.state == State::ReadHeader {
// buffer is full
if self.buf.remaining() < super::consts::HEADER_SIZE {
Expand Down Expand Up @@ -215,10 +216,10 @@ impl<T> Decoding<T> {
}

pub fn decode_chunk(&mut self) -> Result<Option<T>, crate::status::Status> {
if self.is_json {
self.decode_json()
if self.decode_as_grpc {
self.decode_grpc()
} else {
self.decode_proto()
self.decode_http()
}
}
}
Expand Down
34 changes: 17 additions & 17 deletions dubbo/src/triple/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub fn encode<E, B>(
mut encoder: Box<dyn Encoder<Error = Status, Item = E> + Send + 'static>,
resp_body: B,
compression_encoding: Option<CompressionEncoding>,
is_json: bool,
encode_as_grpc: bool,
) -> impl TryStream<Ok = Bytes, Error = Status>
where
B: Stream<Item = Result<E, Status>>,
Expand All @@ -48,10 +48,10 @@ where
loop {
match resp_body.next().await {
Some(Ok(item)) => {
if !is_json {
if encode_as_grpc {
buf.reserve(super::consts::HEADER_SIZE);
unsafe {
buf.advance_mut(super::consts::HEADER_SIZE);
unsafe {
buf.advance_mut(super::consts::HEADER_SIZE);
}
}
// 编码数据到缓冲中
Expand All @@ -67,18 +67,18 @@ where
} else {
encoder.encode(item, &mut EncodeBuf::new(&mut buf)).map_err(|_e| crate::status::Status::new(crate::status::Code::Internal, "encode error".to_string()));
}
let result=match is_json{
let result=match encode_as_grpc{
true=>{
buf.clone()
}
false=>{
let len = buf.len() - super::consts::HEADER_SIZE;
{
let mut buf = &mut buf[..super::consts::HEADER_SIZE];
buf.put_u8(enable_compress as u8);
buf.put_u32(len as u32);
{
let mut buf = &mut buf[..super::consts::HEADER_SIZE];
buf.put_u8(enable_compress as u8);
buf.put_u32(len as u32);
}
buf.split_to(len + super::consts::HEADER_SIZE)
}
buf.split_to(len + super::consts::HEADER_SIZE)
false=>{
buf.clone()
}
};
yield Ok(result.freeze());
Expand All @@ -94,25 +94,25 @@ pub fn encode_server<E, B>(
encoder: Box<dyn Encoder<Error = Status, Item = E> + Send + 'static>,
body: B,
compression_encoding: Option<CompressionEncoding>,
is_json: bool,
encode_as_grpc: bool,
) -> EncodeBody<impl Stream<Item = Result<Bytes, Status>>>
where
B: Stream<Item = Result<E, Status>>,
{
let s = encode(encoder, body, compression_encoding, is_json).into_stream();
let s = encode(encoder, body, compression_encoding, encode_as_grpc).into_stream();
EncodeBody::new_server(s)
}

pub fn encode_client<E, B>(
encoder: Box<dyn Encoder<Error = Status, Item = E> + Send + 'static>,
body: B,
compression_encoding: Option<CompressionEncoding>,
is_json: bool,
is_grpc: bool,
) -> EncodeBody<impl Stream<Item = Result<Bytes, Status>>>
where
B: Stream<Item = E>,
{
let s = encode(encoder, body.map(Ok), compression_encoding, is_json).into_stream();
let s = encode(encoder, body.map(Ok), compression_encoding, is_grpc).into_stream();
EncodeBody::new_client(s)
}

Expand Down
Loading

0 comments on commit d966a5e

Please sign in to comment.