Skip to content

Collect events from a SQLite database, create message from a protobuf schema definition and publish events to a Kafka topic.

License

Notifications You must be signed in to change notification settings

fpaupier/alertDispatcher

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

24 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Go Report Card

Alert Dispatcher

This code runs on your Raspberry Pi. It regularly polls the local SQLite storage to see if there are events to publish to the Kafka topic.

Credentials

Rename the credentials-template.go file into credentials.go and make sure your version control system ignores it. Then, add your API key and secret into the file.

Sending credentials to your Pi

On Mac and Linux, you can use the scp (secure copy) command to send files to your Raspberry Pi. Example: To send the credentials.go file from your development machine to your Pi:

scp path/to/credentials.go [email protected]:/path/to/credentials.go

Replace YOUR.PI.IP.ADDRESS by the IP address of your Pi. Run ifconfig -a to list your device's network information. If you're on a WiFi network, look for the wlan section and the inet line.

Note: Make sur ssh is enabled on your Raspberry Pi. Find out how to enable SSH on your Raspberry Pi by checking the official documentation.

Install the project

First, make sure your Pi has SQLite3 installed.

sudo apt-get install sqlite3

Packages

Run the following command to install the dependencies:

go get github.com/golang/protobuf/proto
go get github.com/mattn/go-sqlite3
  • We use protocol buffers to have a consistent schema definition for messages.
  • The go-sqlite3 package offers a driver for interacting with a SQLite3 database from our Go code.

Installing Kafka

The Confluent Kafka Go Client requires a manual installation, and a few tricks to run on a Raspberry Pi. First, install the package

go get github.com/confluentinc/confluent-kafka-go/kafka

It takes a few seconds, and it's likely you get the following error message:

# github.com/confluentinc/confluent-kafka-go/kafka
/usr/bin/ld: ../go/src/github.com/confluentinc/confluent-kafka-go/kafka/librdkafka/librdkafka_glibc_linux.a:
error adding symbols: file format not recognized collect2: error: ld
returned 1 exit status

If this happens, it's because there is no compatible librdkafka build on for your ARM based Raspberry PI. No worries! We can fix this by building librdkafka from source. On your Pi execute:

git clone https://github.com/edenhill/librdkafka.git
cd librdkafka
./configure
make
sudo make install

Now, cd to the alertDispatcher/ folder and run the go application with the -tags dynamic tag:

go run -tags dynamic .

You might get an additional error:

# github.com/confluentinc/confluent-kafka-go/kafka
../go/src/github.com/confluentinc/confluent-kafka-go/kafka/message.go:115:16:
type [1073741824]_Ctype_struct_tmphdr_s larger than address
space ../go/src/github.com/confluentinc/confluent-kafka-go/kafka/message.go:115:16:
type [1073741824]_Ctype_struct_tmphdr_s too
large ../go/src/github.com/confluentinc/confluent-kafka-go/kafka/message.go:190:30:
constant 2147483648 overflows
int ../go/src/github.com/confluentinc/confluent-kafka-go/kafka/message.go:190:30:
array bound is too large

Those errors are due to the fact that the code in confluentinc/confluent-kafka-go/kafka/message.go uses the unsafe Go library to perform type conversion by directly manipulating values at a given memory address instead of using Go type casting features. This makes the code non portable and risks of breaking when running on different platforms.

Again, we can easily fix this by modifying two lines in the message.go file. The error indicates that the lines 115 and 190 are responsible for the exception raised.
Line 115 reads:

tmphdr := (*[1 << 30]C.tmphdr_t)(unsafe.Pointer(fcMsg.tmphdrs))[n]

Let's understand why it fails. This line is part of a function named: newMessageFromFcMsg. It creates a message from a C typed message, and a conversion occurs for the header. It initializes an address by right shifting the bits 30 times ([1 << 30]) but this is not a valid memory address on our Pi leading to the exception. We also have an information that this value is too large in the logs, trying with a lower value, like 10 works on a Raspberry Pi 4 B.

Thus replace the line 14 by:

tmphdr := (*[1 << 10]C.tmphdr_t)(unsafe.Pointer(fcMsg.tmphdrs))[n]

The line 190 reads:

keyp = unsafe.Pointer(&((*[1 << 31]byte)(payload)[valueLen]))

Line 190 belong to the function messageToC whose signature indicates it sets the attribute of a C message based on the value from a Go message. Again, the logs tell us we overflow when copying the payload. Let's decrease the value of the right bit shift until we don't have overflow anymore. With 30 instead of 31 it works. Thus, replace line 190 with:

keyp = unsafe.Pointer(&((*[1 << 30]byte)(payload)[valueLen]))

Now, let's build again our app with the dynamic tag: To execute within the alertDispatcher/ folder.

go build -tags dynamic .

And Voilà, it should now work. Run the app with the command: ./alertDispatcher

Note: If experienced C programmer or Go programmers familiar with the unsafe package could explain what happens in the lines 115 and 190 in more details; I'd be happy to have your insights.

Related projects

This repository is part of a series of projects; this one focuses on message delivery. The others being:

  • pi-mask-detection focuses on the actual detection of whether someone is wearing their mask, as seen per the Pi.

  • alertIngress is a Go module designed to run on a server, consuming from a Kafka topic where edge devices pushes their events. Each event consumed by the alert Ingress are archived in PostgresSQL and pushed to a notification service.

  • notifyMask is a Go module designed to run on a server, sending email notification to a system administrator when an event occurs.