From 29c2abbdac3c29145080a7e3940b0cf942961dae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20J=C3=B6rdens?= Date: Thu, 11 Jul 2024 11:12:54 +0200 Subject: [PATCH 1/4] heapless 0.8 --- Cargo.lock | 3 ++- Cargo.toml | 2 +- src/net/data_stream.rs | 51 ++++++++++++++++++++++++------------------ src/net/telemetry.rs | 4 ++-- src/settings.rs | 8 +++---- 5 files changed, 38 insertions(+), 30 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3c311c117..a7e0657fc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -419,6 +419,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0bfb9eb618601c89945a70e254898da93b13be0388091d42117462b265bb3fad" dependencies = [ "hash32 0.3.1", + "serde", "stable_deref_trait", ] @@ -1320,7 +1321,7 @@ dependencies = [ "embedded-storage-async", "enum-iterator", "fugit", - "heapless 0.7.17", + "heapless 0.8.0", "idsp", "lm75", "log", diff --git a/Cargo.toml b/Cargo.toml index a45c63d58..510341041 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,7 +43,7 @@ log = { version = "0.4", features = ["max_level_trace", "release_max_level_info" rtt-target = "0.3" serde = { version = "1.0", features = ["derive"], default-features = false } serde-json-core = "0.5" -heapless = { version = "0.7.16", features = ["serde"] } +heapless = { version = "0.8", features = ["serde"] } rtic = { version = "2.1", features = ["thumbv7-backend"] } rtic-monotonics = { version = "2.0", features = ["cortex-m-systick"] } embedded-hal = "0.2.7" diff --git a/src/net/data_stream.rs b/src/net/data_stream.rs index 9f1d4dd12..03ac15146 100644 --- a/src/net/data_stream.rs +++ b/src/net/data_stream.rs @@ -22,9 +22,13 @@ //! # Example //! A sample Python script is available in `scripts/stream_throughput.py` to demonstrate reception //! of livestreamed data. + +#![allow(non_camel_case_types)] // https://github.com/rust-embedded/heapless/issues/411 + use core::{fmt::Write, mem::MaybeUninit}; use heapless::{ - pool::{Box, Init, Pool, Uninit}, + box_pool, + pool::boxed::{Box, BoxBlock}, spsc::{Consumer, Producer, Queue}, String, }; @@ -57,6 +61,8 @@ const FRAME_QUEUE_SIZE: usize = FRAME_COUNT * 2; type Frame = [MaybeUninit; FRAME_SIZE]; +box_pool!(FRAME_POOL: Frame); + /// Represents the destination for the UDP stream to send data to. /// /// # Miniconf @@ -142,35 +148,37 @@ pub fn setup_streaming( .unwrap(); let (producer, consumer) = queue.split(); - let frame_pool = cortex_m::singleton!(: Pool = Pool::new()).unwrap(); + #[allow(clippy::declare_interior_mutable_const)] + const FRAME: BoxBlock = BoxBlock::new(); + let memory = + cortex_m::singleton!(FRAME_DATA: [BoxBlock; FRAME_COUNT] = + [FRAME; FRAME_COUNT]) + .unwrap(); - let memory = cortex_m::singleton!(FRAME_DATA: [u8; core::mem::size_of::() * FRAME_SIZE * FRAME_COUNT] = - [0; core::mem::size_of::() * FRAME_SIZE * FRAME_COUNT]).unwrap(); - - frame_pool.grow(memory); + for block in memory.iter_mut() { + FRAME_POOL.manage(block); + } - let generator = FrameGenerator::new(producer, frame_pool); + let generator = FrameGenerator::new(producer, &FRAME_POOL); - let stream = DataStream::new(stack, consumer, frame_pool); + let stream = DataStream::new(stack, consumer); (generator, stream) } #[derive(Debug)] struct StreamFrame { - buffer: Box, + buffer: Box, offset: usize, batches: u8, } impl StreamFrame { pub fn new( - buffer: Box, + mut buffer: Box, format_id: u8, sequence_number: u32, ) -> Self { - let mut buffer = buffer.init([MaybeUninit::uninit(); FRAME_SIZE]); - for (byte, buf) in MAGIC .to_le_bytes() .iter() @@ -211,7 +219,7 @@ impl StreamFrame { /// The data generator for a stream. pub struct FrameGenerator { queue: Producer<'static, StreamFrame, FRAME_QUEUE_SIZE>, - pool: &'static Pool, + pool: &'static FRAME_POOL, current_frame: Option, sequence_number: u32, format: u8, @@ -220,7 +228,7 @@ pub struct FrameGenerator { impl FrameGenerator { fn new( queue: Producer<'static, StreamFrame, FRAME_QUEUE_SIZE>, - pool: &'static Pool, + pool: &'static FRAME_POOL, ) -> Self { Self { queue, @@ -248,7 +256,7 @@ impl FrameGenerator { /// # Args /// * `f` - A closure that will be provided the buffer to write batch data into. /// Returns the number of bytes written. - pub fn add(&mut self, f: F) + pub fn add(&mut self, func: F) where F: FnMut(&mut [MaybeUninit]) -> usize, { @@ -256,7 +264,9 @@ impl FrameGenerator { self.sequence_number = self.sequence_number.wrapping_add(1); if self.current_frame.is_none() { - if let Some(buffer) = self.pool.alloc() { + if let Ok(buffer) = + self.pool.alloc([MaybeUninit::uninit(); FRAME_SIZE]) + { self.current_frame.replace(StreamFrame::new( buffer, self.format, @@ -270,7 +280,7 @@ impl FrameGenerator { // Note(unwrap): We ensure the frame is present above. let current_frame = self.current_frame.as_mut().unwrap(); - let len = current_frame.add_batch(f); + let len = current_frame.add_batch(func); if current_frame.is_full(len) { // Note(unwrap): The queue is designed to be at least as large as the frame buffer @@ -290,7 +300,6 @@ pub struct DataStream { stack: NetworkReference, socket: Option<::UdpSocket>, queue: Consumer<'static, StreamFrame, FRAME_QUEUE_SIZE>, - frame_pool: &'static Pool, remote: StreamTarget, } @@ -304,14 +313,12 @@ impl DataStream { fn new( stack: NetworkReference, consumer: Consumer<'static, StreamFrame, FRAME_QUEUE_SIZE>, - frame_pool: &'static Pool, ) -> Self { Self { stack, socket: None, remote: StreamTarget::default(), queue: consumer, - frame_pool, } } @@ -366,7 +373,7 @@ impl DataStream { if self.open().is_ok() { // If we just successfully opened the socket, flush old data from queue. while let Some(frame) = self.queue.dequeue() { - self.frame_pool.free(frame.buffer); + drop(frame.buffer); } } } @@ -401,7 +408,7 @@ impl DataStream { log::warn!("Unexpected UDP error during data stream: {other:?}"); } } - self.frame_pool.free(frame.buffer) + drop(frame.buffer) } } } diff --git a/src/net/telemetry.rs b/src/net/telemetry.rs index 1b8999070..c8eb536b4 100644 --- a/src/net/telemetry.rs +++ b/src/net/telemetry.rs @@ -139,7 +139,7 @@ impl TelemetryClient { /// # Args /// * `telemetry` - The telemetry to report pub fn publish(&mut self, telemetry: &T) { - let mut topic: String<128> = self.prefix.into(); + let mut topic: String<128> = self.prefix.try_into().unwrap(); topic.push_str("/telemetry").unwrap(); self.mqtt @@ -189,7 +189,7 @@ impl TelemetryClient { .. } = self; - let mut topic: String<128> = self.prefix.into(); + let mut topic: String<128> = self.prefix.try_into().unwrap(); topic.push_str("/meta").unwrap(); if mqtt diff --git a/src/settings.rs b/src/settings.rs index 466673b44..6e48cbacd 100644 --- a/src/settings.rs +++ b/src/settings.rs @@ -59,8 +59,8 @@ impl NetSettings { write!(&mut id, "{mac}").unwrap(); Self { - broker: "mqtt".into(), - ip: "0.0.0.0".into(), + broker: "mqtt".try_into().unwrap(), + ip: "0.0.0.0".try_into().unwrap(), id, mac, } @@ -262,7 +262,7 @@ where }; if let Some(key) = key { - save_setting(String::from(key).into())?; + save_setting(String::try_from(key).unwrap().into())?; } else { for path in Self::Settings::nodes() { save_setting(path.unwrap().0)?; @@ -362,7 +362,7 @@ where }; if let Some(key) = key { - erase_setting(String::from(key).into()).unwrap(); + erase_setting(String::try_from(key).unwrap().into()).unwrap(); } else { for path in Self::Settings::nodes() { erase_setting(path.unwrap().0).unwrap(); From 0ee84bf9dd32f350c1cc02649cdb113abb711f19 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20J=C3=B6rdens?= Date: Thu, 11 Jul 2024 11:21:30 +0200 Subject: [PATCH 2/4] simplify pool --- src/net/data_stream.rs | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/src/net/data_stream.rs b/src/net/data_stream.rs index 03ac15146..5ebe6fb1d 100644 --- a/src/net/data_stream.rs +++ b/src/net/data_stream.rs @@ -159,7 +159,7 @@ pub fn setup_streaming( FRAME_POOL.manage(block); } - let generator = FrameGenerator::new(producer, &FRAME_POOL); + let generator = FrameGenerator::new(producer); let stream = DataStream::new(stack, consumer); @@ -219,20 +219,15 @@ impl StreamFrame { /// The data generator for a stream. pub struct FrameGenerator { queue: Producer<'static, StreamFrame, FRAME_QUEUE_SIZE>, - pool: &'static FRAME_POOL, current_frame: Option, sequence_number: u32, format: u8, } impl FrameGenerator { - fn new( - queue: Producer<'static, StreamFrame, FRAME_QUEUE_SIZE>, - pool: &'static FRAME_POOL, - ) -> Self { + fn new(queue: Producer<'static, StreamFrame, FRAME_QUEUE_SIZE>) -> Self { Self { queue, - pool, format: StreamFormat::Unknown.into(), current_frame: None, sequence_number: 0, @@ -265,7 +260,7 @@ impl FrameGenerator { if self.current_frame.is_none() { if let Ok(buffer) = - self.pool.alloc([MaybeUninit::uninit(); FRAME_SIZE]) + FRAME_POOL.alloc([MaybeUninit::uninit(); FRAME_SIZE]) { self.current_frame.replace(StreamFrame::new( buffer, From 0e9599c1a8c0de3a52fae87f7f77338ca579acc5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20J=C3=B6rdens?= Date: Thu, 11 Jul 2024 11:22:49 +0200 Subject: [PATCH 3/4] shrink frame queue --- src/net/data_stream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/net/data_stream.rs b/src/net/data_stream.rs index 5ebe6fb1d..5a462d018 100644 --- a/src/net/data_stream.rs +++ b/src/net/data_stream.rs @@ -57,7 +57,7 @@ const FRAME_SIZE: usize = 1500 - 40 - 8; // The size of the frame queue must be at least as large as the number of frame buffers. Every // allocated frame buffer should fit in the queue. -const FRAME_QUEUE_SIZE: usize = FRAME_COUNT * 2; +const FRAME_QUEUE_SIZE: usize = FRAME_COUNT; type Frame = [MaybeUninit; FRAME_SIZE]; From 4841c52abba91ef7188f18ee49ff471e02c4119d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20J=C3=B6rdens?= Date: Thu, 11 Jul 2024 13:19:31 +0000 Subject: [PATCH 4/4] Revert "shrink frame queue" This reverts commit 0e9599c1a8c0de3a52fae87f7f77338ca579acc5. --- src/net/data_stream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/net/data_stream.rs b/src/net/data_stream.rs index 5a462d018..5ebe6fb1d 100644 --- a/src/net/data_stream.rs +++ b/src/net/data_stream.rs @@ -57,7 +57,7 @@ const FRAME_SIZE: usize = 1500 - 40 - 8; // The size of the frame queue must be at least as large as the number of frame buffers. Every // allocated frame buffer should fit in the queue. -const FRAME_QUEUE_SIZE: usize = FRAME_COUNT; +const FRAME_QUEUE_SIZE: usize = FRAME_COUNT * 2; type Frame = [MaybeUninit; FRAME_SIZE];