From 1051b6c78c858ba2d13b835a7b0627bbdd6d8a5c Mon Sep 17 00:00:00 2001 From: Matt Mastracci Date: Wed, 24 Jan 2024 09:49:07 -0500 Subject: [PATCH] refactor(core): split up resource code and add a resource test (#472) Partially extracted from #440 Also pulls in `BufMutViewWhole` from that PR that will be required for buffer soundness in a later PR -- you should not split a buffer that is owned by JavaScript. --- core/{ => io}/buffer_strategy.rs | 0 core/{io.rs => io/buffers.rs} | 139 ++++++++++++++- core/io/mod.rs | 45 +++++ core/io/resource.rs | 286 +++++++++++++++++++++++++++++++ core/io/resource_handle.rs | 110 ++++++++++++ core/io/resource_table.rs | 223 ++++++++++++++++++++++++ core/lib.rs | 17 +- core/ops.rs | 2 +- core/ops_builtin.rs | 4 +- testing/checkin/runner/mod.rs | 2 + testing/checkin/runner/ops_io.rs | 67 ++++++++ testing/lib.rs | 1 + testing/unit/resource_test.ts | 22 +++ 13 files changed, 897 insertions(+), 21 deletions(-) rename core/{ => io}/buffer_strategy.rs (100%) rename core/{io.rs => io/buffers.rs} (80%) create mode 100644 core/io/mod.rs create mode 100644 core/io/resource.rs create mode 100644 core/io/resource_handle.rs create mode 100644 core/io/resource_table.rs create mode 100644 testing/checkin/runner/ops_io.rs create mode 100644 testing/unit/resource_test.ts diff --git a/core/buffer_strategy.rs b/core/io/buffer_strategy.rs similarity index 100% rename from core/buffer_strategy.rs rename to core/io/buffer_strategy.rs diff --git a/core/io.rs b/core/io/buffers.rs similarity index 80% rename from core/io.rs rename to core/io/buffers.rs index e2ab5e607..9ed372807 100644 --- a/core/io.rs +++ b/core/io/buffers.rs @@ -174,6 +174,14 @@ impl From for bytes::Bytes { } } +/// BufMutViewWhole is equivalent to `BufMutView`, but cannot be split, preventing +/// someone from accidentally holding a `BufView` down the road that is being actively +/// mutated from JavaScript. +pub struct BufMutViewWhole { + inner: BufMutViewInner, + cursor: usize, +} + /// BufMutView is a wrapper around an underlying contiguous chunk of writable /// bytes. It can be created from a `JsBuffer` or a `Vec` and implements /// `DerefMut<[u8]>` and `AsMut<[u8]>`. @@ -196,6 +204,15 @@ enum BufMutViewInner { Bytes(BytesMut), } +impl Default for BufMutView { + fn default() -> Self { + BufMutView { + inner: BufMutViewInner::Bytes(BytesMut::default()), + cursor: 0, + } + } +} + impl BufMutView { fn from_inner(inner: BufMutViewInner) -> Self { Self { inner, cursor: 0 } @@ -412,18 +429,122 @@ impl From for BufMutView { } } -pub enum WriteOutcome { - Partial { nwritten: usize, view: BufView }, - Full { nwritten: usize }, -} +impl BufMutViewWhole { + fn from_inner(inner: BufMutViewInner) -> Self { + Self { inner, cursor: 0 } + } + + pub fn new(len: usize) -> Self { + let bytes = BytesMut::zeroed(len); + Self::from_inner(BufMutViewInner::Bytes(bytes)) + } -impl WriteOutcome { - pub fn nwritten(&self) -> usize { - match self { - WriteOutcome::Partial { nwritten, .. } => *nwritten, - WriteOutcome::Full { nwritten } => *nwritten, + /// Get the length of the buffer view. This is the length of the underlying + /// buffer minus the cursor position. + pub fn len(&self) -> usize { + match &self.inner { + BufMutViewInner::JsBuffer(js_buf) => js_buf.len() - self.cursor, + BufMutViewInner::Bytes(bytes) => bytes.len() - self.cursor, } } + + /// Is the buffer view empty? + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Advance the internal cursor of the buffer view by `n` bytes. + pub fn advance_cursor(&mut self, n: usize) { + assert!(self.len() >= n); + self.cursor += n; + } + + /// Reset the internal cursor of the buffer view to the beginning of the + /// buffer. Returns the old cursor position. + pub fn reset_cursor(&mut self) -> usize { + let old = self.cursor; + self.cursor = 0; + old + } + + /// Turn this `BufMutView` into a `BufView`. + pub fn into_view(self) -> BufView { + let inner = match self.inner { + BufMutViewInner::JsBuffer(js_buf) => BufViewInner::JsBuffer(js_buf), + BufMutViewInner::Bytes(bytes) => BufViewInner::Bytes(bytes.into()), + }; + BufView { + inner, + cursor: self.cursor, + } + } + + /// Attempts to unwrap the underlying buffer into a [`BytesMut`], consuming the `BufMutView`. If + /// this buffer does not have a [`BytesMut`], returns `Self`. + pub fn maybe_unwrap_bytes(self) -> Result { + match self.inner { + BufMutViewInner::JsBuffer(_) => Err(self), + BufMutViewInner::Bytes(bytes) => Ok(bytes), + } + } + + /// Adjust the length of the remaining buffer and ensure that the cursor continues to + /// stay in-bounds. + pub fn truncate(&mut self, size: usize) { + match &mut self.inner { + BufMutViewInner::Bytes(bytes) => bytes.truncate(size + self.cursor), + BufMutViewInner::JsBuffer(buffer) => buffer.truncate(size + self.cursor), + } + self.cursor = std::cmp::min(self.cursor, self.len()); + } +} + +impl Buf for BufMutViewWhole { + fn remaining(&self) -> usize { + self.len() + } + + fn chunk(&self) -> &[u8] { + self.deref() + } + + fn advance(&mut self, cnt: usize) { + self.advance_cursor(cnt) + } +} + +impl Deref for BufMutViewWhole { + type Target = [u8]; + + fn deref(&self) -> &[u8] { + let buf = match &self.inner { + BufMutViewInner::JsBuffer(js_buf) => js_buf.deref(), + BufMutViewInner::Bytes(vec) => vec.deref(), + }; + &buf[self.cursor..] + } +} + +impl DerefMut for BufMutViewWhole { + fn deref_mut(&mut self) -> &mut [u8] { + let buf = match &mut self.inner { + BufMutViewInner::JsBuffer(js_buf) => js_buf.deref_mut(), + BufMutViewInner::Bytes(vec) => vec.deref_mut(), + }; + &mut buf[self.cursor..] + } +} + +impl From for BufMutViewWhole { + fn from(buf: JsBuffer) -> Self { + Self::from_inner(BufMutViewInner::JsBuffer(buf.into_parts())) + } +} + +impl From for BufMutViewWhole { + fn from(buf: BytesMut) -> Self { + Self::from_inner(BufMutViewInner::Bytes(buf)) + } } #[cfg(test)] diff --git a/core/io/mod.rs b/core/io/mod.rs new file mode 100644 index 000000000..c1317558a --- /dev/null +++ b/core/io/mod.rs @@ -0,0 +1,45 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. + +// Think of Resources as File Descriptors. They are integers that are allocated +// by the privileged side of Deno which refer to various rust objects that need +// to be persisted between various ops. For example, network sockets are +// resources. Resources may or may not correspond to a real operating system +// file descriptor (hence the different name). + +use anyhow::Error; +use futures::Future; +use std::pin::Pin; + +mod buffer_strategy; +mod buffers; +mod resource; +mod resource_handle; +mod resource_table; + +pub use buffer_strategy::AdaptiveBufferStrategy; +pub use buffers::BufMutView; +pub use buffers::BufMutViewWhole; +pub use buffers::BufView; +pub use resource::Resource; +pub use resource_handle::ResourceHandle; +pub use resource_handle::ResourceHandleFd; +pub use resource_handle::ResourceHandleSocket; +pub use resource_table::ResourceId; +pub use resource_table::ResourceTable; + +/// Returned by resource shutdown methods +pub type AsyncResult = Pin>>>; + +pub enum WriteOutcome { + Partial { nwritten: usize, view: BufView }, + Full { nwritten: usize }, +} + +impl WriteOutcome { + pub fn nwritten(&self) -> usize { + match self { + WriteOutcome::Partial { nwritten, .. } => *nwritten, + WriteOutcome::Full { nwritten } => *nwritten, + } + } +} diff --git a/core/io/resource.rs b/core/io/resource.rs new file mode 100644 index 000000000..752fd5c57 --- /dev/null +++ b/core/io/resource.rs @@ -0,0 +1,286 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. + +// Think of Resources as File Descriptors. They are integers that are allocated +// by the privileged side of Deno which refer to various rust objects that need +// to be persisted between various ops. For example, network sockets are +// resources. Resources may or may not correspond to a real operating system +// file descriptor (hence the different name). + +use crate::error::not_supported; +use crate::io::AsyncResult; +use crate::io::BufMutView; +use crate::io::BufView; +use crate::io::WriteOutcome; +use crate::ResourceHandle; +use crate::ResourceHandleFd; +use anyhow::Error; +use std::any::type_name; +use std::any::Any; +use std::any::TypeId; +use std::borrow::Cow; +use std::rc::Rc; + +/// Resources are Rust objects that are attached to a [deno_core::JsRuntime]. +/// They are identified in JS by a numeric ID (the resource ID, or rid). +/// Resources can be created in ops. Resources can also be retrieved in ops by +/// their rid. Resources are not thread-safe - they can only be accessed from +/// the thread that the JsRuntime lives on. +/// +/// Resources are reference counted in Rust. This means that they can be +/// cloned and passed around. When the last reference is dropped, the resource +/// is automatically closed. As long as the resource exists in the resource +/// table, the reference count is at least 1. +/// +/// ### Readable +/// +/// Readable resources are resources that can have data read from. Examples of +/// this are files, sockets, or HTTP streams. +/// +/// Readables can be read from from either JS or Rust. In JS one can use +/// `Deno.core.read()` to read from a single chunk of data from a readable. In +/// Rust one can directly call `read()` or `read_byob()`. The Rust side code is +/// used to implement ops like `op_slice`. +/// +/// A distinction can be made between readables that produce chunks of data +/// themselves (they allocate the chunks), and readables that fill up +/// bring-your-own-buffers (BYOBs). The former is often the case for framed +/// protocols like HTTP, while the latter is often the case for kernel backed +/// resources like files and sockets. +/// +/// All readables must implement `read()`. If resources can support an optimized +/// path for BYOBs, they should also implement `read_byob()`. For kernel backed +/// resources it often makes sense to implement `read_byob()` first, and then +/// implement `read()` as an operation that allocates a new chunk with +/// `len == limit`, then calls `read_byob()`, and then returns a chunk sliced to +/// the number of bytes read. Kernel backed resources can use the +/// [deno_core::impl_readable_byob] macro to implement optimized `read_byob()` +/// and `read()` implementations from a single `Self::read()` method. +/// +/// ### Writable +/// +/// Writable resources are resources that can have data written to. Examples of +/// this are files, sockets, or HTTP streams. +/// +/// Writables can be written to from either JS or Rust. In JS one can use +/// `Deno.core.write()` to write to a single chunk of data to a writable. In +/// Rust one can directly call `write()`. The latter is used to implement ops +/// like `op_slice`. +pub trait Resource: Any + 'static { + /// Returns a string representation of the resource which is made available + /// to JavaScript code through `op_resources`. The default implementation + /// returns the Rust type name, but specific resource types may override this + /// trait method. + fn name(&self) -> Cow { + type_name::().into() + } + + /// Read a single chunk of data from the resource. This operation returns a + /// `BufView` that represents the data that was read. If a zero length buffer + /// is returned, it indicates that the resource has reached EOF. + /// + /// If this method is not implemented, the default implementation will error + /// with a "not supported" error. + /// + /// If a readable can provide an optimized path for BYOBs, it should also + /// implement `read_byob()`. + fn read(self: Rc, limit: usize) -> AsyncResult { + _ = limit; + Box::pin(futures::future::err(not_supported())) + } + + /// Read a single chunk of data from the resource into the provided `BufMutView`. + /// + /// This operation returns the number of bytes read. If zero bytes are read, + /// it indicates that the resource has reached EOF. + /// + /// If this method is not implemented explicitly, the default implementation + /// will call `read()` and then copy the data into the provided buffer. For + /// readable resources that can provide an optimized path for BYOBs, it is + /// strongly recommended to override this method. + fn read_byob( + self: Rc, + mut buf: BufMutView, + ) -> AsyncResult<(usize, BufMutView)> { + Box::pin(async move { + let read = self.read(buf.len()).await?; + let nread = read.len(); + buf[..nread].copy_from_slice(&read); + Ok((nread, buf)) + }) + } + + /// Write an error state to this resource, if the resource supports it. + fn write_error(self: Rc, _error: Error) -> AsyncResult<()> { + Box::pin(futures::future::err(not_supported())) + } + + /// Write a single chunk of data to the resource. The operation may not be + /// able to write the entire chunk, in which case it should return the number + /// of bytes written. Additionally it should return the `BufView` that was + /// passed in. + /// + /// If this method is not implemented, the default implementation will error + /// with a "not supported" error. + fn write(self: Rc, buf: BufView) -> AsyncResult { + _ = buf; + Box::pin(futures::future::err(not_supported())) + } + + /// Write an entire chunk of data to the resource. Unlike `write()`, this will + /// ensure the entire chunk is written. If the operation is not able to write + /// the entire chunk, an error is to be returned. + /// + /// By default this method will call `write()` repeatedly until the entire + /// chunk is written. Resources that can write the entire chunk in a single + /// operation using an optimized path should override this method. + fn write_all(self: Rc, view: BufView) -> AsyncResult<()> { + Box::pin(async move { + let mut view = view; + let this = self; + while !view.is_empty() { + let resp = this.clone().write(view).await?; + match resp { + WriteOutcome::Partial { + nwritten, + view: new_view, + } => { + view = new_view; + view.advance_cursor(nwritten); + } + WriteOutcome::Full { .. } => break, + } + } + Ok(()) + }) + } + + /// The same as [`read_byob()`][Resource::read_byob], but synchronous. + fn read_byob_sync(self: Rc, data: &mut [u8]) -> Result { + _ = data; + Err(not_supported()) + } + + /// The same as [`write()`][Resource::write], but synchronous. + fn write_sync(self: Rc, data: &[u8]) -> Result { + _ = data; + Err(not_supported()) + } + + /// The shutdown method can be used to asynchronously close the resource. It + /// is not automatically called when the resource is dropped or closed. + /// + /// If this method is not implemented, the default implementation will error + /// with a "not supported" error. + fn shutdown(self: Rc) -> AsyncResult<()> { + Box::pin(futures::future::err(not_supported())) + } + + /// Resources may implement the `close()` trait method if they need to do + /// resource specific clean-ups, such as cancelling pending futures, after a + /// resource has been removed from the resource table. + fn close(self: Rc) {} + + /// Resources backed by a file descriptor or socket handle can let ops know + /// to allow for low-level optimizations. + fn backing_handle(self: Rc) -> Option { + #[allow(deprecated)] + self.backing_fd().map(ResourceHandle::Fd) + } + + /// Resources backed by a file descriptor can let ops know to allow for + /// low-level optimizations. + #[deprecated = "Use backing_handle"] + fn backing_fd(self: Rc) -> Option { + None + } + + fn size_hint(&self) -> (u64, Option) { + (0, None) + } +} + +impl dyn Resource { + #[inline(always)] + fn is(&self) -> bool { + self.type_id() == TypeId::of::() + } + + #[inline(always)] + #[allow(clippy::needless_lifetimes)] + pub fn downcast_rc<'a, T: Resource>(self: &'a Rc) -> Option<&'a Rc> { + if self.is::() { + let ptr = self as *const Rc<_> as *const Rc; + // TODO(piscisaureus): safety comment + #[allow(clippy::undocumented_unsafe_blocks)] + Some(unsafe { &*ptr }) + } else { + None + } + } +} + +#[macro_export] +macro_rules! impl_readable_byob { + () => { + fn read( + self: ::std::rc::Rc, + limit: ::core::primitive::usize, + ) -> AsyncResult<$crate::BufView> { + ::std::boxed::Box::pin(async move { + let mut vec = ::std::vec![0; limit]; + let nread = self.read(&mut vec).await?; + if nread != vec.len() { + vec.truncate(nread); + } + let view = $crate::BufView::from(vec); + ::std::result::Result::Ok(view) + }) + } + + fn read_byob( + self: ::std::rc::Rc, + mut buf: $crate::BufMutView, + ) -> AsyncResult<(::core::primitive::usize, $crate::BufMutView)> { + ::std::boxed::Box::pin(async move { + let nread = self.read(buf.as_mut()).await?; + ::std::result::Result::Ok((nread, buf)) + }) + } + }; +} + +#[macro_export] +macro_rules! impl_writable { + (__write) => { + fn write( + self: ::std::rc::Rc, + view: $crate::BufView, + ) -> $crate::AsyncResult<$crate::WriteOutcome> { + ::std::boxed::Box::pin(async move { + let nwritten = self.write(&view).await?; + ::std::result::Result::Ok($crate::WriteOutcome::Partial { + nwritten, + view, + }) + }) + } + }; + (__write_all) => { + fn write_all( + self: ::std::rc::Rc, + view: $crate::BufView, + ) -> $crate::AsyncResult<()> { + ::std::boxed::Box::pin(async move { + self.write_all(&view).await?; + ::std::result::Result::Ok(()) + }) + } + }; + () => { + $crate::impl_writable!(__write); + }; + (with_all) => { + $crate::impl_writable!(__write); + $crate::impl_writable!(__write_all); + }; +} diff --git a/core/io/resource_handle.rs b/core/io/resource_handle.rs new file mode 100644 index 000000000..65b7a937a --- /dev/null +++ b/core/io/resource_handle.rs @@ -0,0 +1,110 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +use std::io::IsTerminal; + +/// Represents an underlying handle for a platform. On unix, everything is an `fd`. On Windows, everything +/// is a Windows handle except for sockets (which are `SOCKET`s). +#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)] +#[allow(unused)] +pub enum ResourceHandle { + /// A file handle/descriptor. + Fd(ResourceHandleFd), + /// A socket handle/file descriptor. + Socket(ResourceHandleSocket), +} + +#[cfg(unix)] +pub type ResourceHandleFd = std::os::fd::RawFd; +#[cfg(unix)] +pub type ResourceHandleSocket = std::os::fd::RawFd; +#[cfg(windows)] +pub type ResourceHandleFd = std::os::windows::io::RawHandle; +#[cfg(windows)] +pub type ResourceHandleSocket = std::os::windows::io::RawSocket; + +impl ResourceHandle { + /// Converts a file-like thing to a [`ResourceHandle`]. + #[cfg(windows)] + pub fn from_fd_like(io: &impl std::os::windows::io::AsRawHandle) -> Self { + Self::Fd(io.as_raw_handle()) + } + + /// Converts a file-like thing to a [`ResourceHandle`]. + #[cfg(unix)] + pub fn from_fd_like(io: &impl std::os::unix::io::AsRawFd) -> Self { + Self::Fd(io.as_raw_fd()) + } + + /// Converts a socket-like thing to a [`ResourceHandle`]. + #[cfg(windows)] + pub fn from_socket_like(io: &impl std::os::windows::io::AsRawSocket) -> Self { + Self::Socket(io.as_raw_socket()) + } + + /// Converts a socket-like thing to a [`ResourceHandle`]. + #[cfg(unix)] + pub fn from_socket_like(io: &impl std::os::unix::io::AsRawFd) -> Self { + Self::Socket(io.as_raw_fd()) + } + + /// Runs a basic validity check on the handle, but cannot fully determine if the handle is valid for use. + pub fn is_valid(&self) -> bool { + #[cfg(windows)] + { + match self { + // NULL or INVALID_HANDLE_VALUE + Self::Fd(handle) => { + !handle.is_null() + && *handle != -1_isize as std::os::windows::io::RawHandle + } + // INVALID_SOCKET + Self::Socket(socket) => { + *socket != -1_i64 as std::os::windows::io::RawSocket + } + } + } + #[cfg(unix)] + { + match self { + Self::Fd(fd) => *fd >= 0, + Self::Socket(fd) => *fd >= 0, + } + } + } + + /// Returns this as a file-descriptor-like handle. + pub fn as_fd_like(&self) -> Option { + match self { + Self::Fd(fd) => Some(*fd), + _ => None, + } + } + + /// Returns this as a socket-like handle. + pub fn as_socket_like(&self) -> Option { + match self { + Self::Socket(socket) => Some(*socket), + _ => None, + } + } + + /// Determines if this handle is a terminal. Analagous to [`std::io::IsTerminal`]. + pub fn is_terminal(&self) -> bool { + match self { + Self::Fd(fd) if self.is_valid() => { + #[cfg(windows)] + { + // SAFETY: The resource remains open for the for the duration of borrow_raw + unsafe { + std::os::windows::io::BorrowedHandle::borrow_raw(*fd).is_terminal() + } + } + #[cfg(unix)] + { + // SAFETY: The resource remains open for the for the duration of borrow_raw + unsafe { std::os::fd::BorrowedFd::borrow_raw(*fd).is_terminal() } + } + } + _ => false, + } + } +} diff --git a/core/io/resource_table.rs b/core/io/resource_table.rs new file mode 100644 index 000000000..747c0e8bc --- /dev/null +++ b/core/io/resource_table.rs @@ -0,0 +1,223 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +use super::Resource; +use super::ResourceHandle; +use super::ResourceHandleFd; +use super::ResourceHandleSocket; +use crate::error::bad_resource_id; +use crate::error::custom_error; +use anyhow::Error; +use std::borrow::Cow; +use std::collections::BTreeMap; +use std::iter::Iterator; +use std::rc::Rc; + +/// A `ResourceId` is an integer value referencing a resource. It could be +/// considered to be the Deno equivalent of a `file descriptor` in POSIX like +/// operating systems. Elsewhere in the code base it is commonly abbreviated +/// to `rid`. +// TODO: use `u64` instead? +pub type ResourceId = u32; +/// Map-like data structure storing Deno's resources (equivalent to file +/// descriptors). +/// +/// Provides basic methods for element access. A resource can be of any type. +/// Different types of resources can be stored in the same map, and provided +/// with a name for description. +/// +/// Each resource is identified through a _resource ID (rid)_, which acts as +/// the key in the map. +#[derive(Default)] +pub struct ResourceTable { + index: BTreeMap>, + next_rid: ResourceId, +} + +impl ResourceTable { + /// Returns the number of resources currently active in the resource table. + /// Resources taken from the table do not contribute to this count. + pub fn len(&self) -> usize { + self.index.len() + } + + /// Returns whether this table is empty. + pub fn is_empty(&self) -> bool { + self.index.is_empty() + } + + /// Inserts resource into the resource table, which takes ownership of it. + /// + /// The resource type is erased at runtime and must be statically known + /// when retrieving it through `get()`. + /// + /// Returns a unique resource ID, which acts as a key for this resource. + pub fn add(&mut self, resource: T) -> ResourceId { + self.add_rc(Rc::new(resource)) + } + + /// Inserts a `Rc`-wrapped resource into the resource table. + /// + /// The resource type is erased at runtime and must be statically known + /// when retrieving it through `get()`. + /// + /// Returns a unique resource ID, which acts as a key for this resource. + pub fn add_rc(&mut self, resource: Rc) -> ResourceId { + let resource = resource as Rc; + self.add_rc_dyn(resource) + } + + pub fn add_rc_dyn(&mut self, resource: Rc) -> ResourceId { + let rid = self.next_rid; + let removed_resource = self.index.insert(rid, resource); + assert!(removed_resource.is_none()); + self.next_rid += 1; + rid + } + + /// Returns true if any resource with the given `rid` exists. + pub fn has(&self, rid: ResourceId) -> bool { + self.index.contains_key(&rid) + } + + /// Returns a reference counted pointer to the resource of type `T` with the + /// given `rid`. If `rid` is not present or has a type different than `T`, + /// this function returns `None`. + pub fn get(&self, rid: ResourceId) -> Result, Error> { + self + .index + .get(&rid) + .and_then(|rc| rc.downcast_rc::()) + .map(Clone::clone) + .ok_or_else(bad_resource_id) + } + + pub fn get_any(&self, rid: ResourceId) -> Result, Error> { + self + .index + .get(&rid) + .map(Clone::clone) + .ok_or_else(bad_resource_id) + } + + /// Replaces a resource with a new resource. + /// + /// Panics if the resource does not exist. + pub fn replace(&mut self, rid: ResourceId, resource: T) { + let result = self + .index + .insert(rid, Rc::new(resource) as Rc); + assert!(result.is_some()); + } + + /// Removes a resource of type `T` from the resource table and returns it. + /// If a resource with the given `rid` exists but its type does not match `T`, + /// it is not removed from the resource table. Note that the resource's + /// `close()` method is *not* called. + /// + /// Also note that there might be a case where + /// the returned `Rc` is referenced by other variables. That is, we cannot + /// assume that `Rc::strong_count(&returned_rc)` is always equal to 1 on success. + /// In particular, be really careful when you want to extract the inner value of + /// type `T` from `Rc`. + pub fn take(&mut self, rid: ResourceId) -> Result, Error> { + let resource = self.get::(rid)?; + self.index.remove(&rid); + Ok(resource) + } + + /// Removes a resource from the resource table and returns it. Note that the + /// resource's `close()` method is *not* called. + /// + /// Also note that there might be a + /// case where the returned `Rc` is referenced by other variables. That is, + /// we cannot assume that `Rc::strong_count(&returned_rc)` is always equal to 1 + /// on success. In particular, be really careful when you want to extract the + /// inner value of type `T` from `Rc`. + pub fn take_any( + &mut self, + rid: ResourceId, + ) -> Result, Error> { + self.index.remove(&rid).ok_or_else(bad_resource_id) + } + + /// Removes the resource with the given `rid` from the resource table. If the + /// only reference to this resource existed in the resource table, this will + /// cause the resource to be dropped. However, since resources are reference + /// counted, therefore pending ops are not automatically cancelled. A resource + /// may implement the `close()` method to perform clean-ups such as canceling + /// ops. + #[deprecated = "This method may deadlock. Use take() and close() instead."] + pub fn close(&mut self, rid: ResourceId) -> Result<(), Error> { + self + .index + .remove(&rid) + .ok_or_else(bad_resource_id) + .map(|resource| resource.close()) + } + + /// Returns an iterator that yields a `(id, name)` pair for every resource + /// that's currently in the resource table. This can be used for debugging + /// purposes or to implement the `op_resources` op. Note that the order in + /// which items appear is not specified. + /// + /// # Example + /// + /// ``` + /// # use deno_core::ResourceTable; + /// # let resource_table = ResourceTable::default(); + /// let resource_names = resource_table.names().collect::>(); + /// ``` + pub fn names(&self) -> impl Iterator)> { + self + .index + .iter() + .map(|(&id, resource)| (id, resource.name())) + } + + /// Retrieves the [`ResourceHandleFd`] for a given resource, for potential optimization + /// purposes within ops. + pub fn get_fd(&self, rid: ResourceId) -> Result { + let Some(handle) = self.get_any(rid)?.backing_handle() else { + return Err(bad_resource_id()); + }; + let Some(fd) = handle.as_fd_like() else { + return Err(bad_resource_id()); + }; + if !handle.is_valid() { + return Err(custom_error("ReferenceError", "null or invalid handle")); + } + Ok(fd) + } + + /// Retrieves the [`ResourceHandleSocket`] for a given resource, for potential optimization + /// purposes within ops. + pub fn get_socket( + &self, + rid: ResourceId, + ) -> Result { + let Some(handle) = self.get_any(rid)?.backing_handle() else { + return Err(bad_resource_id()); + }; + let Some(socket) = handle.as_socket_like() else { + return Err(bad_resource_id()); + }; + if !handle.is_valid() { + return Err(custom_error("ReferenceError", "null or invalid handle")); + } + Ok(socket) + } + + /// Retrieves the [`ResourceHandle`] for a given resource, for potential optimization + /// purposes within ops. + pub fn get_handle( + &self, + rid: ResourceId, + ) -> ::std::result::Result { + let Some(handle) = self.get_any(rid)?.backing_handle() else { + return Err(bad_resource_id()); + }; + if !handle.is_valid() { + return Err(custom_error("ReferenceError", "null or invalid handle")); + } + Ok(handle) + } +} diff --git a/core/lib.rs b/core/lib.rs index 7dfa21f68..db9eeea00 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -2,7 +2,6 @@ pub mod arena; mod async_cancel; mod async_cell; -mod buffer_strategy; pub mod cppgc; pub mod error; mod error_codes; @@ -23,7 +22,6 @@ mod ops_builtin_types; mod ops_builtin_v8; mod ops_metrics; mod path; -mod resources; mod runtime; mod source_map; mod tasks; @@ -82,8 +80,16 @@ pub use crate::inspector::InspectorMsgKind; pub use crate::inspector::InspectorSessionProxy; pub use crate::inspector::JsRuntimeInspector; pub use crate::inspector::LocalInspectorSession; +pub use crate::io::AsyncResult; pub use crate::io::BufMutView; +pub use crate::io::BufMutViewWhole; pub use crate::io::BufView; +pub use crate::io::Resource; +pub use crate::io::ResourceHandle; +pub use crate::io::ResourceHandleFd; +pub use crate::io::ResourceHandleSocket; +pub use crate::io::ResourceId; +pub use crate::io::ResourceTable; pub use crate::io::WriteOutcome; pub use crate::module_specifier::resolve_import; pub use crate::module_specifier::resolve_path; @@ -124,13 +130,6 @@ pub use crate::ops_metrics::OpMetricsSource; pub use crate::ops_metrics::OpMetricsSummary; pub use crate::ops_metrics::OpMetricsSummaryTracker; pub use crate::path::strip_unc_prefix; -pub use crate::resources::AsyncResult; -pub use crate::resources::Resource; -pub use crate::resources::ResourceHandle; -pub use crate::resources::ResourceHandleFd; -pub use crate::resources::ResourceHandleSocket; -pub use crate::resources::ResourceId; -pub use crate::resources::ResourceTable; pub use crate::runtime::stats; pub use crate::runtime::CompiledWasmModuleStore; pub use crate::runtime::CreateRealmOptions; diff --git a/core/ops.rs b/core/ops.rs index 81f5b14b6..49fe3348a 100644 --- a/core/ops.rs +++ b/core/ops.rs @@ -3,8 +3,8 @@ use crate::error::AnyError; use crate::error::GetErrorClassFn; use crate::gotham_state::GothamState; +use crate::io::ResourceTable; use crate::ops_metrics::OpMetricsFn; -use crate::resources::ResourceTable; use crate::runtime::ContextState; use crate::runtime::JsRuntimeState; use crate::FeatureChecker; diff --git a/core/ops_builtin.rs b/core/ops_builtin.rs index 913287d9e..293ca784d 100644 --- a/core/ops_builtin.rs +++ b/core/ops_builtin.rs @@ -1,13 +1,13 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. -use crate::buffer_strategy::AdaptiveBufferStrategy; use crate::error::format_file_name; use crate::error::type_error; +use crate::io::AdaptiveBufferStrategy; use crate::io::BufMutView; use crate::io::BufView; +use crate::io::ResourceId; use crate::op2; use crate::ops_builtin_types; use crate::ops_builtin_v8; -use crate::resources::ResourceId; use crate::JsBuffer; use crate::OpState; use crate::Resource; diff --git a/testing/checkin/runner/mod.rs b/testing/checkin/runner/mod.rs index aafb13bf5..dc1eab467 100644 --- a/testing/checkin/runner/mod.rs +++ b/testing/checkin/runner/mod.rs @@ -21,6 +21,7 @@ use self::testing::TestFunctions; mod ops; mod ops_async; mod ops_buffer; +mod ops_io; mod testing; mod ts_module_loader; @@ -34,6 +35,7 @@ deno_core::extension!( ops::op_stats_diff, ops::op_stats_dump, ops::op_stats_delete, + ops_io::op_pipe_create, ops_async::op_async_yield, ops_async::op_async_barrier_create, ops_async::op_async_barrier_await, diff --git a/testing/checkin/runner/ops_io.rs b/testing/checkin/runner/ops_io.rs new file mode 100644 index 000000000..33100b5bd --- /dev/null +++ b/testing/checkin/runner/ops_io.rs @@ -0,0 +1,67 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +use deno_core::op2; +use deno_core::AsyncRefCell; +use deno_core::BufView; +use deno_core::OpState; +use deno_core::RcRef; +use deno_core::Resource; +use deno_core::ResourceId; +use deno_core::WriteOutcome; +use futures::FutureExt; +use tokio::io::AsyncReadExt; +use tokio::io::AsyncWriteExt; +use tokio::io::DuplexStream; +use tokio::io::ReadHalf; +use tokio::io::WriteHalf; + +struct PipeResource { + tx: AsyncRefCell>, + rx: AsyncRefCell>, +} + +impl Resource for PipeResource { + fn read_byob( + self: std::rc::Rc, + mut buf: deno_core::BufMutView, + ) -> deno_core::AsyncResult<(usize, deno_core::BufMutView)> { + async { + let mut lock = RcRef::map(self, |this| &this.rx).borrow_mut().await; + // Note that we're holding a slice across an await point, so this code is very much not safe + let res = lock.read(&mut buf).await?; + Ok((res, buf)) + } + .boxed_local() + } + + fn write( + self: std::rc::Rc, + buf: BufView, + ) -> deno_core::AsyncResult { + async { + let mut lock = RcRef::map(self, |this| &this.tx).borrow_mut().await; + let nwritten = lock.write(&buf).await?; + Ok(WriteOutcome::Partial { + nwritten, + view: buf, + }) + } + .boxed_local() + } +} + +#[op2] +#[serde] +pub fn op_pipe_create(op_state: &mut OpState) -> (ResourceId, ResourceId) { + let (s1, s2) = tokio::io::duplex(1024); + let (rx1, tx1) = tokio::io::split(s1); + let (rx2, tx2) = tokio::io::split(s2); + let rid1 = op_state.resource_table.add(PipeResource { + rx: AsyncRefCell::new(rx1), + tx: AsyncRefCell::new(tx1), + }); + let rid2 = op_state.resource_table.add(PipeResource { + rx: AsyncRefCell::new(rx2), + tx: AsyncRefCell::new(tx2), + }); + (rid1, rid2) +} diff --git a/testing/lib.rs b/testing/lib.rs index 69c0873f1..9a149a01d 100644 --- a/testing/lib.rs +++ b/testing/lib.rs @@ -37,6 +37,7 @@ unit_test!( microtask_test, ops_async_test, ops_buffer_test, + resource_test, serialize_deserialize_test, stats_test, tc39_test, diff --git a/testing/unit/resource_test.ts b/testing/unit/resource_test.ts new file mode 100644 index 000000000..37f69c03c --- /dev/null +++ b/testing/unit/resource_test.ts @@ -0,0 +1,22 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +import { assertArrayEquals, assertEquals, test } from "checkin:testing"; + +const { op_pipe_create } = Deno.core.ensureFastOps(); + +test(async function testPipe() { + const [p1, p2] = op_pipe_create(); + assertEquals(3, await Deno.core.write(p1, new Uint8Array([1, 2, 3]))); + const buf = new Uint8Array(10); + assertEquals(3, await Deno.core.read(p2, buf)); + assertArrayEquals(buf.subarray(0, 3), [1, 2, 3]); +}); + +test(async function testPipeSmallRead() { + const [p1, p2] = op_pipe_create(); + assertEquals(6, await Deno.core.write(p1, new Uint8Array([1, 2, 3, 4, 5, 6]))); + const buf = new Uint8Array(1); + for (let i = 1; i <= 6; i++) { + assertEquals(1, await Deno.core.read(p2, buf)); + assertArrayEquals(buf.subarray(0), [i]); + } +});