Skip to content

Commit

Permalink
linux: use threads for console IO
Browse files Browse the repository at this point in the history
Signed-off-by: Timofey Titovets <[email protected]>
  • Loading branch information
nefelim4ag committed Dec 26, 2024
1 parent 80d185c commit bc24e35
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 30 deletions.
2 changes: 1 addition & 1 deletion src/linux/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ dirs-y += src/linux src/generic
src-y += linux/main.c linux/timer.c linux/console.c linux/watchdog.c
src-y += linux/pca9685.c linux/spidev.c linux/analog.c linux/hard_pwm.c
src-y += linux/i2c.c linux/gpio.c generic/crc16_ccitt.c generic/alloc.c
src-y += linux/sensor_ds18b20.c
src-y += linux/sensor_ds18b20.c linux/ringbuf.c

CFLAGS_klipper.elf += -lutil -lrt -lpthread

Expand Down
91 changes: 67 additions & 24 deletions src/linux/console.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,20 @@
#include <sys/stat.h> // chmod
#include <time.h> // struct timespec
#include <unistd.h> // ttyname
#include <pthread.h> // pthread_create
#include "board/irq.h" // irq_wait
#include "board/misc.h" // console_sendf
#include "command.h" // command_find_block
#include "internal.h" // console_setup
#include "sched.h" // sched_wake_task
#include "ringbuf.h" // SPSC ring buf

static struct pollfd main_pfd[1];
static struct ring_buf outputq;
static struct ring_buf inputq;
static pthread_t main;
static pthread_t reader;
static pthread_t writer;
#define MP_TTY_IDX 0

// Report 'errno' in a message written to stderr
Expand All @@ -31,6 +38,47 @@ report_errno(char *where, int rc)
fprintf(stderr, "Got error %d in %s: (%d)%s\n", rc, where, e, strerror(e));
}

/****************************************************************
* Threaded IO
****************************************************************/

static void *
tty_reader(void *_unused)
{
static uint8_t buf[128];
while (1) {
int ret = read(main_pfd[MP_TTY_IDX].fd, buf, sizeof(buf));
if (ret < 0) {
report_errno("read", ret);
continue;
}
while (ring_buffer_available_to_write(&inputq) < ret)
nanosleep(&(struct timespec){.tv_nsec = 10000}, NULL);
ring_buffer_write(&inputq, buf, ret);
}

return NULL;
}

static void *
tty_writer(void *_unused)
{
static uint8_t buf[128];
while (1) {
int available = ring_buffer_available_to_read(&outputq);
if (available == 0) {
nanosleep(&(struct timespec){.tv_nsec = 100000}, NULL);
continue;
}

int len = ring_buffer_read(&outputq, buf, sizeof(buf));
int ret = write(main_pfd[MP_TTY_IDX].fd, buf, len);
if (ret < 0)
report_errno("write", ret);
}

return NULL;
}

