Skip to content

Commit

Permalink
Merge branch 'master' into fix_mmap_error_handle
Browse files Browse the repository at this point in the history
  • Loading branch information
deanlee committed Jun 26, 2024
2 parents 303278e + 74074d6 commit 5362e49
Show file tree
Hide file tree
Showing 74 changed files with 509 additions and 5,620 deletions.
1 change: 0 additions & 1 deletion .github/workflows/repo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ jobs:
name: pre-commit autoupdate
runs-on: ubuntu-latest
container:
image: ghcr.io/commaai/cereal:latest
steps:
- uses: actions/checkout@v3
- name: pre-commit autoupdate
Expand Down
22 changes: 11 additions & 11 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ on: [push, pull_request]

env:
DOCKER_REGISTRY: ghcr.io/commaai
RUN: docker run -e PYTHONWARNINGS=error --shm-size 1G --name cereal cereal /bin/sh -c
RUN_NAMED: docker run -e PYTHONWARNINGS=error --shm-size 1G --rm cereal /bin/sh -c
CI_RUN: docker run -e GITHUB_ACTION -e GITHUB_REF -e GITHUB_HEAD_REF -e GITHUB_SHA -e GITHUB_REPOSITORY -e GITHUB_RUN_ID --rm cerealci /bin/bash -c
BUILD: docker buildx build --pull --load --cache-to type=inline --cache-from $DOCKER_REGISTRY/cereal:latest -t cereal -f Dockerfile .
RUN: docker run -e PYTHONWARNINGS=error --shm-size 1G --name msgq msgq /bin/sh -c
RUN_NAMED: docker run -e PYTHONWARNINGS=error --shm-size 1G --rm msgq /bin/sh -c
CI_RUN: docker run -e GITHUB_ACTION -e GITHUB_REF -e GITHUB_HEAD_REF -e GITHUB_SHA -e GITHUB_REPOSITORY -e GITHUB_RUN_ID --rm msgqci /bin/bash -c
BUILD: docker buildx build --pull --load --cache-to type=inline --cache-from $DOCKER_REGISTRY/msgq:latest -t msgq -f Dockerfile .
PYTHONWARNINGS: error

jobs:
Expand All @@ -19,11 +19,11 @@ jobs:
- name: Build docker image
run: eval "$BUILD"
- name: Push to dockerhub
if: github.ref == 'refs/heads/master' && github.event_name != 'pull_request' && github.repository == 'commaai/cereal'
if: github.ref == 'refs/heads/master' && github.event_name != 'pull_request' && github.repository == 'commaai/msgq'
run: |
docker login ghcr.io -u ${{ github.actor }} -p ${{ secrets.GITHUB_TOKEN }}
docker tag cereal $DOCKER_REGISTRY/cereal:latest
docker push $DOCKER_REGISTRY/cereal:latest
docker tag msgq $DOCKER_REGISTRY/msgq:latest
docker push $DOCKER_REGISTRY/msgq:latest
unit_tests:
name: unit tests
Expand All @@ -40,14 +40,14 @@ jobs:
run: |
$RUN "export ${{ matrix.backend }}=1 && \
scons ${{ matrix.flags }} -j$(nproc) && \
messaging/test_runner && \
visionipc/test_runner"
msgq/test_runner && \
msgq/visionipc/test_runner"
- name: python tests
run: $RUN_NAMED "${{ matrix.backend }}=1 coverage run -m unittest discover ."
- name: Upload coverage
run: |
docker commit cereal cerealci
$CI_RUN "cd /project/cereal && bash <(curl -s https://codecov.io/bash) -v -F unit_tests_${{ matrix.backend }}"
docker commit msgq msgqci
$CI_RUN "cd /project/msgq && bash <(curl -s https://codecov.io/bash) -v -F unit_tests_${{ matrix.backend }}"
static_analysis:
name: static analysis
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ __pycache__
.*.swp
.*.swo
*.os
*.so
*.o
*.a

