Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

linux: use threads for console IO #6770

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/linux/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ dirs-y += src/linux src/generic
src-y += linux/main.c linux/timer.c linux/console.c linux/watchdog.c
src-y += linux/pca9685.c linux/spidev.c linux/analog.c linux/hard_pwm.c
src-y += linux/i2c.c linux/gpio.c generic/crc16_ccitt.c generic/alloc.c
src-y += linux/sensor_ds18b20.c
src-y += linux/sensor_ds18b20.c linux/ringbuf.c

CFLAGS_klipper.elf += -lutil -lrt -lpthread

Expand Down
121 changes: 97 additions & 24 deletions src/linux/console.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,22 @@
#include <sys/stat.h> // chmod
#include <time.h> // struct timespec
#include <unistd.h> // ttyname
#include <pthread.h> // pthread_create
#include <stdatomic.h> // atomic_store
#include "board/irq.h" // irq_wait
#include "board/misc.h" // console_sendf
#include "command.h" // command_find_block
#include "internal.h" // console_setup
#include "sched.h" // sched_wake_task
#include "ringbuf.h" // SPSC ring buf

static struct pollfd main_pfd[1];
static struct ring_buf outputq;
static struct ring_buf inputq;
static pthread_t main;
static _Atomic int main_is_sleeping;
static pthread_t reader;
static pthread_t writer;
#define MP_TTY_IDX 0

// Report 'errno' in a message written to stderr
Expand All @@ -31,6 +40,54 @@ report_errno(char *where, int rc)
fprintf(stderr, "Got error %d in %s: (%d)%s\n", rc, where, e, strerror(e));
}

/****************************************************************
* Threaded IO
****************************************************************/

static void *
tty_reader(void *_unused)
{
static uint8_t buf[128];
while (1) {
int ret = read(main_pfd[MP_TTY_IDX].fd, buf, sizeof(buf));
if (ret < 0) {
report_errno("read", ret);
continue;
}
while (ring_buffer_available_to_write(&inputq) < ret)
nanosleep(&(struct timespec){.tv_nsec = 10000}, NULL);
ring_buffer_write(&inputq, buf, ret);
if (atomic_load(&main_is_sleeping))
pthread_kill(main, SIGUSR1);
}

return NULL;
}

static void *
tty_writer(void *_unused)
{
static uint8_t buf[128];
// adjustable sleep
static uint32_t nsec = 1000000;
while (1) {
int len = ring_buffer_read(&outputq, buf, sizeof(buf));
if (len == 0) {
if (nsec < 1000000)
nsec += 1000;
nanosleep(&(struct timespec){.tv_nsec = nsec}, NULL);
continue;
}
if (nsec > 1000)
nsec = nsec >> 1;

int ret = write(main_pfd[MP_TTY_IDX].fd, buf, len);
if (ret < 0)
report_errno("write", ret);
}

return NULL;
}

