Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
asladeofgreen committed Jan 16, 2025
2 parents f1cbaea + 391f2b6 commit 0e9a7b7
Show file tree
Hide file tree
Showing 11 changed files with 873 additions and 6 deletions.
24 changes: 24 additions & 0 deletions .github/workflows/cctl.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
name: Casper 2.0-RC3 CCTL
on: [push]
jobs:
build_and_test:
runs-on: ubuntu-22.04
services:
casper-cctl:
image: koxu1996/casper-cctl:2.0-rc3
ports:
- 14101:14101 # RPC
- 21101:21101 # SSE
steps:
- uses: actions/checkout@v3
- name: Test RPC - info_get_status call
run: >
curl --silent --location 'http://127.0.0.1:21101/rpc'
--header 'Content-Type: application/json'
--data '{"id": 1, "jsonrpc": "2.0", "method": "info_get_status", "params": []}'
| jq
- name: Test SSE - read stream for 5 seconds
continue-on-error: true
run: |
curl --silent --location http://127.0.0.1:14101/events --max-time 5
(($? != 28)) && { printf '%s\n' "Unexpected exit code"; exit 1; }
19 changes: 15 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,19 @@ license = "MIT OR Apache-2.0"
readme = "README.md"
repository = "https://github.com/casper-network/casper-sdk-rs"

# [dependencies]
# l1-types = { path = "../casper-node/types", package = "casper-types", features = [
# "std",
# ] }
# l1-binary-port = { path = "../casper-node/binary_port", package = "casper-binary-port" }


[dependencies]
l1-types = { path = "../casper-node/types", package = "casper-types", features = [
"std",
] }
l1-binary-port = { path = "../casper-node/binary_port", package = "casper-binary-port" }
eventsource-stream = "0.2.3"
reqwest = { version = "0.12.5", features = ["json", "stream"] }
serde = { version = "1.0.189", features = ["derive"] }
serde_json = "1.0.107"
tokio = { version = "1", features = ["full"] }
futures = "0.3.30"
thiserror = "1.0"
casper-types = "4.0.1"
2 changes: 1 addition & 1 deletion src/api/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
pub(super) mod node;
pub mod node;
pub(super) mod sidecar;
2 changes: 1 addition & 1 deletion src/api/node/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
pub(super) mod binary;
pub(super) mod rest;
pub(super) mod sse;
pub mod sse;
134 changes: 134 additions & 0 deletions src/api/node/sse/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
use super::{
error::ClientError,
types::{CoreCommand, EventType},
ClientCore, SseData,
};
use std::time::Duration;
use tokio::sync::{mpsc, oneshot};

pub struct Client {
command_sender: mpsc::Sender<CoreCommand>,
}

