Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improves deserialization efficiency for agents. #741

Merged
merged 13 commits into from
Dec 11, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ impl LocationTracker {
/// it completes. Not that this is cannot be used as a stand-alone decoder as it has no concept of
/// a separator between frames. It needs to be incorporated into another decoder that can determine
/// where one record ends and another begins.
#[derive(Debug)]
pub struct RecognizerDecoder<R> {
parser: IncrementalReconParser,
recognizer: R,
Expand Down
37 changes: 30 additions & 7 deletions server/swimos_agent/src/agent_model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,17 +204,28 @@ pub trait AgentDescription {
/// although it will not provided any lifecycle events for the agent or its lanes.
pub trait AgentSpec: AgentDescription + Sized + Send {
/// The type of handler to run when a command is received for a value lane.
type ValCommandHandler: HandlerAction<Self, Completion = ()> + Send + 'static;
type ValCommandHandler<'a>: HandlerAction<Self, Completion = ()> + Send + 'a
where
Self: 'a;

/// The type of handler to run when a command is received for a map lane.
type MapCommandHandler: HandlerAction<Self, Completion = ()> + Send + 'static;
type MapCommandHandler<'a>: HandlerAction<Self, Completion = ()> + Send + 'a
where
Self: 'a;

/// The type of handler to run when a request is received to sync with a lane.
type OnSyncHandler: HandlerAction<Self, Completion = ()> + Send + 'static;

/// The type of the handler to run when an HTTP request is received for a lane.
type HttpRequestHandler: HandlerAction<Self, Completion = ()> + Send + 'static;

/// A store of persistent deserializers that may be used by [`AgentSpec::on_value_command`]
/// and [`AgentSpec::on_map_command`].
type Deserializers: Send + 'static;

/// Create th store of deserializers that can be reused any number of times.
fn initialize_deserializers(&self) -> Self::Deserializers;

/// The names and flags of all items (lanes and stores) in the agent.
fn item_specs() -> HashMap<&'static str, ItemSpec>;

Expand All @@ -223,9 +234,15 @@ pub trait AgentSpec: AgentDescription + Sized + Send {
/// accept commands.
///
/// # Arguments
/// * `deserializers` - The store of persistent deserializers for the agent.
/// * `lane` - The name of the lane.
/// * `body` - The content of the command.
fn on_value_command(&self, lane: &str, body: BytesMut) -> Option<Self::ValCommandHandler>;
fn on_value_command<'a>(
&self,
deserializers: &'a mut Self::Deserializers,
lane: &str,
body: BytesMut,
) -> Option<Self::ValCommandHandler<'a>>;

/// Create an initializer that will consume the state of a value-like item, as reported by the runtime.
///
Expand All @@ -247,13 +264,15 @@ pub trait AgentSpec: AgentDescription + Sized + Send {
/// for a map lane. There will be no handler if the lane does not exist or does not
/// accept commands.
/// # Arguments
/// * `deserializers` - The store of persistent deserializers for the agent.
/// * `lane` - The name of the lane.
/// * `body` - The content of the command.
fn on_map_command(
fn on_map_command<'a>(
&self,
deserializers: &'a mut Self::Deserializers,
lane: &str,
body: MapMessage<BytesMut, BytesMut>,
) -> Option<Self::MapCommandHandler>;
) -> Option<Self::MapCommandHandler<'a>>;

/// Create a handler that will update the state of an agent when a request is made to
/// sync with a lane. There will be no handler if the lane does not exist.
Expand Down Expand Up @@ -1217,6 +1236,7 @@ where
let add_commander = |address: Address<Text>| cmd_ids.borrow_mut().get_request(&address);
let add_link = (add_downlink, add_commander);
let add_lane = NoDynLanes;
let mut deserializers = item_model.initialize_deserializers();

// Calling run_handler is very verbose so is pulled out into this macro to make the code easier to read.
macro_rules! exec_handler {
Expand Down Expand Up @@ -1399,7 +1419,8 @@ where
match request {
LaneRequest::Command(body) => {
trace!(name = %name, "Received a command for a value-like lane.");
if let Some(handler) = item_model.on_value_command(name.as_str(), body)
if let Some(handler) =
item_model.on_value_command(&mut deserializers, name.as_str(), body)
{
let result = run_handler(
&mut ActionContext::new(
Expand Down Expand Up @@ -1453,7 +1474,9 @@ where
match request {
LaneRequest::Command(body) => {
trace!(name = %name, "Received a command for a map-like lane.");
if let Some(handler) = item_model.on_map_command(name.as_str(), body) {
if let Some(handler) =
item_model.on_map_command(&mut deserializers, name.as_str(), body)
{
let result = run_handler(
&mut ActionContext::new(
&suspended,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,32 @@ impl AgentDescription for EmptyAgent {
}

impl AgentSpec for EmptyAgent {
type ValCommandHandler = UnitHandler;
type ValCommandHandler<'a> = UnitHandler
where
Self: 'a;

type MapCommandHandler = UnitHandler;
type MapCommandHandler<'a> = UnitHandler
where
Self: 'a;

type OnSyncHandler = UnitHandler;

type HttpRequestHandler = UnitHandler;

type Deserializers = ();

fn initialize_deserializers(&self) -> Self::Deserializers {}

fn item_specs() -> HashMap<&'static str, ItemSpec> {
HashMap::new()
}

fn on_value_command(&self, _lane: &str, _body: BytesMut) -> Option<Self::ValCommandHandler> {
fn on_value_command(
&self,
_: &mut (),
_lane: &str,
_body: BytesMut,
) -> Option<Self::ValCommandHandler<'_>> {
None
}

Expand All @@ -67,11 +80,12 @@ impl AgentSpec for EmptyAgent {
None
}

fn on_map_command(
fn on_map_command<'a>(
&self,
_: &'a mut (),
_lane: &str,
_body: MapMessage<BytesMut, BytesMut>,
) -> Option<Self::MapCommandHandler> {
) -> Option<Self::MapCommandHandler<'a>> {
None
}

Expand Down
24 changes: 19 additions & 5 deletions server/swimos_agent/src/agent_model/tests/fake_agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,13 @@ impl AgentDescription for TestAgent {
}

impl AgentSpec for TestAgent {
type ValCommandHandler = TestHandler;
type ValCommandHandler<'a> = TestHandler
where
Self: 'a;

type MapCommandHandler = TestHandler;
type MapCommandHandler<'a> = TestHandler
where
Self: 'a;

type OnSyncHandler = TestHandler;

Expand Down Expand Up @@ -219,7 +223,12 @@ impl AgentSpec for TestAgent {
lanes
}

fn on_value_command(&self, lane: &str, body: BytesMut) -> Option<Self::ValCommandHandler> {
fn on_value_command<'a>(
&self,
_: &'a mut (),
lane: &str,
body: BytesMut,
) -> Option<Self::ValCommandHandler<'a>> {
match lane {
VAL_LANE => Some(
TestEvent::Value {
Expand Down Expand Up @@ -264,11 +273,12 @@ impl AgentSpec for TestAgent {
None
}

fn on_map_command(
fn on_map_command<'a>(
&self,
_: &'a mut (),
lane: &str,
body: MapMessage<BytesMut, BytesMut>,
) -> Option<Self::MapCommandHandler> {
) -> Option<Self::MapCommandHandler<'a>> {
match lane {
MAP_LANE => Some(
TestEvent::Map {
Expand Down Expand Up @@ -426,6 +436,10 @@ impl AgentSpec for TestAgent {
Err(DynamicRegistrationError::DuplicateName(name.to_string()))
}
}

type Deserializers = ();

fn initialize_deserializers(&self) -> Self::Deserializers {}
}

impl HandlerAction<TestAgent> for TestHandler {
Expand Down
34 changes: 15 additions & 19 deletions server/swimos_agent/src/event_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use swimos_api::{
};
use swimos_form::{read::RecognizerReadable, write::StructuralWritable};
use swimos_model::Text;
use swimos_recon::parser::{AsyncParseError, RecognizerDecoder};
use swimos_recon::parser::AsyncParseError;
use swimos_utilities::{never::Never, routing::RouteUri};
use thiserror::Error;
use tokio::time::Instant;
Expand All @@ -45,6 +45,7 @@ use crate::{
},
lanes::JoinLaneKind,
meta::AgentMetadata,
ReconDecoder,
};

use bitflags::bitflags;
Expand Down Expand Up @@ -1283,23 +1284,21 @@ impl<Context, S: AsRef<str>> HandlerAction<Context> for GetParameter<S> {

/// An event handler that will attempt to decode a [readable](`swimos_form::read::StructuralReadable`) type
/// from a buffer, immediately returning the result or an error.
pub struct Decode<T> {
_target_type: PhantomData<fn() -> T>,
pub struct Decode<'a, T: RecognizerReadable> {
decoder: Option<&'a mut ReconDecoder<T>>,
buffer: BytesMut,
complete: bool,
}

impl<T> Decode<T> {
pub fn new(buffer: BytesMut) -> Self {
impl<'a, T: RecognizerReadable> Decode<'a, T> {
pub fn new(decoder: &'a mut ReconDecoder<T>, buffer: BytesMut) -> Self {
Decode {
_target_type: PhantomData,
decoder: Some(decoder),
buffer,
complete: false,
}
}
}

impl<Context, T: RecognizerReadable> HandlerAction<Context> for Decode<T> {
impl<'a, T: RecognizerReadable, Context> HandlerAction<Context> for Decode<'a, T> {
type Completion = T;

fn step(
Expand All @@ -1308,27 +1307,24 @@ impl<Context, T: RecognizerReadable> HandlerAction<Context> for Decode<T> {
_meta: AgentMetadata,
_context: &Context,
) -> StepResult<Self::Completion> {
let Decode {
buffer, complete, ..
} = self;
if *complete {
StepResult::after_done()
} else {
let mut decoder = RecognizerDecoder::new(T::make_recognizer());
*complete = true;
let Decode { decoder, buffer } = self;
if let Some(decoder) = decoder.take() {
decoder.reset();
match decoder.decode_eof(buffer) {
Ok(Some(value)) => StepResult::done(value),
Ok(_) => StepResult::Fail(EventHandlerError::IncompleteCommand),
Err(e) => StepResult::Fail(EventHandlerError::BadCommand(e)),
}
} else {
StepResult::after_done()
}
}

fn describe(&self, _context: &Context, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
let Decode {
buffer, complete, ..
buffer, decoder, ..
} = self;
let content = if *complete {
let content = if decoder.is_none() {
CONSUMED
} else {
std::str::from_utf8(buffer.as_ref()).unwrap_or("<<BAD UTF8>>")
Expand Down
11 changes: 7 additions & 4 deletions server/swimos_agent/src/event_handler/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ use tokio::time::Instant;

use crate::agent_model::AgentDescription;
use crate::event_handler::check_step::{check_is_complete, check_is_continue};
use crate::event_handler::{GetParameter, ModificationFlags, WithParameters};
use crate::event_handler::{Decode, GetParameter, ModificationFlags, WithParameters};

use crate::test_context::{NO_DOWNLINKS, NO_DYN_LANES};
use crate::ReconDecoder;
use crate::{
event_handler::{
ConstHandler, EventHandlerError, GetAgentUri, HandlerActionExt, Sequentially, SideEffects,
Expand All @@ -36,7 +37,7 @@ use crate::{
};

use super::{
join, ActionContext, Decode, HandlerAction, HandlerFuture, Modification, ScheduleTimerEvent,
join, ActionContext, HandlerAction, HandlerFuture, Modification, ScheduleTimerEvent,
SideEffect, Spawner, StepResult,
};

Expand Down Expand Up @@ -526,14 +527,15 @@ fn followed_by_handler() {

#[test]
fn decoding_handler_success() {
let mut decoder = ReconDecoder::<i32>::default();
let uri = make_uri();
let route_params = HashMap::new();
let meta = make_meta(&uri, &route_params);

let mut buffer = BytesMut::new();
write!(buffer, "56").expect("Write failed.");

let mut handler = Decode::<i32>::new(buffer);
let mut handler = Decode::<i32>::new(&mut decoder, buffer);

let result = handler.step(
&mut dummy_context(&mut HashMap::new(), &mut BytesMut::new()),
Expand Down Expand Up @@ -561,14 +563,15 @@ fn decoding_handler_success() {

#[test]
fn decoding_handler_failure() {
let mut decoder = ReconDecoder::<i32>::default();
let uri = make_uri();
let route_params = HashMap::new();
let meta = make_meta(&uri, &route_params);

let mut buffer = BytesMut::new();
write!(buffer, "boom").expect("Write failed.");

let mut handler = Decode::<i32>::new(buffer);
let mut handler = Decode::<i32>::new(&mut decoder, buffer);

let result = handler.step(
&mut dummy_context(&mut HashMap::new(), &mut BytesMut::new()),
Expand Down
8 changes: 5 additions & 3 deletions server/swimos_agent/src/lanes/command/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::{
},
item::AgentItem,
meta::AgentMetadata,
ReconDecoder,
};

use super::{LaneItem, ProjTransform};
Expand Down Expand Up @@ -157,15 +158,16 @@ impl<C, T> HandlerTrans<T> for ProjTransform<C, CommandLane<T>> {
}
}

pub type DecodeAndCommand<C, T> =
AndThen<Decode<T>, DoCommand<C, T>, ProjTransform<C, CommandLane<T>>>;
pub type DecodeAndCommand<'a, C, T> =
AndThen<Decode<'a, T>, DoCommand<C, T>, ProjTransform<C, CommandLane<T>>>;

/// Create an event handler that will decode an incoming command and apply it to a command lane.
pub fn decode_and_command<C: AgentDescription, T: RecognizerReadable>(
decoder: &mut ReconDecoder<T>,
buffer: BytesMut,
projection: fn(&C) -> &CommandLane<T>,
) -> DecodeAndCommand<C, T> {
let decode: Decode<T> = Decode::new(buffer);
let decode: Decode<T> = Decode::new(decoder, buffer);
decode.and_then(ProjTransform::new(projection))
}

Expand Down
Loading
Loading