/****************************************************************
* Setup
Expand Down Expand Up @@ -63,6 +120,9 @@ set_close_on_exec(int fd)
return 0;
}

static void
reader_signal(int signal);

int
console_setup(char *name)
{
Expand All @@ -74,9 +134,6 @@ console_setup(char *name)
report_errno("openpty", ret);
return -1;
}
ret = set_non_blocking(mfd);
if (ret)
return -1;
ret = set_close_on_exec(mfd);
if (ret)
return -1;
Expand Down Expand Up @@ -109,6 +166,25 @@ console_setup(char *name)
if (ret)
return -1;

ring_buffer_init(&inputq);
ring_buffer_init(&outputq);

main = pthread_self();
pthread_create(&reader, NULL, tty_reader, NULL);
pthread_setschedparam(reader, SCHED_OTHER,
&(struct sched_param){.sched_priority = 0});
pthread_create(&writer, NULL, tty_writer, NULL);
pthread_setschedparam(writer, SCHED_OTHER,
&(struct sched_param){.sched_priority = 0});

struct sigaction act = {.sa_handler = reader_signal,
.sa_flags = SA_RESTART};
ret = sigaction(SIGUSR1, &act, NULL);
if (ret < 0){
report_errno("sigaction", ret);
return -1;
}

return 0;
}

Expand All @@ -121,6 +197,12 @@ static struct task_wake console_wake;
static uint8_t receive_buf[4096];
static int receive_pos;

static void
reader_signal(int signal)
{
sched_wake_task(&console_wake);
}

void *
console_receive_buffer(void)
{
Expand All @@ -135,16 +217,8 @@ console_task(void)
return;

// Read data
int ret = read(main_pfd[MP_TTY_IDX].fd, &receive_buf[receive_pos]
, sizeof(receive_buf) - receive_pos);
if (ret < 0) {
if (errno == EWOULDBLOCK) {
ret = 0;
} else {
report_errno("read", ret);
return;
}
}
int ret = ring_buffer_read(&inputq, &receive_buf[receive_pos],
sizeof(receive_buf) - receive_pos);
if (ret == 15 && receive_buf[receive_pos+14] == '\n'
&& memcmp(&receive_buf[receive_pos], "FORCE_SHUTDOWN\n", 15) == 0)
shutdown("Force shutdown command");
Expand All @@ -171,23 +245,22 @@ console_sendf(const struct command_encoder *ce, va_list args)
// Generate message
uint8_t buf[MESSAGE_MAX];
uint_fast8_t msglen = command_encode_and_frame(buf, ce, args);
while (ring_buffer_available_to_write(&outputq) < msglen)
nanosleep(&(struct timespec){.tv_nsec = 1000}, NULL);

// Transmit message
int ret = write(main_pfd[MP_TTY_IDX].fd, buf, msglen);
if (ret < 0)
report_errno("write", ret);
ring_buffer_write(&outputq, buf, msglen);
}

// Sleep until a signal received (waking early for console input if needed)
// Sleep for the specified time or until a signal interrupts
void
console_sleep(sigset_t *sigset)
console_sleep(void)
{
int ret = ppoll(main_pfd, ARRAY_SIZE(main_pfd), NULL, sigset);
if (ret <= 0) {
if (errno != EINTR)
report_errno("ppoll main_pfd", ret);
if (ring_buffer_available_to_read(&inputq) > 0) {
sched_wake_task(&console_wake);
return;
}
if (main_pfd[MP_TTY_IDX].revents)
sched_wake_task(&console_wake);
atomic_store(&main_is_sleeping, 1);
nanosleep(&(struct timespec){.tv_nsec = 1000000}, NULL);
atomic_store(&main_is_sleeping, 0);
}
2 changes: 1 addition & 1 deletion src/linux/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ void report_errno(char *where, int rc);
int set_non_blocking(int fd);
int set_close_on_exec(int fd);
int console_setup(char *name);
void console_sleep(sigset_t *sigset);
void console_sleep(void);

// timer.c
int timer_check_periodic(uint32_t *ts);
Expand Down
75 changes: 75 additions & 0 deletions src/linux/ringbuf.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#include <string.h>
#include <stdint.h>
#include <stddef.h>
#include <stdatomic.h>
#include "ringbuf.h"

// Simple Single Producer Single Consumer Ring Buffer

void ring_buffer_init(struct ring_buf *rb)
{
atomic_store(&rb->head, 0);
atomic_store(&rb->tail, 0);
}

int ring_buffer_available_to_read(const struct ring_buf *rb)
{
int head = atomic_load_explicit(&rb->head, memory_order_acquire);
int tail = atomic_load_explicit(&rb->tail, memory_order_acquire);
return (head >= tail) ? (head - tail) : (RING_BUFFER_SIZE + head - tail);
}

int ring_buffer_available_to_write(const struct ring_buf *rb)
{
return RING_BUFFER_SIZE - 1 - ring_buffer_available_to_read(rb);
}

int ring_buffer_write(struct ring_buf *rb, const uint8_t *data, int length)
{
int available = ring_buffer_available_to_write(rb);
int to_write = (length < available) ? length : available;

int head = atomic_load_explicit(&rb->head, memory_order_acquire);
int first_chunk = RING_BUFFER_SIZE - head;
if (first_chunk > to_write) {
first_chunk = to_write;
}
memcpy(&rb->buffer[head], data, first_chunk);
head = (head + first_chunk) % RING_BUFFER_SIZE;
atomic_store_explicit(&rb->head, head, memory_order_release);

int second_chunk = to_write - first_chunk;
if (second_chunk > 0) {
memcpy(&rb->buffer[0], data + first_chunk, second_chunk);
}
head = (head + second_chunk) % RING_BUFFER_SIZE;
atomic_store_explicit(&rb->head, head, memory_order_release);

return to_write;
}

int ring_buffer_read(struct ring_buf *rb, uint8_t *data, int length)
{
int available = ring_buffer_available_to_read(rb);
int to_read = (length < available) ? length : available;
if (to_read == 0)
return 0;

int tail = atomic_load_explicit(&rb->tail, memory_order_acquire);
int first_chunk = RING_BUFFER_SIZE - tail;
if (first_chunk > to_read) {
first_chunk = to_read;
}
memcpy(data, &rb->buffer[tail], first_chunk);
tail = (tail + first_chunk) % RING_BUFFER_SIZE;
atomic_store_explicit(&rb->tail, tail, memory_order_release);

int second_chunk = to_read - first_chunk;
if (second_chunk > 0) {
memcpy(data + first_chunk, &rb->buffer[0], second_chunk);
tail = (tail + second_chunk) % RING_BUFFER_SIZE;
}
atomic_store_explicit(&rb->tail, tail, memory_order_release);

return to_read;
}
15 changes: 15 additions & 0 deletions src/linux/ringbuf.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#include <stdint.h>

#define RING_BUFFER_SIZE (2048 - sizeof(int) * 2)
struct ring_buf
{
uint8_t buffer[RING_BUFFER_SIZE];
_Atomic int head;
_Atomic int tail;
};

void ring_buffer_init(struct ring_buf *rb);
int ring_buffer_available_to_read(const struct ring_buf *rb);
int ring_buffer_available_to_write(const struct ring_buf *rb);
int ring_buffer_write(struct ring_buf *rb, const uint8_t *data, int length);
int ring_buffer_read(struct ring_buf *rb, uint8_t *data, int length);
5 changes: 1 addition & 4 deletions src/linux/timer.c
Original file line number Diff line number Diff line change
Expand Up @@ -269,10 +269,7 @@ irq_wait(void)
{
// Must atomically sleep until signaled
if (!readl(&TimerInfo.must_wake_timers)) {
timer_disable_signals();
if (!TimerInfo.must_wake_timers)
console_sleep(&TimerInfo.ss_sleep);
timer_enable_signals();
console_sleep();
}
irq_poll();
}
Expand Down