-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathbroadcast.rs
57 lines (54 loc) · 1.83 KB
/
broadcast.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
//! Example pub/sub server with broadcast.
use std::{sync::Arc, time::Duration};
use futures_util::StreamExt;
use jsonrpc_core::{MetaIoHandler, Params};
use jsonrpc_utils::{
axum_utils::jsonrpc_router,
pub_sub::{add_pub_sub, PublishMsg},
stream::StreamServerConfig,
};
use tokio::sync::broadcast;
use tokio_stream::wrappers::BroadcastStream;
#[tokio::main]
async fn main() {
let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
let (tx, _) = broadcast::channel(8);
tokio::spawn({
let tx = tx.clone();
async move {
for i in 0u64.. {
// Error can be ignored.
//
// It is recommended to broadcast already serialized
// `PublishMsg`. This way it only need to serialized once.
drop(tx.send(PublishMsg::result(&i)));
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
});
add_pub_sub(
&mut rpc,
"subscribe",
"subscription",
"unsubscribe",
move |_params: Params| {
Ok(BroadcastStream::new(tx.subscribe()).map(|result| {
result.unwrap_or_else(|_| {
PublishMsg::error(&jsonrpc_core::Error {
code: jsonrpc_core::ErrorCode::ServerError(-32000),
message: "lagged".into(),
data: None,
})
})
}))
},
);
let rpc = Arc::new(rpc);
let config = StreamServerConfig::default()
.with_channel_size(4)
.with_pipeline_size(4)
.with_keep_alive(true);
let app = jsonrpc_router("/rpc", rpc.clone(), config);
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
axum::serve(listener, app).await.unwrap();
}