Skip to content

Commit

Permalink
Enhance event handling with NATS integration
Browse files Browse the repository at this point in the history
Updated the `handle_event` function to include NATS connection for routing events to NATS. Improved error handling and added `RouteEvent` support in event schema. Furthermore, cleaned up deprecated code formatting and streamlined field additions in FbsBuilder.
  • Loading branch information
arkavo-com committed Oct 20, 2024
1 parent c15dbe6 commit 45da2d1
Show file tree
Hide file tree
Showing 4 changed files with 515 additions and 312 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ redis = { version = "0.27.2", features = ["tokio-comp"] }
flatbuffers = "24.3.25"
scale = { package = "parity-scale-codec", version = "3.6.12", default-features = false, features = ["derive"] }
scale-info = { version = "2.11.3", default-features = false, features = ["derive"], optional = true }

bs58 = "0.5.1"

[dev-dependencies]
criterion = "0.5.1"
Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,9 @@ redis-server

#### Start backend

```shell
cargo run
```
```shell
cargo run
```

The server will start and listen on the configured port.

Expand Down
51 changes: 46 additions & 5 deletions src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ async fn handle_binary_message(
)
.await
} // internal
Some(MessageType::Event) => handle_event(server_state, payload).await, // embedded
Some(MessageType::Event) => handle_event(server_state, payload, nats_connection).await, // embedded
None => {
// println!("Unknown message type: {:?}", message_type);
None
Expand Down Expand Up @@ -822,7 +822,7 @@ async fn handle_nats_message(
Ok(())
}

async fn handle_event(server_state: &Arc<ServerState>, payload: &[u8]) -> Option<Message> {
async fn handle_event(server_state: &Arc<ServerState>, payload: &[u8], nats_connection: Arc<NatsConnection>,) -> Option<Message> {
let start_time = Instant::now();
let mut event_data: Option<Vec<u8>> = None;
if let Ok(event) = root::<Event>(payload) {
Expand Down Expand Up @@ -868,7 +868,7 @@ async fn handle_event(server_state: &Arc<ServerState>, payload: &[u8]) -> Option
return None;
}
};
// if cache miss then route to device
// TODO if cache miss then route to device
}
}
EventData::CacheEvent => {
Expand Down Expand Up @@ -923,11 +923,52 @@ async fn handle_event(server_state: &Arc<ServerState>, payload: &[u8]) -> Option
event_data = Some(cache_event.target_payload().unwrap().bytes().to_vec());
}
}
EventData::RouteEvent => {
if let Some(route_event) = event.data_as_route_event() {
if let Some(target_id) = route_event.target_id() {
println!("Route Event:");
println!(" Target ID: {:?}", target_id);
let public_id = bs58::encode(target_id.bytes()).into_string();
println!(" Public ID: {}", public_id);
let subject = format!("profile.{}", public_id);
println!(" subject: {}", subject);
// Create NATS message
// Create NATS message
let nats_message = match NATSMessage::new(payload) {
Ok(msg) => msg,
Err(e) => {
error!("Failed to create NATS message: {}", e);
return None;
}
};
// Get NATS client
if let Some(nats_client) = nats_connection.get_client().await {
// Send the event to NATS
match nats_message.send_to_nats(&nats_client, subject).await {
Ok(_) => {
println!("Successfully sent route event to NATS");
return None;
}
Err(e) => {
error!("Failed to send route event to NATS: {}", e);
return None;
}
}
} else {
error!("NATS client not available");
return None;
}
} else {
error!("Target ID is missing.");
return None;
}
}
}
EventData::NONE => {
println!("No event data");
error!("No event data");
}
_ => {
println!("Unknown event data type: {:?}", event.data_type());
error!("Unknown event data type: {:?}", event.data_type());
}
}
} else {
Expand Down
Loading

0 comments on commit 45da2d1

Please sign in to comment.