Expand Down
3 changes: 2 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
files: ^msgq/
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.6.0
Expand All @@ -21,7 +22,7 @@ repos:
entry: cppcheck
language: system
types: [c++]
exclude: '^(messaging/msgq_tests.cc|messaging/test_runner.cc)'
exclude: '^(msgq/msgq_tests.cc|msgq/test_runner.cc)'
args:
- --error-exitcode=1
- --inline-suppr
Expand Down
8 changes: 4 additions & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,18 @@ RUN apt-get update && apt-get install -y --no-install-recommends \

RUN pip3 install --break-system-packages --no-cache-dir pyyaml Cython scons pycapnp pre-commit ruff parameterized coverage numpy

WORKDIR /project/
WORKDIR /project/msgq/
RUN cd /tmp/ && \
git clone -b v2.x --depth 1 https://github.com/catchorg/Catch2.git && \
cd Catch2 && \
mv single_include/catch2/ /project/ && \
mv single_include/* /project/msgq/ && \
cd .. \
rm -rf Catch2

WORKDIR /project/cereal
WORKDIR /project/msgq

ENV PYTHONPATH=/project

COPY . .
RUN rm -rf .git && \
RUN ls && rm -rf .git && \
scons -c && scons -j$(nproc)
7 changes: 0 additions & 7 deletions LICENSE

This file was deleted.

78 changes: 36 additions & 42 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,60 +1,54 @@
# What is cereal? [![cereal tests](https://github.com/commaai/cereal/workflows/tests/badge.svg?event=push)](https://github.com/commaai/cereal/actions) [![codecov](https://codecov.io/gh/commaai/cereal/branch/master/graph/badge.svg)](https://codecov.io/gh/commaai/cereal)
# MSGQ: A lock free single producer multi consumer message queue

cereal is both a messaging spec for robotics systems as well as generic high performance IPC pub sub messaging with a single publisher and multiple subscribers.
## What is this library?
MSGQ is a generic high performance IPC pub sub system with a single publisher and multiple subscribers. MSGQ is designed to be a high performance replacement for ZMQ-like SUB/PUB patterns. It uses a ring buffer in shared memory to efficiently read and write data. Each read requires a copy. Writing can be done without a copy, as long as the size of the data is known in advance. While MSGQ is the core of this library, this library also allows replacing the MSGQ backend with ZMQ or a spoofed implementation that can be used for deterministic testing. This library also contains visionipc, an IPC system specifically for large contiguous buffers (like images/video).

Imagine this use case:
* A sensor process reads gyro measurements directly from an IMU and publishes a `sensorEvents` packet
* A calibration process subscribes to the `sensorEvents` packet to use the IMU
* A localization process subscribes to the `sensorEvents` packet to use the IMU also
## Storage
The storage for the queue consists of an area of metadata, and the actual buffer. The metadata contains:

1. A counter to the number of readers that are active
2. A pointer to the head of the queue for writing. From now on referred to as *write pointer*
3. A cycle counter for the writer. This counter is incremented when the writer wraps around
4. N pointers, pointing to the current read position for all the readers. From now on referred to as *read pointer*
5. N counters, counting the number of cycles for all the readers
6. N booleans, indicating validity for all the readers. From now on referred to as *validity flag*

## Messaging Spec
The counter and the pointer are both 32 bit values, packed into 64 bit so they can be read and written atomically.

You'll find the message types in [log.capnp](log.capnp). It uses [Cap'n proto](https://capnproto.org/capnp-tool.html) and defines one struct called `Event`.
The data buffer is a ring buffer. All messages are prefixed by an 8 byte size field, followed by the data. A size of -1 indicates a wrap-around, and means the next message is stored at the beginning of the buffer.

All `Events` have a `logMonoTime` and a `valid`. Then a big union defines the packet type.

### Best Practices
## Writing
Writing involves the following steps:

- **All fields must describe quantities in SI units**, unless otherwise specified in the field name.
- In the context of the message they are in, field names should be completely unambiguous.
- All values should be easy to plot and be human-readable with minimal parsing.
1. Check if the area that is to be written overlaps with any of the read pointers, mark those readers as invalid by clearing the validity flag.
2. Write the message
3. Increase the write pointer by the size of the message

### Maintaining backwards-compatibility
In case there is not enough space at the end of the buffer, a special empty message with a prefix of -1 is written. The cycle counter is incremented by one. In this case step 1 will check there are no read pointers pointing to the remainder of the buffer. Then another write cycle will start with the actual message.

When making changes to the messaging spec you want to maintain backwards-compatibility, such that old logs can
be parsed with a new version of cereal. Adding structs and adding members to structs is generally safe, most other
things are not. Read more details [here](https://capnproto.org/language.html).
There always needs to be 8 bytes of empty space at the end of the buffer. By doing this there is always space to write the -1.

### Custom forks
## Reset reader
When the reader is lagging too much behind the read pointer becomes invalid and no longer points to the beginning of a valid message. To reset a reader to the current write pointer, the following steps are performed:

Forks of [openpilot](https://github.com/commaai/openpilot) might want to add things to the messaging
spec, however this could conflict with future changes made in mainline cereal/openpilot. Rebasing against mainline openpilot
then means breaking backwards-compatibility with all old logs of your fork. So we added reserved events in
[custom.capnp](custom.capnp) that we will leave empty in mainline cereal/openpilot. **If you only modify those, you can ensure your
fork will remain backwards-compatible with all versions of mainline cereal/openpilot and your fork.**
1. Set valid flag
2. Set read cycle counter to that of the writer
3. Set read pointer to write pointer

## Pub Sub Backends
## Reading
Reading involves the following steps:

cereal supports two backends, one based on [zmq](https://zeromq.org/) and another called [msgq](messaging/msgq.cc), a custom pub sub based on shared memory that doesn't require the bytes to pass through the kernel.
1. Read the size field at the current read pointer
2. Read the validity flag
3. Copy the data out of the buffer
4. Increase the read pointer by the size of the message
5. Check the validity flag again

Example
---
```python
import cereal.messaging as messaging
Before starting the copy, the valid flag is checked. This is to prevent a race condition where the size prefix was invalid, and the read could read outside of the buffer. Make sure that step 1 and 2 are not reordered by your compiler or CPU.

# in subscriber
sm = messaging.SubMaster(['sensorEvents'])
while 1:
sm.update()
print(sm['sensorEvents'])
If a writer overwrites the data while it's being copied out, the data will be invalid. Therefore the validity flag is also checked after reading it. The order of step 4 and 5 does not matter.

```
If at steps 2 or 5 the validity flag is not set, the reader is reset. Any data that was already read is discarded. After the reader is reset, the reading starts from the beginning.

```python
# in publisher
pm = messaging.PubMaster(['sensorEvents'])
dat = messaging.new_message('sensorEvents', size=1)
dat.sensorEvents[0] = {"gyro": {"v": [0.1, -0.1, 0.1]}}
pm.send('sensorEvents', dat)
```
If a message with size -1 is encountered, step 3 and 4 are replaced by increasing the cycle counter and setting the read pointer to the beginning of the buffer. After that another read is performed.
72 changes: 21 additions & 51 deletions SConscript
Original file line number Diff line number Diff line change
@@ -1,78 +1,48 @@
Import('env', 'envCython', 'arch', 'common')

import shutil

cereal_dir = Dir('.')
visionipc_dir = Dir('msgq/visionipc')
gen_dir = Dir('gen')
messaging_dir = Dir('messaging')

# Build cereal

schema_files = ['log.capnp', 'car.capnp', 'legacy.capnp', 'custom.capnp']
env.Command(["gen/c/include/c++.capnp.h"], [], "mkdir -p " + gen_dir.path + "/c/include && touch $TARGETS")
env.Command([f'gen/cpp/{s}.c++' for s in schema_files] + [f'gen/cpp/{s}.h' for s in schema_files],
schema_files,
f"capnpc --src-prefix={cereal_dir.path} $SOURCES -o c++:{gen_dir.path}/cpp/")

# TODO: remove non shared cereal and messaging
cereal_objects = env.SharedObject([f'gen/cpp/{s}.c++' for s in schema_files])

cereal = env.Library('cereal', cereal_objects)
env.SharedLibrary('cereal_shared', cereal_objects)

# Build messaging

services_h = env.Command(['services.h'], ['services.py'], 'python3 ' + cereal_dir.path + '/services.py > $TARGET')

messaging_objects = env.SharedObject([
'messaging/messaging.cc',
'messaging/event.cc',
'messaging/impl_zmq.cc',
'messaging/impl_msgq.cc',
'messaging/impl_fake.cc',
'messaging/msgq.cc',
'messaging/socketmaster.cc',
# Build msgq
msgq_objects = env.SharedObject([
'msgq/ipc.cc',
'msgq/event.cc',
'msgq/impl_zmq.cc',
'msgq/impl_msgq.cc',
'msgq/impl_fake.cc',
'msgq/msgq.cc',
])

messaging = env.Library('messaging', messaging_objects)
Depends('messaging/impl_zmq.cc', services_h)

env.Program('messaging/bridge', ['messaging/bridge.cc'], LIBS=[messaging, 'zmq', common])
Depends('messaging/bridge.cc', services_h)

messaging_python = envCython.Program('messaging/messaging_pyx.so', 'messaging/messaging_pyx.pyx', LIBS=envCython["LIBS"]+[messaging, "zmq", common])
msgq = env.Library('msgq', msgq_objects)
msgq_python = envCython.Program('msgq/ipc_pyx.so', 'msgq/ipc_pyx.pyx', LIBS=envCython["LIBS"]+[msgq, "zmq", common])

# Build Vision IPC
vipc_sources = [
'visionipc/ipc.cc',
'visionipc/visionipc_server.cc',
'visionipc/visionipc_client.cc',
'visionipc/visionbuf.cc',
]
vipc_files = ['visionipc.cc', 'visionipc_server.cc', 'visionipc_client.cc', 'visionbuf.cc']
vipc_sources = [f'{visionipc_dir.abspath}/{f}' for f in vipc_files]

if arch == "larch64":
vipc_sources += ['visionipc/visionbuf_ion.cc']
vipc_sources += [f'{visionipc_dir.abspath}/visionbuf_ion.cc']
else:
vipc_sources += ['visionipc/visionbuf_cl.cc']
vipc_sources += [f'{visionipc_dir.abspath}/visionbuf_cl.cc']

vipc_objects = env.SharedObject(vipc_sources)
visionipc = env.Library('visionipc', vipc_objects)


vipc_frameworks = []
vipc_libs = envCython["LIBS"] + [visionipc, messaging, common, "zmq"]
vipc_libs = envCython["LIBS"] + [visionipc, msgq, common, "zmq"]
if arch == "Darwin":
vipc_frameworks.append('OpenCL')
else:
vipc_libs.append('OpenCL')
envCython.Program('visionipc/visionipc_pyx.so', 'visionipc/visionipc_pyx.pyx',
envCython.Program(f'{visionipc_dir.abspath}/visionipc_pyx.so', f'{visionipc_dir.abspath}/visionipc_pyx.pyx',
LIBS=vipc_libs, FRAMEWORKS=vipc_frameworks)

if GetOption('extras'):
env.Program('messaging/test_runner', ['messaging/test_runner.cc', 'messaging/msgq_tests.cc'], LIBS=[messaging, common])

env.Program('visionipc/test_runner', ['visionipc/test_runner.cc', 'visionipc/visionipc_tests.cc'],
env.Program('msgq/test_runner', ['msgq/test_runner.cc', 'msgq/msgq_tests.cc'], LIBS=[msgq, common])
env.Program(f'{visionipc_dir.abspath}/test_runner',
[f'{visionipc_dir.abspath}/test_runner.cc', f'{visionipc_dir.abspath}/visionipc_tests.cc'],
LIBS=['pthread'] + vipc_libs, FRAMEWORKS=vipc_frameworks)


Export('cereal', 'messaging', 'messaging_python', 'visionipc')
Export('visionipc', 'msgq', 'msgq_python')
3 changes: 2 additions & 1 deletion SConstruct
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ if platform.system() == "Darwin":
common = ''

cpppath = [
f"#/../",
f"#/",
'#msgq/',
'/usr/lib/include',
'/opt/homebrew/include',
sysconfig.get_paths()['include'],
Expand Down
9 changes: 0 additions & 9 deletions __init__.py

This file was deleted.

Loading

0 comments on commit 5362e49

Please sign in to comment.