Skip to content

Commit

Permalink
Merge branch 'main' into docs9
Browse files Browse the repository at this point in the history
  • Loading branch information
horned-sphere authored Jun 21, 2024
2 parents 07707e0 + c2e5776 commit 7c8cca7
Show file tree
Hide file tree
Showing 37 changed files with 485 additions and 175 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ members = [
"example_apps/join_map",
"example_apps/join_value",
"example_apps/aggregations",
"example_apps/time_series",
]

exclude = [
Expand Down Expand Up @@ -93,7 +94,7 @@ ratchet_fixture = "0.4"
flate2 = "1.0.22"
bitflags = "2.5"
rocksdb = "0.22"
integer-encoding = "3.0.4"
integer-encoding = "4.0.0"
rustls = "0.20"
webpki = "0.22"
webpki-roots = "0.22"
Expand Down
8 changes: 5 additions & 3 deletions example_apps/aggregations/src/area.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl AreaLifecycle {
impl AreaLifecycle {
#[on_start]
pub fn on_start(&self, context: HandlerContext<AreaAgent>) -> impl EventHandler<AreaAgent> {
context.send_command(None, "/aggregate", "register", self.area.to_string())
context.send_command(None, "/city", "register", self.area.to_string())
}

#[on_command(registrations)]
Expand All @@ -140,8 +140,10 @@ impl AreaLifecycle {
format!("/cars/{car_id}").as_str(),
"speed",
)
.boxed(),
Action::Deregister(car_id) => context.remove_downlink(AreaAgent::CARS, *car_id).boxed(),
.boxed_local(),
Action::Deregister(car_id) => context
.remove_downlink(AreaAgent::CARS, *car_id)
.boxed_local(),
}
}
#[on_update(cars)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,38 +24,38 @@ use swimos::{

#[derive(AgentLaneModel)]
#[projections]
pub struct AggregateAgent {
pub struct CityAgent {
aggregated: JoinValueLane<String, f64>,
average_speed: ValueLane<f64>,
register: CommandLane<String>,
}

#[derive(Clone, Default)]
pub struct AggregateLifecycle;
pub struct CityLifecycle;

#[lifecycle(AggregateAgent)]
impl AggregateLifecycle {
#[lifecycle(CityAgent)]
impl CityLifecycle {
#[on_update(aggregated)]
fn aggregated(
&self,
context: HandlerContext<AggregateAgent>,
context: HandlerContext<CityAgent>,
averages: &HashMap<String, f64>,
_key: String,
_prev: Option<f64>,
_new_value: &f64,
) -> impl EventHandler<AggregateAgent> {
) -> impl EventHandler<CityAgent> {
let average = averages.values().sum::<f64>() / averages.len() as f64;
context.set_value(AggregateAgent::AVERAGE_SPEED, average)
context.set_value(CityAgent::AVERAGE_SPEED, average)
}

#[on_command(register)]
pub fn register(
&self,
context: HandlerContext<AggregateAgent>,
context: HandlerContext<CityAgent>,
area_id: &String,
) -> impl EventHandler<AggregateAgent> {
) -> impl EventHandler<CityAgent> {
context.add_downlink(
AggregateAgent::AGGREGATED,
CityAgent::AGGREGATED,
area_id.clone(),
None,
format!("/area/{}", area_id).as_str(),
Expand Down
9 changes: 4 additions & 5 deletions example_apps/aggregations/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ use std::time::Duration;

use crate::area::Area;
use crate::{
aggregate::{AggregateAgent, AggregateLifecycle},
area::{AreaAgent, AreaLifecycle},
car::CarAgent,
car::CarLifecycle,
city::{CityAgent, CityLifecycle},
};
use example_util::{example_logging, manage_handle};
use swimos::route::RouteUri;
Expand All @@ -31,9 +31,9 @@ use swimos::{
server::{Server, ServerBuilder},
};

mod aggregate;
mod area;
mod car;
mod city;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
Expand All @@ -46,12 +46,11 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
AreaLifecycle::new(name).into_lifecycle(),
)
};
let aggregate_agent =
AgentModel::new(AggregateAgent::default, AggregateLifecycle.into_lifecycle());
let aggregate_agent = AgentModel::new(CityAgent::default, CityLifecycle.into_lifecycle());

let mut builder = ServerBuilder::with_plane_name("Example Plane")
.add_route(RoutePattern::parse_str("/cars/:car_id")?, car_agent)
.add_route(RoutePattern::parse_str("/aggregate")?, aggregate_agent)
.add_route(RoutePattern::parse_str("/city")?, aggregate_agent)
.update_config(|config| {
config.agent_runtime.inactive_timeout = Duration::from_secs(5 * 60);
});
Expand Down
6 changes: 3 additions & 3 deletions example_apps/command_lane/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,12 @@ impl ExampleLifecycle {
cmd: &Instruction,
) -> impl EventHandler<ExampleAgent> {
match *cmd {
Instruction::Zero => context.set_value(ExampleAgent::LANE, 0).boxed(),
Instruction::Zero => context.set_value(ExampleAgent::LANE, 0).boxed_local(),
Instruction::Add(n) => context
.get_value(ExampleAgent::LANE)
.and_then(move |v| context.set_value(ExampleAgent::LANE, v + n))
.boxed(),
Instruction::Stop => context.stop().boxed(),
.boxed_local(),
Instruction::Stop => context.stop().boxed_local(),
}
}
}
6 changes: 3 additions & 3 deletions example_apps/event_downlink/src/consumer/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,15 @@ impl ConsumerLifecycle {
),
})
.discard()
.boxed(),
.boxed_local(),
Instruction::CloseLink => handle
.with_mut(|h| {
if let Some(h) = h.as_mut() {
h.stop();
}
})
.boxed(),
Instruction::Stop => context.stop().boxed(),
.boxed_local(),
Instruction::Stop => context.stop().boxed_local(),
};
context
.effect(move || println!("{}", msg))
Expand Down
8 changes: 4 additions & 4 deletions example_apps/local_downlink/src/consumer/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,14 @@ impl ConsumerLifecycle {
),
})
.discard()
.boxed(),
.boxed_local(),
Instruction::CloseLink => handle
.with_mut(|h| {
if let Some(h) = h.as_mut() {
h.stop();
}
})
.boxed(),
.boxed_local(),
Instruction::Send(n) => handle
.with_mut(move |maybe| {
if let Some(handle) = maybe.as_mut() {
Expand All @@ -114,8 +114,8 @@ impl ConsumerLifecycle {
}
}
})
.boxed(),
Instruction::Stop => context.stop().boxed(),
.boxed_local(),
Instruction::Stop => context.stop().boxed_local(),
};
context
.effect(move || println!("{}", msg))
Expand Down
8 changes: 4 additions & 4 deletions example_apps/map_downlink/src/consumer/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,14 @@ impl ConsumerLifecycle {
),
})
.discard()
.boxed(),
.boxed_local(),
Instruction::CloseLink => handle
.with_mut(|h| {
if let Some(h) = h.as_mut() {
h.stop();
}
})
.boxed(),
.boxed_local(),
Instruction::Send(n) => handle
.with_mut(move |h| {
if let Some(handle) = h.as_mut() {
Expand All @@ -121,8 +121,8 @@ impl ConsumerLifecycle {
}
}
})
.boxed(),
Instruction::Stop => context.stop().boxed(),
.boxed_local(),
Instruction::Stop => context.stop().boxed_local(),
};
context
.effect(move || println!("{}", msg))
Expand Down
4 changes: 2 additions & 2 deletions example_apps/map_store/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,15 @@ impl ExampleLifecycle {
context
.get_value(ExampleAgent::LANE)
.and_then(move |v| context.update(ExampleAgent::SAVED, key, v))
.boxed()
.boxed_local()
}
Instruction::Restore(name) => {
let key = name.clone();
context
.get_entry(ExampleAgent::SAVED, key)
.map(|maybe: Option<i32>| maybe.unwrap_or_default())
.and_then(move |v| context.set_value(ExampleAgent::LANE, v))
.boxed()
.boxed_local()
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions example_apps/map_store_persistence/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,14 @@ impl ExampleLifecycle {
cmd: &Instruction,
) -> impl EventHandler<ExampleAgent> {
match cmd {
Instruction::Wake => UnitHandler::default().boxed(),
Instruction::Wake => UnitHandler::default().boxed_local(),
Instruction::SetValue { key, value } => context
.update(ExampleAgent::VALUE, key.clone(), *value)
.boxed(),
.boxed_local(),
Instruction::SetTemp { key, value } => context
.update(ExampleAgent::TEMPORARY, key.clone(), *value)
.boxed(),
Instruction::Stop => context.stop().boxed(),
.boxed_local(),
Instruction::Stop => context.stop().boxed_local(),
}
}
}
11 changes: 11 additions & 0 deletions example_apps/time_series/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[package]
name = "time_series"
version = "0.1.0"
edition = "2021"

