Skip to content

Commit

Permalink
Optimize bytes serde performance. (#118)
Browse files Browse the repository at this point in the history
  • Loading branch information
gftea authored Dec 23, 2023
1 parent de21bbe commit f852da2
Show file tree
Hide file tree
Showing 14 changed files with 61 additions and 101 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/regression_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ on:
- 'prepare_release.sh'
- 'regression_test.sh'
workflow_dispatch:
branches: ["main"]

pull_request:
branches: ["main"]
paths-ignore:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ on:
- 'prepare_release.sh'
- 'regression_test.sh'
workflow_dispatch:
branches: ["main"]

pull_request:
branches: ["main"]
paths-ignore:
Expand Down
52 changes: 37 additions & 15 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,7 @@ resolver = "2"
# debug = false
# lto = true
# codegen-units = 1


[workspace.dependencies]
serde_bytes_ng = { version = "0.1.1" }
1 change: 1 addition & 0 deletions amqp_serde/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@ documentation = "https://docs.rs/amqp_serde/latest/amqp_serde/"
[dependencies]
serde = { version = "1.0", features = ["derive"] }
bytes = { version = "1.0" }
serde_bytes_ng = { workspace = true }

[dev-dependencies]
6 changes: 3 additions & 3 deletions amqp_serde/src/de.rs
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,7 @@ mod tests {
fn test_eof() {
#[derive(Deserialize)]
struct Frame(u32);
let input = vec![0x00];
let input = [0x00];
let _: Frame = from_bytes(&input[..]).unwrap();
}

Expand All @@ -746,7 +746,7 @@ mod tests {
fn test_missing_length() {
#[derive(Deserialize)]
struct Frame(Vec<u8>);
let input = vec![0, 1, 2];
let input = [0, 1, 2];
let _: Frame = from_bytes(&input[..]).unwrap();
}

Expand All @@ -755,7 +755,7 @@ mod tests {
fn test_syntax_err() {
#[derive(Deserialize)]
struct Frame(u8, Vec<u8>);
let input = vec![9, 0, 0];
let input = [9, 0, 0];
let _: Frame = from_bytes(&input[..]).unwrap();
}

Expand Down
68 changes: 1 addition & 67 deletions amqp_serde/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ impl fmt::Display for DecimalValue {
/// AMQP byte array type.
#[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Clone)]
pub struct ByteArray(LongUint, Vec<u8>);
pub struct ByteArray(LongUint, #[serde(with = "serde_bytes_ng")] Vec<u8>);
impl TryFrom<Vec<u8>> for ByteArray {
type Error = TryFromIntError;

Expand Down Expand Up @@ -492,72 +492,6 @@ impl AsRef<HashMap<FieldName, FieldValue>> for FieldTable {
}
}

/////////////////////////////////////////////////////////////////////////////
// #[derive(Serialize, Deserialize, PartialEq, Debug, Clone)]
// pub struct FieldTable(HashMap<FieldName, FieldValue>);
// impl Default for FieldTable {
// fn default() -> Self {
// Self(HashMap::new())
// }
// }

// impl Deref for FieldTable {
// type Target = HashMap<FieldName, FieldValue>;

// fn deref(&self) -> &Self::Target {
// &self.0
// }
// }

// impl FieldTable {
// pub fn new() -> Self {
// Self(HashMap::new())
// }

// pub fn insert<V>(&mut self, k: String, v: V)
// where
// V: Any,
// {
// let k: FieldName = k.try_into().unwrap();
// let v = &v as &dyn Any;

// if v.is::<bool>() {
// let v = v.downcast_ref::<bool>().unwrap();
// let old = self.0.insert(k, FieldValue::t(v.clone()));
// } else if v.is::<i8>() {
// let v = v.downcast_ref::<i8>().unwrap();
// let old = self.0.insert(k, FieldValue::b(v.clone()));
// } else if v.is::<u8>() {
// let v = v.downcast_ref::<u8>().unwrap();
// let old = self.0.insert(k, FieldValue::B(v.clone()));
// } else if v.is::<i16>() {
// let v = v.downcast_ref::<bool>().unwrap();
// let old = self.0.insert(k, FieldValue::t(v.clone()));
// } else if v.is::<u16>() {
// } else if v.is::<i32>() {
// } else if v.is::<u32>() {
// } else if v.is::<i64>() {
// } else if v.is::<f32>() {
// } else if v.is::<f64>() {
// } else if v.is::<DecimalValue>() {
// } else if v.is::<String>() {
// // RabbitMQ does not have "short string" type in field value,
// let v = v.downcast_ref::<String>().unwrap();
// let old = self
// .0
// .insert(k, FieldValue::S(v.clone().try_into().unwrap()));
// } else if v.is::<FieldArray>() {
// } else if v.is::<u64>() { // RabbitMQ do not have "Unsigned 64-bit" field value, so `u64` can be uniquely mapped to TimeStamp
// } else if v.is::<Self>() {
// } else if v.is::<()>() {
// } else if v.is::<ByteArray>() {
// } else {
// panic!("unsupported value type {:?} ", v);
// }

// }
// }

/////////////////////////////////////////////////////////////////////////////
// AMQP domains
/// Note: it is different from definition in [`RabbitMQ Definition`].
Expand Down
1 change: 1 addition & 0 deletions amqprs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ amqp_serde = { path = "../amqp_serde", version = "0.4" }
async-trait = "0.1"
tracing = { version = "0.1", optional = true }
uriparse = { version = "0.6", optional = true }
serde_bytes_ng = { workspace = true }

# SSL/TLS dependencies
tokio-rustls = { version = "0.23", optional = true }
Expand Down
1 change: 0 additions & 1 deletion amqprs/src/api/channel/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -970,7 +970,6 @@ mod tests {
consumer::DefaultConsumer,
},
frame::BasicProperties,
DELIVERY_MODE_TRANSIENT,
};
use tokio::time;

Expand Down
2 changes: 1 addition & 1 deletion amqprs/src/api/channel/confim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ mod tests {
channel::BasicPublishArguments,
connection::{Connection, OpenConnectionArguments},
test_utils::setup_logging,
BasicProperties, DELIVERY_MODE_TRANSIENT,
BasicProperties,
};

use super::ConfirmSelectArguments;
Expand Down
6 changes: 3 additions & 3 deletions amqprs/src/api/channel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ mod tests {

#[ignore = "https://github.com/gftea/amqprs/issues/69"]
#[tokio::test]
async fn test_channel_is_not_cloneable() {
async fn test_channel_cloneable() {
// default: `IS_CLONEABLE = false` for all types
trait NotCloneable {
const IS_CLONEABLE: bool = false;
Expand All @@ -438,8 +438,8 @@ mod tests {
impl<T: Clone> Wrapper<T> {
const IS_CLONEABLE: bool = true;
}

assert_eq!(false, <Wrapper<Channel>>::IS_CLONEABLE);
// Prevent clippy to report assertion on const value.
assert_eq!(<Wrapper<Channel>>::IS_CLONEABLE.to_string(), "true");
}

#[tokio::test]
Expand Down
2 changes: 1 addition & 1 deletion amqprs/src/api/channel/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ mod tests {
callbacks::{DefaultChannelCallback, DefaultConnectionCallback},
channel::BasicPublishArguments,
connection::{Connection, OpenConnectionArguments},
BasicProperties, DELIVERY_MODE_TRANSIENT,
BasicProperties,
};

#[tokio::test]
Expand Down
14 changes: 6 additions & 8 deletions amqprs/src/api/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -607,13 +607,11 @@ impl Connection {
FieldValue::S("0.1".try_into().unwrap()),
);
let mut client_properties_capabilities = FieldTable::new();
client_properties_capabilities.insert(
"consumer_cancel_notify".try_into().unwrap(),
true.into()
);
client_properties_capabilities
.insert("consumer_cancel_notify".try_into().unwrap(), true.into());
client_properties.insert(
"capabilities".try_into().unwrap(),
FieldValue::F(client_properties_capabilities)
FieldValue::F(client_properties_capabilities),
);

// S: `Start` C: `StartOk`
Expand Down Expand Up @@ -1301,9 +1299,9 @@ mod tests {
let conn1 = Connection::open(&args).await.unwrap();
let conn2 = conn1.clone();
tokio::spawn(async move {
assert_eq!(true, conn2.is_open());
assert!(conn2.is_open());
});
assert_eq!(true, conn1.is_open());
assert!(conn1.is_open());
}
// wait for finished, otherwise runtime exit before all tasks are done
time::sleep(time::Duration::from_millis(100)).await;
Expand Down Expand Up @@ -1348,7 +1346,7 @@ mod tests {
jh.push(thread::spawn(|| generate_connection_name("testdomain")));
}
for h in jh {
assert_eq!(true, res.insert(h.join().unwrap()));
assert!(res.insert(h.join().unwrap()));
}
}

Expand Down
1 change: 1 addition & 0 deletions amqprs/src/frame/content_body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use super::Frame;

#[derive(Debug, Serialize)]
pub struct ContentBody {
#[serde(with = "serde_bytes_ng")]
pub(crate) inner: Vec<u8>,
}

Expand Down

0 comments on commit f852da2

Please sign in to comment.