Skip to content

Commit

Permalink
Merge pull request #106 from TheCacophonyProject/web-socket
Browse files Browse the repository at this point in the history
Web socket
  • Loading branch information
GP authored Nov 23, 2021
2 parents d228857 + 8e45b8b commit 68f297e
Show file tree
Hide file tree
Showing 25 changed files with 12,923 additions and 444 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,7 @@ debug
/.vscode/
/dist/
/managementd
api/types.js
api/types.js.map
static/js/camera.js
static/js/camera.js.map
15 changes: 11 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,21 @@ install-packr:
go install github.com/gobuffalo/packr/[email protected]

.PHONY: build
build: install-packr
build: install-packr install-typescript
packr build -ldflags="-s -w" ./cmd/managementd

.PHONY: release
release: install-packr

.PHONY: install-typescript
install-typescript:
npm install -g typescript
npm install -g rollup
tsc

.PHONY: release
release: install-packr install-typescript
curl -sL https://git.io/goreleaser | bash

.PHONY: clean
clean:
packr clean
rm managementd
rm managementd
66 changes: 66 additions & 0 deletions api/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
export interface FrameInfo {
Telemetry: Telemetry;
AppVersion: string;
BinaryVersion: string;
Camera: CameraInfo;
Tracks: Track[];
}

export interface Track {
predictions: Prediction[];
positions: Region[];
}

export interface Prediction {
label: string;
confidence: number;
clairty: number;
}

export interface Region {
mass: number;
frame_number: number;
pixel_variance: number;
x: number;
y: number;
width: number;
height: number;
}

export interface Telemetry {
TimeOn: number;
FFCState: string;
FrameCount: number;
FrameMean: number;
TempC: number;
LastFFCTempC: number;
LastFFCTime: number;
}

export interface CameraInfo {
Brand: string;
Model: string;
FPS: number;
ResX: number;
ResY: number;
Firmware: string;
CameraSerial: number;
}

export interface Frame {
frameInfo: FrameInfo;
frame: Uint16Array;
}

interface CameraStats {
skippedFramesServer: number;
skippedFramesClient: number;
}

interface CameraState {
socket: WebSocket | null;
UUID: number;
stats: CameraStats;
prevFrameNum: number;
heartbeatInterval: number;
}
233 changes: 231 additions & 2 deletions cmd/managementd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,40 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
package main

import (
"bytes"
"encoding/binary"
"encoding/json"
"fmt"
"log"
"net/http"
"sync"
"sync/atomic"
"time"

"github.com/gobuffalo/packr"
"github.com/godbus/dbus"
"github.com/gorilla/mux"
"golang.org/x/net/websocket"

goconfig "github.com/TheCacophonyProject/go-config"
"github.com/TheCacophonyProject/go-cptv/cptvframe"
managementinterface "github.com/TheCacophonyProject/management-interface"
"github.com/TheCacophonyProject/management-interface/api"
)

const (
configDir = goconfig.DefaultConfigDir
configDir = goconfig.DefaultConfigDir
socketTimeout = 7 * time.Second
)

var haveClients = make(chan bool)
var version = "<not set>"
var sockets = make(map[int64]*WebsocketRegistration)
var socketsLock sync.RWMutex
var cameraInfo map[string]interface{}
var lastFrame *FrameData
var currentFrame = -1
var managementAPI *api.ManagementAPI

