Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sample api rework #858

Merged
merged 75 commits into from
Apr 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
75 commits
Select commit Hold shift + click to select a range
5b18594
replaced sample new to put/delete
milyin Mar 19, 2024
1038beb
interceptors removed
milyin Mar 19, 2024
09a84b3
interceptors removed
milyin Mar 19, 2024
886c37c
storage sample added
milyin Mar 19, 2024
780c82a
some compile error fixes
milyin Mar 19, 2024
af0d167
removed interceptor proxy
milyin Mar 19, 2024
067823d
sample builders
milyin Mar 20, 2024
4f1ba2f
compiles
milyin Mar 20, 2024
9a71712
Merge branch 'protocol_changes' into sample_api_rework
milyin Mar 22, 2024
d7cb97a
SampleBuilderTrait
milyin Mar 22, 2024
a05b93d
reply builder unfinished
milyin Mar 23, 2024
0992ff8
replybuilder unfinished
milyin Mar 23, 2024
62378ad
new reply_sample
milyin Mar 24, 2024
cc580a5
sample decompose, opt setters
milyin Mar 24, 2024
2708402
samples, plugins updated
milyin Mar 24, 2024
b80fd0a
interceptors removed from plugin storage API
milyin Mar 24, 2024
265d1cf
Merge branch 'remove_storage_interceptors' into sample_api_rework
milyin Mar 24, 2024
7d2abd4
deconstruct sample api used
milyin Mar 24, 2024
2b1071f
comment, clippy fix
milyin Mar 24, 2024
3386237
clippy fix
milyin Mar 24, 2024
f52140a
zenoh-ext links zenoh with unstable
milyin Mar 24, 2024
a629c76
samplefields used
milyin Mar 24, 2024
1945492
restored old storage manager code
milyin Mar 25, 2024
48d8d77
separate qosbuilder trait
milyin Mar 25, 2024
322a4e0
removed `with_keyexpr` from trait
milyin Mar 25, 2024
9515c7d
put, delete builder
milyin Mar 25, 2024
4e14cf9
build fixes
milyin Mar 26, 2024
5b9e3bd
Merge branch 'protocol_changes' into sample_api_rework
milyin Mar 26, 2024
8cd60d0
Publication updated
milyin Mar 26, 2024
00e0a59
build fix
milyin Mar 26, 2024
e601271
reply_sample restored
milyin Mar 27, 2024
ea4020d
build fixes
milyin Mar 27, 2024
5f0b531
clippy warning fix
milyin Mar 27, 2024
8c90028
Merge branch 'clippy_truncate_warning_fix' into sample_api_rework
milyin Mar 27, 2024
fdec8dc
Merge branch 'protocol_changes' into sample_api_rework
milyin Mar 27, 2024
b6a243d
Merge branch 'protocol_changes' into sample_api_rework
milyin Mar 27, 2024
ce5b610
sample api for GetBuilder
milyin Mar 28, 2024
0bce160
restored "express" name
milyin Mar 28, 2024
3620c3a
removed 'timestamp_opt'
milyin Mar 28, 2024
aafd2a4
with removed, into<opttion<>> added
milyin Mar 28, 2024
fb6509d
into to encoding returned
milyin Mar 28, 2024
2ff6bc2
example build fix
milyin Mar 28, 2024
5bbef9c
with removed
milyin Mar 28, 2024
c427ac7
resolvable removed from simple builders
milyin Mar 28, 2024
10baf8c
doctests fixed
milyin Mar 28, 2024
e74caa4
Merge branch 'protocol_changes' into sample_api_rework
milyin Mar 28, 2024
48cb96b
sample bulider in separarte module
milyin Mar 28, 2024
ddb93a2
separate module
milyin Mar 28, 2024
ab96aab
SampleBuilder put/delete
milyin Mar 28, 2024
82c1c99
set value api
milyin Mar 28, 2024
b5a1f6b
with removed
milyin Mar 28, 2024
1c95157
commented code removed
milyin Mar 28, 2024
d9eb96a
map-from removed
milyin Mar 28, 2024
0ee0908
Merge branch 'protocol_changes' into sample_api_rework
milyin Mar 28, 2024
e4501f4
build warnings fixed
milyin Mar 28, 2024
43a4937
SampleBuilder uses generics
Mallets Mar 29, 2024
6c305a1
Improve Query builders with generics
Mallets Mar 29, 2024
bca953d
Reorg sample files
Mallets Mar 29, 2024
9d1a540
Remove error op struct in SampleBuilder
Mallets Mar 29, 2024
7904d09
Add forgotten file
Mallets Mar 29, 2024
1eede12
Merge branch 'protocol_changes' into sample_api_rework
milyin Mar 31, 2024
8d282a8
Merge pull request #882 from eclipse-zenoh/sample_api_rework_builder
milyin Mar 31, 2024
2992d45
Merge branch 'sample_api_rework' of github.com:eclipse-zenoh/zenoh in…
milyin Mar 31, 2024
ab349b2
support of TryIntoKeyexpr
milyin Apr 1, 2024
e4c4be1
removed "op" namespace to align naming with ReplyBuilder
milyin Apr 1, 2024
d631f76
publication builder shortened
milyin Apr 1, 2024
9b8aaa6
parametrized publication builder
milyin Apr 1, 2024
bbe07f7
removed PutPublication, DeletePublication
milyin Apr 1, 2024
4d0f6e5
removed extra uses
milyin Apr 1, 2024
23931f9
more cleanup
milyin Apr 1, 2024
8f8eb25
typedefs for complex builder types (#890)
milyin Apr 5, 2024
eb1a80a
Fix use and unstable visibility
Mallets Apr 5, 2024
a43e451
Add payload and encoding accessors for Query
Mallets Apr 5, 2024
1ad8c84
cargo fmt --all
Mallets Apr 5, 2024
232177f
Merge pull request #907 from eclipse-zenoh/sample_api_rework_unstable
milyin Apr 5, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 25 additions & 25 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 8 additions & 9 deletions examples/examples/z_get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,14 @@ async fn main() {
let session = zenoh::open(config).res().await.unwrap();

println!("Sending Query '{selector}'...");
let replies = match value {
Some(value) => session.get(&selector).with_value(value),
None => session.get(&selector),
}
.target(target)
.timeout(timeout)
.res()
.await
.unwrap();
let replies = session
.get(&selector)
.value(value)
.target(target)
.timeout(timeout)
.res()
.await
.unwrap();
while let Ok(reply) = replies.recv_async().await {
match reply.sample {
Ok(sample) => {
Expand Down
4 changes: 2 additions & 2 deletions examples/examples/z_pub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ async fn main() {
println!("Putting Data ('{}': '{}')...", &key_expr, buf);
let mut put = publisher.put(buf);
if let Some(attachment) = &attachment {
put = put.with_attachment(
put = put.attachment(Some(
attachment
.split('&')
.map(|pair| split_once(pair, '='))
.collect(),
)
))
}
put.res().await.unwrap();
}
Expand Down
10 changes: 2 additions & 8 deletions plugins/zenoh-plugin-rest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ async fn query(mut req: Request<(Arc<Session>, String)>) -> tide::Result<Respons
.content_type()
.map(|m| Encoding::from(m.to_string()))
.unwrap_or_default();
query = query.with_value(Value::from(body).with_encoding(encoding));
query = query.payload(body).encoding(encoding);
}
match query.res().await {
Ok(receiver) => {
Expand Down Expand Up @@ -463,13 +463,7 @@ async fn write(mut req: Request<(Arc<Session>, String)>) -> tide::Result<Respons
// @TODO: Define the right congestion control value
let session = &req.state().0;
let res = match method_to_kind(req.method()) {
SampleKind::Put => {
session
.put(&key_expr, bytes)
.with_encoding(encoding)
.res()
.await
}
SampleKind::Put => session.put(&key_expr, bytes).encoding(encoding).res().await,
SampleKind::Delete => session.delete(&key_expr).res().await,
};
match res {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ impl AlignQueryable {
AlignData::Data(k, (v, ts)) => {
query
.reply(k, v.payload)
.with_encoding(v.encoding)
.with_timestamp(ts)
.encoding(v.encoding)
.timestamp(ts)
.res()
.await
.unwrap();
Expand Down
8 changes: 5 additions & 3 deletions plugins/zenoh-plugin-storage-manager/src/replica/aligner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::str;
use zenoh::key_expr::{KeyExpr, OwnedKeyExpr};
use zenoh::payload::StringOrBase64;
use zenoh::prelude::r#async::*;
use zenoh::sample::builder::SampleBuilder;
use zenoh::time::Timestamp;
use zenoh::Session;

Expand Down Expand Up @@ -108,9 +109,10 @@ impl Aligner {
let Value {
payload, encoding, ..
} = value;
let sample = Sample::new(key, payload)
.with_encoding(encoding)
.with_timestamp(ts);
let sample = SampleBuilder::put(key, payload)
.encoding(encoding)
.timestamp(ts)
.into();
log::debug!("[ALIGNER] Adding {:?} to storage", sample);
self.tx_sample.send_async(sample).await.unwrap_or_else(|e| {
log::error!("[ALIGNER] Error adding sample to storage: {}", e)
Expand Down
73 changes: 40 additions & 33 deletions plugins/zenoh-plugin-storage-manager/src/replica/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@ use futures::select;
use std::collections::{HashMap, HashSet};
use std::str::{self, FromStr};
use std::time::{SystemTime, UNIX_EPOCH};
use zenoh::buffers::buffer::SplitBuffer;
use zenoh::buffers::ZBuf;
use zenoh::prelude::r#async::*;
use zenoh::query::ConsolidationMode;
use zenoh::time::{Timestamp, NTP64};
use zenoh::query::{ConsolidationMode, QueryTarget};
use zenoh::sample::builder::SampleBuilder;
use zenoh::sample::{Sample, SampleKind};
use zenoh::time::{new_reception_timestamp, Timestamp, NTP64};
use zenoh::value::Value;
use zenoh::{Result as ZResult, Session};
use zenoh_backend_traits::config::{GarbageCollectionConfig, StorageConfig};
use zenoh_backend_traits::{Capability, History, Persistence, StorageInsertionResult, StoredData};
Expand Down Expand Up @@ -219,14 +223,15 @@ impl StorageService {
select!(
// on sample for key_expr
sample = storage_sub.recv_async() => {
let mut sample = match sample {
let sample = match sample {
Ok(sample) => sample,
Err(e) => {
log::error!("Error in sample: {}", e);
continue;
}
};
sample.ensure_timestamp();
let timestamp = sample.timestamp().cloned().unwrap_or(new_reception_timestamp());
let sample = SampleBuilder::from(sample).timestamp(timestamp).into();
self.process_sample(sample).await;
},
// on query on key_expr
Expand Down Expand Up @@ -290,23 +295,25 @@ impl StorageService {
);
// there might be the case that the actual update was outdated due to a wild card update, but not stored yet in the storage.
// get the relevant wild card entry and use that value and timestamp to update the storage
let sample_to_store = match self
let sample_to_store: Sample = if let Some(update) = self
.ovderriding_wild_update(&k, sample.timestamp().unwrap())
.await
{
Some(overriding_update) => {
let Value {
payload, encoding, ..
} = overriding_update.data.value;
Sample::new(KeyExpr::from(k.clone()), payload)
.with_encoding(encoding)
.with_timestamp(overriding_update.data.timestamp)
.with_kind(overriding_update.kind)
match update.kind {
SampleKind::Put => {
SampleBuilder::put(KeyExpr::from(k.clone()), update.data.value.payload)
.encoding(update.data.value.encoding)
.timestamp(update.data.timestamp)
.into()
}
SampleKind::Delete => SampleBuilder::delete(KeyExpr::from(k.clone()))
.timestamp(update.data.timestamp)
.into(),
}
None => Sample::new(KeyExpr::from(k.clone()), sample.payload().clone())
.with_encoding(sample.encoding().clone())
.with_timestamp(*sample.timestamp().unwrap())
.with_kind(sample.kind()),
} else {
SampleBuilder::from(sample.clone())
.keyexpr(k.clone())
.into()
};

let stripped_key = match self.strip_prefix(sample_to_store.key_expr()) {
Expand All @@ -323,7 +330,7 @@ impl StorageService {
.put(
stripped_key,
Value::new(sample_to_store.payload().clone())
.with_encoding(sample_to_store.encoding().clone()),
.encoding(sample_to_store.encoding().clone()),
*sample_to_store.timestamp().unwrap(),
)
.await
Expand Down Expand Up @@ -506,13 +513,13 @@ impl StorageService {
match storage.get(stripped_key, q.parameters()).await {
Ok(stored_data) => {
for entry in stored_data {
let Value {
payload, encoding, ..
} = entry.value;
let sample = Sample::new(key.clone(), payload)
.with_encoding(encoding)
.with_timestamp(entry.timestamp);
if let Err(e) = q.reply_sample(sample).res().await {
if let Err(e) = q
.reply(key.clone(), entry.value.payload)
.encoding(entry.value.encoding)
.timestamp(entry.timestamp)
.res()
.await
{
log::warn!(
"Storage '{}' raised an error replying a query: {}",
self.name,
Expand All @@ -538,13 +545,13 @@ impl StorageService {
match storage.get(stripped_key, q.parameters()).await {
Ok(stored_data) => {
for entry in stored_data {
let Value {
payload, encoding, ..
} = entry.value;
let sample = Sample::new(q.key_expr().clone(), payload)
.with_encoding(encoding)
.with_timestamp(entry.timestamp);
if let Err(e) = q.reply_sample(sample).res().await {
if let Err(e) = q
.reply(q.key_expr().clone(), entry.value.payload)
.encoding(entry.value.encoding)
.timestamp(entry.timestamp)
.res()
.await
{
log::warn!(
"Storage '{}' raised an error replying a query: {}",
self.name,
Expand Down Expand Up @@ -692,7 +699,7 @@ fn construct_update(data: String) -> Update {
for slice in result.3 {
payload.push_zslice(slice.to_vec().into());
}
let value = Value::new(payload).with_encoding(result.2);
let value = Value::new(payload).encoding(result.2);
let data = StoredData {
value,
timestamp: Timestamp::from_str(&result.1).unwrap(), // @TODO: remove the unwrap()
Expand Down
Loading
Loading