Skip to content

Commit

Permalink
Merge branch 'vod' into vod-auto-remove-node
Browse files Browse the repository at this point in the history
  • Loading branch information
ioppermann committed Aug 19, 2024
2 parents 7879480 + d6d39f1 commit fa19973
Show file tree
Hide file tree
Showing 456 changed files with 10,404 additions and 19,476 deletions.
2 changes: 2 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@ README.md
*.md
.github/*
.github_build/*
core
core-*
6 changes: 3 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
ARG GOLANG_IMAGE=golang:1.22-alpine3.19
ARG BUILD_IMAGE=alpine:3.19
ARG GOLANG_IMAGE=golang:1.22-alpine3.20
ARG BUILD_IMAGE=alpine:3.20

# Cross-Compilation
# https://www.docker.com/blog/faster-multi-platform-builds-dockerfile-cross-compilation-guide/
FROM --platform=$BUILDPLATFORM $GOLANG_IMAGE as builder
FROM --platform=$BUILDPLATFORM $GOLANG_IMAGE AS builder

ARG TARGETOS TARGETARCH TARGETVARIANT
ENV GOOS=$TARGETOS GOARCH=$TARGETARCH GOARM=$TARGETVARIANT
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile.bundle
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ ARG CORE_IMAGE=core:dev

ARG FFMPEG_IMAGE=datarhei/base:alpine-ffmpeg-latest

FROM $CORE_IMAGE as core
FROM $CORE_IMAGE AS core

FROM $FFMPEG_IMAGE

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ init:

## build: Build core (default)
build:
CGO_ENABLED=0 GOOS=${GOOS} GOARCH=${GOARCH} GOARM=${GOARM} go build -o core$(BINSUFFIX) -trimpath
CGO_ENABLED=0 GOOS=${GOOS} GOARCH=${GOARCH} GOARM=${GOARM} go build -o core$(BINSUFFIX)

## swagger: Update swagger API documentation (requires github.com/swaggo/swag)
swagger:
Expand Down
18 changes: 9 additions & 9 deletions app/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import (
httpfs "github.com/datarhei/core/v16/http/fs"
"github.com/datarhei/core/v16/http/router"
"github.com/datarhei/core/v16/iam"
iamaccess "github.com/datarhei/core/v16/iam/access"
iamidentity "github.com/datarhei/core/v16/iam/identity"
iampolicy "github.com/datarhei/core/v16/iam/policy"
"github.com/datarhei/core/v16/io/fs"
"github.com/datarhei/core/v16/log"
"github.com/datarhei/core/v16/math/rand"
Expand Down Expand Up @@ -670,7 +670,7 @@ func (a *api) start(ctx context.Context) error {
return err
}

policyAdapter, err := iamaccess.NewJSONAdapter(rfs, "./policy.json", nil)
policyAdapter, err := iampolicy.NewJSONAdapter(rfs, "./policy.json", nil)
if err != nil {
return err
}
Expand All @@ -695,7 +695,7 @@ func (a *api) start(ctx context.Context) error {
// Check if there are already file created by IAM. If not, create policies
// and users based on the config in order to mimic the behaviour before IAM.
if len(rfs.List("/", fs.ListOptions{Pattern: "/*.json"})) == 0 {
policies := []iamaccess.Policy{
policies := []iampolicy.Policy{
{
Name: "$anon",
Domain: "$none",
Expand Down Expand Up @@ -731,7 +731,7 @@ func (a *api) start(ctx context.Context) error {
},
}

policies = append(policies, iamaccess.Policy{
policies = append(policies, iampolicy.Policy{
Name: cfg.Storage.Memory.Auth.Username,
Domain: "$none",
Types: []string{"fs"},
Expand All @@ -757,7 +757,7 @@ func (a *api) start(ctx context.Context) error {
users[s.Auth.Username] = user
}

policies = append(policies, iamaccess.Policy{
policies = append(policies, iampolicy.Policy{
Name: s.Auth.Username,
Domain: "$none",
Types: []string{"fs"},
Expand All @@ -768,7 +768,7 @@ func (a *api) start(ctx context.Context) error {
}

if cfg.RTMP.Enable && len(cfg.RTMP.Token) == 0 {
policies = append(policies, iamaccess.Policy{
policies = append(policies, iampolicy.Policy{
Name: "$anon",
Domain: "$none",
Types: []string{"rtmp"},
Expand All @@ -778,7 +778,7 @@ func (a *api) start(ctx context.Context) error {
}

if cfg.SRT.Enable && len(cfg.SRT.Token) == 0 {
policies = append(policies, iamaccess.Policy{
policies = append(policies, iampolicy.Policy{
Name: "$anon",
Domain: "$none",
Types: []string{"srt"},
Expand Down Expand Up @@ -1180,7 +1180,7 @@ func (a *api) start(ctx context.Context) error {

var store restreamstore.Store = nil

{
if !cfg.Cluster.Enable {
fs, err := fs.NewRootedDiskFilesystem(fs.RootedDiskConfig{
Root: cfg.DB.Dir,
})
Expand Down Expand Up @@ -1989,7 +1989,7 @@ func backupMemFS(target, source fs.Filesystem, patterns []string) error {
continue
}

target.WriteFileReader(name, file)
target.WriteFileReader(name, file, -1)

file.Close()
}
Expand Down
12 changes: 6 additions & 6 deletions app/import/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,11 +575,11 @@ func importV1(fs fs.Filesystem, path string, cfg importConfig) (store.Data, erro
ID: "restreamer-ui:ingest:" + cfg.id,
Reference: cfg.id,
CreatedAt: time.Now().Unix(),
Order: "stop",
Order: app.NewOrder("stop"),
}

if v1data.Actions.Ingest == "start" {
process.Order = "start"
process.Order = app.NewOrder("start")
}

config := &app.Config{
Expand Down Expand Up @@ -1211,11 +1211,11 @@ func importV1(fs fs.Filesystem, path string, cfg importConfig) (store.Data, erro
ID: "restreamer-ui:ingest:" + cfg.id + "_snapshot",
Reference: cfg.id,
CreatedAt: time.Now().Unix(),
Order: "stop",
Order: app.NewOrder("stop"),
}

if v1data.Actions.Ingest == "start" {
process.Order = "start"
process.Order = app.NewOrder("start")
}

snapshotConfig := &app.Config{
Expand Down Expand Up @@ -1292,11 +1292,11 @@ func importV1(fs fs.Filesystem, path string, cfg importConfig) (store.Data, erro
ID: egressId,
Reference: cfg.id,
CreatedAt: time.Now().Unix(),
Order: "stop",
Order: app.NewOrder("stop"),
}

if v1data.Actions.Egress == "start" {
process.Order = "start"
process.Order = app.NewOrder("start")
}

egress := restreamerUIEgress{
Expand Down
4 changes: 2 additions & 2 deletions app/import/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ func TestImport(t *testing.T) {
memfs, err := fs.NewMemFilesystem(fs.MemConfig{})
require.NoError(t, err)

memfs.WriteFileReader("/mime.types", strings.NewReader("foobar"))
memfs.WriteFileReader("/bin/ffmpeg", strings.NewReader("foobar"))
memfs.WriteFileReader("/mime.types", strings.NewReader("foobar"), -1)
memfs.WriteFileReader("/bin/ffmpeg", strings.NewReader("foobar"), -1)

configstore, err := store.NewJSON(memfs, "/config.json", nil)
require.NoError(t, err)
Expand Down
4 changes: 2 additions & 2 deletions app/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ func (v versionInfo) MinorString() string {
// Version of the app
var Version = versionInfo{
Major: 16,
Minor: 19,
Patch: 1,
Minor: 20,
Patch: 0,
}

// Commit is the git commit the app is build from. It should be filled in during compilation
Expand Down
6 changes: 6 additions & 0 deletions cluster/about.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@ type ClusterNodeResources struct {
NCPU float64 // Number of CPU on this node
CPU float64 // Current CPU load, 0-100*ncpu
CPULimit float64 // Defined CPU load limit, 0-100*ncpu
CPUCore float64 // Current CPU load of the core itself, 0-100*ncpu
Mem uint64 // Currently used memory in bytes
MemLimit uint64 // Defined memory limit in bytes
MemTotal uint64 // Total available memory in bytes
MemCore uint64 // Current used memory of the core itself in bytes
Error error
}

Expand Down Expand Up @@ -145,8 +148,11 @@ func (c *cluster) About() (ClusterAbout, error) {
NCPU: nodeAbout.Resources.NCPU,
CPU: nodeAbout.Resources.CPU,
CPULimit: nodeAbout.Resources.CPULimit,
CPUCore: nodeAbout.Resources.CPUCore,
Mem: nodeAbout.Resources.Mem,
MemLimit: nodeAbout.Resources.MemLimit,
MemTotal: nodeAbout.Resources.MemTotal,
MemCore: nodeAbout.Resources.MemCore,
Error: nodeAbout.Resources.Error,
},
}
Expand Down
45 changes: 44 additions & 1 deletion cluster/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func NewAPI(config APIConfig) (API, error) {
a.router.GET("/v1/snaphot", a.Snapshot)

a.router.POST("/v1/process", a.ProcessAdd)
a.router.GET("/v1/process/:id", a.ProcessGet)
a.router.DELETE("/v1/process/:id", a.ProcessRemove)
a.router.PUT("/v1/process/:id", a.ProcessUpdate)
a.router.PUT("/v1/process/:id/command", a.ProcessSetCommand)
Expand Down Expand Up @@ -186,8 +187,11 @@ func (a *api) About(c echo.Context) error {
NCPU: resources.CPU.NCPU,
CPU: (100 - resources.CPU.Idle) * resources.CPU.NCPU,
CPULimit: resources.CPU.Limit * resources.CPU.NCPU,
CPUCore: resources.CPU.Core * resources.CPU.NCPU,
Mem: resources.Mem.Total - resources.Mem.Available,
MemLimit: resources.Mem.Total,
MemLimit: resources.Mem.Limit,
MemTotal: resources.Mem.Total,
MemCore: resources.Mem.Core,
},
}

Expand Down Expand Up @@ -387,6 +391,45 @@ func (a *api) ProcessAdd(c echo.Context) error {
return c.JSON(http.StatusOK, "OK")
}

// ProcessGet gets a process from the cluster DB
// @Summary Get a process
// @Description Get a process from the cluster DB
// @Tags v1.0.0
// @ID cluster-1-get-process
// @Produce json
// @Param id path string true "Process ID"
// @Param domain query string false "Domain to act on"
// @Param X-Cluster-Origin header string false "Origin ID of request"
// @Success 200 {string} string
// @Failure 404 {object} Error
// @Failure 500 {object} Error
// @Failure 508 {object} Error
// @Router /v1/process/{id} [get]
func (a *api) ProcessGet(c echo.Context) error {
id := util.PathParam(c, "id")
domain := util.DefaultQuery(c, "domain", "")

origin := c.Request().Header.Get("X-Cluster-Origin")

if origin == a.id {
return Err(http.StatusLoopDetected, "", "breaking circuit")
}

pid := app.ProcessID{ID: id, Domain: domain}

process, nodeid, err := a.cluster.Store().ProcessGet(pid)
if err != nil {
return ErrFromClusterError(err)
}

res := client.GetProcessResponse{
Process: process,
NodeID: nodeid,
}

return c.JSON(http.StatusOK, res)
}

// ProcessRemove removes a process from the cluster DB
// @Summary Remove a process
// @Description Remove a process from the cluster DB
Expand Down
13 changes: 11 additions & 2 deletions cluster/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ import (
"strings"
"time"

"github.com/datarhei/core/v16/cluster/store"
"github.com/datarhei/core/v16/config"
"github.com/datarhei/core/v16/encoding/json"
"github.com/datarhei/core/v16/ffmpeg/skills"
iamaccess "github.com/datarhei/core/v16/iam/access"
iamidentity "github.com/datarhei/core/v16/iam/identity"
iampolicy "github.com/datarhei/core/v16/iam/policy"
"github.com/datarhei/core/v16/restream/app"
)

Expand All @@ -26,6 +27,11 @@ type AddProcessRequest struct {
Config app.Config `json:"config"`
}

type GetProcessResponse struct {
Process store.Process `json:"process"`
NodeID string `json:"nodeid"`
}

type UpdateProcessRequest struct {
Config app.Config `json:"config"`
}
Expand All @@ -51,7 +57,7 @@ type UpdateIdentityRequest struct {
}

type SetPoliciesRequest struct {
Policies []iamaccess.Policy `json:"policies"`
Policies []iampolicy.Policy `json:"policies"`
}

type LockRequest struct {
Expand Down Expand Up @@ -82,8 +88,11 @@ type AboutResponseResources struct {
NCPU float64 `json:"ncpu"` // Number of CPU on this node
CPU float64 `json:"cpu"` // Current CPU load, 0-100*ncpu
CPULimit float64 `json:"cpu_limit"` // Defined CPU load limit, 0-100*ncpu
CPUCore float64 `json:"cpu_core"` // Current CPU load of the core itself, 0-100*ncpu
Mem uint64 `json:"memory_bytes"` // Currently used memory in bytes
MemLimit uint64 `json:"memory_limit_bytes"` // Defined memory limit in bytes
MemTotal uint64 `json:"memory_total_bytes"` // Total available memory in bytes
MemCore uint64 `json:"memory_core_bytes"` // Current used memory of the core itself in bytes
Error string `json:"error"` // Last error
}

Expand Down
17 changes: 17 additions & 0 deletions cluster/client/proces.go → cluster/client/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"net/http"
"net/url"

"github.com/datarhei/core/v16/cluster/store"
"github.com/datarhei/core/v16/encoding/json"
"github.com/datarhei/core/v16/restream/app"
)
Expand All @@ -20,6 +21,22 @@ func (c *APIClient) ProcessAdd(origin string, r AddProcessRequest) error {
return err
}

func (c APIClient) ProcessGet(origin string, id app.ProcessID) (store.Process, string, error) {
res := GetProcessResponse{}

data, err := c.call(http.MethodGet, "/v1/process/"+url.PathEscape(id.ID)+"?domain="+url.QueryEscape(id.Domain), "application/json", nil, origin)
if err != nil {
return store.Process{}, "", err
}

err = json.Unmarshal(data, &res)
if err != nil {
return store.Process{}, "", err
}

return res.Process, res.NodeID, nil
}

func (c *APIClient) ProcessRemove(origin string, id app.ProcessID) error {
_, err := c.call(http.MethodDelete, "/v1/process/"+url.PathEscape(id.ID)+"?domain="+url.QueryEscape(id.Domain), "application/json", nil, origin)

Expand Down
Loading

0 comments on commit fa19973

Please sign in to comment.