/****************************************************************
* Setup
Expand Down Expand Up @@ -74,9 +122,6 @@ console_setup(char *name)
report_errno("openpty", ret);
return -1;
}
ret = set_non_blocking(mfd);
if (ret)
return -1;
ret = set_close_on_exec(mfd);
if (ret)
return -1;
Expand Down Expand Up @@ -109,6 +154,15 @@ console_setup(char *name)
if (ret)
return -1;

ring_buffer_init(&inputq);
ring_buffer_init(&outputq);

main = pthread_self();
pthread_create(&reader, NULL, tty_reader, NULL);
pthread_setschedparam(reader, SCHED_OTHER, &(struct sched_param){.sched_priority = 0});
pthread_create(&writer, NULL, tty_writer, NULL);
pthread_setschedparam(writer, SCHED_OTHER, &(struct sched_param){.sched_priority = 0});

return 0;
}

Expand All @@ -135,16 +189,8 @@ console_task(void)
return;

// Read data
int ret = read(main_pfd[MP_TTY_IDX].fd, &receive_buf[receive_pos]
, sizeof(receive_buf) - receive_pos);
if (ret < 0) {
if (errno == EWOULDBLOCK) {
ret = 0;
} else {
report_errno("read", ret);
return;
}
}
int ret = ring_buffer_read(&inputq, &receive_buf[receive_pos],
sizeof(receive_buf) - receive_pos);
if (ret == 15 && receive_buf[receive_pos+14] == '\n'
&& memcmp(&receive_buf[receive_pos], "FORCE_SHUTDOWN\n", 15) == 0)
shutdown("Force shutdown command");
Expand All @@ -171,23 +217,20 @@ console_sendf(const struct command_encoder *ce, va_list args)
// Generate message
uint8_t buf[MESSAGE_MAX];
uint_fast8_t msglen = command_encode_and_frame(buf, ce, args);
while (ring_buffer_available_to_write(&outputq) < msglen)
nanosleep(&(struct timespec){.tv_nsec = 1000}, NULL);

// Transmit message
int ret = write(main_pfd[MP_TTY_IDX].fd, buf, msglen);
if (ret < 0)
report_errno("write", ret);
ring_buffer_write(&outputq, buf, msglen);
}

// Sleep until a signal received (waking early for console input if needed)
// Sleep for the specified time or until a signal interrupts
void
console_sleep(sigset_t *sigset)
console_sleep(void)
{
int ret = ppoll(main_pfd, ARRAY_SIZE(main_pfd), NULL, sigset);
if (ret <= 0) {
if (errno != EINTR)
report_errno("ppoll main_pfd", ret);
if (ring_buffer_available_to_read(&inputq) > 0) {
sched_wake_task(&console_wake);
return;
}
if (main_pfd[MP_TTY_IDX].revents)
sched_wake_task(&console_wake);
nanosleep(&(struct timespec){.tv_nsec = 500000}, NULL);
}
2 changes: 1 addition & 1 deletion src/linux/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ void report_errno(char *where, int rc);
int set_non_blocking(int fd);
int set_close_on_exec(int fd);
int console_setup(char *name);
void console_sleep(sigset_t *sigset);
void console_sleep(void);

// timer.c
int timer_check_periodic(uint32_t *ts);
Expand Down
76 changes: 76 additions & 0 deletions src/linux/ringbuf.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#include <string.h>
#include <stdint.h>
#include <stddef.h>
#include <stdatomic.h>
#include "ringbuf.h"

// Simple Single Producer Single Consumer Ring Buffer

void ring_buffer_init(struct ring_buf *rb)
{
atomic_store(&rb->head, 0);
atomic_store(&rb->tail, 0);
atomic_store(&rb->size, 0);
}

int ring_buffer_available_to_read(const struct ring_buf *rb)
{
return atomic_load(&rb->size);
}

int ring_buffer_available_to_write(const struct ring_buf *rb)
{
return RING_BUFFER_SIZE - atomic_load(&rb->size);
}

int ring_buffer_write(struct ring_buf *rb, const uint8_t *data, int length)
{
int available = ring_buffer_available_to_write(rb);
int to_write = (length < available) ? length : available;

int head = atomic_load_explicit(&rb->head, memory_order_acquire);
int first_chunk = RING_BUFFER_SIZE - head;
if (first_chunk > to_write) {
first_chunk = to_write;
}
memcpy(&rb->buffer[head], data, first_chunk);
head = (head + first_chunk) % RING_BUFFER_SIZE;
atomic_store_explicit(&rb->head, head, memory_order_release);
atomic_fetch_add(&rb->size, first_chunk);

int second_chunk = to_write - first_chunk;
if (second_chunk > 0) {
memcpy(&rb->buffer[0], data + first_chunk, second_chunk);
}
head = (head + second_chunk) % RING_BUFFER_SIZE;
atomic_store_explicit(&rb->head, head, memory_order_release);
atomic_fetch_add(&rb->size, second_chunk);

return to_write;
}

int ring_buffer_read(struct ring_buf *rb, uint8_t *data, int length)
{
int available = ring_buffer_available_to_read(rb);
int to_read = (length < available) ? length : available;

int tail = atomic_load_explicit(&rb->tail, memory_order_acquire);
int first_chunk = RING_BUFFER_SIZE - tail;
if (first_chunk > to_read) {
first_chunk = to_read;
}
memcpy(data, &rb->buffer[tail], first_chunk);
tail = (tail + first_chunk) % RING_BUFFER_SIZE;
atomic_store_explicit(&rb->tail, tail, memory_order_release);
atomic_fetch_sub(&rb->size, first_chunk);

int second_chunk = to_read - first_chunk;
if (second_chunk > 0) {
memcpy(data + first_chunk, &rb->buffer[0], second_chunk);
tail = second_chunk % RING_BUFFER_SIZE;
}
atomic_store_explicit(&rb->tail, tail, memory_order_release);
atomic_fetch_sub(&rb->size, second_chunk);

return to_read;
}
16 changes: 16 additions & 0 deletions src/linux/ringbuf.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#include <stdint.h>

#define RING_BUFFER_SIZE 8192
struct ring_buf
{
uint8_t buffer[RING_BUFFER_SIZE];
_Atomic int head;
_Atomic int tail;
_Atomic int size;
};

void ring_buffer_init(struct ring_buf *rb);
int ring_buffer_available_to_read(const struct ring_buf *rb);
int ring_buffer_available_to_write(const struct ring_buf *rb);
int ring_buffer_write(struct ring_buf *rb, const uint8_t *data, int length);
int ring_buffer_read(struct ring_buf *rb, uint8_t *data, int length);
5 changes: 1 addition & 4 deletions src/linux/timer.c
Original file line number Diff line number Diff line change
Expand Up @@ -269,10 +269,7 @@ irq_wait(void)
{
// Must atomically sleep until signaled
if (!readl(&TimerInfo.must_wake_timers)) {
timer_disable_signals();
if (!TimerInfo.must_wake_timers)
console_sleep(&TimerInfo.ss_sleep);
timer_enable_signals();
console_sleep();
}
irq_poll();
}
Expand Down

0 comments on commit bc24e35

Please sign in to comment.