// Set up and handle page requests.
func main() {
Expand All @@ -57,7 +74,8 @@ func main() {
// Serve up static content.
static := packr.NewBox("../../static")
router.PathPrefix("/static/").Handler(http.StripPrefix("/static/", http.FileServer(static)))

router.Handle("/ws", websocket.Handler(WebsocketServer))
go sendFrameToSockets()
// UI handlers.
router.HandleFunc("/", managementinterface.IndexHandler).Methods("GET")
router.HandleFunc("/wifi-networks", managementinterface.WifiNetworkHandler).Methods("GET", "POST")
Expand All @@ -76,6 +94,8 @@ func main() {

// API
apiObj, err := api.NewAPI(config.config, version)
managementAPI = apiObj

if err != nil {
log.Fatal(err)
return
Expand Down Expand Up @@ -127,3 +147,212 @@ func basicAuth(next http.Handler) http.Handler {
}
})
}

type WebsocketRegistration struct {
AtomicLock uint32
Socket *websocket.Conn
LastHeartbeatAt time.Time
}

func (socket *WebsocketRegistration) Inactive() bool {
return time.Since(socket.LastHeartbeatAt) >= socketTimeout
}

type message struct {
// the json tag means this will serialize as a lowercased field
Type string `json:"type"`
Data string `json:"data"`
Uuid int64 `json:"uuid"`
}

func WebsocketServer(ws *websocket.Conn) {
for {
// Receive any messages from the client
message := message{}
if err := websocket.JSON.Receive(ws, &message); err != nil {
// Probably EOF error, when there's no message. Maybe could sleep, so we're not thrashing this?
} else {
// When we first get a connection, register the websocket and push it onto an array of websockets.
// Occasionally go through the list and cull any that are no-longer sending heart-beats.
if message.Type == "Register" {
socketsLock.Lock()
firstSocket := len(sockets) == 0
sockets[message.Uuid] = &WebsocketRegistration{
Socket: ws,
LastHeartbeatAt: time.Now(),
AtomicLock: 0,
}
socketsLock.Unlock()
if firstSocket {
log.Print("Git new client register")
haveClients <- true
}
}
if message.Type == "Heartbeat" {
if socket, ok := sockets[message.Uuid]; ok {
socket.LastHeartbeatAt = time.Now()
}
}
}
// TODO(jon): This blocks, so lets avoid busy-waiting
time.Sleep(1 * time.Millisecond)
}
}

type FrameInfo struct {
Camera map[string]interface{}
Telemetry cptvframe.Telemetry
Calibration map[string]interface{}
BinaryVersion string
AppVersion string
Mode string
Tracks []map[string]interface{}
}

func sendFrameToSockets() {
frameNum := 0
var fps int32 = 9
sleepDuration := time.Duration(1000/fps) * time.Millisecond
for {
// NOTE: Only bother with this work if we have clients connected.
if len(sockets) != 0 {
if cameraInfo == nil {
cameraInfo = Headers()
// wairing for camera to connect
if cameraInfo == nil {
time.Sleep(5)
continue
}
fps = cameraInfo["FPS"].(int32)
sleepDuration = time.Duration(1000/fps) * time.Millisecond
}
time.Sleep(sleepDuration)
lastFrame = GetFrame()
if lastFrame == nil {
continue
}
// Make the frame info
buffer := bytes.NewBuffer(make([]byte, 0))
// lastFrameLock.RLock()
frameInfo := FrameInfo{
Camera: cameraInfo,
Telemetry: lastFrame.Frame.Status,
Tracks: lastFrame.Tracks,
}
frameInfoJson, _ := json.Marshal(frameInfo)
frameInfoLen := len(frameInfoJson)
// Write out the length of the frameInfo json as a u16
_ = binary.Write(buffer, binary.LittleEndian, uint16(frameInfoLen))
_ = binary.Write(buffer, binary.LittleEndian, frameInfoJson)
for _, row := range lastFrame.Frame.Pix {
_ = binary.Write(buffer, binary.LittleEndian, row)
}
// Send the buffer back to the client
frameBytes := buffer.Bytes()
socketsLock.RLock()
for uuid, socket := range sockets {
go func(socket *WebsocketRegistration, uuid int64, frameNum int) {
// If the socket is busy sending the previous frame,
// don't block, just move on to the next socket.
if atomic.CompareAndSwapUint32(&socket.AtomicLock, 0, 1) {
_ = websocket.Message.Send(socket.Socket, frameBytes)
atomic.StoreUint32(&socket.AtomicLock, 0)
} else {
// Locked, skip this frame to let client catch up.
log.Println("Skipping frame for", uuid, frameNum)
}
}(socket, uuid, frameNum)
}
socketsLock.RUnlock()
frameNum++

var socketsToRemove []int64
socketsLock.RLock()
for uuid, socket := range sockets {
if socket.Inactive() {
socketsToRemove = append(socketsToRemove, uuid)
}
}
socketsLock.RUnlock()
if len(socketsToRemove) != 0 {
socketsLock.Lock()
for _, socketUuid := range socketsToRemove {
socket := sockets[socketUuid]
delete(sockets, socketUuid)
go func(socket *WebsocketRegistration, uuid int64) {
log.Println("Dropping old socket", uuid)
_ = socket.Socket.Close()
log.Println("Dropped old socket", uuid)
}(socket, socketUuid)
}
socketsLock.Unlock()
}
} else {
log.Print("Wait for new client camera register")
<-haveClients
}
}
}

func Headers() map[string]interface{} {
conn, err := dbus.SystemBus()
if err != nil {
return nil
}
recorder := conn.Object("org.cacophony.thermalrecorder", "/org/cacophony/thermalrecorder")
specs := map[string]interface{}{}
err = recorder.Call("org.cacophony.thermalrecorder.CameraInfo", 0).Store(&specs)
if err != nil {
log.Printf("Error getting camera headers %v", err)
return nil
}
return specs
}

type FrameData struct {
Frame *cptvframe.Frame
Tracks []map[string]interface{}
}

func GetFrame() *FrameData {

conn, err := dbus.SystemBus()
if err != nil {
return nil
}

recorder := conn.Object("org.cacophony.thermalrecorder", "/org/cacophony/thermalrecorder")
f := &FrameData{&cptvframe.Frame{}, nil}
start := time.Now()

c := recorder.Call("org.cacophony.thermalrecorder.TakeSnapshot", 0, currentFrame)
if c.Err != nil {
log.Printf("Err taking snapshot %v", err)
return nil
}
val := c.Body[0].([]interface{})
elapsed := time.Since(start)
log.Printf("Snapshot took %s", elapsed)

tel := val[1].([]interface{})
f.Frame.Pix = val[0].([][]uint16)
f.Frame.Status.TimeOn = time.Duration(tel[0].(int64))
f.Frame.Status.FFCState = tel[1].(string)
f.Frame.Status.FrameCount = int(tel[2].(int32))
f.Frame.Status.FrameMean = tel[3].(uint16)
f.Frame.Status.TempC = tel[4].(float64)
f.Frame.Status.LastFFCTempC = tel[5].(float64)
f.Frame.Status.LastFFCTime = time.Duration(tel[6].(int64))
f.Frame.Status.BackgroundFrame = tel[7].(bool)
if len(val) == 3 {
jsonS := val[2].(string)
if jsonS != "" {
json.Unmarshal([]byte(jsonS), &f.Tracks)
}
}

if f != nil {
currentFrame = f.Frame.Status.FrameCount
}
return f
}
Loading

0 comments on commit 68f297e

Please sign in to comment.