Skip to content

Commit

Permalink
Remove StringOrBase64 (#1112)
Browse files Browse the repository at this point in the history
* Remove StringOrBase64

* Fix cargo fmt

* Use serde_json::from_reader

* Align error text

* Fix cargo clippy
  • Loading branch information
Mallets authored Jun 10, 2024
1 parent 7d5d1d4 commit 13ed78e
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 94 deletions.
6 changes: 3 additions & 3 deletions plugins/zenoh-plugin-rest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use http_types::Method;
use serde::{Deserialize, Serialize};
use tide::{http::Mime, sse::Sender, Request, Response, Server, StatusCode};
use zenoh::{
bytes::{StringOrBase64, ZBytes},
bytes::ZBytes,
encoding::Encoding,
internal::{
plugins::{RunningPluginTrait, ZenohPlugin},
Expand Down Expand Up @@ -76,11 +76,11 @@ fn payload_to_json(payload: &ZBytes, encoding: &Encoding) -> serde_json::Value {
payload
.deserialize::<serde_json::Value>()
.unwrap_or_else(|_| {
serde_json::Value::String(StringOrBase64::from(payload).into_string())
serde_json::Value::String(base64_encode(&Cow::from(payload)))
})
}
// otherwise convert to JSON string
_ => serde_json::Value::String(StringOrBase64::from(payload).into_string()),
_ => serde_json::Value::String(base64_encode(&Cow::from(payload))),
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use std::{
borrow::Cow,
cmp::Ordering,
collections::{BTreeSet, HashMap, HashSet},
str,
Expand All @@ -20,8 +21,8 @@ use std::{

use async_std::sync::Arc;
use zenoh::{
bytes::StringOrBase64, key_expr::OwnedKeyExpr, prelude::*, sample::Sample, selector::Selector,
time::Timestamp, value::Value, Session,
key_expr::OwnedKeyExpr, prelude::*, sample::Sample, selector::Selector, time::Timestamp,
value::Value, Session,
};

use super::{digest::*, Snapshotter};
Expand Down Expand Up @@ -234,8 +235,11 @@ impl AlignQueryable {
tracing::trace!(
"[ALIGN QUERYABLE] Received ('{}': '{}' @ {:?})",
sample.key_expr().as_str(),
StringOrBase64::from(sample.payload()),
sample.timestamp()
sample
.payload()
.deserialize::<Cow<str>>()
.unwrap_or(Cow::Borrowed("<malformed>")),
sample.timestamp(),
);
if let Some(timestamp) = sample.timestamp() {
match timestamp.cmp(&logentry.timestamp) {
Expand Down
19 changes: 11 additions & 8 deletions plugins/zenoh-plugin-storage-manager/src/replica/aligner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@
//

use std::{
borrow::Cow,
collections::{HashMap, HashSet},
str,
};

use async_std::sync::{Arc, RwLock};
use flume::{Receiver, Sender};
use zenoh::{
bytes::StringOrBase64,
key_expr::{KeyExpr, OwnedKeyExpr},
prelude::*,
sample::{Sample, SampleBuilder},
Expand Down Expand Up @@ -216,15 +216,15 @@ impl Aligner {
let mut other_intervals: HashMap<u64, u64> = HashMap::new();
// expecting sample.payload to be a vec of intervals with their checksum
for each in reply_content {
match serde_json::from_str(&StringOrBase64::from(each.payload())) {
match serde_json::from_reader(each.payload().reader()) {
Ok((i, c)) => {
other_intervals.insert(i, c);
}
Err(e) => {
tracing::error!("[ALIGNER] Error decoding reply: {}", e);
no_err = false;
}
};
}
}
(other_intervals, no_err)
} else {
Expand Down Expand Up @@ -262,15 +262,15 @@ impl Aligner {
let (reply_content, mut no_err) = self.perform_query(other_rep, properties).await;
let mut other_subintervals: HashMap<u64, u64> = HashMap::new();
for each in reply_content {
match serde_json::from_str(&StringOrBase64::from(each.payload())) {
match serde_json::from_reader(each.payload().reader()) {
Ok((i, c)) => {
other_subintervals.insert(i, c);
}
Err(e) => {
tracing::error!("[ALIGNER] Error decoding reply: {}", e);
no_err = false;
}
};
}
}
(other_subintervals, no_err)
};
Expand Down Expand Up @@ -303,15 +303,15 @@ impl Aligner {
let (reply_content, mut no_err) = self.perform_query(other_rep, properties).await;
let mut other_content: HashMap<u64, Vec<LogEntry>> = HashMap::new();
for each in reply_content {
match serde_json::from_str(&StringOrBase64::from(each.payload())) {
match serde_json::from_reader(each.payload().reader()) {
Ok((i, c)) => {
other_content.insert(i, c);
}
Err(e) => {
tracing::error!("[ALIGNER] Error decoding reply: {}", e);
no_err = false;
}
};
}
}
// get subintervals diff
let result = this.get_full_content_diff(other_content);
Expand Down Expand Up @@ -343,7 +343,10 @@ impl Aligner {
tracing::trace!(
"[ALIGNER] Received ('{}': '{}')",
sample.key_expr().as_str(),
StringOrBase64::from(sample.payload())
sample
.payload()
.deserialize::<Cow<str>>()
.unwrap_or(Cow::Borrowed("<malformed>"))
);
return_val.push(sample);
}
Expand Down
29 changes: 14 additions & 15 deletions plugins/zenoh-plugin-storage-manager/src/replica/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@

use std::{
collections::{HashMap, HashSet},
str,
str::FromStr,
str::{self, FromStr},
time::{Duration, SystemTime},
};

Expand All @@ -44,9 +43,7 @@ pub use aligner::Aligner;
pub use digest::{Digest, DigestConfig, EraType, LogEntry};
pub use snapshotter::Snapshotter;
pub use storage::{ReplicationService, StorageService};
use zenoh::{
bytes::StringOrBase64, key_expr::OwnedKeyExpr, sample::Locality, time::Timestamp, Session,
};
use zenoh::{key_expr::OwnedKeyExpr, sample::Locality, time::Timestamp, Session};

const ERA: &str = "era";
const INTERVALS: &str = "intervals";
Expand Down Expand Up @@ -227,21 +224,23 @@ impl Replica {
};
let from = &sample.key_expr().as_str()
[Replica::get_digest_key(&self.key_expr, ALIGN_PREFIX).len() + 1..];
tracing::trace!(
"[DIGEST_SUB] From {} Received {} ('{}': '{}')",
from,
sample.kind(),
sample.key_expr().as_str(),
StringOrBase64::from(sample.payload())
);
let digest: Digest = match serde_json::from_str(&StringOrBase64::from(sample.payload()))
{

let digest: Digest = match serde_json::from_reader(sample.payload().reader()) {
Ok(digest) => digest,
Err(e) => {
tracing::error!("[DIGEST_SUB] Error in decoding the digest: {}", e);
continue;
}
};

tracing::trace!(
"[DIGEST_SUB] From {} Received {} ('{}': '{:?}')",
from,
sample.kind(),
sample.key_expr().as_str(),
digest,
);

let ts = digest.timestamp;
let to_be_processed = self
.processing_needed(
Expand All @@ -260,7 +259,7 @@ impl Replica {
tracing::error!("[DIGEST_SUB] Error sending digest to aligner: {}", e)
}
}
};
}
received.insert(from.to_string(), ts);
}
}
Expand Down
12 changes: 6 additions & 6 deletions plugins/zenoh-plugin-storage-manager/tests/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
// 1. normal case, just some wild card puts and deletes on existing keys and ensure it works
// 2. check for dealing with out of order updates

use std::{str::FromStr, thread::sleep};
use std::{borrow::Cow, str::FromStr, thread::sleep};

use async_std::task;
use zenoh::{
bytes::StringOrBase64, internal::zasync_executor_init, prelude::*, query::Reply,
sample::Sample, time::Timestamp, Config, Session,
internal::zasync_executor_init, prelude::*, query::Reply, sample::Sample, time::Timestamp,
Config, Session,
};
use zenoh_plugin_trait::Plugin;

Expand Down Expand Up @@ -96,7 +96,7 @@ async fn test_updates_in_order() {
// expects exactly one sample
let data = get_data(&session, "operation/test/a").await;
assert_eq!(data.len(), 1);
assert_eq!(StringOrBase64::from(data[0].payload()).as_str(), "1");
assert_eq!(data[0].payload().deserialize::<Cow<str>>().unwrap(), "1");

put_data(
&session,
Expand All @@ -112,7 +112,7 @@ async fn test_updates_in_order() {
// expects exactly one sample
let data = get_data(&session, "operation/test/b").await;
assert_eq!(data.len(), 1);
assert_eq!(StringOrBase64::from(data[0].payload()).as_str(), "2");
assert_eq!(data[0].payload().deserialize::<Cow<str>>().unwrap(), "2");

delete_data(
&session,
Expand All @@ -131,7 +131,7 @@ async fn test_updates_in_order() {
// expects exactly one sample
let data = get_data(&session, "operation/test/b").await;
assert_eq!(data.len(), 1);
assert_eq!(StringOrBase64::from(data[0].payload()).as_str(), "2");
assert_eq!(data[0].payload().deserialize::<Cow<str>>().unwrap(), "2");
assert_eq!(data[0].key_expr().as_str(), "operation/test/b");

drop(storage);
Expand Down
28 changes: 20 additions & 8 deletions plugins/zenoh-plugin-storage-manager/tests/wildcard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
// 1. normal case, just some wild card puts and deletes on existing keys and ensure it works
// 2. check for dealing with out of order updates

use std::{str::FromStr, thread::sleep};
use std::{borrow::Cow, str::FromStr, thread::sleep};

// use std::collections::HashMap;
use async_std::task;
use zenoh::{
bytes::StringOrBase64, internal::zasync_executor_init, prelude::*, query::Reply,
sample::Sample, time::Timestamp, Config, Session,
internal::zasync_executor_init, prelude::*, query::Reply, sample::Sample, time::Timestamp,
Config, Session,
};
use zenoh_plugin_trait::Plugin;

Expand Down Expand Up @@ -113,7 +113,7 @@ async fn test_wild_card_in_order() {
let data = get_data(&session, "wild/test/*").await;
assert_eq!(data.len(), 1);
assert_eq!(data[0].key_expr().as_str(), "wild/test/a");
assert_eq!(StringOrBase64::from(data[0].payload()).as_str(), "2");
assert_eq!(data[0].payload().deserialize::<Cow<str>>().unwrap(), "2");

put_data(
&session,
Expand All @@ -131,8 +131,20 @@ async fn test_wild_card_in_order() {
assert_eq!(data.len(), 2);
assert!(["wild/test/a", "wild/test/b"].contains(&data[0].key_expr().as_str()));
assert!(["wild/test/a", "wild/test/b"].contains(&data[1].key_expr().as_str()));
assert!(["2", "3"].contains(&StringOrBase64::from(data[0].payload()).as_str()));
assert!(["2", "3"].contains(&StringOrBase64::from(data[1].payload()).as_str()));
assert!(["2", "3"].contains(
&data[0]
.payload()
.deserialize::<Cow<str>>()
.unwrap()
.as_ref()
));
assert!(["2", "3"].contains(
&data[1]
.payload()
.deserialize::<Cow<str>>()
.unwrap()
.as_ref()
));

put_data(
&session,
Expand All @@ -150,8 +162,8 @@ async fn test_wild_card_in_order() {
assert_eq!(data.len(), 2);
assert!(["wild/test/a", "wild/test/b"].contains(&data[0].key_expr().as_str()));
assert!(["wild/test/a", "wild/test/b"].contains(&data[1].key_expr().as_str()));
assert_eq!(StringOrBase64::from(data[0].payload()).as_str(), "4");
assert_eq!(StringOrBase64::from(data[1].payload()).as_str(), "4");
assert_eq!(data[0].payload().deserialize::<Cow<str>>().unwrap(), "4");
assert_eq!(data[1].payload().deserialize::<Cow<str>>().unwrap(), "4");

delete_data(
&session,
Expand Down
49 changes: 1 addition & 48 deletions zenoh/src/api/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

//! ZBytes primitives.
use std::{
borrow::Cow, convert::Infallible, fmt::Debug, marker::PhantomData, ops::Deref, str::Utf8Error,
borrow::Cow, convert::Infallible, fmt::Debug, marker::PhantomData, str::Utf8Error,
string::FromUtf8Error, sync::Arc,
};

Expand Down Expand Up @@ -1806,53 +1806,6 @@ where
}
}

// For convenience to always convert a Value in the examples
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum StringOrBase64 {
String(String),
Base64(String),
}

impl StringOrBase64 {
pub fn into_string(self) -> String {
match self {
StringOrBase64::String(s) | StringOrBase64::Base64(s) => s,
}
}
}

impl Deref for StringOrBase64 {
type Target = String;

fn deref(&self) -> &Self::Target {
match self {
Self::String(s) | Self::Base64(s) => s,
}
}
}

impl std::fmt::Display for StringOrBase64 {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self)
}
}

impl From<&ZBytes> for StringOrBase64 {
fn from(v: &ZBytes) -> Self {
use base64::{engine::general_purpose::STANDARD as b64_std_engine, Engine};
match v.deserialize::<String>() {
Ok(s) => StringOrBase64::String(s),
Err(_) => StringOrBase64::Base64(b64_std_engine.encode(v.into::<Vec<u8>>())),
}
}
}

impl From<&mut ZBytes> for StringOrBase64 {
fn from(v: &mut ZBytes) -> Self {
StringOrBase64::from(&*v)
}
}

// Protocol attachment extension
impl<const ID: u8> From<ZBytes> for AttachmentType<ID> {
fn from(this: ZBytes) -> Self {
Expand Down
4 changes: 2 additions & 2 deletions zenoh/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,8 @@ pub mod encoding {
/// Payload primitives
pub mod bytes {
pub use crate::api::bytes::{
Deserialize, OptionZBytes, Serialize, StringOrBase64, ZBytes, ZBytesIterator, ZBytesReader,
ZBytesWriter, ZDeserializeError, ZSerde,
Deserialize, OptionZBytes, Serialize, ZBytes, ZBytesIterator, ZBytesReader, ZBytesWriter,
ZDeserializeError, ZSerde,
};
}

Expand Down

0 comments on commit 13ed78e

Please sign in to comment.