Skip to content

Commit

Permalink
Merge pull request #927 from quartiq/heapless-0.8
Browse files Browse the repository at this point in the history
heapless 0.8
  • Loading branch information
jordens authored Jul 11, 2024
2 parents 1406251 + 4841c52 commit ff2c2c1
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 34 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ log = { version = "0.4", features = ["max_level_trace", "release_max_level_info"
rtt-target = "0.3"
serde = { version = "1.0", features = ["derive"], default-features = false }
serde-json-core = "0.5"
heapless = { version = "0.7.16", features = ["serde"] }
heapless = { version = "0.8", features = ["serde"] }
rtic = { version = "2.1", features = ["thumbv7-backend"] }
rtic-monotonics = { version = "2.0", features = ["cortex-m-systick"] }
embedded-hal = "0.2.7"
Expand Down
54 changes: 28 additions & 26 deletions src/net/data_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,13 @@
//! # Example
//! A sample Python script is available in `scripts/stream_throughput.py` to demonstrate reception
//! of livestreamed data.
#![allow(non_camel_case_types)] // https://github.com/rust-embedded/heapless/issues/411

use core::{fmt::Write, mem::MaybeUninit};
use heapless::{
pool::{Box, Init, Pool, Uninit},
box_pool,
pool::boxed::{Box, BoxBlock},
spsc::{Consumer, Producer, Queue},
String,
};
Expand Down Expand Up @@ -57,6 +61,8 @@ const FRAME_QUEUE_SIZE: usize = FRAME_COUNT * 2;

type Frame = [MaybeUninit<u8>; FRAME_SIZE];

box_pool!(FRAME_POOL: Frame);

/// Represents the destination for the UDP stream to send data to.
///
/// # Miniconf
Expand Down Expand Up @@ -142,35 +148,37 @@ pub fn setup_streaming(
.unwrap();
let (producer, consumer) = queue.split();

let frame_pool = cortex_m::singleton!(: Pool<Frame> = Pool::new()).unwrap();

let memory = cortex_m::singleton!(FRAME_DATA: [u8; core::mem::size_of::<u8>() * FRAME_SIZE * FRAME_COUNT] =
[0; core::mem::size_of::<u8>() * FRAME_SIZE * FRAME_COUNT]).unwrap();
#[allow(clippy::declare_interior_mutable_const)]
const FRAME: BoxBlock<Frame> = BoxBlock::new();
let memory =
cortex_m::singleton!(FRAME_DATA: [BoxBlock<Frame>; FRAME_COUNT] =
[FRAME; FRAME_COUNT])
.unwrap();

frame_pool.grow(memory);
for block in memory.iter_mut() {
FRAME_POOL.manage(block);
}

let generator = FrameGenerator::new(producer, frame_pool);
let generator = FrameGenerator::new(producer);

let stream = DataStream::new(stack, consumer, frame_pool);
let stream = DataStream::new(stack, consumer);

(generator, stream)
}

#[derive(Debug)]
struct StreamFrame {
buffer: Box<Frame, Init>,
buffer: Box<FRAME_POOL>,
offset: usize,
batches: u8,
}

impl StreamFrame {
pub fn new(
buffer: Box<Frame, Uninit>,
mut buffer: Box<FRAME_POOL>,
format_id: u8,
sequence_number: u32,
) -> Self {
let mut buffer = buffer.init([MaybeUninit::uninit(); FRAME_SIZE]);

for (byte, buf) in MAGIC
.to_le_bytes()
.iter()
Expand Down Expand Up @@ -211,20 +219,15 @@ impl StreamFrame {
/// The data generator for a stream.
pub struct FrameGenerator {
queue: Producer<'static, StreamFrame, FRAME_QUEUE_SIZE>,
pool: &'static Pool<Frame>,
current_frame: Option<StreamFrame>,
sequence_number: u32,
format: u8,
}

impl FrameGenerator {
fn new(
queue: Producer<'static, StreamFrame, FRAME_QUEUE_SIZE>,
pool: &'static Pool<Frame>,
) -> Self {
fn new(queue: Producer<'static, StreamFrame, FRAME_QUEUE_SIZE>) -> Self {
Self {
queue,
pool,
format: StreamFormat::Unknown.into(),
current_frame: None,
sequence_number: 0,
Expand All @@ -248,15 +251,17 @@ impl FrameGenerator {
/// # Args
/// * `f` - A closure that will be provided the buffer to write batch data into.
/// Returns the number of bytes written.
pub fn add<F>(&mut self, f: F)
pub fn add<F>(&mut self, func: F)
where
F: FnMut(&mut [MaybeUninit<u8>]) -> usize,
{
let sequence_number = self.sequence_number;
self.sequence_number = self.sequence_number.wrapping_add(1);

if self.current_frame.is_none() {
if let Some(buffer) = self.pool.alloc() {
if let Ok(buffer) =
FRAME_POOL.alloc([MaybeUninit::uninit(); FRAME_SIZE])
{
self.current_frame.replace(StreamFrame::new(
buffer,
self.format,
Expand All @@ -270,7 +275,7 @@ impl FrameGenerator {
// Note(unwrap): We ensure the frame is present above.
let current_frame = self.current_frame.as_mut().unwrap();

let len = current_frame.add_batch(f);
let len = current_frame.add_batch(func);

if current_frame.is_full(len) {
// Note(unwrap): The queue is designed to be at least as large as the frame buffer
Expand All @@ -290,7 +295,6 @@ pub struct DataStream {
stack: NetworkReference,
socket: Option<<NetworkReference as UdpClientStack>::UdpSocket>,
queue: Consumer<'static, StreamFrame, FRAME_QUEUE_SIZE>,
frame_pool: &'static Pool<Frame>,
remote: StreamTarget,
}

Expand All @@ -304,14 +308,12 @@ impl DataStream {
fn new(
stack: NetworkReference,
consumer: Consumer<'static, StreamFrame, FRAME_QUEUE_SIZE>,
frame_pool: &'static Pool<Frame>,
) -> Self {
Self {
stack,
socket: None,
remote: StreamTarget::default(),
queue: consumer,
frame_pool,
}
}

Expand Down Expand Up @@ -366,7 +368,7 @@ impl DataStream {
if self.open().is_ok() {
// If we just successfully opened the socket, flush old data from queue.
while let Some(frame) = self.queue.dequeue() {
self.frame_pool.free(frame.buffer);
drop(frame.buffer);
}
}
}
Expand Down Expand Up @@ -401,7 +403,7 @@ impl DataStream {
log::warn!("Unexpected UDP error during data stream: {other:?}");
}
}
self.frame_pool.free(frame.buffer)
drop(frame.buffer)
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/net/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ impl TelemetryClient {
/// # Args
/// * `telemetry` - The telemetry to report
pub fn publish<T: Serialize>(&mut self, telemetry: &T) {
let mut topic: String<128> = self.prefix.into();
let mut topic: String<128> = self.prefix.try_into().unwrap();
topic.push_str("/telemetry").unwrap();

self.mqtt
Expand Down Expand Up @@ -189,7 +189,7 @@ impl TelemetryClient {
..
} = self;

let mut topic: String<128> = self.prefix.into();
let mut topic: String<128> = self.prefix.try_into().unwrap();
topic.push_str("/meta").unwrap();

if mqtt
Expand Down
8 changes: 4 additions & 4 deletions src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ impl NetSettings {
write!(&mut id, "{mac}").unwrap();

Self {
broker: "mqtt".into(),
ip: "0.0.0.0".into(),
broker: "mqtt".try_into().unwrap(),
ip: "0.0.0.0".try_into().unwrap(),
id,
mac,
}
Expand Down Expand Up @@ -262,7 +262,7 @@ where
};

if let Some(key) = key {
save_setting(String::from(key).into())?;
save_setting(String::try_from(key).unwrap().into())?;
} else {
for path in Self::Settings::nodes() {
save_setting(path.unwrap().0)?;
Expand Down Expand Up @@ -362,7 +362,7 @@ where
};

if let Some(key) = key {
erase_setting(String::from(key).into()).unwrap();
erase_setting(String::try_from(key).unwrap().into()).unwrap();
} else {
for path in Self::Settings::nodes() {
erase_setting(path.unwrap().0).unwrap();
Expand Down

0 comments on commit ff2c2c1

Please sign in to comment.