diff --git a/pkg/config/output_file.go b/pkg/config/output_file.go index e4636896..d09bc0c0 100644 --- a/pkg/config/output_file.go +++ b/pkg/config/output_file.go @@ -16,7 +16,6 @@ package config import ( "fmt" - "os" "path" "strings" "time" @@ -139,16 +138,8 @@ func (o *FileConfig) updateFilepath(p *PipelineConfig, identifier string, replac // get local filepath _, filename := path.Split(o.StorageFilepath) - // prepend the configuration base directory and the egress Id - local := path.Join(p.TmpDir, p.Info.EgressId) - - // create temporary directory - if err := os.MkdirAll(local, 0755); err != nil { - return err - } - // write to tmp dir - o.LocalFilepath = path.Join(local, filename) + o.LocalFilepath = path.Join(p.TmpDir, filename) return nil } diff --git a/pkg/config/output_image.go b/pkg/config/output_image.go index de6b9edf..2a35a73e 100644 --- a/pkg/config/output_image.go +++ b/pkg/config/output_image.go @@ -132,9 +132,7 @@ func (o *ImageConfig) updatePrefix(p *PipelineConfig) error { // there is more than one image output // os.ModeDir creates a directory with mode 000 when mapping the directory outside the container // Append a "/" to the path for consistency with the "UploadConfig == nil" case - o.LocalDir = path.Join(p.TmpDir, p.Info.EgressId, o.Id) + "/" - - // create local directories + o.LocalDir = path.Join(p.TmpDir, o.Id) + "/" if err := os.MkdirAll(o.LocalDir, 0755); err != nil { return err } diff --git a/pkg/config/output_segment.go b/pkg/config/output_segment.go index 1229ba0f..5b28b17c 100644 --- a/pkg/config/output_segment.go +++ b/pkg/config/output_segment.go @@ -105,7 +105,7 @@ func (o *SegmentConfig) updatePrefixAndPlaylist(p *PipelineConfig) error { playlistDir, playlistName := path.Split(o.PlaylistFilename) livePlaylistDir, livePlaylistName := path.Split(o.LivePlaylistFilename) - fileDir, filePrefix := path.Split(o.SegmentPrefix) + segmentDir, segmentPrefix := path.Split(o.SegmentPrefix) // force live playlist to be in the same directory as the main playlist if livePlaylistDir != "" && livePlaylistDir != playlistDir { @@ -117,20 +117,20 @@ func (o *SegmentConfig) updatePrefixAndPlaylist(p *PipelineConfig) error { livePlaylistName = removeKnownExtension(livePlaylistName) // only keep fileDir if it is a subdirectory of playlistDir - if fileDir != "" { - if playlistDir == fileDir { - fileDir = "" + if segmentDir != "" { + if playlistDir == segmentDir { + segmentDir = "" } else if playlistDir == "" { - playlistDir = fileDir - fileDir = "" + playlistDir = segmentDir + segmentDir = "" } } o.StorageDir = playlistDir // ensure playlistName if playlistName == "" { - if filePrefix != "" { - playlistName = filePrefix + if segmentPrefix != "" { + playlistName = segmentPrefix } else { playlistName = fmt.Sprintf("%s-%s", identifier, time.Now().Format("2006-01-02T150405")) } @@ -138,8 +138,8 @@ func (o *SegmentConfig) updatePrefixAndPlaylist(p *PipelineConfig) error { // live playlist disabled by default // ensure filePrefix - if filePrefix == "" { - filePrefix = playlistName + if segmentPrefix == "" { + segmentPrefix = playlistName } // update config @@ -148,7 +148,7 @@ func (o *SegmentConfig) updatePrefixAndPlaylist(p *PipelineConfig) error { if livePlaylistName != "" { o.LivePlaylistFilename = fmt.Sprintf("%s%s", livePlaylistName, ext) } - o.SegmentPrefix = fmt.Sprintf("%s%s", fileDir, filePrefix) + o.SegmentPrefix = fmt.Sprintf("%s%s", segmentDir, segmentPrefix) if o.PlaylistFilename == o.LivePlaylistFilename { return errors.ErrInvalidInput("live_playlist_name cannot be identical to playlist_name") @@ -157,15 +157,11 @@ func (o *SegmentConfig) updatePrefixAndPlaylist(p *PipelineConfig) error { // Prepend the configuration base directory and the egress Id // os.ModeDir creates a directory with mode 000 when mapping the directory outside the container // Append a "/" to the path for consistency with the "UploadConfig == nil" case - o.LocalDir = path.Join(p.TmpDir, p.Info.EgressId) + "/" - - // create local directories - if fileDir != "" { - if err := os.MkdirAll(path.Join(o.LocalDir, fileDir), 0755); err != nil { + o.LocalDir = p.TmpDir + if segmentDir != "" { + if err := os.MkdirAll(path.Join(o.LocalDir, segmentDir), 0755); err != nil { return err } - } else if err := os.MkdirAll(o.LocalDir, 0755); err != nil { - return err } o.SegmentsInfo.PlaylistName = path.Join(o.StorageDir, o.PlaylistFilename) diff --git a/pkg/config/pipeline.go b/pkg/config/pipeline.go index ffa1462b..a107b43a 100644 --- a/pkg/config/pipeline.go +++ b/pkg/config/pipeline.go @@ -39,6 +39,7 @@ import ( const ( Latency = uint64(3e9) + TmpDir = "/home/egress/tmp" ) type PipelineConfig struct { diff --git a/pkg/pipeline/sink/uploader/local.go b/pkg/pipeline/sink/uploader/local.go index 7c131275..fee30434 100644 --- a/pkg/pipeline/sink/uploader/local.go +++ b/pkg/pipeline/sink/uploader/local.go @@ -15,6 +15,7 @@ package uploader import ( + "io" "os" "path" @@ -42,7 +43,21 @@ func (u *localUploader) upload(localFilepath, storageFilepath string, _ types.Ou return "", 0, err } - if err = os.Rename(localFilepath, storageFilepath); err != nil { + tmp, err := os.Open(localFilepath) + if err != nil { + return "", 0, err + } + + f, err := os.Create(storageFilepath) + if err != nil { + _ = tmp.Close() + return "", 0, err + } + + _, err = io.Copy(f, tmp) + _ = f.Close() + _ = tmp.Close() + if err != nil { return "", 0, err } diff --git a/pkg/server/server.go b/pkg/server/server.go index 9f9035a1..1d16a803 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -96,7 +96,7 @@ func NewServer(conf *config.ServiceConfig, bus psrpc.MessageBus, ioClient info.I }() } - tmpDir := path.Join(os.TempDir(), s.conf.NodeID) + tmpDir := path.Join(config.TmpDir, s.conf.NodeID) if err := os.MkdirAll(tmpDir, 0755); err != nil { return nil, err } diff --git a/pkg/server/server_rpc.go b/pkg/server/server_rpc.go index 6c91009d..0cb5f8bb 100644 --- a/pkg/server/server_rpc.go +++ b/pkg/server/server_rpc.go @@ -95,7 +95,7 @@ func (s *Server) launchProcess(req *rpc.StartEgressRequest, info *livekit.Egress p := &config.PipelineConfig{ BaseConfig: s.conf.BaseConfig, HandlerID: handlerID, - TmpDir: path.Join(os.TempDir(), handlerID), + TmpDir: path.Join(config.TmpDir, handlerID), } confString, err := yaml.Marshal(p) diff --git a/test/download.go b/test/download.go index e1926f98..0d82b927 100644 --- a/test/download.go +++ b/test/download.go @@ -40,15 +40,17 @@ import ( ) func download(t *testing.T, c *config.StorageConfig, localFilepath, storageFilepath string) { - if c.S3 != nil { - logger.Debugw("s3 download", "localFilepath", localFilepath, "storageFilepath", storageFilepath) - downloadS3(t, c.S3, localFilepath, storageFilepath) - } else if c.GCP != nil { - logger.Debugw("gcp download", "localFilepath", localFilepath, "storageFilepath", storageFilepath) - downloadGCP(t, c.GCP, localFilepath, storageFilepath) - } else if c.Azure != nil { - logger.Debugw("azure download", "localFilepath", localFilepath, "storageFilepath", storageFilepath) - downloadAzure(t, c.Azure, localFilepath, storageFilepath) + if c != nil { + if c.S3 != nil { + logger.Debugw("s3 download", "localFilepath", localFilepath, "storageFilepath", storageFilepath) + downloadS3(t, c.S3, localFilepath, storageFilepath) + } else if c.GCP != nil { + logger.Debugw("gcp download", "localFilepath", localFilepath, "storageFilepath", storageFilepath) + downloadGCP(t, c.GCP, localFilepath, storageFilepath) + } else if c.Azure != nil { + logger.Debugw("azure download", "localFilepath", localFilepath, "storageFilepath", storageFilepath) + downloadAzure(t, c.Azure, localFilepath, storageFilepath) + } } }