Skip to content

Commit

Permalink
Merge pull request #1091 from eclipse-zenoh/new_reception_timestamp
Browse files Browse the repository at this point in the history
session id parameter added to new_timestamp, renaming
  • Loading branch information
milyin authored Jun 6, 2024
2 parents dc5d6b2 + 4eb4193 commit 04f05cb
Show file tree
Hide file tree
Showing 10 changed files with 43 additions and 19 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions commons/zenoh-config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,4 @@ zenoh-protocol = { workspace = true }
zenoh-result = { workspace = true }
zenoh-util = { workspace = true }
secrecy = { workspace = true }
uhlc = { workspace = true }
6 changes: 6 additions & 0 deletions commons/zenoh-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ impl From<ZenohId> for zenoh_protocol::core::ZenohId {
}
}

impl From<ZenohId> for uhlc::ID {
fn from(zid: ZenohId) -> Self {
zid.0.into()
}
}

impl FromStr for ZenohId {
type Err = zenoh_result::Error;

Expand Down
6 changes: 6 additions & 0 deletions commons/zenoh-protocol/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,12 @@ impl From<&ZenohId> for uhlc::ID {
}
}

impl From<ZenohId> for uhlc::ID {
fn from(zid: ZenohId) -> Self {
zid.0
}
}

