diff --git a/.github/workflows/test-integration.yaml b/.github/workflows/test-integration.yaml index 7461ef80..57fa5a5c 100644 --- a/.github/workflows/test-integration.yaml +++ b/.github/workflows/test-integration.yaml @@ -66,8 +66,6 @@ jobs: run: | docker run --rm \ --network host \ - --security-opt seccomp=unconfined \ - --cap-add=SYS_ADMIN \ -e GITHUB_WORKFLOW=1 \ -e EGRESS_CONFIG_STRING="$(echo ${{ secrets.EGRESS_CONFIG_STRING }} | base64 -d)" \ -e S3_UPLOAD="$(echo ${{ secrets.S3_UPLOAD }} | base64 -d)" \ diff --git a/README.md b/README.md index a81c6aca..5628c70c 100644 --- a/README.md +++ b/README.md @@ -66,6 +66,7 @@ logging: json: true template_base: can be used to host custom templates (default https://egress-composite.livekit.io) backup_storage: files will be moved here when uploads fail. location must have write access granted for all users +enable_chrome_sandbox: if true, egress will run Chrome with sandboxing enabled. This requires a specific Docker setup, see below. cpu_cost: # optionally override cpu cost estimation, used when accepting or denying requests room_composite_cpu_cost: 3.0 web_cpu_cost: 3.0 @@ -172,13 +173,34 @@ Then to run the service: ```shell docker run --rm \ -e EGRESS_CONFIG_FILE=/out/config.yaml \ - --cap-add=SYS_ADMIN \ -v ~/egress-test:/out \ livekit/egress ``` You can then use our [cli](https://github.com/livekit/livekit-cli) to submit egress requests to your server. +### Chrome sandboxing + +By default, Room Composite and Web egresses run with Chrome sandboxing disabled. This is because the default docker security settings prevent Chrome from +switching to a different kernel namespace, which is needed by Chrome to setup its sandbox. + +Chrome sandboxing within Egress can be reenabled by setting the the `enable_chrome_sandbox` option to `true` in the egress configuration, and launching docker using the [provided +seccomp security profile](https://github.com/livekit/egress/blob/main/chrome-sandboxing-seccomp-profile.json): + +```shell +docker run --rm \ + -e EGRESS_CONFIG_FILE=/out/config.yaml \ + -v ~/egress-test:/out \ + --security-opt seccomp=chrome-sandboxing-seccomp-profile.json \ + livekit/egress +``` + +This profile is based on the [default docker seccomp security profile](https://github.com/moby/moby/blob/master/profiles/seccomp/default.json) and allows +the 2 extra system calls (`clone` and `unshare`) that Chrome needs to setup the sandbox. + +Note that kubernetes disables seccomp entirely by default, which means that running with Chrome sandboxing enabled is possible on a kubernetes with +the default security settings. + ## FAQ ### Can I store the files locally instead of uploading to cloud storage? diff --git a/chrome-sandboxing-seccomp-profile.json b/chrome-sandboxing-seccomp-profile.json new file mode 100644 index 00000000..8b8388db --- /dev/null +++ b/chrome-sandboxing-seccomp-profile.json @@ -0,0 +1,782 @@ +{ + "defaultAction": "SCMP_ACT_ERRNO", + "defaultErrnoRet": 1, + "archMap": [ + { + "architecture": "SCMP_ARCH_X86_64", + "subArchitectures": [ + "SCMP_ARCH_X86", + "SCMP_ARCH_X32" + ] + }, + { + "architecture": "SCMP_ARCH_AARCH64", + "subArchitectures": [ + "SCMP_ARCH_ARM" + ] + }, + { + "architecture": "SCMP_ARCH_MIPS64", + "subArchitectures": [ + "SCMP_ARCH_MIPS", + "SCMP_ARCH_MIPS64N32" + ] + }, + { + "architecture": "SCMP_ARCH_MIPS64N32", + "subArchitectures": [ + "SCMP_ARCH_MIPS", + "SCMP_ARCH_MIPS64" + ] + }, + { + "architecture": "SCMP_ARCH_MIPSEL64", + "subArchitectures": [ + "SCMP_ARCH_MIPSEL", + "SCMP_ARCH_MIPSEL64N32" + ] + }, + { + "architecture": "SCMP_ARCH_MIPSEL64N32", + "subArchitectures": [ + "SCMP_ARCH_MIPSEL", + "SCMP_ARCH_MIPSEL64" + ] + }, + { + "architecture": "SCMP_ARCH_S390X", + "subArchitectures": [ + "SCMP_ARCH_S390" + ] + }, + { + "architecture": "SCMP_ARCH_RISCV64", + "subArchitectures": null + } + ], + "syscalls": [ + { + "names": [ + "accept", + "accept4", + "access", + "adjtimex", + "alarm", + "bind", + "brk", + "capget", + "capset", + "chdir", + "chmod", + "chown", + "chown32", + "clock_adjtime", + "clock_adjtime64", + "clock_getres", + "clock_getres_time64", + "clock_gettime", + "clock_gettime64", + "clock_nanosleep", + "clock_nanosleep_time64", + "close", + "close_range", + "connect", + "copy_file_range", + "creat", + "dup", + "dup2", + "dup3", + "epoll_create", + "epoll_create1", + "epoll_ctl", + "epoll_ctl_old", + "epoll_pwait", + "epoll_pwait2", + "epoll_wait", + "epoll_wait_old", + "eventfd", + "eventfd2", + "execve", + "execveat", + "exit", + "exit_group", + "faccessat", + "faccessat2", + "fadvise64", + "fadvise64_64", + "fallocate", + "fanotify_mark", + "fchdir", + "fchmod", + "fchmodat", + "fchown", + "fchown32", + "fchownat", + "fcntl", + "fcntl64", + "fdatasync", + "fgetxattr", + "flistxattr", + "flock", + "fork", + "fremovexattr", + "fsetxattr", + "fstat", + "fstat64", + "fstatat64", + "fstatfs", + "fstatfs64", + "fsync", + "ftruncate", + "ftruncate64", + "futex", + "futex_time64", + "futex_waitv", + "futimesat", + "getcpu", + "getcwd", + "getdents", + "getdents64", + "getegid", + "getegid32", + "geteuid", + "geteuid32", + "getgid", + "getgid32", + "getgroups", + "getgroups32", + "getitimer", + "getpeername", + "getpgid", + "getpgrp", + "getpid", + "getppid", + "getpriority", + "getrandom", + "getresgid", + "getresgid32", + "getresuid", + "getresuid32", + "getrlimit", + "get_robust_list", + "getrusage", + "getsid", + "getsockname", + "getsockopt", + "get_thread_area", + "gettid", + "gettimeofday", + "getuid", + "getuid32", + "getxattr", + "inotify_add_watch", + "inotify_init", + "inotify_init1", + "inotify_rm_watch", + "io_cancel", + "ioctl", + "io_destroy", + "io_getevents", + "io_pgetevents", + "io_pgetevents_time64", + "ioprio_get", + "ioprio_set", + "io_setup", + "io_submit", + "io_uring_enter", + "io_uring_register", + "io_uring_setup", + "ipc", + "kill", + "landlock_add_rule", + "landlock_create_ruleset", + "landlock_restrict_self", + "lchown", + "lchown32", + "lgetxattr", + "link", + "linkat", + "listen", + "listxattr", + "llistxattr", + "_llseek", + "lremovexattr", + "lseek", + "lsetxattr", + "lstat", + "lstat64", + "madvise", + "membarrier", + "memfd_create", + "memfd_secret", + "mincore", + "mkdir", + "mkdirat", + "mknod", + "mknodat", + "mlock", + "mlock2", + "mlockall", + "mmap", + "mmap2", + "mprotect", + "mq_getsetattr", + "mq_notify", + "mq_open", + "mq_timedreceive", + "mq_timedreceive_time64", + "mq_timedsend", + "mq_timedsend_time64", + "mq_unlink", + "mremap", + "msgctl", + "msgget", + "msgrcv", + "msgsnd", + "msync", + "munlock", + "munlockall", + "munmap", + "name_to_handle_at", + "nanosleep", + "newfstatat", + "_newselect", + "open", + "openat", + "openat2", + "pause", + "pidfd_open", + "pidfd_send_signal", + "pipe", + "pipe2", + "pkey_alloc", + "pkey_free", + "pkey_mprotect", + "poll", + "ppoll", + "ppoll_time64", + "prctl", + "pread64", + "preadv", + "preadv2", + "prlimit64", + "process_mrelease", + "pselect6", + "pselect6_time64", + "pwrite64", + "pwritev", + "pwritev2", + "read", + "readahead", + "readlink", + "readlinkat", + "readv", + "recv", + "recvfrom", + "recvmmsg", + "recvmmsg_time64", + "recvmsg", + "remap_file_pages", + "removexattr", + "rename", + "renameat", + "renameat2", + "restart_syscall", + "rmdir", + "rseq", + "rt_sigaction", + "rt_sigpending", + "rt_sigprocmask", + "rt_sigqueueinfo", + "rt_sigreturn", + "rt_sigsuspend", + "rt_sigtimedwait", + "rt_sigtimedwait_time64", + "rt_tgsigqueueinfo", + "sched_getaffinity", + "sched_getattr", + "sched_getparam", + "sched_get_priority_max", + "sched_get_priority_min", + "sched_getscheduler", + "sched_rr_get_interval", + "sched_rr_get_interval_time64", + "sched_setaffinity", + "sched_setattr", + "sched_setparam", + "sched_setscheduler", + "sched_yield", + "seccomp", + "select", + "semctl", + "semget", + "semop", + "semtimedop", + "semtimedop_time64", + "send", + "sendfile", + "sendfile64", + "sendmmsg", + "sendmsg", + "sendto", + "setfsgid", + "setfsgid32", + "setfsuid", + "setfsuid32", + "setgid", + "setgid32", + "setgroups", + "setgroups32", + "setitimer", + "setpgid", + "setpriority", + "setregid", + "setregid32", + "setresgid", + "setresgid32", + "setresuid", + "setresuid32", + "setreuid", + "setreuid32", + "setrlimit", + "set_robust_list", + "setsid", + "setsockopt", + "set_thread_area", + "set_tid_address", + "setuid", + "setuid32", + "setxattr", + "shmat", + "shmctl", + "shmdt", + "shmget", + "shutdown", + "sigaltstack", + "signalfd", + "signalfd4", + "sigprocmask", + "sigreturn", + "socketcall", + "socketpair", + "splice", + "stat", + "stat64", + "statfs", + "statfs64", + "statx", + "symlink", + "symlinkat", + "sync", + "sync_file_range", + "syncfs", + "sysinfo", + "tee", + "tgkill", + "time", + "timer_create", + "timer_delete", + "timer_getoverrun", + "timer_gettime", + "timer_gettime64", + "timer_settime", + "timer_settime64", + "timerfd_create", + "timerfd_gettime", + "timerfd_gettime64", + "timerfd_settime", + "timerfd_settime64", + "times", + "tkill", + "truncate", + "truncate64", + "ugetrlimit", + "umask", + "uname", + "unlink", + "unlinkat", + "utime", + "utimensat", + "utimensat_time64", + "utimes", + "vfork", + "vmsplice", + "wait4", + "waitid", + "waitpid", + "write", + "writev", + "clone", + "unshare" + ], + "action": "SCMP_ACT_ALLOW" + }, + { + "names": [ + "process_vm_readv", + "process_vm_writev", + "ptrace" + ], + "action": "SCMP_ACT_ALLOW", + "includes": { + "minKernel": "4.8" + } + }, + { + "names": [ + "socket" + ], + "action": "SCMP_ACT_ALLOW", + "args": [ + { + "index": 0, + "value": 40, + "op": "SCMP_CMP_NE" + } + ] + }, + { + "names": [ + "personality" + ], + "action": "SCMP_ACT_ALLOW", + "args": [ + { + "index": 0, + "value": 0, + "op": "SCMP_CMP_EQ" + } + ] + }, + { + "names": [ + "personality" + ], + "action": "SCMP_ACT_ALLOW", + "args": [ + { + "index": 0, + "value": 8, + "op": "SCMP_CMP_EQ" + } + ] + }, + { + "names": [ + "personality" + ], + "action": "SCMP_ACT_ALLOW", + "args": [ + { + "index": 0, + "value": 131072, + "op": "SCMP_CMP_EQ" + } + ] + }, + { + "names": [ + "personality" + ], + "action": "SCMP_ACT_ALLOW", + "args": [ + { + "index": 0, + "value": 131080, + "op": "SCMP_CMP_EQ" + } + ] + }, + { + "names": [ + "personality" + ], + "action": "SCMP_ACT_ALLOW", + "args": [ + { + "index": 0, + "value": 4294967295, + "op": "SCMP_CMP_EQ" + } + ] + }, + { + "names": [ + "sync_file_range2", + "swapcontext" + ], + "action": "SCMP_ACT_ALLOW", + "includes": { + "arches": [ + "ppc64le" + ] + } + }, + { + "names": [ + "arm_fadvise64_64", + "arm_sync_file_range", + "sync_file_range2", + "breakpoint", + "cacheflush", + "set_tls" + ], + "action": "SCMP_ACT_ALLOW", + "includes": { + "arches": [ + "arm", + "arm64" + ] + } + }, + { + "names": [ + "arch_prctl" + ], + "action": "SCMP_ACT_ALLOW", + "includes": { + "arches": [ + "amd64", + "x32" + ] + } + }, + { + "names": [ + "modify_ldt" + ], + "action": "SCMP_ACT_ALLOW", + "includes": { + "arches": [ + "amd64", + "x32", + "x86" + ] + } + }, + { + "names": [ + "s390_pci_mmio_read", + "s390_pci_mmio_write", + "s390_runtime_instr" + ], + "action": "SCMP_ACT_ALLOW", + "includes": { + "arches": [ + "s390", + "s390x" + ] + } + }, + { + "names": [ + "riscv_flush_icache" + ], + "action": "SCMP_ACT_ALLOW", + "includes": { + "arches": [ + "riscv64" + ] + } + }, + { + "names": [ + "open_by_handle_at" + ], + "action": "SCMP_ACT_ALLOW", + "includes": { + "caps": [ + "CAP_DAC_READ_SEARCH" + ] + } + }, + { + "names": [ + "bpf", + "clone3", + "fanotify_init", + "fsconfig", + "fsmount", + "fsopen", + "fspick", + "lookup_dcookie", + "mount", + "mount_setattr", + "move_mount", + "open_tree", + "perf_event_open", + "quotactl", + "quotactl_fd", + "setdomainname", + "sethostname", + "setns", + "syslog", + "umount", + "umount2" + ], + "action": "SCMP_ACT_ALLOW", + "includes": { + "caps": [ + "CAP_SYS_ADMIN" + ] + } + }, + { + "names": [ + "clone3" + ], + "action": "SCMP_ACT_ERRNO", + "errnoRet": 38, + "excludes": { + "caps": [ + "CAP_SYS_ADMIN" + ] + } + }, + { + "names": [ + "reboot" + ], + "action": "SCMP_ACT_ALLOW", + "includes": { + "caps": [ + "CAP_SYS_BOOT" + ] + } + }, + { + "names": [ + "chroot" + ], + "action": "SCMP_ACT_ALLOW", + "includes": { + "caps": [ + "CAP_SYS_CHROOT" + ] + } + }, + { + "names": [ + "delete_module", + "init_module", + "finit_module" + ], + "action": "SCMP_ACT_ALLOW", + "includes": { + "caps": [ + "CAP_SYS_MODULE" + ] + } + }, + { + "names": [ + "acct" + ], + "action": "SCMP_ACT_ALLOW", + "includes": { + "caps": [ + "CAP_SYS_PACCT" + ] + } + }, + { + "names": [ + "kcmp", + "pidfd_getfd", + "process_madvise", + "process_vm_readv", + "process_vm_writev", + "ptrace" + ], + "action": "SCMP_ACT_ALLOW", + "includes": { + "caps": [ + "CAP_SYS_PTRACE" + ] + } + }, + { + "names": [ + "iopl", + "ioperm" + ], + "action": "SCMP_ACT_ALLOW", + "includes": { + "caps": [ + "CAP_SYS_RAWIO" + ] + } + }, + { + "names": [ + "settimeofday", + "stime", + "clock_settime", + "clock_settime64" + ], + "action": "SCMP_ACT_ALLOW", + "includes": { + "caps": [ + "CAP_SYS_TIME" + ] + } + }, + { + "names": [ + "vhangup" + ], + "action": "SCMP_ACT_ALLOW", + "includes": { + "caps": [ + "CAP_SYS_TTY_CONFIG" + ] + } + }, + { + "names": [ + "get_mempolicy", + "mbind", + "set_mempolicy" + ], + "action": "SCMP_ACT_ALLOW", + "includes": { + "caps": [ + "CAP_SYS_NICE" + ] + } + }, + { + "names": [ + "syslog" + ], + "action": "SCMP_ACT_ALLOW", + "includes": { + "caps": [ + "CAP_SYSLOG" + ] + } + }, + { + "names": [ + "bpf" + ], + "action": "SCMP_ACT_ALLOW", + "includes": { + "caps": [ + "CAP_BPF" + ] + } + }, + { + "names": [ + "perf_event_open" + ], + "action": "SCMP_ACT_ALLOW", + "includes": { + "caps": [ + "CAP_PERFMON" + ] + } + } + ] +} diff --git a/magefile.go b/magefile.go index 8137e446..aecdccc7 100644 --- a/magefile.go +++ b/magefile.go @@ -116,7 +116,7 @@ func Integration(configFile string) error { return mageutil.Run(context.Background(), "docker build -t egress-test -f build/test/Dockerfile .", - fmt.Sprintf("docker run --rm -e EGRESS_CONFIG_FILE=%s -v %s/test:/out --cap-add=SYS_ADMIN egress-test", configFile, dir), + fmt.Sprintf("docker run --rm -e EGRESS_CONFIG_FILE=%s -v %s/test:/out egress-test", configFile, dir), ) } diff --git a/pkg/config/base.go b/pkg/config/base.go index eaa1b64e..a8e1f232 100644 --- a/pkg/config/base.go +++ b/pkg/config/base.go @@ -33,12 +33,13 @@ type BaseConfig struct { WsUrl string `yaml:"ws_url"` // (env LIVEKIT_WS_URL) // optional - Logging *logger.Config `yaml:"logging"` // logging config - TemplateBase string `yaml:"template_base"` // custom template base url - BackupStorage string `yaml:"backup_storage"` // backup file location for failed uploads - ClusterID string `yaml:"cluster_id"` // cluster this instance belongs to - StorageConfig `yaml:",inline"` // upload config (S3, Azure, GCP, or AliOSS) - SessionLimits `yaml:"session_limits"` // session duration limits + Logging *logger.Config `yaml:"logging"` // logging config + TemplateBase string `yaml:"template_base"` // custom template base url + BackupStorage string `yaml:"backup_storage"` // backup file location for failed uploads + ClusterID string `yaml:"cluster_id"` // cluster this instance belongs to + EnableChromeSandbox bool `yaml:"enable_chrome_sandbox"` // enable Chrome sandbox, requires extra docker configuration + StorageConfig `yaml:",inline"` // upload config (S3, Azure, GCP, or AliOSS) + SessionLimits `yaml:"session_limits"` // session duration limits // dev/debugging Insecure bool `yaml:"insecure"` // allow chrome to connect to an insecure websocket diff --git a/pkg/gstreamer/bin.go b/pkg/gstreamer/bin.go index 3d84e940..dc26e48d 100644 --- a/pkg/gstreamer/bin.go +++ b/pkg/gstreamer/bin.go @@ -71,6 +71,14 @@ func (b *Bin) AddSinkBin(sink *Bin) error { } func (b *Bin) addBin(bin *Bin, direction gst.PadDirection) error { + bin.mu.Lock() + alreadyAdded := bin.added + bin.added = true + bin.mu.Unlock() + if alreadyAdded { + return errors.ErrBinAlreadyAdded + } + b.LockStateShared() defer b.UnlockStateShared() @@ -82,14 +90,6 @@ func (b *Bin) addBin(bin *Bin, direction gst.PadDirection) error { b.mu.Lock() defer b.mu.Unlock() - bin.mu.Lock() - alreadyAdded := bin.added - bin.added = true - bin.mu.Unlock() - if alreadyAdded { - return errors.ErrBinAlreadyAdded - } - if direction == gst.PadDirectionSource { b.srcs = append(b.srcs, bin) } else { @@ -149,21 +149,39 @@ func (b *Bin) AddElements(elements ...*gst.Element) error { } func (b *Bin) RemoveSourceBin(name string) (bool, error) { + return b.removeBin(name, gst.PadDirectionSource) +} + +func (b *Bin) RemoveSinkBin(name string) (bool, error) { + return b.removeBin(name, gst.PadDirectionSink) +} + +func (b *Bin) removeBin(name string, direction gst.PadDirection) (bool, error) { b.LockStateShared() defer b.UnlockStateShared() b.mu.Lock() defer b.mu.Unlock() - var src *Bin - for i, s := range b.srcs { - if s.bin.GetName() == name { - src = s - b.srcs = append(b.srcs[:i], b.srcs[i+1:]...) - break + var bin *Bin + if direction == gst.PadDirectionSource { + for i, s := range b.srcs { + if s.bin.GetName() == name { + bin = s + b.srcs = append(b.srcs[:i], b.srcs[i+1:]...) + break + } + } + } else { + for i, s := range b.sinks { + if s.bin.GetName() == name { + bin = s + b.sinks = append(b.sinks[:i], b.sinks[i+1:]...) + break + } } } - if src == nil { + if bin == nil { return false, nil } @@ -172,73 +190,58 @@ func (b *Bin) RemoveSourceBin(name string) (bool, error) { return true, nil } - if state != StateBuilding { - src.mu.Lock() - srcGhostPad, sinkGhostPad := getGhostPads(src, b) - src.mu.Unlock() - - srcGhostPad.AddProbe(gst.PadProbeTypeIdle, func(_ *gst.Pad, _ *gst.PadProbeInfo) gst.PadProbeReturn { - sinkPad := sinkGhostPad.GetTarget() - b.elements[0].ReleaseRequestPad(sinkPad) - - srcGhostPad.Unlink(sinkGhostPad.Pad) - b.bin.RemovePad(sinkGhostPad.Pad) - - if err := b.pipeline.Remove(src.bin.Element); err != nil { - b.OnError(err) - } + if state == StateBuilding { + if err := b.pipeline.Remove(bin.bin.Element); err != nil { + return false, errors.ErrGstPipelineError(err) + } + return true, nil + } - _ = src.bin.SetState(gst.StateNull) - return gst.PadProbeRemove - }) + if direction == gst.PadDirectionSource { + b.probeRemoveSource(bin) + } else { + b.probeRemoveSink(bin) } return true, nil } -func (b *Bin) RemoveSinkBin(name string) (bool, error) { - b.LockStateShared() - defer b.UnlockStateShared() +func (b *Bin) probeRemoveSource(src *Bin) { + src.mu.Lock() + srcGhostPad, sinkGhostPad := deleteGhostPadsLocked(src, b) + src.mu.Unlock() - b.mu.Lock() - defer b.mu.Unlock() + srcGhostPad.AddProbe(gst.PadProbeTypeIdle, func(_ *gst.Pad, _ *gst.PadProbeInfo) gst.PadProbeReturn { + sinkPad := sinkGhostPad.GetTarget() + b.elements[0].ReleaseRequestPad(sinkPad) - var sink *Bin - for i, s := range b.sinks { - if s.bin.GetName() == name { - sink = s - b.sinks = append(b.sinks[:i], b.sinks[i+1:]...) - break - } - } - if sink == nil { - return false, nil - } + srcGhostPad.Unlink(sinkGhostPad.Pad) + b.bin.RemovePad(sinkGhostPad.Pad) - state := b.GetStateLocked() - if state > StateRunning { - return true, nil - } + if err := b.pipeline.Remove(src.bin.Element); err != nil { + b.OnError(err) + } - defer logger.Debugw(fmt.Sprintf("%s removed", name)) - if state == StateBuilding { - if err := b.pipeline.Remove(sink.bin.Element); err != nil { - return false, errors.ErrGstPipelineError(err) + if err := src.bin.SetState(gst.StateNull); err != nil { + logger.Warnw(fmt.Sprintf("failed to change %s state", src.bin.GetName()), err) } - return true, nil - } + return gst.PadProbeRemove + }) +} +func (b *Bin) probeRemoveSink(sink *Bin) { sink.mu.Lock() - srcPad, sinkPad := getGhostPads(b, sink) + srcGhostPad, sinkGhostPad := deleteGhostPadsLocked(b, sink) sink.mu.Unlock() - srcPad.AddProbe(gst.PadProbeTypeBlockDownstream, func(_ *gst.Pad, _ *gst.PadProbeInfo) gst.PadProbeReturn { - srcPad.Unlink(sinkPad.Pad) - sinkPad.Pad.SendEvent(gst.NewEOSEvent()) + srcGhostPad.AddProbe(gst.PadProbeTypeBlockDownstream, func(_ *gst.Pad, _ *gst.PadProbeInfo) gst.PadProbeReturn { + srcGhostPad.Unlink(sinkGhostPad.Pad) + sinkGhostPad.Pad.SendEvent(gst.NewEOSEvent()) b.mu.Lock() err := b.pipeline.Remove(sink.bin.Element) b.mu.Unlock() + if err != nil { b.OnError(errors.ErrGstPipelineError(err)) return gst.PadProbeRemove @@ -248,12 +251,20 @@ func (b *Bin) RemoveSinkBin(name string) (bool, error) { logger.Warnw(fmt.Sprintf("failed to change %s state", sink.bin.GetName()), err) } - b.elements[len(b.elements)-1].ReleaseRequestPad(srcPad.GetTarget()) - b.bin.RemovePad(srcPad.Pad) + b.elements[len(b.elements)-1].ReleaseRequestPad(srcGhostPad.GetTarget()) + b.bin.RemovePad(srcGhostPad.Pad) return gst.PadProbeRemove }) +} - return true, nil +func deleteGhostPadsLocked(src, sink *Bin) (*gst.GhostPad, *gst.GhostPad) { + srcPad := src.pads[sink.bin.GetName()] + sinkPad := sink.pads[src.bin.GetName()] + + delete(src.pads, sink.bin.GetName()) + delete(sink.pads, src.bin.GetName()) + + return srcPad, sinkPad } func (b *Bin) SetState(state gst.State) error { @@ -304,6 +315,31 @@ func (b *Bin) SetEOSFunc(f func() bool) { b.eosFunc = f } +func (b *Bin) sendEOS() { + b.mu.Lock() + eosFunc := b.eosFunc + srcs := b.srcs + b.mu.Unlock() + + if eosFunc != nil && !eosFunc() { + return + } + + if len(srcs) > 0 { + var wg sync.WaitGroup + wg.Add(len(b.srcs)) + for _, src := range srcs { + go func(s *Bin) { + s.sendEOS() + wg.Done() + }(src) + } + wg.Wait() + } else if len(b.elements) > 0 { + b.bin.SendEvent(gst.NewEOSEvent()) + } +} + // ----- Internal ----- func (b *Bin) link() error { @@ -362,7 +398,7 @@ func (b *Bin) link() error { sink.mu.Lock() var err error if addQueues { - err = b.linkPeersWithQueueLocked(src, sink) + err = b.queueLinkPeersLocked(src, sink) } else { err = linkPeersLocked(src, sink) } @@ -380,7 +416,7 @@ func (b *Bin) link() error { } func linkPeersLocked(src, sink *Bin) error { - srcPad, sinkPad, err := createGhostPads(src, sink) + srcPad, sinkPad, err := createGhostPadsLocked(src, sink, nil) if err != nil { return err } @@ -418,7 +454,7 @@ func linkPeersLocked(src, sink *Bin) error { return nil } -func (b *Bin) linkPeersWithQueueLocked(src, sink *Bin) error { +func (b *Bin) queueLinkPeersLocked(src, sink *Bin) error { srcName := src.bin.GetName() sinkName := sink.bin.GetName() @@ -432,7 +468,7 @@ func (b *Bin) linkPeersWithQueueLocked(src, sink *Bin) error { return err } - srcPad, sinkPad, err := createGhostPadsWithQueue(src, sink, queue) + srcPad, sinkPad, err := createGhostPadsLocked(src, sink, queue) if err != nil { return err } @@ -443,31 +479,6 @@ func (b *Bin) linkPeersWithQueueLocked(src, sink *Bin) error { return nil } -func (b *Bin) sendEOS() { - b.mu.Lock() - eosFunc := b.eosFunc - srcs := b.srcs - b.mu.Unlock() - - if eosFunc != nil && !eosFunc() { - return - } - - if len(srcs) > 0 { - var wg sync.WaitGroup - wg.Add(len(b.srcs)) - for _, src := range srcs { - go func(s *Bin) { - s.sendEOS() - wg.Done() - }(src) - } - wg.Wait() - } else if len(b.elements) > 0 { - b.bin.SendEvent(gst.NewEOSEvent()) - } -} - func getPeerSrcs(srcs []*Bin) []*Bin { flattened := make([]*Bin, 0, len(srcs)) for _, src := range srcs { diff --git a/pkg/gstreamer/pads.go b/pkg/gstreamer/pads.go index 03fb8d40..3b202550 100644 --- a/pkg/gstreamer/pads.go +++ b/pkg/gstreamer/pads.go @@ -21,180 +21,199 @@ import ( "github.com/tinyzimmer/go-gst/gst" "github.com/livekit/egress/pkg/errors" + "github.com/livekit/protocol/logger" ) -func createGhostPads(src, sink *Bin) (*gst.GhostPad, *gst.GhostPad, error) { - srcName := src.bin.GetName() - sinkName := sink.bin.GetName() +type padTemplate struct { + element *gst.Element + template *gst.PadTemplate + capsNames map[string]struct{} + dataTypes map[string]struct{} +} - srcPad, sinkPad, err := getPads(src, sink) - if err != nil { - return nil, nil, err +func (p *padTemplate) toPad() *gst.Pad { + if p.template.Presence() == gst.PadPresenceAlways { + return p.element.GetStaticPad(p.template.Name()) + } else { + return p.element.GetRequestPad(p.template.Name()) } +} - srcGhostPad := gst.NewGhostPad(fmt.Sprintf("%s_%s_sink", srcName, sinkName), srcPad) - src.pads[sinkName] = srcGhostPad - src.bin.AddPad(srcGhostPad.Pad) - - sinkGhostPad := gst.NewGhostPad(fmt.Sprintf("%s_%s_src", srcName, sinkName), sinkPad) - sink.pads[srcName] = sinkGhostPad - sink.bin.AddPad(sinkGhostPad.Pad) +func (p *padTemplate) findDirectMatch(others []*padTemplate) *padTemplate { + for _, other := range others { + for capsName := range p.capsNames { + if _, ok := other.capsNames[capsName]; ok { + return other + } + } + for dataType := range p.dataTypes { + if _, ok := other.dataTypes[dataType]; ok { + return other + } + } + } + return nil +} - return srcGhostPad, sinkGhostPad, nil +func (p *padTemplate) findAnyMatch(others []*padTemplate) *padTemplate { + for _, other := range others { + if _, ok := p.dataTypes["ANY"]; ok { + return other + } + if _, ok := other.dataTypes["ANY"]; ok { + return other + } + } + return nil } -func createGhostPadsWithQueue(src, sink *Bin, queue *gst.Element) (*gst.GhostPad, *gst.GhostPad, error) { +func createGhostPadsLocked(src, sink *Bin, queue *gst.Element) (*gst.GhostPad, *gst.GhostPad, error) { srcName := src.bin.GetName() sinkName := sink.bin.GetName() - srcPad, sinkPad, err := getPads(src, sink) + srcPad, sinkPad, err := matchPadsLocked(src, sink) if err != nil { return nil, nil, err } - if padReturn := queue.GetStaticPad("src").Link(sinkPad); padReturn != gst.PadLinkOK { - return nil, nil, errors.ErrPadLinkFailed(queue.GetName(), sinkName, padReturn.String()) - } srcGhostPad := gst.NewGhostPad(fmt.Sprintf("%s_%s_sink", srcName, sinkName), srcPad) src.pads[sinkName] = srcGhostPad src.bin.AddPad(srcGhostPad.Pad) - sinkGhostPad := gst.NewGhostPad(fmt.Sprintf("%s_%s_src", srcName, sinkName), queue.GetStaticPad("sink")) + if queue != nil { + if padReturn := queue.GetStaticPad("src").Link(sinkPad); padReturn != gst.PadLinkOK { + return nil, nil, errors.ErrPadLinkFailed(queue.GetName(), sinkName, padReturn.String()) + } + + sinkGhostPad := gst.NewGhostPad(fmt.Sprintf("%s_%s_src", srcName, sinkName), queue.GetStaticPad("sink")) + sink.pads[srcName] = sinkGhostPad + sink.bin.AddPad(sinkGhostPad.Pad) + return srcGhostPad, sinkGhostPad, nil + } + + sinkGhostPad := gst.NewGhostPad(fmt.Sprintf("%s_%s_src", srcName, sinkName), sinkPad) sink.pads[srcName] = sinkGhostPad sink.bin.AddPad(sinkGhostPad.Pad) - return srcGhostPad, sinkGhostPad, nil } -func getGhostPads(src, sink *Bin) (*gst.GhostPad, *gst.GhostPad) { - srcPad := src.pads[sink.bin.GetName()] - sinkPad := sink.pads[src.bin.GetName()] - - delete(src.pads, sink.bin.GetName()) - delete(sink.pads, src.bin.GetName()) - - return srcPad, sinkPad -} - -func getPads(src, sink *Bin) (*gst.Pad, *gst.Pad, error) { +func matchPadsLocked(src, sink *Bin) (*gst.Pad, *gst.Pad, error) { var srcPad, sinkPad *gst.Pad - srcElement := src.elements[len(src.elements)-1] - sinkElement := sink.elements[0] - + var srcTemplates, sinkTemplates []*padTemplate if src.getSinkPad != nil { srcPad = src.getSinkPad(sink.bin.GetName()) - for _, padTemplate := range sinkElement.GetPadTemplates() { - if padTemplate.Direction() == gst.PadDirectionSink { - return srcPad, getPad(sinkElement, padTemplate), nil - } - } + } else { + srcTemplates = src.getPadTemplatesLocked(gst.PadDirectionSource) } if sink.getSrcPad != nil { sinkPad = sink.getSrcPad(src.bin.GetName()) - for _, padTemplate := range srcElement.GetPadTemplates() { - if padTemplate.Direction() == gst.PadDirectionSource { - return getPad(srcElement, padTemplate), sinkPad, nil - } - } + } else { + sinkTemplates = sink.getPadTemplatesLocked(gst.PadDirectionSink) } - srcPrimary, srcSecondary, srcAny := getPadTemplates(src.elements, gst.PadDirectionSource) - sinkPrimary, sinkSecondary, sinkAny := getPadTemplates(sink.elements, gst.PadDirectionSink) - - for srcCaps, srcTemplate := range srcPrimary { - if sinkTemplate, ok := sinkPrimary[srcCaps]; ok { - return getPad(srcElement, srcTemplate), getPad(sinkElement, sinkTemplate), nil + switch { + case srcPad != nil && sinkPad != nil: + return srcPad, sinkPad, nil + case srcPad != nil && len(sinkTemplates) == 1: + return srcPad, sinkTemplates[0].toPad(), nil + case sinkPad != nil && len(srcTemplates) == 1: + return srcTemplates[0].toPad(), sinkPad, nil + case len(srcTemplates) >= 1 && len(srcTemplates) >= 1: + for _, srcTemplate := range srcTemplates { + if sinkTemplate := srcTemplate.findDirectMatch(sinkTemplates); sinkTemplate != nil { + return srcTemplate.toPad(), sinkTemplate.toPad(), nil + } } - } - for dataType, srcTemplate := range srcSecondary { - if sinkTemplate, ok := sinkSecondary[dataType]; ok { - return getPad(srcElement, srcTemplate), getPad(sinkElement, sinkTemplate), nil + for _, srcTemplate := range srcTemplates { + if sinkTemplate := srcTemplate.findAnyMatch(sinkTemplates); sinkTemplate != nil { + return srcTemplate.toPad(), sinkTemplate.toPad(), nil + } } } - if srcAny != nil && sinkAny != nil { - return getPad(srcElement, srcAny), getPad(sinkElement, sinkAny), nil - } + logger.Warnw("could not match pads", nil, "srcTemplates", srcTemplates, "sinkTemplates", sinkTemplates) return nil, nil, errors.ErrGhostPadFailed } -func getPad(e *gst.Element, template *gst.PadTemplate) *gst.Pad { - if template.Presence() == gst.PadPresenceAlways { - return e.GetStaticPad(template.Name()) +func (b *Bin) getPadTemplatesLocked(direction gst.PadDirection) []*padTemplate { + var element *gst.Element + if direction == gst.PadDirectionSource { + element = b.elements[len(b.elements)-1] } else { - return e.GetRequestPad(template.Name()) + element = b.elements[0] } -} -func getPadTemplates(elements []*gst.Element, direction gst.PadDirection) ( - map[string]*gst.PadTemplate, - map[string]*gst.PadTemplate, - *gst.PadTemplate, -) { - primary := make(map[string]*gst.PadTemplate) - secondary := make(map[string]*gst.PadTemplate) - var anyTemplate *gst.PadTemplate + allTemplates := element.GetPadTemplates() + templates := make([]*padTemplate, 0) - var i int - if direction == gst.PadDirectionSource { - i = len(elements) - 1 - } + for _, template := range allTemplates { + if template.Direction() == direction { + t := &padTemplate{ + element: element, + template: template, + capsNames: make(map[string]struct{}), + dataTypes: make(map[string]struct{}), + } - for i >= 0 && i < len(elements) { - padTemplates := elements[i].GetPadTemplates() - for _, padTemplate := range padTemplates { - if padTemplate.Direction() == direction { - caps := padTemplate.Caps() - - if caps.IsAny() { - if strings.HasPrefix(padTemplate.Name(), direction.String()) { - // most generic pad - if anyTemplate == nil { - anyTemplate = padTemplate - } else { - continue - } + caps := template.Caps() + if caps.IsAny() { + if strings.HasPrefix(template.Name(), direction.String()) { + // src/src_%u/sink/sink_%u pad + capsNames, dataTypes, ok := b.getTypesLocked(direction) + if ok { + t.capsNames = capsNames + t.dataTypes = dataTypes } else { - // any caps but associated name - dataType := padTemplate.Name() - if strings.HasSuffix(dataType, "_%u") { - dataType = dataType[:len(dataType)-3] - } - if anyTemplate != nil { - secondary[dataType] = anyTemplate - return primary, secondary, nil - } - secondary[dataType] = padTemplate + t.dataTypes["ANY"] = struct{}{} } } else { - // specified caps + // audio/audio_%u/video/video_%u pad + dataType := template.Name() + if strings.HasSuffix(dataType, "_%u") { + dataType = dataType[:len(dataType)-3] + } + t.dataTypes[dataType] = struct{}{} + } + } else { + // pad has caps + splitCaps := strings.Split(caps.String(), "; ") + for _, c := range splitCaps { + capsName := strings.SplitN(c, ",", 2)[0] + t.capsNames[capsName] = struct{}{} + t.dataTypes[strings.Split(capsName, "/")[0]] = struct{}{} + } + } + + templates = append(templates, t) + } + } + + return templates +} + +func (b *Bin) getTypesLocked(direction gst.PadDirection) (map[string]struct{}, map[string]struct{}, bool) { + var i int + if direction == gst.PadDirectionSource { + i = len(b.elements) - 1 + } + + for i >= 0 && i < len(b.elements) { + allTemplates := b.elements[i].GetPadTemplates() + for _, template := range allTemplates { + if template.Direction() == gst.PadDirectionSource { + if caps := template.Caps(); !caps.IsAny() { + capsNames := make(map[string]struct{}) + dataTypes := make(map[string]struct{}) splitCaps := strings.Split(caps.String(), ";") for _, c := range splitCaps { capsName := strings.SplitN(c, ",", 2)[0] - dataType := strings.Split(capsName, "/")[0] - if anyTemplate != nil { - primary[capsName] = anyTemplate - secondary[dataType] = anyTemplate - } else { - primary[capsName] = padTemplate - secondary[dataType] = padTemplate - } - } - if anyTemplate != nil { - return primary, secondary, anyTemplate + capsNames[capsName] = struct{}{} + dataTypes[strings.Split(capsName, "/")[0]] = struct{}{} } + return capsNames, dataTypes, true } } } - if anyTemplate == nil { - for _, template := range primary { - return primary, secondary, template - } - for _, template := range secondary { - return primary, secondary, template - } - return primary, secondary, nil - } if direction == gst.PadDirectionSource { i-- @@ -203,5 +222,25 @@ func getPadTemplates(elements []*gst.Element, direction gst.PadDirection) ( } } - return primary, secondary, anyTemplate + if direction == gst.PadDirectionSource { + for _, src := range b.srcs { + src.mu.Lock() + capsNames, dataTypes, ok := src.getTypesLocked(direction) + src.mu.Unlock() + if ok { + return capsNames, dataTypes, true + } + } + } else { + for _, sink := range b.sinks { + sink.mu.Lock() + capsNames, dataTypes, ok := sink.getTypesLocked(direction) + sink.mu.Unlock() + if ok { + return capsNames, dataTypes, true + } + } + } + + return nil, nil, false } diff --git a/pkg/gstreamer/pipeline.go b/pkg/gstreamer/pipeline.go index 9d5f9a74..2060a2ab 100644 --- a/pkg/gstreamer/pipeline.go +++ b/pkg/gstreamer/pipeline.go @@ -27,7 +27,6 @@ import ( const ( stateChangeTimeout = time.Second * 15 - stopTimeout = time.Second * 30 ) type Pipeline struct { @@ -112,14 +111,6 @@ func (p *Pipeline) SetState(state gst.State) error { stateErr <- p.pipeline.SetState(state) }() - if state == gst.StateNull { - for _, src := range p.srcs { - if err := src.SetState(gst.StateNull); err != nil { - return err - } - } - } - select { case <-time.After(stateChangeTimeout): return errors.ErrPipelineFrozen @@ -172,7 +163,6 @@ func (p *Pipeline) Stop() { if old >= StateRunning { p.loop.Quit() - logger.Debugw("main loop closed") } p.UpgradeState(StateFinished) diff --git a/pkg/gstreamer/state.go b/pkg/gstreamer/state.go index e413e38d..b17654a9 100644 --- a/pkg/gstreamer/state.go +++ b/pkg/gstreamer/state.go @@ -32,25 +32,6 @@ const ( StateFinished ) -func (s State) String() string { - switch s { - case StateBuilding: - return "building" - case StateStarted: - return "starting" - case StateRunning: - return "running" - case StateEOS: - return "eos" - case StateStopping: - return "stopping" - case StateFinished: - return "finished" - default: - return "unknown" - } -} - type StateManager struct { lock sync.RWMutex state State @@ -96,3 +77,22 @@ func (s *StateManager) UpgradeState(state State) (State, bool) { return old, true } } + +func (s State) String() string { + switch s { + case StateBuilding: + return "building" + case StateStarted: + return "starting" + case StateRunning: + return "running" + case StateEOS: + return "eos" + case StateStopping: + return "stopping" + case StateFinished: + return "finished" + default: + return "unknown" + } +} diff --git a/pkg/pipeline/controller.go b/pkg/pipeline/controller.go index 81691599..a8cab099 100644 --- a/pkg/pipeline/controller.go +++ b/pkg/pipeline/controller.go @@ -55,7 +55,7 @@ type Controller struct { gstLogger *zap.SugaredLogger limitTimer *time.Timer playing core.Fuse - eosSent core.Fuse + eos core.Fuse stopped core.Fuse } @@ -71,7 +71,7 @@ func New(ctx context.Context, conf *config.PipelineConfig) (*Controller, error) }, gstLogger: logger.GetLogger().(*logger.ZapLogger).ToZap().WithOptions(zap.WithCaller(false)), playing: core.NewFuse(), - eosSent: core.NewFuse(), + eos: core.NewFuse(), stopped: core.NewFuse(), } c.callbacks.SetOnError(c.OnError) @@ -107,8 +107,6 @@ func New(ctx context.Context, conf *config.PipelineConfig) (*Controller, error) } func (c *Controller) BuildPipeline() error { - logger.Debugw("building pipeline") - p, err := gstreamer.NewPipeline(pipelineName, c.Latency, c.callbacks) if err != nil { return errors.ErrGstPipelineError(err) @@ -183,6 +181,9 @@ func (c *Controller) Run(ctx context.Context) *livekit.EgressInfo { c.Info.UpdatedAt = now c.Info.EndedAt = now + if c.SourceType == types.SourceTypeSDK { + c.updateDuration(c.src.GetEndedAt()) + } // update status if c.Info.Error != "" { @@ -443,13 +444,12 @@ func (c *Controller) SendEOS(ctx context.Context) { ctx, span := tracer.Start(ctx, "Pipeline.SendEOS") defer span.End() - c.eosSent.Once(func() { + c.eos.Once(func() { logger.Debugw("Sending EOS") if c.limitTimer != nil { c.limitTimer.Stop() } - switch c.Info.Status { case livekit.EgressStatus_EGRESS_STARTING: c.Info.Status = livekit.EgressStatus_EGRESS_ABORTED @@ -489,7 +489,7 @@ func (c *Controller) OnError(err error) { } } - if c.Info.Error == "" && (!c.eosSent.IsBroken() || c.FinalizationRequired) { + if c.Info.Error == "" && (!c.eos.IsBroken() || c.FinalizationRequired) { c.Info.Error = err.Error() } diff --git a/pkg/pipeline/source/web.go b/pkg/pipeline/source/web.go index 6385ca2a..5e868379 100644 --- a/pkg/pipeline/source/web.go +++ b/pkg/pipeline/source/web.go @@ -193,7 +193,7 @@ func (s *WebSource) launchChrome(ctx context.Context, p *config.PipelineConfig, webUrl = inputUrl.String() } - logger.Debugw("launching chrome", "url", webUrl) + logger.Debugw("launching chrome", "url", webUrl, "enableChomeSandbox", p.EnableChromeSandbox, "insecure", p.Insecure) opts := []chromedp.ExecAllocatorOption{ chromedp.NoFirstRun, @@ -235,6 +235,9 @@ func (s *WebSource) launchChrome(ctx context.Context, p *config.PipelineConfig, // output chromedp.Env(fmt.Sprintf("PULSE_SINK=%s", p.Info.EgressId)), chromedp.Flag("display", p.Display), + + // sandbox + chromedp.Flag("no-sandbox", !p.EnableChromeSandbox), } if insecure { diff --git a/pkg/pipeline/watch.go b/pkg/pipeline/watch.go index 2ac1950c..0e2166c8 100644 --- a/pkg/pipeline/watch.go +++ b/pkg/pipeline/watch.go @@ -122,7 +122,7 @@ func (c *Controller) messageWatch(msg *gst.Message) bool { var err error switch msg.Type() { case gst.MessageEOS: - logger.Infow("EOS received, stopping pipeline") + logger.Infow("EOS received") c.p.Stop() return false case gst.MessageWarning: @@ -164,7 +164,7 @@ func (c *Controller) handleMessageError(gErr *gst.GError) error { switch { case element == elementGstRtmp2Sink: - if strings.HasPrefix(gErr.Error(), "Connection error") && !c.eosSent.IsBroken() { + if strings.HasPrefix(gErr.Error(), "Connection error") && !c.eos.IsBroken() { // try reconnecting ok, err := c.streamBin.ResetStream(name, gErr) if err != nil { @@ -194,7 +194,7 @@ func (c *Controller) handleMessageError(gErr *gst.GError) error { case element == elementSplitMuxSink: // We sometimes get GstSplitMuxSink errors if send EOS before the first media was sent to the mux if message == msgMuxer { - if c.eosSent.IsBroken() { + if c.eos.IsBroken() { logger.Debugw("GstSplitMuxSink failure after sending EOS") return nil }