Skip to content

Commit

Permalink
fix websocket support
Browse files Browse the repository at this point in the history
  • Loading branch information
SamuelBonilla committed Jan 14, 2023
1 parent 04cd99a commit 4d277b9
Show file tree
Hide file tree
Showing 8 changed files with 216 additions and 4 deletions.
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "graphul"
version = "0.4.5"
version = "0.4.6"
edition = "2021"
license = "MIT"
categories = ["asynchronous", "network-programming", "web-programming::http-server"]
Expand All @@ -13,9 +13,9 @@ readme = "README.md"

[dependencies]
hyper = { version = "0.14", features = ["full"] }
tokio = { version = "1", features = ["full"] }
tokio = { version = "1.24", features = ["full"] }
tokio-util = { version = "0.7", features = ["io"] }
axum = { version = "0.6", features = ["multipart"] }
axum = { version = "0.6", features = ["multipart", "ws", "headers"] }
askama = "0.11"
futures = "0.3.24"
tower = { version = "0.4", features = ["util"] }
Expand Down
10 changes: 10 additions & 0 deletions examples/chat-websocket/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[package]
name = "example-chat"
version = "0.1.0"
edition = "2021"
publish = false

[dependencies]
graphul = { path = "../../." }
tokio = { version = "1", features = ["full"] }
futures = "0.3"
22 changes: 22 additions & 0 deletions examples/chat-websocket/src/chat/domain.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use std::{
collections::HashSet,
sync::{Arc, Mutex},
};

use tokio::sync::broadcast;

// Our shared state
pub struct AppState {
// We require unique usernames. This tracks which usernames have been taken.
pub user_set: Mutex<HashSet<String>>,
// Channel used to send messages to all connected clients.
pub tx: broadcast::Sender<String>,
}

pub fn app_state() -> Arc<AppState> {
// Set up application state for use with share_state.
let user_set = Mutex::new(HashSet::new());
let (tx, _rx) = broadcast::channel(100);

Arc::new(AppState { user_set, tx })
}
105 changes: 105 additions & 0 deletions examples/chat-websocket/src/chat/handlers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
use futures::{sink::SinkExt, stream::StreamExt};
use graphul::{
extract::{
ws::{Message, WebSocket, WebSocketUpgrade},
State,
},
IntoResponse,
};
use std::sync::Arc;

use crate::domain;

pub async fn websocket_handler(
ws: WebSocketUpgrade,
State(state): State<Arc<domain::AppState>>,
) -> impl IntoResponse {
ws.on_upgrade(|socket| websocket(socket, state))
}

// This function deals with a single websocket connection, i.e., a single
// connected client / user, for which we will spawn two independent tasks (for
// receiving / sending chat messages).
async fn websocket(stream: WebSocket, state: Arc<domain::AppState>) {
// By splitting, we can send and receive at the same time.
let (mut sender, mut receiver) = stream.split();

// Username gets set in the receive loop, if it's valid.
let mut username = String::new();
// Loop until a text message is found.
while let Some(Ok(message)) = receiver.next().await {
if let Message::Text(name) = message {
// If username that is sent by client is not taken, fill username string.
check_username(&state, &mut username, &name);

// If not empty we want to quit the loop else we want to quit function.
if !username.is_empty() {
break;
} else {
// Only send our client that username is taken.
let _ = sender
.send(Message::Text(String::from("Username already taken.")))
.await;

return;
}
}
}

// We subscribe *before* sending the "joined" message, so that we will also
// display it to our client.
let mut rx = state.tx.subscribe();

// Now send the "joined" message to all subscribers.
let msg = format!("{} joined.", username);
println!("{}", msg);
let _ = state.tx.send(msg);

// Spawn the first task that will receive broadcast messages and send text
// messages over the websocket to our client.
let mut send_task = tokio::spawn(async move {
while let Ok(msg) = rx.recv().await {
// In any websocket error, break loop.
if sender.send(Message::Text(msg)).await.is_err() {
break;
}
}
});

// Clone things we want to pass (move) to the receiving task.
let tx = state.tx.clone();
let name = username.clone();

// Spawn a task that takes messages from the websocket, prepends the user
// name, and sends them to all broadcast subscribers.
let mut recv_task = tokio::spawn(async move {
while let Some(Ok(Message::Text(text))) = receiver.next().await {
// Add username before message.
let _ = tx.send(format!("{}: {}", name, text));
}
});

// If any one of the tasks run to completion, we abort the other.
tokio::select! {
_ = (&mut send_task) => recv_task.abort(),
_ = (&mut recv_task) => send_task.abort(),
};

// Send "user left" message (similar to "joined" above).
let msg = format!("{} left.", username);
println!("{}", msg);
let _ = state.tx.send(msg);

// Remove username from map so new clients can take it again.
state.user_set.lock().unwrap().remove(&username);
}

fn check_username(state: &domain::AppState, string: &mut String, name: &str) {
let mut user_set = state.user_set.lock().unwrap();

if !user_set.contains(name) {
user_set.insert(name.to_owned());

string.push_str(name);
}
}
2 changes: 2 additions & 0 deletions examples/chat-websocket/src/chat/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod domain;
pub mod handlers;
21 changes: 21 additions & 0 deletions examples/chat-websocket/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
//! Example chat application.
//!
mod chat;

use chat::{domain, handlers};

use graphul::{http::Methods, FileConfig, Graphul};

#[tokio::main]
async fn main() {
let state = domain::app_state();

let mut app = Graphul::share_state(state);

app.static_file("/", "templates/index.html", FileConfig::default());

app.get("/websocket", handlers::websocket_handler);

app.run("127.0.0.1:3000").await;
}
52 changes: 52 additions & 0 deletions examples/chat-websocket/templates/index.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>WebSocket Chat</title>
</head>
<body>
<h1>WebSocket Chat Example With Graphul</h1>

<input id="username" style="display:block; width:100px; box-sizing: border-box" type="text" placeholder="username">
<button id="join-chat" type="button">Join Chat</button>
<textarea id="chat" style="display:block; width:600px; height:400px; box-sizing: border-box" cols="30" rows="10"></textarea>
<input id="input" style="display:block; width:600px; box-sizing: border-box" type="text" placeholder="chat">

<script>
const username = document.querySelector("#username");
const join_btn = document.querySelector("#join-chat");
const textarea = document.querySelector("#chat");
const input = document.querySelector("#input");

join_btn.addEventListener("click", function(e) {
this.disabled = true;

const websocket = new WebSocket("ws://localhost:3000/websocket");

websocket.onopen = function() {
console.log("connection opened");
websocket.send(username.value);
}

const btn = this;

websocket.onclose = function() {
console.log("connection closed");
btn.disabled = false;
}

websocket.onmessage = function(e) {
console.log("received message: "+e.data);
textarea.value += e.data+"\r\n";
}

input.onkeydown = function(e) {
if (e.key == "Enter") {
websocket.send(input.value);
input.value = "";
}
}
});
</script>
</body>
</html>
2 changes: 1 addition & 1 deletion src/app.rs
Original file line number Diff line number Diff line change
@@ -1 +1 @@
pub const VERSION: &str = "0.4.5";
pub const VERSION: &str = "0.4.6";

0 comments on commit 4d277b9

Please sign in to comment.