impl Client {
pub async fn new(url: &str) -> Self {
let client_core = ClientCore::new(url).await;

let (tx, rx) = mpsc::channel(32);
let _handle = tokio::spawn(async move {
if let Err(e) = run_client_core(rx, client_core).await {
panic!("Unrecoverable client error: {}", e);
}
});

Client { command_sender: tx }
}

pub async fn connect(&self) -> Result<(), ClientError> {
let (tx, rx) = oneshot::channel();
self.command_sender
.send(CoreCommand::Connect(tx))
.await
.map_err(|err| ClientError::CommandSendError(err))?;
rx.await.map_err(|err| ClientError::CommandRecvError(err))
}

pub async fn on_event<F>(
&mut self,
event_type: EventType,
handler: F,
) -> Result<u64, ClientError>
where
F: Fn(SseData) + 'static + Send + Sync,
{
let (tx, rx) = oneshot::channel();
self.command_sender
.send(CoreCommand::AddOnEventHandler(
event_type,
Box::new(handler),
tx,
))
.await
.map_err(|err| ClientError::CommandSendError(err))?;
rx.await.map_err(|err| ClientError::CommandRecvError(err))
}

pub async fn wait_for_event<F>(
&mut self,
event_type: EventType,
predicate: F,
timeout: Duration,
) -> Result<Option<SseData>, ClientError>
where
F: Fn(SseData) -> bool + Send + Sync + 'static,
{
let (tx, mut rx) = mpsc::channel(1);

// Register the event handler
let handler_id = self
.on_event(event_type, move |event_info: SseData| {
if predicate(event_info.clone()) {
// Send the matching event to the channel
let _ = tx
.try_send(event_info)
.map_err(|err| ClientError::ChannelInternalError(err));
}
})
.await?;

// Wait for the event or timeout
let result = if timeout.is_zero() {
rx.recv().await
} else {
tokio::time::timeout(timeout, rx.recv())
.await
.ok()
.flatten()
};

// Remove the event handler after the event is received or timeout occurs
self.remove_handler(handler_id).await?;

match result {
Some(event_info) => Ok(Some(event_info)),
None => {
eprintln!("Timed out or stream exhausted while waiting for event");
Ok(None)
}
}
}

pub async fn remove_handler(&mut self, id: u64) -> Result<bool, ClientError> {
let (tx, rx) = oneshot::channel();
self.command_sender
.send(CoreCommand::RemoveEventHandler(id, tx))
.await
.map_err(|err| ClientError::CommandSendError(err))?;
rx.await.map_err(|err| ClientError::CommandRecvError(err))
}
}

/// Handles incoming commands and delegates tasks to ClientCore.
async fn run_client_core(
mut rx: mpsc::Receiver<CoreCommand>,
mut client_core: ClientCore,
) -> Result<(), ClientError> {
loop {
if !client_core.is_connected() {
// Not connected yet, so only process Connect commands.
if let Some(command) = rx.recv().await {
client_core.handle_command(command).await?
}
} else {
tokio::select! {
Ok(Some(event)) = client_core.run_once() => {
client_core.handle_event(event)?;
},
Some(command) = rx.recv() => {
client_core.handle_command(command)
.await?
},
}
}
}
}
139 changes: 139 additions & 0 deletions src/api/node/sse/client_core.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
use super::{
error::ClientError,
types::{BoxedEventStream, CoreCommand, EventType, Handler},
SseData,
};
use eventsource_stream::{Event, Eventsource};
use futures::stream::TryStreamExt;
use std::collections::HashMap;

pub struct ClientCore {
url: String,
event_stream: Option<BoxedEventStream>,
next_handler_id: u64,
event_handlers: HashMap<EventType, HashMap<u64, Box<dyn Fn(SseData) + Send + Sync + 'static>>>,
id_types: HashMap<u64, EventType>,
is_connected: bool,
}

