Skip to content

Commit

Permalink
feat: add wasm-bindgen-futures runtime
Browse files Browse the repository at this point in the history
Add the `wasm-bindgen-futures` webtime, with support for `wasm32-unknown-known`.

Tests can be run with `wasm-pack test --node -- --no-default-features --features asyncdb-wasm-bindgen-futures`
  • Loading branch information
enmand committed Sep 1, 2024
1 parent 812d49c commit b783c4c
Show file tree
Hide file tree
Showing 5 changed files with 251 additions and 5 deletions.
10 changes: 9 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,25 @@ fs2 = { optional = true, version = "0.4.3" }

tokio = { optional = true, features = ["rt", "sync"], version = "1.39.3" }
async-std = { optional = true, version = "1.12.0" }
wasm-bindgen-futures = { optional = true, version = "0.4.24" }
getrandom = { optional = true, version = "0.2.15", features = ["js"] }

[features]
default = ["fs"]
default = ["fs", "asyncdb-wasm-bindgen-futures"]
async = ["asyncdb-tokio"]
asyncdb-tokio = ["tokio"]
asyncdb-async-std = ["async-std"]
asyncdb-wasm-bindgen-futures = [
"wasm-bindgen-futures",
"async-std/async-channel",
"getrandom",
]
fs = ["errno", "fs2"]

[dev-dependencies]
time-test = "0.2"
bencher = "0.1"
wasm-bindgen-test = "0.3.0"

[[bench]]
name = "maps_bench"
Expand Down
81 changes: 78 additions & 3 deletions src/asyncdb.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use std::collections::hash_map::HashMap;

use crate::{
send_response, send_response_result, AsyncDB, Message, Result, Status, StatusCode, WriteBatch,
DB,
send_response, send_response_result, snapshot::Snapshot, AsyncDB, Message, Result, Status,
StatusCode, WriteBatch, DB,
};

pub(crate) const CHANNEL_BUFFER_SIZE: usize = 32;

#[derive(Clone, Copy)]
pub struct SnapshotRef(usize);
pub struct SnapshotRef(pub(crate) usize);

/// A request sent to the database thread.
pub(crate) enum Request {
Expand Down Expand Up @@ -151,6 +151,78 @@ impl AsyncDB {
}
}

pub(crate) fn match_message(
db: &mut DB,
mut recv: impl ReceiverExt<Message>,
snapshots: &mut HashMap<usize, Snapshot>,
snapshot_counter: &mut usize,
message: Message,
) {
match message.req {
Request::Close => {
send_response(message.resp_channel, Response::OK);
recv.close();
return;
}
Request::Put { key, val } => {
let ok = db.put(&key, &val);
send_response_result(message.resp_channel, ok);
}
Request::Delete { key } => {
let ok = db.delete(&key);
send_response_result(message.resp_channel, ok);
}
Request::Write { batch, sync } => {
let ok = db.write(batch, sync);
send_response_result(message.resp_channel, ok);
}
Request::Flush => {
let ok = db.flush();
send_response_result(message.resp_channel, ok);
}
Request::GetAt { snapshot, key } => {
let snapshot_id = snapshot.0;
if let Some(snapshot) = snapshots.get(&snapshot_id) {
let ok = db.get_at(snapshot, &key);
match ok {
Err(e) => {
send_response(message.resp_channel, Response::Error(e));
}
Ok(v) => {
send_response(message.resp_channel, Response::Value(v));
}
};
} else {
send_response(
message.resp_channel,
Response::Error(Status {
code: StatusCode::AsyncError,
err: "Unknown snapshot reference: this is a bug".to_string(),
}),
);
}
}
Request::Get { key } => {
let r = db.get(&key);
send_response(message.resp_channel, Response::Value(r));
}
Request::GetSnapshot => {
snapshots.insert(*snapshot_counter, db.get_snapshot());
let sref = SnapshotRef(*snapshot_counter);
*snapshot_counter += 1;
send_response(message.resp_channel, Response::Snapshot(sref));
}
Request::DropSnapshot { snapshot } => {
snapshots.remove(&snapshot.0);
send_response_result(message.resp_channel, Ok(()));
}
Request::CompactRange { from, to } => {
let ok = db.compact_range(&from, &to);
send_response_result(message.resp_channel, ok);
}
}
}

