Skip to content

Commit

Permalink
Merge branch 'vod' into vod-auto-remove-node
Browse files Browse the repository at this point in the history
  • Loading branch information
ioppermann committed Oct 31, 2024
2 parents fa19973 + abc821f commit 317b30c
Show file tree
Hide file tree
Showing 596 changed files with 42,190 additions and 8,629 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
ARG GOLANG_IMAGE=golang:1.22-alpine3.20
ARG GOLANG_IMAGE=golang:1.23-alpine3.20
ARG BUILD_IMAGE=alpine:3.20

# Cross-Compilation
Expand Down
47 changes: 27 additions & 20 deletions app/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ import (
"github.com/datarhei/core/v16/monitor"
"github.com/datarhei/core/v16/net"
"github.com/datarhei/core/v16/prometheus"
"github.com/datarhei/core/v16/psutil"
"github.com/datarhei/core/v16/resources"
"github.com/datarhei/core/v16/resources/psutil"
"github.com/datarhei/core/v16/resources/psutil/gpu/nvidia"
"github.com/datarhei/core/v16/restream"
restreamapp "github.com/datarhei/core/v16/restream/app"
"github.com/datarhei/core/v16/restream/replace"
Expand Down Expand Up @@ -127,8 +128,6 @@ type api struct {
state string

undoMaxprocs func()

process psutil.Process
}

// ErrConfigReload is an error returned to indicate that a reload of
Expand Down Expand Up @@ -370,17 +369,23 @@ func (a *api) start(ctx context.Context) error {
debug.SetMemoryLimit(math.MaxInt64)
}

psutil, err := psutil.New("", nvidia.New(""))
if err != nil {
return fmt.Errorf("failed to initialize psutils: %w", err)
}

resources, err := resources.New(resources.Config{
MaxCPU: cfg.Resources.MaxCPUUsage,
MaxMemory: cfg.Resources.MaxMemoryUsage,
Logger: a.log.logger.core.WithComponent("Resources"),
MaxCPU: cfg.Resources.MaxCPUUsage,
MaxMemory: cfg.Resources.MaxMemoryUsage,
MaxGPU: cfg.Resources.MaxGPUUsage,
MaxGPUMemory: cfg.Resources.MaxGPUMemoryUsage,
Logger: a.log.logger.core.WithComponent("Resources"),
PSUtil: psutil,
})
if err != nil {
return fmt.Errorf("failed to initialize resource manager: %w", err)
}

resources.Start()

a.resources = resources

if cfg.Sessions.Enable {
Expand Down Expand Up @@ -507,6 +512,7 @@ func (a *api) start(ctx context.Context) error {
ValidatorOutput: validatorOut,
Portrange: portrange,
Collector: a.sessions.Collector("ffmpeg"),
Resource: a.resources,
})
if err != nil {
return fmt.Errorf("unable to create ffmpeg: %w", err)
Expand Down Expand Up @@ -848,6 +854,7 @@ func (a *api) start(ctx context.Context) error {
"type": "mem",
"name": "mem",
}),
Storage: "swiss",
}
var memfs fs.Filesystem = nil
if len(cfg.Storage.Memory.Backup.Dir) != 0 {
Expand Down Expand Up @@ -1228,13 +1235,15 @@ func (a *api) start(ctx context.Context) error {
metrics.Register(monitor.NewUptimeCollector())
metrics.Register(monitor.NewCPUCollector(a.resources))
metrics.Register(monitor.NewMemCollector(a.resources))
metrics.Register(monitor.NewNetCollector())
metrics.Register(monitor.NewDiskCollector(a.diskfs.Metadata("base")))
metrics.Register(monitor.NewGPUCollector(a.resources))
metrics.Register(monitor.NewNetCollector(a.resources))
metrics.Register(monitor.NewDiskCollector(a.diskfs.Metadata("base"), a.resources))
metrics.Register(monitor.NewFilesystemCollector("diskfs", a.diskfs))
metrics.Register(monitor.NewFilesystemCollector("memfs", a.memfs))
for name, fs := range a.s3fs {
metrics.Register(monitor.NewFilesystemCollector(name, fs))
}
metrics.Register(monitor.NewSelfCollector())
metrics.Register(monitor.NewRestreamCollector(a.restream))
metrics.Register(monitor.NewFFmpegCollector(a.ffmpeg))
metrics.Register(monitor.NewSessionCollector(a.sessions, []string{}))
Expand Down Expand Up @@ -1428,7 +1437,6 @@ func (a *api) start(ctx context.Context) error {
Password: "",
DefaultFile: "index.html",
DefaultContentType: "text/html",
Gzip: true,
Filesystem: a.diskfs,
Cache: a.cache,
},
Expand All @@ -1441,7 +1449,6 @@ func (a *api) start(ctx context.Context) error {
Password: cfg.Storage.Memory.Auth.Password,
DefaultFile: "",
DefaultContentType: "application/data",
Gzip: true,
Filesystem: a.memfs,
Cache: nil,
},
Expand All @@ -1457,7 +1464,6 @@ func (a *api) start(ctx context.Context) error {
Password: s3.Auth.Password,
DefaultFile: "",
DefaultContentType: "application/data",
Gzip: true,
Filesystem: a.s3fs[s3.Name],
Cache: a.cache,
})
Expand All @@ -1470,7 +1476,7 @@ func (a *api) start(ctx context.Context) error {
Restream: a.restream,
Metrics: a.metrics,
Prometheus: a.prom,
MimeTypesFile: cfg.Storage.MimeTypes,
MimeTypesFile: cfg.Storage.MimeTypesFile,
Filesystems: httpfilesystems,
IPLimiter: iplimiter,
Profiling: cfg.Debug.Profiling,
Expand Down Expand Up @@ -1501,6 +1507,12 @@ func (a *api) start(ctx context.Context) error {

return false
},
Resources: a.resources,
Compress: http.CompressConfig{
Encoding: cfg.Compress.Encoding,
MimeTypes: cfg.Compress.MimeTypes,
MinLength: cfg.Compress.MinLength,
},
}

mainserverhandler, err := http.NewServer(serverConfig)
Expand Down Expand Up @@ -1882,11 +1894,6 @@ func (a *api) stop() {
a.service = nil
}

if a.process != nil {
a.process.Stop()
a.process = nil
}

// Unregister all collectors
if a.metrics != nil {
a.metrics.UnregisterAll()
Expand All @@ -1909,7 +1916,7 @@ func (a *api) stop() {

// Stop resource observer
if a.resources != nil {
a.resources.Stop()
a.resources.Cancel()
}

// Stop the session tracker
Expand Down
42 changes: 33 additions & 9 deletions cluster/about.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,29 @@ type ClusterRaft struct {
}

type ClusterNodeResources struct {
IsThrottling bool // Whether this core is currently throttling
NCPU float64 // Number of CPU on this node
CPU float64 // Current CPU load, 0-100*ncpu
CPULimit float64 // Defined CPU load limit, 0-100*ncpu
CPUCore float64 // Current CPU load of the core itself, 0-100*ncpu
Mem uint64 // Currently used memory in bytes
MemLimit uint64 // Defined memory limit in bytes
MemTotal uint64 // Total available memory in bytes
MemCore uint64 // Current used memory of the core itself in bytes
IsThrottling bool // Whether this core is currently throttling
NCPU float64 // Number of CPU on this node
CPU float64 // Current CPU load, 0-100*ncpu
CPULimit float64 // Defined CPU load limit, 0-100*ncpu
CPUCore float64 // Current CPU load of the core itself, 0-100*ncpu
Mem uint64 // Currently used memory in bytes
MemLimit uint64 // Defined memory limit in bytes
MemTotal uint64 // Total available memory in bytes
MemCore uint64 // Current used memory of the core itself in bytes
GPU []ClusterNodeGPUResources // GPU resources
Error error
}

type ClusterNodeGPUResources struct {
Mem uint64 // Currently used memory in bytes
MemLimit uint64 // Defined memory limit in bytes
MemTotal uint64 // Total available memory in bytes
Usage float64 // Current general usage, 0-100
UsageLimit float64 // Defined general usage limit, 0-100
Encoder float64 // Current encoder usage, 0-100
Decoder float64 // Current decoder usage, 0-100
}

type ClusterNode struct {
ID string
Name string
Expand Down Expand Up @@ -157,6 +168,19 @@ func (c *cluster) About() (ClusterAbout, error) {
},
}

if len(nodeAbout.Resources.GPU) != 0 {
node.Resources.GPU = make([]ClusterNodeGPUResources, len(nodeAbout.Resources.GPU))
for i, gpu := range nodeAbout.Resources.GPU {
node.Resources.GPU[i].Mem = gpu.Mem
node.Resources.GPU[i].MemLimit = gpu.MemLimit
node.Resources.GPU[i].MemTotal = gpu.MemTotal
node.Resources.GPU[i].Usage = gpu.Usage
node.Resources.GPU[i].UsageLimit = gpu.UsageLimit
node.Resources.GPU[i].Encoder = gpu.Encoder
node.Resources.GPU[i].Decoder = gpu.Decoder
}
}

if s, ok := serversMap[nodeAbout.ID]; ok {
node.Voter = s.Voter
node.Leader = s.Leader
Expand Down
17 changes: 15 additions & 2 deletions cluster/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (a *api) Version(c echo.Context) error {
// @Tags v1.0.0
// @ID cluster-1-about
// @Produce json
// @Success 200 {string} About
// @Success 200 {object} client.AboutResponse
// @Success 500 {object} Error
// @Router /v1/about [get]
func (a *api) About(c echo.Context) error {
Expand All @@ -195,6 +195,19 @@ func (a *api) About(c echo.Context) error {
},
}

if len(resources.GPU.GPU) != 0 {
about.Resources.GPU = make([]client.AboutResponseGPUResources, len(resources.GPU.GPU))
for i, gpu := range resources.GPU.GPU {
about.Resources.GPU[i].Mem = gpu.MemoryUsed
about.Resources.GPU[i].MemLimit = gpu.MemoryLimit
about.Resources.GPU[i].MemTotal = gpu.MemoryTotal
about.Resources.GPU[i].Usage = gpu.Usage
about.Resources.GPU[i].UsageLimit = gpu.UsageLimit
about.Resources.GPU[i].Encoder = gpu.Encoder
about.Resources.GPU[i].Decoder = gpu.Decoder
}
}

if err != nil {
about.Resources.Error = err.Error()
}
Expand Down Expand Up @@ -400,7 +413,7 @@ func (a *api) ProcessAdd(c echo.Context) error {
// @Param id path string true "Process ID"
// @Param domain query string false "Domain to act on"
// @Param X-Cluster-Origin header string false "Origin ID of request"
// @Success 200 {string} string
// @Success 200 {object} client.GetProcessResponse
// @Failure 404 {object} Error
// @Failure 500 {object} Error
// @Failure 508 {object} Error
Expand Down
31 changes: 21 additions & 10 deletions cluster/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,17 +83,28 @@ type AboutResponse struct {
Resources AboutResponseResources `json:"resources"`
}

type AboutResponseGPUResources struct {
Mem uint64 `json:"memory_bytes"` // Currently used memory in bytes
MemLimit uint64 `json:"memory_limit_bytes"` // Defined memory limit in bytes
MemTotal uint64 `json:"memory_total_bytes"` // Total available memory in bytes
Usage float64 `json:"usage"` // Current general usage, 0-100
Encoder float64 `json:"encoder"` // Current encoder usage, 0-100
Decoder float64 `json:"decoder"` // Current decoder usage, 0-100
UsageLimit float64 `json:"usage_limit"` // Defined general usage limit, 0-100
}

type AboutResponseResources struct {
IsThrottling bool `json:"is_throttling"` // Whether this core is currently throttling
NCPU float64 `json:"ncpu"` // Number of CPU on this node
CPU float64 `json:"cpu"` // Current CPU load, 0-100*ncpu
CPULimit float64 `json:"cpu_limit"` // Defined CPU load limit, 0-100*ncpu
CPUCore float64 `json:"cpu_core"` // Current CPU load of the core itself, 0-100*ncpu
Mem uint64 `json:"memory_bytes"` // Currently used memory in bytes
MemLimit uint64 `json:"memory_limit_bytes"` // Defined memory limit in bytes
MemTotal uint64 `json:"memory_total_bytes"` // Total available memory in bytes
MemCore uint64 `json:"memory_core_bytes"` // Current used memory of the core itself in bytes
Error string `json:"error"` // Last error
IsThrottling bool `json:"is_throttling"` // Whether this core is currently throttling
NCPU float64 `json:"ncpu"` // Number of CPU on this node
CPU float64 `json:"cpu"` // Current CPU load, 0-100*ncpu
CPULimit float64 `json:"cpu_limit"` // Defined CPU load limit, 0-100*ncpu
CPUCore float64 `json:"cpu_core"` // Current CPU load of the core itself, 0-100*ncpu
Mem uint64 `json:"memory_bytes"` // Currently used memory in bytes
MemLimit uint64 `json:"memory_limit_bytes"` // Defined memory limit in bytes
MemTotal uint64 `json:"memory_total_bytes"` // Total available memory in bytes
MemCore uint64 `json:"memory_core_bytes"` // Current used memory of the core itself in bytes
GPU []AboutResponseGPUResources `json:"gpu"` // Currently used GPU resources
Error string `json:"error"` // Last error
}

type SetNodeStateRequest struct {
Expand Down
16 changes: 11 additions & 5 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,7 @@ func (c *cluster) CertManager() autocert.Manager {
}

func (c *cluster) Shutdown() error {
c.logger.Info().Log("Shutting down cluster")
c.logger.Info().Log("Shutting down cluster ...")
c.shutdownLock.Lock()
defer c.shutdownLock.Unlock()

Expand All @@ -652,26 +652,32 @@ func (c *cluster) Shutdown() error {
c.shutdown = true
close(c.shutdownCh)

c.logger.Info().Log("Waiting for all routines to exit ...")

c.shutdownWg.Wait()

c.logger.Info().Log("All routines exited ...")

if c.manager != nil {
c.logger.Info().Log("Shutting down node manager ...")
c.manager.NodesClear()
}

if c.api != nil {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

c.logger.Info().Log("Shutting down API ...")

c.api.Shutdown(ctx)
}

if c.raft != nil {
c.logger.Info().Log("Shutting down raft ...")
c.raft.Shutdown()
}

// TODO: here might some situations, where the manager is still need from the synchronize loop and will run into a panic
c.manager = nil
c.raft = nil
c.logger.Info().Log("Cluster stopped")

return nil
}
Expand Down Expand Up @@ -1055,7 +1061,7 @@ func (c *cluster) trackLeaderChanges() {
if !isNodeInCluster {
// We're not anymore part of the cluster, shutdown
c.logger.Warn().WithField("id", c.nodeID).Log("This node left the cluster. Shutting down.")
c.Shutdown()
go c.Shutdown()
}

case <-c.shutdownCh:
Expand Down
Loading

0 comments on commit 317b30c

Please sign in to comment.