impl ClientCore {
pub async fn new(url: &str) -> Self {
ClientCore {
url: url.to_string(),
event_stream: None,
next_handler_id: 0,
event_handlers: HashMap::new(),
id_types: HashMap::new(),
is_connected: false,
}
}

pub async fn connect(&mut self) -> Result<(), ClientError> {
// Connect to SSE endpoint.
let client = reqwest::Client::new();
let response = client.get(&self.url).send().await?;

let stream = response.bytes_stream();
let mut event_stream = stream.eventsource();

// Handle the handshake with API version.
let handshake_event = event_stream
.try_next()
.await?
.ok_or(ClientError::StreamExhausted)?;
let handshake_data: SseData = serde_json::from_str(&handshake_event.data)?;
let _api_version = match handshake_data {
SseData::ApiVersion(v) => Ok(v),
_ => Err(ClientError::InvalidHandshake),
}?;

// Wrap stream with box and store it.
let boxed_event_stream = Box::pin(event_stream);
self.event_stream = Some(boxed_event_stream);
self.is_connected = true;

Ok(())
}

pub fn remove_handler(&mut self, id: u64) -> bool {
if let Some(event_type) = self.id_types.get(&id) {
match self.event_handlers.get_mut(&event_type) {
Some(handlers_for_type) => {
self.id_types.remove(&id);
handlers_for_type.remove(&id).is_some()
}
None => false,
}
} else {
false //not found
}
}

pub fn is_connected(&self) -> bool {
self.is_connected
}

pub fn handle_event(&mut self, event: Event) -> Result<(), ClientError> {
let data: SseData = serde_json::from_str(&event.data)?;

match data {
SseData::ApiVersion(_) => return Err(ClientError::UnexpectedHandshake), // Should only happen once at connection
SseData::Shutdown => return Err(ClientError::NodeShutdown),

// For each type, find and invoke registered handlers
event => {
if let Some(handlers) = self.event_handlers.get_mut(&event.event_type()) {
for handler in handlers.values() {
handler(event.clone()); // Invoke each handler for the event
}
}
}
}
Ok(())
}

pub async fn run_once(&mut self) -> Result<Option<Event>, ClientError> {
if let Some(stream) = self.event_stream.as_mut() {
match stream.try_next().await {
Ok(Some(event)) => Ok(Some(event)),
Ok(None) => Err(ClientError::StreamExhausted),
Err(err) => Err(ClientError::EventStreamError(err)),
}
} else {
Err(ClientError::NoEventStreamAvailable)
}
}

pub fn add_on_event_handler(&mut self, event_type: EventType, handler: Box<Handler>) -> u64 {
let handlers = self.event_handlers.entry(event_type).or_default();
let handler_id = self.next_handler_id;
handlers.insert(handler_id, handler);
self.id_types.insert(handler_id, event_type);
self.next_handler_id += 1;
handler_id
}

pub async fn handle_command(&mut self, command: CoreCommand) -> Result<(), ClientError> {
match command {
CoreCommand::AddOnEventHandler(event_type, callback, completion_ack) => {
let event_id = self.add_on_event_handler(event_type, callback);
completion_ack
.send(event_id)
.map_err(|_| ClientError::ReciverDroppedError())?;
}
CoreCommand::Connect(completion_ack) => {
self.connect().await.map_err(ClientError::from)?;
completion_ack
.send(())
.map_err(|_| ClientError::ReciverDroppedError())?;
}
CoreCommand::RemoveEventHandler(id, completion_ack) => {
let removed = self.remove_handler(id);
completion_ack
.send(removed)
.map_err(|_| ClientError::ReciverDroppedError())?;
}
}
Ok(())
}
}
42 changes: 42 additions & 0 deletions src/api/node/sse/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use super::{types::CoreCommand, SseData};
use eventsource_stream::EventStreamError;
use thiserror::Error;

#[derive(Error, Debug)]
pub enum ClientError {
#[error("Failed to connect to SSE endpoint: {0}")]
ConnectionError(#[from] reqwest::Error),

#[error("SSE stream exhausted unexpectedly")]
StreamExhausted,

#[error("Invalid handshake event")]
InvalidHandshake,

#[error("Unexpected handshake event")]
UnexpectedHandshake,

#[error("Deserialization error: {0}")]
DeserializationError(#[from] serde_json::Error),

#[error("Node shutdown")]
NodeShutdown,

#[error("Failed to send command to core: {0}")]
CommandSendError(#[from] tokio::sync::mpsc::error::SendError<CoreCommand>),

#[error("Failed to send ack to client")]
ReciverDroppedError(),

#[error("Failed to recive command from core: {0}")]
CommandRecvError(#[from] tokio::sync::oneshot::error::RecvError),

#[error("Failed to send Event into the channel: {0}")]
ChannelInternalError(#[from] tokio::sync::mpsc::error::TrySendError<SseData>),

#[error("Error reading from event stream:{0}")]
EventStreamError(#[from] EventStreamError<reqwest::Error>),

#[error("No event stream available")]
NoEventStreamAvailable,
}
6 changes: 6 additions & 0 deletions src/api/node/sse/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pub mod client_core;
pub mod error;
pub mod types;
pub use client_core::ClientCore;
pub use types::SseData;
pub mod client;
Loading

0 comments on commit 0e9a7b7

Please sign in to comment.