Skip to content

Commit

Permalink
Merge pull request #720 from swimos/connector-lanes
Browse files Browse the repository at this point in the history
Moves lane opening into swimos_connector.
  • Loading branch information
horned-sphere authored Oct 7, 2024
2 parents 8a5ed6c + a1e64c7 commit e0e0885
Show file tree
Hide file tree
Showing 24 changed files with 662 additions and 387 deletions.
11 changes: 11 additions & 0 deletions api/swimos_api/src/address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,17 @@ impl<T> Address<T> {
}
}

impl Address<&str> {
pub fn owned(&self) -> Address<String> {
let Address { host, node, lane } = self;
Address {
host: host.as_ref().map(|s| s.to_string()),
node: node.to_string(),
lane: lane.to_string(),
}
}
}

impl<T: Display + Debug> Display for Address<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
Expand Down
2 changes: 1 addition & 1 deletion server/swimos_agent/src/lanes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl<OnDone> OpenLane<OnDone> {
/// * `name` - The name of the new lane.
/// * `kind` - The kind of the new lane.
/// * `on_done` - A callback tht produces an event handler that will be executed after the request completes.
pub(crate) fn new(name: String, kind: WarpLaneKind, on_done: OnDone) -> Self {
pub fn new(name: String, kind: WarpLaneKind, on_done: OnDone) -> Self {
OpenLane {
name,
kind,
Expand Down
47 changes: 27 additions & 20 deletions server/swimos_connector/src/connector/egress/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::{collections::HashMap, time::Duration};

use swimos_api::address::Address;
use swimos_api::{address::Address, agent::WarpLaneKind};
use swimos_model::Value;

use super::{BaseConnector, ConnectorFuture};
Expand Down Expand Up @@ -42,12 +42,12 @@ pub trait EgressConnector: BaseConnector {
/// The type of the sender created by this connector.
type Sender: EgressConnectorSender<Self::Error> + 'static;

/// Open the downlinks required by the connector. This is called during the agent's `on_start`
/// Open the lanes and downlinks required by the connector. This is called during the agent's `on_start`
/// event.
///
/// # Arguments
/// * `context` - The connector makes calls to the context to request the downlinks.
fn open_downlinks(&self, context: &mut dyn EgressContext);
/// * `context` - The connector makes calls to the context to request the lanes and downlinks.
fn initialize(&self, context: &mut dyn EgressContext) -> Result<(), Self::Error>;

/// Create sender for the connector which is used to send messages to the external data sink. This is called
/// exactly ones during the agent's `on_start` event but must implement [`Clone`] so that copies can be passed
Expand All @@ -61,22 +61,6 @@ pub trait EgressConnector: BaseConnector {
) -> Result<Self::Sender, Self::Error>;
}

/// A reference to an egress context is passed to an [`EgressConnector`] when it starts allowing it
/// to request that downlinks be opened to remote lanes.
pub trait EgressContext {
/// Request an event downlink to a remote lane.
///
/// # Arguments
/// * `address` - The address of the remote lane.
fn open_event_downlink(&mut self, address: Address<String>);

/// Request a map-event downlink to a remote lane.
///
/// # Arguments
/// * `address` - The address of the remote lane.
fn open_map_downlink(&mut self, address: Address<String>);
}

/// Possible results of sending a message to the external sink.
pub enum SendResult<F, E> {
/// The process of attempting to send the message can begin. When the provided future completes,
Expand Down Expand Up @@ -154,3 +138,26 @@ pub trait EgressConnectorSender<SendError>: Send + Clone {
timer_id: u64,
) -> Option<SendResult<impl ConnectorFuture<SendError>, SendError>>;
}

/// A reference to an egress context is passed to an [egress connector](`EgressConnector`) when it starts
/// allowing it to request that lanes or downlinks to remote lanes be opened.
pub trait EgressContext {
/// Request a new, dynamic WARP lane be opened on the agent.
///
/// # Arguments
/// * `name` - The name of the lane.
/// * `kind` - The kind of the lane.
fn open_lane(&mut self, name: &str, kind: WarpLaneKind);

/// Request an event downlink to a remote lane.
///
/// # Arguments
/// * `address` - The address of the remote lane.
fn open_event_downlink(&mut self, address: Address<&str>);

/// Request a map-event downlink to a remote lane.
///
/// # Arguments
/// * `address` - The address of the remote lane.
fn open_map_downlink(&mut self, address: Address<&str>);
}
19 changes: 19 additions & 0 deletions server/swimos_connector/src/connector/ingress/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use swimos_agent::{
agent_lifecycle::HandlerContext,
event_handler::{HandlerAction, HandlerActionExt, TryHandlerActionExt},
};
use swimos_api::agent::WarpLaneKind;

use crate::generic::ConnectorAgent;

Expand All @@ -46,6 +47,13 @@ pub trait IngressConnector: BaseConnector {
/// Create an asynchronous stream that consumes events from the external data source and produces
/// [event handlers](swimos_agent::event_handler::EventHandler) from them which modify the state of the agent.
fn create_stream(&self) -> Result<impl ConnectorStream<Self::Error>, Self::Error>;

/// Open the lanes required by the connector. This is called during the agent's `on_start`
/// event.
///
/// # Arguments
/// * `context` - The connector makes calls to the context to request the lanes.
fn initialize(&self, context: &mut dyn IngressContext) -> Result<(), Self::Error>;
}

/// A trait for fallible streams of event handlers that are returned by a [`IngressConnector`].
Expand Down Expand Up @@ -86,3 +94,14 @@ where
};
context.suspend(fut)
}

/// A reference to an ingress context is passed to an [ingress connector](`IngressConnector`) when it starts
/// allowing it to request that lanes be opened.
pub trait IngressContext {
/// Request a new, dynamic WARP lane be opened on the agent.
///
/// # Arguments
/// * `name` - The name of the lane.
/// * `kind` - The kind of the lane.
fn open_lane(&mut self, name: &str, kind: WarpLaneKind);
}
2 changes: 1 addition & 1 deletion server/swimos_connector/src/connector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub use egress::{
EgressConnector, EgressConnectorSender, EgressContext, MessageSource, SendResult,
};
use futures::TryFuture;
pub use ingress::{suspend_connector, ConnectorStream, IngressConnector};
pub use ingress::{suspend_connector, ConnectorStream, IngressConnector, IngressContext};
use swimos_agent::event_handler::EventHandler;
use swimos_utilities::trigger;

Expand Down
3 changes: 2 additions & 1 deletion server/swimos_connector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ mod route;
mod test_support;
pub use connector::{
BaseConnector, ConnectorFuture, ConnectorHandler, ConnectorStream, EgressConnector,
EgressConnectorSender, EgressContext, IngressConnector, MessageSource, SendResult,
EgressConnectorSender, EgressContext, IngressConnector, IngressContext, MessageSource,
SendResult,
};
pub use error::ConnectorInitError;
pub use generic::{ConnectorAgent, MapLaneSelectorFn, ValueLaneSelectorFn};
Expand Down
59 changes: 37 additions & 22 deletions server/swimos_connector/src/lifecycle/egress/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::{cell::OnceCell, collections::HashMap};

use bitflags::bitflags;
use futures::{FutureExt, TryFutureExt};
use futures::{future::join, FutureExt, TryFutureExt};
use swimos_agent::{
agent_lifecycle::{
item_event::{dynamic_handler, DynamicHandler, DynamicLifecycle, ItemEvent},
Expand All @@ -34,16 +34,18 @@ use swimos_agent::{
AgentMetadata,
};
use swimos_agent_protocol::MapMessage;
use swimos_api::address::Address;
use swimos_api::{address::Address, agent::WarpLaneKind};
use swimos_model::Value;
use swimos_utilities::trigger;

use crate::{
connector::{EgressConnector, EgressConnectorSender},
connector::{EgressConnector, EgressConnectorSender, EgressContext},
error::ConnectorInitError,
ConnectorAgent, EgressContext, MessageSource, SendResult,
ConnectorAgent, MessageSource, SendResult,
};

use super::open_lanes;

#[cfg(test)]
mod tests;

Expand Down Expand Up @@ -96,18 +98,23 @@ bitflags! {
}

#[derive(Default, Debug)]
struct DownlinkCollector {
struct RequestCollector {
lanes: Vec<(String, WarpLaneKind)>,
value_downlinks: Vec<Address<String>>,
map_downlinks: Vec<Address<String>>,
}

impl EgressContext for DownlinkCollector {
fn open_event_downlink(&mut self, address: Address<String>) {
self.value_downlinks.push(address);
impl EgressContext for RequestCollector {
fn open_event_downlink(&mut self, address: Address<&str>) {
self.value_downlinks.push(address.owned());
}

fn open_map_downlink(&mut self, address: Address<&str>) {
self.map_downlinks.push(address.owned());
}

fn open_map_downlink(&mut self, address: Address<String>) {
self.map_downlinks.push(address);
fn open_lane(&mut self, name: &str, kind: WarpLaneKind) {
self.lanes.push((name.to_string(), kind));
}
}

Expand All @@ -117,11 +124,12 @@ where
{
fn on_start(&self) -> impl EventHandler<ConnectorAgent> + '_ {
let EgressConnectorLifecycle { lifecycle, sender } = self;
let (tx, rx) = trigger::trigger();
let (on_start_tx, on_start_rx) = trigger::trigger();
let (lanes_tx, lanes_rx) = trigger::trigger();
let context: HandlerContext<ConnectorAgent> = Default::default();
let mut collector = DownlinkCollector::default();
let on_start = lifecycle.on_start(tx);
lifecycle.open_downlinks(&mut collector);
let mut collector = RequestCollector::default();
let init_result = lifecycle.initialize(&mut collector);
let on_start = lifecycle.on_start(on_start_tx);
let create_sender = context
.with_parameters(|params| lifecycle.make_sender(params))
.try_handler()
Expand All @@ -131,14 +139,24 @@ where
})
});
let await_init = context.suspend(async move {
let (r1, r2) = join(on_start_rx, lanes_rx).await;
context
.value(rx.await)
.value(r1.and(r2))
.try_handler()
.followed_by(ConnectorAgent::set_flags(EgressFlags::INITIALIZED.bits()))
});
let open_downlinks = self.open_downlinks(collector);
await_init
let RequestCollector {
lanes,
value_downlinks,
map_downlinks,
} = collector;
let open_lanes = open_lanes(lanes, lanes_tx);
let open_downlinks = self.open_downlinks(value_downlinks, map_downlinks);
let check_init = context.value(init_result).try_handler();
check_init
.followed_by(await_init)
.followed_by(on_start)
.followed_by(open_lanes)
.followed_by(create_sender)
.followed_by(open_downlinks)
}
Expand Down Expand Up @@ -252,18 +270,15 @@ where
{
fn open_downlinks(
&self,
collector: DownlinkCollector,
value_downlinks: Vec<Address<String>>,
map_downlinks: Vec<Address<String>>,
) -> impl EventHandler<ConnectorAgent> + '_ {
let EgressConnectorLifecycle { sender, .. } = self;
let context: HandlerContext<ConnectorAgent> = Default::default();
context
.effect(|| sender.get().ok_or(ConnectorInitError))
.try_handler()
.and_then(move |sender: &C::Sender| {
let DownlinkCollector {
value_downlinks,
map_downlinks,
} = collector;
let mut open_value_dls = vec![];
let mut open_map_dls = vec![];
for address in value_downlinks {
Expand Down
37 changes: 27 additions & 10 deletions server/swimos_connector/src/lifecycle/egress/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ use swimos_model::{Text, Value};
use swimos_utilities::trigger;
use thiserror::Error;

use crate::connector::EgressContext;
use crate::lifecycle::fixture::{
drive_downlink, run_handle_with_futs, DownlinkRecord, RequestsRecord, TimerRecord,
};
use crate::{
BaseConnector, ConnectorAgent, ConnectorFuture, EgressConnector, EgressConnectorLifecycle,
EgressConnectorSender, EgressContext, MapLaneSelectorFn, MessageSource, SendResult,
ValueLaneSelectorFn,
EgressConnectorSender, MapLaneSelectorFn, MessageSource, SendResult, ValueLaneSelectorFn,
};

#[derive(Default)]
Expand Down Expand Up @@ -167,22 +167,23 @@ const NODE2: &str = "/node2";
const LANE1: &str = "value_lane";
const LANE2: &str = "map_lane";

fn value_lane_addr() -> Address<String> {
Address::new(Some(HOST.to_string()), NODE1.to_string(), LANE1.to_string())
fn value_lane_addr() -> Address<&'static str> {
Address::new(Some(HOST), NODE1, LANE1)
}

fn map_lane_addr() -> Address<String> {
Address::new(None, NODE2.to_string(), LANE2.to_string())
fn map_lane_addr() -> Address<&'static str> {
Address::new(None, NODE2, LANE2)
}

impl EgressConnector for TestConnector {
type Error = TestError;

type Sender = TestSender;

fn open_downlinks(&self, context: &mut dyn EgressContext) {
fn initialize(&self, context: &mut dyn EgressContext) -> Result<(), Self::Error> {
context.open_event_downlink(value_lane_addr());
context.open_map_downlink(map_lane_addr());
Ok(())
}

fn make_sender(
Expand Down Expand Up @@ -265,6 +266,7 @@ async fn init_connector(
let RequestsRecord {
mut downlinks,
timers,
..
} = run_handle_with_futs(agent, handler)
.await
.expect("Handler failed.");
Expand Down Expand Up @@ -415,11 +417,16 @@ async fn connector_lifecycle_value_lane_event_busy() {
let handler = lifecycle
.item_event(&agent, VALUE_LANE)
.expect("No pending event.");
let RequestsRecord { downlinks, timers } = run_handle_with_futs(&agent, handler)
let RequestsRecord {
downlinks,
timers,
lanes,
} = run_handle_with_futs(&agent, handler)
.await
.expect("Handler failed.");

assert!(downlinks.is_empty());
assert!(lanes.is_empty());

let id = match timers.as_slice() {
&[TimerRecord { id, .. }] => id,
Expand Down Expand Up @@ -461,11 +468,16 @@ async fn connector_lifecycle_value_lane_event_busy_twice() {
let handler = lifecycle
.item_event(&agent, VALUE_LANE)
.expect("No pending event.");
let RequestsRecord { downlinks, timers } = run_handle_with_futs(&agent, handler)
let RequestsRecord {
downlinks,
timers,
lanes,
} = run_handle_with_futs(&agent, handler)
.await
.expect("Handler failed.");

assert!(downlinks.is_empty());
assert!(lanes.is_empty());

let id = match timers.as_slice() {
&[TimerRecord { id, .. }] => id,
Expand All @@ -478,10 +490,15 @@ async fn connector_lifecycle_value_lane_event_busy_twice() {

let handler = lifecycle.on_timer(id);

let RequestsRecord { downlinks, timers } = run_handle_with_futs(&agent, handler)
let RequestsRecord {
downlinks,
timers,
lanes,
} = run_handle_with_futs(&agent, handler)
.await
.expect("Handler failed.");
assert!(downlinks.is_empty());
assert!(lanes.is_empty());

let id = match timers.as_slice() {
&[TimerRecord { id, .. }] => id,
Expand Down
Loading

0 comments on commit e0e0885

Please sign in to comment.