From e42f30b70853bf74eaf88cd2e328c675656bffde Mon Sep 17 00:00:00 2001 From: Artem Egorkine Date: Thu, 26 Sep 2024 10:58:25 +0300 Subject: [PATCH] USB: rewrite data transfer using libusb async API To get consistent USB data transfers the USB code has been rewritten to use libusb bulk transfer API. Code based on https://github.com/mxk/burble/blob/8289cf0fc44087d483af11e06f3e8ed4fab235df/src/host/usb.rs#L414-L764 The main difference, however, is that I don't try to implement `Future` trait instead using a callback, which can decide to either drop the bulk tranfer buffer or resubmit it --- .idea/dictionaries/arteme.xml | 1 + Cargo.lock | 10 + usb/Cargo.toml | 1 + usb/src/dev_handler.rs | 273 ++++++++++----------- usb/src/endpoint.rs | 16 +- usb/src/lib.rs | 49 ++-- usb/src/usb.rs | 450 ++++++++++++++++++++++++++++++++++ 7 files changed, 614 insertions(+), 186 deletions(-) create mode 100644 usb/src/usb.rs diff --git a/.idea/dictionaries/arteme.xml b/.idea/dictionaries/arteme.xml index 043eb78..9b03395 100644 --- a/.idea/dictionaries/arteme.xml +++ b/.idea/dictionaries/arteme.xml @@ -9,6 +9,7 @@ flextone hotplug iface + libusb midir objs pixbuf diff --git a/Cargo.lock b/Cargo.lock index 7f94973..e1c7804 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1494,6 +1494,7 @@ dependencies = [ "pod-core", "regex", "rusb", + "structbuf", "tokio", ] @@ -1990,6 +1991,15 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" +[[package]] +name = "structbuf" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ff2db0e7acf4963782f6f45b1fbac2230f5b90493ca67a9955e9c691a007ab6" +dependencies = [ + "smallvec", +] + [[package]] name = "syn" version = "1.0.107" diff --git a/usb/Cargo.toml b/usb/Cargo.toml index f6f85b7..ca33d63 100644 --- a/usb/Cargo.toml +++ b/usb/Cargo.toml @@ -7,6 +7,7 @@ rust-version.workspace = true [dependencies] rusb = "0.9.4" +structbuf = "0.3.4" async-trait = "*" # defined in pod-core log = "*" # defined in pod-core diff --git a/usb/src/dev_handler.rs b/usb/src/dev_handler.rs index 3fe7ddb..877c101 100644 --- a/usb/src/dev_handler.rs +++ b/usb/src/dev_handler.rs @@ -5,51 +5,53 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Duration; use async_trait::async_trait; use log::{debug, error, info, trace}; -use rusb::{DeviceHandle, Direction, Error, TransferType, UsbContext}; +use rusb::{Direction, UsbContext}; use tokio::sync::mpsc; use pod_core::midi_io::{MidiIn, MidiOut}; use crate::devices::UsbDevice; use crate::endpoint::{configure_endpoint, Endpoint, find_endpoint}; use crate::line6::line6_read_serial; +use crate::usb::{DeviceHandle, SubmittedTransfer, Transfer, TransferCommand}; use crate::util::usb_address_string; -pub struct Device { +pub struct Device { pub name: String, - handle: Arc>, + handle: Arc, read_ep: Endpoint, write_ep: Endpoint, - inner: Weak> + inner: Weak } -pub struct DeviceInner { +pub struct DeviceInner { name: String, - handle: Arc>, + handle: Arc, write_ep: Endpoint, closed: Arc, + read: SubmittedTransfer } -pub struct DeviceInput { - inner: Arc>, +pub struct DeviceInput { + inner: Arc, rx: mpsc::UnboundedReceiver> } -pub struct DeviceOutput { - inner: Arc> +pub struct DeviceOutput { + inner: Arc } -pub struct DevHandler { - handle: Arc>, +pub struct DevHandler { + handle: Arc, read_ep: Endpoint, write_ep: Endpoint, tx: mpsc::UnboundedSender>, rx: mpsc::UnboundedReceiver> } -const READ_DURATION: Duration = Duration::from_millis(500); -const WRITE_DURATION: Duration = Duration::from_millis(1000); +const READ_DURATION: Duration = Duration::from_millis(10 * 1000); +const WRITE_DURATION: Duration = Duration::from_millis(10 * 1000); -impl Device { - pub fn new(handle: DeviceHandle, usb_dev: &UsbDevice) -> Result { +impl Device { + pub fn new(handle: DeviceHandle, usb_dev: &UsbDevice) -> Result { let serial = line6_read_serial(&handle).ok() .map(|s| format!(" {}", s)) .unwrap_or("".to_string()); @@ -77,7 +79,7 @@ impl Device { }) } - pub fn open(&mut self) -> Result<(DeviceInput, DeviceOutput)> { + pub fn open(&mut self) -> Result<(DeviceInput, DeviceOutput)> { if self.inner.upgrade().is_some() { bail!("Device already open") } @@ -105,132 +107,91 @@ impl Device { } } -impl DeviceInner { - fn new(name: String, handle: Arc>, +impl DeviceInner { + fn new(name: String, handle: Arc, read_ep: Endpoint, write_ep: Endpoint, tx: mpsc::UnboundedSender>) -> Self { - let has_kernel_driver = match handle.kernel_driver_active(read_ep.iface) { - Ok(true) => { - handle.detach_kernel_driver(read_ep.iface).ok(); - true - } - _ => false - }; - configure_endpoint(&handle, &read_ep).ok(); + //handle.detach_kernel_driver(read_ep.iface).ok(); + handle.detach_kernel_driver(read_ep.iface).ok(); + configure_endpoint(&handle, &read_ep).map_err(|e| { + error!("Failed to configure end-points: {}", e); + }).ok(); + configure_endpoint(&handle, &read_ep).map_err(|e| { + error!("Failed to configure end-points: {}", e); + }).ok(); let closed = Arc::new(AtomicBool::new(false)); - // libusb's reads DEFINITELY need to go on the blocking tasks queue - tokio::task::spawn_blocking({ - let name = name.clone(); - let handle = handle.clone(); - let closed = closed.clone(); - - move || { - debug!("USB read thread {:?} start", name); - - let mut buf = [0u8; 1024]; - let buf_ptr = buf.as_ptr(); - let mut read_ptr: &mut [u8] = &mut []; - let mut sysex = false; - - let mut reset_read_ptr = || unsafe { - std::slice::from_raw_parts_mut( - buf.as_mut_ptr(), - buf.len() - ) - }; - let mut advance_read_ptr = |len: usize| unsafe { - std::slice::from_raw_parts_mut( - read_ptr.as_mut_ptr().add(len), - read_ptr.len().checked_sub(len).unwrap_or(0) - ) - }; - read_ptr = reset_read_ptr(); - - while !closed.load(Ordering::Relaxed) { - let res = match read_ep.transfer_type { - TransferType::Bulk => { - handle.read_bulk(read_ep.address, &mut read_ptr, READ_DURATION) - } - TransferType::Interrupt => { - handle.read_interrupt(read_ep.address, &mut read_ptr, READ_DURATION) - } - tt => { - error!("Transfer type {:?} not supported!", tt); - break; - } - }; - match res { - Ok(len) => { - if len == 0 { continue; } // does this ever happen? - let start_read = read_ptr.as_ptr() == buf_ptr; - if start_read { - // correct PODxt lower nibble 0010 in command byte, see - // https://github.com/torvalds/linux/blob/8508fa2e7472f673edbeedf1b1d2b7a6bb898ecc/sound/usb/line6/midibuf.c#L148 - if read_ptr[0] == 0xb2 || read_ptr[0] == 0xc2 || read_ptr[0] == 0xf2 { - read_ptr[0] = read_ptr[0] & 0xf0; - } - - sysex = read_ptr[0] == 0xf0; - } - let mut b = read_ptr.chunks(len).next().unwrap(); - let sysex_done = sysex && b[b.len() - 1] == 0xf7; - let mark = match (start_read, sysex, sysex_done) { - (true, true, false) => &"<<-", - (false, true, false) => &"<--", - (false, true, true) => &"-<<", - _ => "<<" - }; - trace!("{} {:02x?} len={}", mark, &b, len); - - if sysex { - if !sysex_done { - // advance read_ptr - read_ptr = advance_read_ptr(len); - continue; - } - if !start_read { - // return full buffer - let len = read_ptr.as_ptr() as u64 - buf_ptr as u64 + len as u64; - b = buf.chunks(len as usize).next().unwrap(); - } - } - - match tx.send(b.to_vec()) { - Ok(_) => {} - Err(e) => { - error!("USB read thread tx failed: {}", e); - } - }; - } + const LEN: usize = 1024; + let mut read_buffer = [0u8; LEN]; + let mut read_offset = 0; + + let mut read_transfer = Transfer::new_bulk(&handle, read_ep.address, 1024); + read_transfer.set_timeout(READ_DURATION); + read_transfer.set_callback(move |buf| { + let Some(buf) = buf else { + // read transfer cancelled, nothing to do here + trace!("<< failed or cancelled"); + return TransferCommand::Drop // doesn't really matter what we return + }; + + if buf.len() == 0 { + // read timed out, continue + return TransferCommand::Resubmit + } + + // add received data to the read buffer at current read offset + let mut read_ptr = &mut read_buffer[read_offset .. read_offset + buf.len()]; + read_ptr.copy_from_slice(buf); + trace!("<< {:02x?} len={}", &read_ptr, read_ptr.len()); + + // go through the whole receive buffer from offset 0, check for + // for messages as send them to the MIDI thread + let process_len = read_offset + read_ptr.len(); + let mut process_buf = read_buffer[..process_len].as_mut(); + let mut process_offset = 0; + loop { + let process_buf = process_buf[process_offset .. process_len].as_mut(); + let buf = Self::find_message(process_buf); + if buf.len() > 0 { + // message found + trace!("<< msg {:02x?} len={}", &buf, buf.len()); + match tx.send(buf.to_vec()) { + Ok(_) => {} Err(e) => { - match e { - Error::Busy | Error::Timeout | Error::Overflow => { continue } - _ => { - error!("USB read failed: {}", e); - break - } - } + error!("USB read thread tx failed: {}", e); } - } + }; } - - handle.release_interface(read_ep.iface).ok(); - if has_kernel_driver { - handle.attach_kernel_driver(read_ep.iface).ok(); + process_offset += buf.len(); + if buf.len() == 0 || process_offset == process_len { break } + } + if process_offset > 0 { + // at least one message consumed + if process_buf.len() - process_offset > 0 { + // data left in the buffer, move it to the beginning of the read buffer + read_buffer.copy_within(process_offset .. process_len, 0); + read_offset = process_len - process_offset; + } else { + // all data consumed + read_offset = 0; } - - debug!("USB read thread {:?} finish", name); + } else { + // unfinished message, adjust read offset + read_offset = process_len; } + + TransferCommand::Resubmit }); + let read = read_transfer.submit().ok().unwrap(); DeviceInner { name, handle, closed, - write_ep + write_ep, + read } } @@ -239,40 +200,56 @@ impl DeviceInner { bail!("Device already closed"); } - trace!(">> {:02x?} len={}", bytes, bytes.len()); - // TODO: this write will stall the executioner for the max - // WRITE_DURATION if something goes wrong. Instead, - // this should go through a channel to a `tokio::task::spawn_blocking` - // TX thread similar to how the RX thread does libusb polling... - let res = match self.write_ep.transfer_type { - TransferType::Bulk => { - self.handle.write_bulk(self.write_ep.address, bytes, WRITE_DURATION) + let mut transfer = Transfer::new_bulk_with_data(&self.handle, self.write_ep.address, bytes); + transfer.set_timeout(WRITE_DURATION); + transfer.set_callback(|buf| { + if let Some(buf) = buf { + trace!(">> {:02x?} len={}", buf, buf.len()); + } else { + trace!(">> failed or cancelled"); } - TransferType::Interrupt => { - self.handle.write_interrupt(self.write_ep.address, bytes, WRITE_DURATION) - } - tt => { - bail!("Transfer type {:?} not supported!", tt); - } - }; - - res.map(|_| ()).map_err(|e| anyhow!("USB write failed: {}", e)) + TransferCommand::Drop + }); + transfer.submit() + .map(|_| ()) + .map_err(|e| anyhow!("USB write transfer failed: {}", e)) } - fn close(&self) { + fn close(&mut self) { self.closed.store(true, Ordering::Relaxed); + self.read.cancel().ok(); + } + + fn find_message(read_ptr: &mut [u8]) -> &[u8] { + // correct PODxt lower nibble 0010 in command byte, see + // https://github.com/torvalds/linux/blob/8508fa2e7472f673edbeedf1b1d2b7a6bb898ecc/sound/usb/line6/midibuf.c#L148 + if read_ptr[0] == 0xb2 || read_ptr[0] == 0xc2 || read_ptr[0] == 0xf2 { + read_ptr[0] = read_ptr[0] & 0xf0; + } + + let sysex = read_ptr[0] == 0xf0; + if sysex { + for i in 0 .. read_ptr.len() { + if read_ptr[i] == 0xf7 { + return &read_ptr[..i + 1]; + } + } + return &[]; + + } else { + return read_ptr; + } } } -impl Drop for DeviceInner { +impl Drop for DeviceInner { fn drop(&mut self) { - debug!("DeviceInner for {:?} dropped", &self.name); self.close(); } } #[async_trait] -impl MidiIn for DeviceInput { +impl MidiIn for DeviceInput { fn name(&self) -> String { self.inner.name.clone() } @@ -287,7 +264,7 @@ impl MidiIn for DeviceInput { } #[async_trait] -impl MidiOut for DeviceOutput { +impl MidiOut for DeviceOutput { fn name(&self) -> String { self.inner.name.clone() } diff --git a/usb/src/endpoint.rs b/usb/src/endpoint.rs index dc4dc3f..5b70793 100644 --- a/usb/src/endpoint.rs +++ b/usb/src/endpoint.rs @@ -2,6 +2,7 @@ use anyhow::*; use core::result::Result::Ok; +use log::error; use rusb::{Device, DeviceDescriptor, DeviceHandle, Direction, TransferType, UsbContext}; #[derive(Debug, Clone)] @@ -17,9 +18,18 @@ pub fn configure_endpoint( handle: &DeviceHandle, endpoint: &Endpoint, ) -> Result<()> { - handle.set_active_configuration(endpoint.config)?; - handle.claim_interface(endpoint.iface)?; - handle.set_alternate_setting(endpoint.iface, endpoint.setting)?; + handle.claim_interface(endpoint.iface).map_err(|e| { + error!("Claim interface 1 error: {}", e); + }).ok(); + handle.set_active_configuration(endpoint.config).map_err(|e| { + error!("Set active config error: {}", e); + }).ok(); + handle.claim_interface(endpoint.iface).map_err(|e| { + error!("Claim interface 2 error: {}", e); + }).ok(); + handle.set_alternate_setting(endpoint.iface, endpoint.setting).map_err(|e| { + error!("Set alt setting error: {}", e); + }).ok(); Ok(()) } diff --git a/usb/src/lib.rs b/usb/src/lib.rs index a1e3603..d3fba98 100644 --- a/usb/src/lib.rs +++ b/usb/src/lib.rs @@ -4,6 +4,7 @@ mod line6; mod dev_handler; mod endpoint; mod util; +mod usb; use log::{debug, error, info, trace}; use anyhow::*; @@ -15,7 +16,7 @@ use std::sync::{Arc, Mutex}; use std::sync::atomic::{AtomicBool, Ordering}; use once_cell::sync::Lazy; -use rusb::{Context, Device as UsbDevice, GlobalContext, Hotplug, HotplugBuilder, UsbContext}; +use rusb::{Context, Device as UsbDevice, Hotplug, UsbContext}; use tokio::sync::{broadcast, Notify}; use tokio::sync::broadcast::error::RecvError; use pod_core::midi_io::{MidiIn, MidiOut}; @@ -23,6 +24,7 @@ use regex::Regex; use crate::dev_handler::Device; use crate::devices::find_device; use crate::event::*; +use crate::usb::Usb; use crate::util::usb_address_string; struct HotplugHandler { @@ -86,7 +88,7 @@ static mut INIT_DONE: AtomicBool = AtomicBool::new(false); static INIT_DONE_NOTIFY: Lazy> = Lazy::new(|| { Arc::new(Notify::new()) }); -static DEVICES: Lazy>>>> = Lazy::new(|| { +static DEVICES: Lazy>>> = Lazy::new(|| { Arc::new(Mutex::new(HashMap::new())) }); @@ -104,28 +106,8 @@ pub fn usb_start() -> Result<()> { init_devices: Some(num_devices) }; hh.device_init_notify(0); - let hotplug = HotplugBuilder::new() - .enumerate(true) - .register(&ctx, Box::new(hh))?; - - // libusb's handle_events may need to go on the blocking tasks queue - tokio::task::spawn_blocking(move || { - info!("USB hotplug thread start"); - let mut reg = Some(hotplug); - loop { - match ctx.handle_events(None) { - Ok(_) => {} - Err(e) => { - error!("Error in USB hotplug thread: {}", e); - break; - } - } - } - if let Some(reg) = reg.take() { - ctx.unregister_callback(reg); - } - info!("USB hotplug thread finish"); - }); + + let usb = Usb::new(Box::new(hh))?; let devices = DEVICES.clone(); @@ -147,16 +129,13 @@ pub fn usb_start() -> Result<()> { match msg { UsbEvent::DeviceAdded(DeviceAddedEvent{ vid, pid, bus, address }) => { let usb_dev = find_device(vid, pid).unwrap(); - /* - let device_list = rusb::devices().unwrap(); - let dev = device_list.iter().find(|dev| { - let desc = dev.device_descriptor().unwrap(); - desc.vendor_id() == vid && desc.product_id() == pid - }).map(|dev| dev.open().unwrap()); - let Some(h) = dev; - */ - - let Some(h) = rusb::open_device_with_vid_pid(vid, pid) else { continue }; + let h = match usb.open(vid, pid) { + Ok(h) => { h } + Err(e) => { + error!("Failed to open device {:04x}{:04x}: {}", vid, pid, e); + continue + } + }; let handler = match Device::new(h, usb_dev) { Ok(h) => { h } Err(e) => { @@ -209,7 +188,7 @@ pub fn usb_list_devices() -> Vec { devices.values().map(|i| i.name.clone()).collect() } -fn usb_add_device(key: String, device: Device) { +fn usb_add_device(key: String, device: Device) { let mut devices = DEVICES.lock().unwrap(); devices.insert(key, device); } diff --git a/usb/src/usb.rs b/usb/src/usb.rs new file mode 100644 index 0000000..b61d240 --- /dev/null +++ b/usb/src/usb.rs @@ -0,0 +1,450 @@ +use anyhow::*; +use core::result::Result::Ok; +use std::ffi::{c_int, c_uint}; +use std::mem::align_of; +use std::ptr::{NonNull, null_mut}; +use std::sync::{Arc, Mutex}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::{ptr, thread}; +use std::time::Duration; +use log::{debug, error, info}; +use rusb::{Context, Hotplug, HotplugBuilder, Registration, UsbContext}; +use rusb::constants::{LIBUSB_ENDPOINT_DIR_MASK, LIBUSB_ENDPOINT_IN, LIBUSB_ENDPOINT_OUT, LIBUSB_ERROR_INTERRUPTED, LIBUSB_TRANSFER_CANCELLED, LIBUSB_TRANSFER_TYPE_BULK}; +use rusb::ffi::{libusb_alloc_transfer, libusb_cancel_transfer, libusb_free_transfer, libusb_submit_transfer, libusb_transfer}; +use crate::check; + +pub type Device = rusb::Device; +pub type DeviceHandle = rusb::DeviceHandle; + +pub struct Usb { + ctx: Context, + running: Arc, + thread: Option> +} + +impl Usb { + pub fn new(hotplug: Box>) -> Result { + let ctx = libusb::new_ctx(rusb::constants::LIBUSB_LOG_LEVEL_INFO)?; + let running = Arc::new(AtomicBool::new(true)); + + let hotplug= HotplugBuilder::new() + .enumerate(true) + .register(&ctx, hotplug)?; + + let thread = { + let (ctx, run) = (ctx.clone(), Arc::clone(&running)); + Some(thread::spawn(move || Self::event_thread(ctx, run, hotplug))) + }; + Ok(Self { ctx, running, thread }) + } + + pub fn close(&mut self) { + self.running.store(false, Ordering::Release); + self.ctx.interrupt_handle_events(); + self.thread.take().map(thread::JoinHandle::join); + } + + /// Convenience function for opening the first device with the matching + /// Vendor/Product ID. + pub fn open(&self, vid: u16, pid: u16) -> Result { + info!("Opening {:04X}:{:04X}", vid, pid); + /* + let hdl = libusb::reset( + self.ctx.open_device_with_vid_pid(vid, pid) + .ok_or(rusb::Error::NotFound)?, + )?; + */ + let hdl = self.ctx.open_device_with_vid_pid(vid, pid).unwrap(); + Ok(hdl) + } + + /// Dedicated thread for async transfer and hotplug events. + fn event_thread(ctx: Context, run: Arc, hotplug: Registration) { + debug!("USB event thread start"); + while run.load(Ordering::Acquire) { + if let Err(e) = ctx.handle_events(None) { + // TODO: Stop all transfers? + error!("Event thread error: {e}"); + break; + } + } + + ctx.unregister_callback(hotplug); + debug!("USB event thread finish"); + } +} + +impl Drop for Usb { + fn drop(&mut self) { + self.close(); + } +} + +pub enum TransferCommand { + Resubmit, + Drop +} + +pub enum TransferStatus { + Ok, + Error(rusb::Error), + Cancel +} + +struct TransferInner(NonNull); + +impl TransferInner { + pub fn new() -> Option { + NonNull::new(unsafe { libusb_alloc_transfer(0) }) + //.map(|inner| {debug!("new inner={:?}", inner); inner}) + .map(|inner| Self(inner)) + } + + #[inline] + pub const fn as_ptr(&self) -> *mut libusb_transfer { + self.0.as_ptr() + } + + /// Returns a shared reference to the `libusb_transfer` struct. + #[inline] + pub const fn as_ref(&self) -> &libusb_transfer { + // SAFETY: new() ensures that inner can be converted to a reference + unsafe { self.0.as_ref() } + } + + /// Returns a mutable reference to the `libusb_transfer` struct. + #[inline] + pub fn as_mut(&mut self) -> &mut libusb_transfer { + // SAFETY: new() ensures that inner can be converted to a reference + unsafe { self.0.as_mut() } + } +} + +/// Async transfer +pub struct Transfer +{ + inner: TransferInner, + handle: Arc, + buf: Vec, + status: Arc>, + callback: Option) -> TransferCommand + Send>> +} + +/// A submitted transfer that will update the result of the +/// original [Transfer] when the submit callback is called +pub struct SubmittedTransfer { + status: Arc>, + inner: *mut libusb_transfer +} + +unsafe impl Sync for SubmittedTransfer {} +unsafe impl Send for SubmittedTransfer {} + +impl Transfer +{ + pub fn new_bulk(handle: &Arc, endpoint: u8, len: usize) -> Box { + Self::new(handle, LIBUSB_TRANSFER_TYPE_BULK, endpoint, len) + } + + pub fn new_bulk_with_data(handle: &Arc, endpoint: u8, data: &[u8]) -> Box { + let mut t = Self::new(handle, LIBUSB_TRANSFER_TYPE_BULK, endpoint, data.len()); + unsafe { t.buf.set_len(data.len()); } + t.buf.copy_from_slice(data); + + t + } + + fn new(handle: &Arc, transfer_type: u8, endpoint: u8, len: usize) -> Box { + let inner = TransferInner::new() + .expect("failed to allocate libusb_transfer struct"); + assert_eq!(inner.0.as_ptr() as usize % align_of::(), 0); + let buf = Vec::with_capacity(len); + let status = Arc::new(Mutex::new(TransferStatus::Ok)); + let mut transfer = Self { + inner, buf, handle: handle.clone(), + status, callback: None + }; + + let mut inner = transfer.inner.as_mut(); + inner.endpoint = endpoint; + inner.transfer_type = transfer_type; + inner.callback = Self::callback; + + Box::new(transfer) + } + + pub fn set_callback) -> TransferCommand + Send + 'static>(&mut self, cb: F) { + self.callback = Some(Box::new(cb)) + } + + pub fn set_timeout(&mut self, timeout: Duration) { + let inner = self.inner.as_mut(); + inner.timeout = c_uint::try_from(timeout.as_millis()).unwrap_or(c_uint::MAX); + } + + pub fn submit(mut self: Box) -> Result { + let buf_ptr = self.buf.as_mut_ptr(); + let buf_len = match self.inner.as_ref().endpoint & LIBUSB_ENDPOINT_DIR_MASK { + LIBUSB_ENDPOINT_OUT => self.buf.len(), + LIBUSB_ENDPOINT_IN => self.buf.capacity(), + _ => unreachable!(), + }; + let dev_handle = self.handle.as_raw(); + let inner = self.inner.as_mut(); + + inner.dev_handle = dev_handle; + inner.length = c_int::try_from(buf_len).unwrap(); + inner.buffer = buf_ptr; + + let inner = self.inner.as_ptr(); + let status = self.status.clone(); + let raw = Box::into_raw(self); + // SAFETY: inner is a valid pointer + unsafe { (*inner).user_data = raw.cast() }; + + { + //println!("submitting transfer={raw:?} inner={inner:?}"); + let mut status = status.lock().unwrap(); + Transfer::submit_inner(raw, inner, &mut status)?; + } + Ok(SubmittedTransfer { + status, + inner + }) + } + + fn submit_inner(raw: *mut Self, inner: *mut libusb_transfer, status: &mut TransferStatus) -> Result<()> { + if let Err(e) = check!(libusb_submit_transfer(inner)) { + *status = TransferStatus::Error(e); + Self::callback_inner(raw, inner, status); + return Err(e.into()); + } + Ok(()) + } + + fn callback_inner(raw: *mut Self, inner: *mut libusb_transfer, status: &mut TransferStatus) { + // SAFETY: raw is a valid reference and we have exclusive access + let t = unsafe { &mut *raw }; + let inner_ptr = inner; + let inner = t.inner.as_mut(); + + //println!("callback transfer={raw:?} inner={inner_ptr:?}"); + let command = match (&status, t.callback.as_mut()) { + (TransferStatus::Cancel, Some(cb)) => { + // Transfer was cancelled + cb(None); + TransferCommand::Drop + } + (TransferStatus::Error(_), _) | (TransferStatus::Cancel, _) => { + // Transfer submit failed or cancelled without callback + TransferCommand::Drop + } + (TransferStatus::Ok, Some(cb)) => { + // SAFETY: buffer allocated with capacity >= actual_length in submit() + let buf = unsafe { std::slice::from_raw_parts( + inner.buffer, + inner.actual_length as usize + ) }; + cb(Some(buf)) + } + (TransferStatus::Ok, None) => { + // Transfer successful, but no callback + TransferCommand::Drop + } + }; + match command { + TransferCommand::Resubmit => { + Transfer::submit_inner(raw, inner_ptr, status).ok(); + } + TransferCommand::Drop => { + inner.dev_handle = null_mut(); + inner.user_data = null_mut(); + *status = TransferStatus::Cancel; + // SAFETY: We have the only pointer to the original Transfer + drop(unsafe { Box::from_raw(t as _) }); + } + } + + } + + /// Handles transfer completion callback. + extern "system" fn callback(inner: *mut libusb_transfer) { + let r = std::panic::catch_unwind(|| { + + // SAFETY: user_data was set in submit() + let raw: *mut Transfer = unsafe { (*inner).user_data.cast() }; + let Some(t) = (unsafe { raw.as_ref() }) else { return }; + + let mut status = t.status.lock().unwrap(); + Transfer::callback_inner(raw, inner, &mut status); + }); + if let Err(e) = r { + eprintln!("libusb_transfer callback panic: {e:?}"); + std::process::abort(); + } + } +} + +impl SubmittedTransfer { + pub fn cancel(&mut self) -> Result<()> { + let mut status = self.status.lock().unwrap(); + match *status { + TransferStatus::Ok => { + *status = TransferStatus::Cancel; + check!(libusb_cancel_transfer(self.inner)).map_err(|e| e.into()) + } + _ => { Ok(()) } + } + } +} + +impl Drop for Transfer { + fn drop(&mut self) { + //println!("drop inner={:?}", self.inner.as_ptr()); + + // SAFETY: C API call, inner can be null + unsafe { libusb_free_transfer(self.inner.as_ptr()) } + } +} + +pub mod libusb { + use std::ffi::{c_char, c_int, c_void, CStr}; + use std::ptr::null_mut; + use std::sync::Once; + use log::{debug, error, info, trace, warn}; + use rusb::constants::*; + use rusb::{Context, DeviceHandle, Error, UsbContext}; + use rusb::ffi::{libusb_context, libusb_set_log_cb, libusb_set_option}; + + #[macro_export] + macro_rules! check { + ($x:expr) => { + // SAFETY: C API call + match unsafe { $x } { + LIBUSB_SUCCESS => Ok(()), + e => Err($crate::usb::libusb::from_libusb(e)), + } + }; + } + + /// Initializes libusb. + fn init_lib() { + static INIT: Once = Once::new(); + // SAFETY: C API calls + INIT.call_once(|| unsafe { + let v = rusb::version(); + info!( + "libusb version: {}.{}.{}.{}{}", + v.major(), + v.minor(), + v.micro(), + v.nano(), + v.rc().unwrap_or("") + ); + debug!("- LIBUSB_CAP_HAS_CAPABILITY = {}", rusb::has_capability()); + debug!("- LIBUSB_CAP_HAS_HOTPLUG = {}", rusb::has_hotplug()); + debug!( + "- LIBUSB_CAP_SUPPORTS_DETACH_KERNEL_DRIVER = {}", + rusb::supports_detach_kernel_driver() + ); + libusb_set_log_cb(null_mut(), Some(log_cb), LIBUSB_LOG_CB_GLOBAL); + let rc = libusb_set_option(null_mut(), LIBUSB_OPTION_LOG_LEVEL, LIBUSB_LOG_LEVEL_DEBUG); + if rc != LIBUSB_SUCCESS { + warn!("Failed to enable libusb logging"); + } + }); + } + + /// Creates a new libusb context. + pub(super) fn new_ctx(max_log_level: c_int) -> rusb::Result { + init_lib(); + let ctx = Context::new()?; + if cfg!(windows) { + match check!(libusb_set_option(ctx.as_raw(), LIBUSB_OPTION_USE_USBDK)) { + Ok(()) => info!("Using UsbDk backend"), + Err(Error::NotFound) => info!("Using WinUSB backend"), + Err(e) => return Err(e), + } + } + check!(libusb_set_option( + ctx.as_raw(), + LIBUSB_OPTION_LOG_LEVEL, + max_log_level, + ))?; + Ok(ctx) + } + + /// Resets the specified device handle. + pub(super) fn reset(mut hdl: DeviceHandle) -> rusb::Result> { + let dev = hdl.device(); + let port = dev.port_numbers()?; + // WinUSB API with libusbK driver requires interface 0 to be claimed in + // order to perform an actual device reset: + // https://github.com/libusb/libusb/issues/1261 + if let Err(e) = hdl.claim_interface(0) { + warn!("Failed to claim interface 0 before reset: {e}"); + } + info!("Resetting {dev:?}"); + let ctx = match hdl.reset() { + Ok(_) => return Ok(hdl), + Err(Error::NotFound) => { + let ctx = hdl.context().clone(); + drop(hdl); + ctx + } + Err(e) => return Err(e), + }; + info!("Attempting to re-open device"); + let all = ctx.devices()?; + for dev in all.iter() { + match dev.port_numbers() { + Ok(p) if p == port => return dev.open(), + _ => {} + } + } + error!("Failed to find device after reset"); + Err(Error::NoDevice) + } + + /// Converts libusb error code to [`Error`]. From `rusb-0.9.4/src/error.rs` + pub(crate) const fn from_libusb(rc: c_int) -> Error { + match rc { + LIBUSB_ERROR_IO => Error::Io, + LIBUSB_ERROR_INVALID_PARAM => Error::InvalidParam, + LIBUSB_ERROR_ACCESS => Error::Access, + LIBUSB_ERROR_NO_DEVICE => Error::NoDevice, + LIBUSB_ERROR_NOT_FOUND => Error::NotFound, + LIBUSB_ERROR_BUSY => Error::Busy, + LIBUSB_ERROR_TIMEOUT => Error::Timeout, + LIBUSB_ERROR_OVERFLOW => Error::Overflow, + LIBUSB_ERROR_PIPE => Error::Pipe, + LIBUSB_ERROR_INTERRUPTED => Error::Interrupted, + LIBUSB_ERROR_NO_MEM => Error::NoMem, + LIBUSB_ERROR_NOT_SUPPORTED => Error::NotSupported, + LIBUSB_ERROR_OTHER | _ => Error::Other, + } + } + + extern "system" fn log_cb(_: *mut libusb_context, lvl: c_int, msg: *mut c_void) { + let r = std::panic::catch_unwind(|| { + // SAFETY: msg is a valid C string + let orig = unsafe { CStr::from_ptr(msg as *const c_char) }.to_string_lossy(); + let msg = match orig.as_ref().split_once("libusb: ") { + Some((_, tail)) => tail.trim_end(), + _ => return, // Debug header (see log_v() in libusb/core.c) + }; + match lvl { + LIBUSB_LOG_LEVEL_ERROR => error!("{}", msg.trim_start_matches("error ")), + LIBUSB_LOG_LEVEL_WARNING => warn!("{}", msg.trim_start_matches("warning ")), + LIBUSB_LOG_LEVEL_INFO => debug!("{}", msg.trim_start_matches("info ")), + LIBUSB_LOG_LEVEL_DEBUG => trace!("{}", msg.trim_start_matches("debug ")), + _ => trace!("{}", msg.trim_start_matches("unknown ")), + } + }); + if let Err(e) = r { + eprintln!("libusb log callback panic: {e:?}"); + std::process::abort(); + } + } + +} \ No newline at end of file