impl From<ZenohId> for OwnedKeyExpr {
fn from(zid: ZenohId) -> Self {
// SAFETY: zid.to_string() returns an stringified hexadecimal
Expand Down
6 changes: 4 additions & 2 deletions plugins/zenoh-plugin-storage-manager/src/replica/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,16 @@ impl Replica {
}
};

// Zid of session for generating timestamps
let zid = session.zid();

let replica = Replica {
name: name.to_string(),
session,
key_expr: storage_config.key_expr.clone(),
replica_config: storage_config.replica_config.clone().unwrap(),
digests_published: RwLock::new(HashSet::new()),
};

// Create channels for communication between components
// channel to queue digests to be aligned
let (tx_digest, rx_digest) = flume::unbounded();
Expand All @@ -132,7 +134,7 @@ impl Replica {

let config = replica.replica_config.clone();
// snapshotter
let snapshotter = Arc::new(Snapshotter::new(rx_log, &startup_entries, &config).await);
let snapshotter = Arc::new(Snapshotter::new(zid, rx_log, &startup_entries, &config).await);
// digest sub
let digest_sub = replica.start_digest_sub(tx_digest).fuse();
// queryable for alignment
Expand Down
13 changes: 10 additions & 3 deletions plugins/zenoh-plugin-storage-manager/src/replica/snapshotter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ use async_std::{
};
use flume::Receiver;
use futures::join;
use zenoh::{key_expr::OwnedKeyExpr, time::Timestamp};
use zenoh::{config::ZenohId, key_expr::OwnedKeyExpr, time::Timestamp};
use zenoh_backend_traits::config::ReplicaConfig;

use super::{Digest, DigestConfig, LogEntry};

pub struct Snapshotter {
// session id for timestamp generation
id: ZenohId,
// channel to get updates from the storage
storage_update: Receiver<(OwnedKeyExpr, Timestamp)>,
// configuration parameters of the replica
Expand All @@ -55,6 +57,7 @@ pub struct ReplicationInfo {
impl Snapshotter {
// Initialize the snapshot parameters, logs and digest
pub async fn new(
id: ZenohId,
rx_sample: Receiver<(OwnedKeyExpr, Timestamp)>,
initial_entries: &Vec<(OwnedKeyExpr, Timestamp)>,
replica_config: &ReplicaConfig,
Expand All @@ -63,10 +66,12 @@ impl Snapshotter {
// from initial entries, populate the log - stable and volatile
// compute digest
let (last_snapshot_time, last_interval) = Snapshotter::compute_snapshot_params(
id,
replica_config.propagation_delay,
replica_config.delta,
);
let snapshotter = Snapshotter {
id,
storage_update: rx_sample,
replica_config: replica_config.clone(),
content: ReplicationInfo {
Expand Down Expand Up @@ -126,6 +131,7 @@ impl Snapshotter {
let mut last_snapshot_time = self.content.last_snapshot_time.write().await;
let mut last_interval = self.content.last_interval.write().await;
let (time, interval) = Snapshotter::compute_snapshot_params(
self.id,
self.replica_config.propagation_delay,
self.replica_config.delta,
);
Expand All @@ -139,10 +145,11 @@ impl Snapshotter {

// Compute latest snapshot time and latest interval with respect to the current time
pub fn compute_snapshot_params(
id: ZenohId,
propagation_delay: Duration,
delta: Duration,
) -> (Timestamp, u64) {
let now = zenoh::time::new_reception_timestamp();
let now = zenoh::time::new_timestamp(id);
let latest_interval = (now
.get_time()
.to_system_time()
Expand Down Expand Up @@ -199,7 +206,7 @@ impl Snapshotter {

// Create digest from the stable log at startup
async fn initialize_digest(&self) {
let now = zenoh::time::new_reception_timestamp();
let now = zenoh::time::new_timestamp(self.id);
let replica_data = &self.content;
let log_locked = replica_data.stable_log.read().await;
let latest_interval = replica_data.last_interval.read().await;
Expand Down
7 changes: 5 additions & 2 deletions plugins/zenoh-plugin-storage-manager/src/replica/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use zenoh::{
sample::{Sample, SampleBuilder, SampleKind, TimestampBuilderTrait, ValueBuilderTrait},
selector::Selector,
session::{Session, SessionDeclarations},
time::{new_reception_timestamp, Timestamp, NTP64},
time::{new_timestamp, Timestamp, NTP64},
value::Value,
};
use zenoh_backend_traits::{
Expand Down Expand Up @@ -149,6 +149,9 @@ impl StorageService {
);
t.add_async(gc).await;

// get session id for timestamp generation
let zid = self.session.info().zid().await;

// subscribe on key_expr
let storage_sub = match self.session.declare_subscriber(&self.key_expr).await {
Ok(storage_sub) => storage_sub,
Expand Down Expand Up @@ -238,7 +241,7 @@ impl StorageService {
continue;
}
};
let timestamp = sample.timestamp().cloned().unwrap_or(new_reception_timestamp());
let timestamp = sample.timestamp().cloned().unwrap_or(new_timestamp(zid));
let sample = SampleBuilder::from(sample).timestamp(timestamp).into();
self.process_sample(sample).await;
},
Expand Down
5 changes: 3 additions & 2 deletions zenoh-ext/src/querying_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use zenoh::{
selector::Selector,
session::{SessionDeclarations, SessionRef},
subscriber::{Reliability, Subscriber},
time::{new_reception_timestamp, Timestamp},
time::{new_timestamp, Timestamp},
};

use crate::ExtractSample;
Expand Down Expand Up @@ -655,6 +655,7 @@ impl<'a, Handler> FetchingSubscriber<'a, Handler> {
InputHandler: IntoHandler<'static, Sample, Handler = Handler> + Send,
TryIntoSample: ExtractSample + Send + Sync,
{
let zid = conf.session.zid();
let state = Arc::new(Mutex::new(InnerState {
pending_fetches: 0,
merge_queue: MergeQueue::new(),
Expand All @@ -674,7 +675,7 @@ impl<'a, Handler> FetchingSubscriber<'a, Handler> {
);
// ensure the sample has a timestamp, thus it will always be sorted into the MergeQueue
// after any timestamped Sample possibly coming from a fetch reply.
let timestamp = s.timestamp().cloned().unwrap_or(new_reception_timestamp());
let timestamp = s.timestamp().cloned().unwrap_or(new_timestamp(zid));
state
.merge_queue
.push(SampleBuilder::from(s).timestamp(timestamp).into());
Expand Down
15 changes: 6 additions & 9 deletions zenoh/src/api/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,13 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use std::convert::TryFrom;
use std::time::{SystemTime, UNIX_EPOCH};

use zenoh_protocol::core::{Timestamp, TimestampId};

/// Generates a reception [`Timestamp`] with id=0x01.
/// This operation should be called if a timestamp is required for an incoming [`zenoh::Sample`](crate::Sample)
/// that doesn't contain any timestamp.
pub fn new_reception_timestamp() -> Timestamp {
use std::time::{SystemTime, UNIX_EPOCH};

let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
Timestamp::new(now.into(), TimestampId::try_from([1]).unwrap())
/// Generates a [`Timestamp`] with [`TimestampId`] and current system time
/// The [`TimestampId`] can be taken from session id returned by [`SessionInfo::zid()`](crate::api::info::SessionInfo::zid).
pub fn new_timestamp<T: Into<TimestampId>>(id: T) -> Timestamp {
let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().into();
Timestamp::new(now, id.into())
}
2 changes: 1 addition & 1 deletion zenoh/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ pub mod liveliness {
pub mod time {
pub use zenoh_protocol::core::{Timestamp, TimestampId, NTP64};

pub use crate::api::time::new_reception_timestamp;
pub use crate::api::time::new_timestamp;
}

/// Configuration to pass to [`open`](crate::session::open) and [`scout`](crate::scouting::scout) functions and associated constants
Expand Down

0 comments on commit 04f05cb

Please sign in to comment.