pub(crate) fn run_server(mut db: DB, mut recv: impl ReceiverExt<Message>) {
let mut snapshots = HashMap::new();
let mut snapshot_counter: usize = 0;
Expand Down Expand Up @@ -225,5 +297,8 @@ impl AsyncDB {

pub(crate) trait ReceiverExt<T> {
fn blocking_recv(&mut self) -> Option<T>;
async fn recv(&mut self) -> Option<T> {
self.blocking_recv()
}
fn close(&mut self);
}
1 change: 1 addition & 0 deletions src/asyncdb_async_std.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub(crate) struct Message {
pub(crate) req: Request,
pub(crate) resp_channel: channel::Sender<Response>,
}

/// `AsyncDB` makes it easy to use LevelDB in a async-std runtime.
/// The methods follow very closely the main API (see `DB` type). Iteration is not yet implemented.
#[derive(Clone)]
Expand Down
151 changes: 151 additions & 0 deletions src/asyncdb_wasm_bindgen_futures.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
use std::collections::HashMap;
use std::path::Path;

use async_std::channel::{self, TryRecvError};
use wasm_bindgen_futures::spawn_local;

use crate::asyncdb::{ReceiverExt, Request, Response, CHANNEL_BUFFER_SIZE};
use crate::snapshot::Snapshot;
use crate::{Options, Result, Status, StatusCode, DB};

pub(crate) struct Message {
pub(crate) req: Request,
pub(crate) resp_channel: channel::Sender<Response>,
}

/// `AsyncDB` makes it easy to use LevelDB in a async-std runtime.
/// The methods follow very closely the main API (see `DB` type). Iteration is not yet implemented.
#[derive(Clone)]
pub struct AsyncDB {
shutdown: channel::Sender<()>,
send: channel::Sender<Message>,
}

impl AsyncDB {
/// Create a new or open an existing database.
pub fn new<P: AsRef<Path>>(name: P, opts: Options) -> Result<AsyncDB> {
let db = DB::open(name, opts)?;

let (send, recv) = channel::bounded(CHANNEL_BUFFER_SIZE);
let (shutdown, shutdown_recv) = channel::bounded(1);

spawn_local(async move {
AsyncDB::run_server_async(db, recv, shutdown_recv, HashMap::new(), 0).await;
});

Ok(AsyncDB { shutdown, send })
}

pub(crate) async fn process_request(&self, req: Request) -> Result<Response> {
let (tx, rx) = channel::bounded(1);

let m = Message {
req,
resp_channel: tx,
};
if let Err(e) = self.send.send(m).await {
return Err(Status {
code: StatusCode::AsyncError,
err: e.to_string(),
});
}
let resp = rx.recv().await;
match resp {
Err(e) => Err(Status {
code: StatusCode::AsyncError,
err: e.to_string(),
}),
Ok(r) => Ok(r),
}
}

pub(crate) async fn run_server_async(
mut db: DB,
mut recv: impl ReceiverExt<Message> + Clone + 'static,
mut shutdown: impl ReceiverExt<()> + Clone + 'static,
mut snapshots: HashMap<usize, Snapshot>,
mut snapshot_counter: usize,
) {
if let Some(message) = recv.recv().await {
Self::match_message(
&mut db,
recv.clone(),
&mut snapshots,
&mut snapshot_counter,
message,
);
}

spawn_local(async move {
// check shutdown
if let Some(()) = shutdown.recv().await {
return;
} else {
AsyncDB::run_server_async(db, recv, shutdown, snapshots, snapshot_counter).await
};
});
}

pub(crate) async fn stop_server_async(&self) {
self.shutdown.close();
}
}

pub(crate) fn send_response_result(ch: channel::Sender<Response>, result: Result<()>) {
if let Err(e) = result {
ch.try_send(Response::Error(e)).ok();
} else {
ch.try_send(Response::OK).ok();
}
}

pub(crate) fn send_response(ch: channel::Sender<Response>, res: Response) {
ch.send_blocking(res).ok();
}

impl ReceiverExt<Message> for channel::Receiver<Message> {
fn blocking_recv(&mut self) -> Option<Message> {
self.recv_blocking().ok()
}

fn close(&mut self) {
channel::Receiver::close(self);
}

async fn recv(&mut self) -> Option<Message> {
channel::Receiver::recv(&self).await.ok()
}
}

impl ReceiverExt<()> for channel::Receiver<()> {
fn blocking_recv(&mut self) -> Option<()> {
self.recv_blocking().ok()
}

fn close(&mut self) {
channel::Receiver::close(self);
}

async fn recv(&mut self) -> Option<()> {
match channel::Receiver::try_recv(&self) {
Ok(_) => Some(()),
Err(TryRecvError::Empty) => None,
Err(TryRecvError::Closed) => Some(()),
}
}
}

#[cfg(test)]
pub mod tests {
use crate::{in_memory, AsyncDB};
use wasm_bindgen_test::wasm_bindgen_test;

#[wasm_bindgen_test]
async fn test_asyncdb() {
let db = AsyncDB::new("test.db", in_memory()).unwrap();
db.put(b"key".to_vec(), b"value".to_vec()).await.unwrap();
let val = db.get(b"key".to_vec()).await.unwrap();
assert_eq!(val, Some(b"value".to_vec()));
db.stop_server_async().await;
}
}
13 changes: 12 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ extern crate time_test;
#[macro_use]
mod infolog;

#[cfg(any(feature = "asyncdb-tokio", feature = "asyncdb-async-std"))]
#[cfg(any(
feature = "asyncdb-tokio",
feature = "asyncdb-async-std",
feature = "asyncdb-wasm-bindgen-futures"
))]
mod asyncdb;

#[cfg(feature = "asyncdb-tokio")]
Expand All @@ -53,6 +57,11 @@ mod asyncdb_async_std;
#[cfg(feature = "asyncdb-async-std")]
use asyncdb_async_std::{send_response, send_response_result, Message};

#[cfg(feature = "asyncdb-wasm-bindgen-futures")]
mod asyncdb_wasm_bindgen_futures;
#[cfg(feature = "asyncdb-wasm-bindgen-futures")]
use self::asyncdb_wasm_bindgen_futures::{send_response, send_response_result, Message};

mod block;
mod block_builder;
mod blockhandle;
Expand Down Expand Up @@ -96,6 +105,8 @@ pub mod env;
pub use asyncdb_async_std::AsyncDB;
#[cfg(feature = "asyncdb-tokio")]
pub use asyncdb_tokio::AsyncDB;
#[cfg(feature = "asyncdb-wasm-bindgen-futures")]
pub use asyncdb_wasm_bindgen_futures::AsyncDB;
pub use cmp::{Cmp, DefaultCmp};
pub use compressor::{Compressor, CompressorId};
pub use db_impl::DB;
Expand Down

0 comments on commit b783c4c

Please sign in to comment.