[dependencies]
swimos = { path = "../../swimos", features = ["server", "agent"] }
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
example-util = { path = "../example_util" }
futures-util = { workspace = true }
swimos_form = { path = "../../api/swimos_form" }
95 changes: 95 additions & 0 deletions example_apps/time_series/src/count.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright 2015-2023 Swim Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::cell::RefCell;
use std::collections::{HashMap, VecDeque};
use std::time::{SystemTime, UNIX_EPOCH};

use swimos::{
agent::event_handler::Sequentially,
agent::lanes::CommandLane,
agent::lanes::MapLane,
agent::{
agent_lifecycle::HandlerContext,
event_handler::{EventHandler, HandlerActionExt},
lifecycle, projections, AgentLaneModel,
},
};

#[derive(AgentLaneModel)]
#[projections]
pub struct CountAgent {
history: MapLane<u64, String>,
add: CommandLane<String>,
}

pub struct CountLifecycle {
max: usize,
keys: RefCell<VecDeque<u64>>,
}

impl CountLifecycle {
pub fn new(max: usize) -> CountLifecycle {
CountLifecycle {
max,
keys: Default::default(),
}
}
}

#[lifecycle(CountAgent, no_clone)]
impl CountLifecycle {
#[on_command(add)]
pub fn add(
&self,
context: HandlerContext<CountAgent>,
cmd: &str,
) -> impl EventHandler<CountAgent> {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
context.update(CountAgent::HISTORY, now, cmd.to_string())
}

#[on_update(history)]
pub fn on_update(
&self,
context: HandlerContext<CountAgent>,
_map: &HashMap<u64, String>,
key: u64,
_prev: Option<String>,
_new_value: &str,
) -> impl EventHandler<CountAgent> {
let CountLifecycle { max, keys } = self;
let timestamps = &mut *keys.borrow_mut();
timestamps.push_front(key);

let len = timestamps.len();
let to_drop = if len > *max { len - *max } else { 0 };

let handler = if to_drop > 0 {
let keys = timestamps
.split_off(to_drop)
.into_iter()
.take(to_drop)
.map(move |key| context.remove(CountAgent::HISTORY, key));
Some(Sequentially::new(keys))
} else {
None
};

handler.discard()
}
}
Loading

0 comments on commit 7c8cca7

Please sign in to comment.