Skip to content

Commit

Permalink
fix paths
Browse files Browse the repository at this point in the history
  • Loading branch information
frostbyte73 committed Oct 15, 2024
1 parent 4786130 commit 347e5f5
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 43 deletions.
11 changes: 1 addition & 10 deletions pkg/config/output_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package config

import (
"fmt"
"os"
"path"
"strings"
"time"
Expand Down Expand Up @@ -139,16 +138,8 @@ func (o *FileConfig) updateFilepath(p *PipelineConfig, identifier string, replac
// get local filepath
_, filename := path.Split(o.StorageFilepath)

// prepend the configuration base directory and the egress Id
local := path.Join(p.TmpDir, p.Info.EgressId)

// create temporary directory
if err := os.MkdirAll(local, 0755); err != nil {
return err
}

// write to tmp dir
o.LocalFilepath = path.Join(local, filename)
o.LocalFilepath = path.Join(p.TmpDir, filename)

return nil
}
Expand Down
4 changes: 1 addition & 3 deletions pkg/config/output_image.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,7 @@ func (o *ImageConfig) updatePrefix(p *PipelineConfig) error {
// there is more than one image output
// os.ModeDir creates a directory with mode 000 when mapping the directory outside the container
// Append a "/" to the path for consistency with the "UploadConfig == nil" case
o.LocalDir = path.Join(p.TmpDir, p.Info.EgressId, o.Id) + "/"

// create local directories
o.LocalDir = path.Join(p.TmpDir, o.Id) + "/"
if err := os.MkdirAll(o.LocalDir, 0755); err != nil {
return err
}
Expand Down
32 changes: 14 additions & 18 deletions pkg/config/output_segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (o *SegmentConfig) updatePrefixAndPlaylist(p *PipelineConfig) error {

playlistDir, playlistName := path.Split(o.PlaylistFilename)
livePlaylistDir, livePlaylistName := path.Split(o.LivePlaylistFilename)
fileDir, filePrefix := path.Split(o.SegmentPrefix)
segmentDir, segmentPrefix := path.Split(o.SegmentPrefix)

// force live playlist to be in the same directory as the main playlist
if livePlaylistDir != "" && livePlaylistDir != playlistDir {
Expand All @@ -117,29 +117,29 @@ func (o *SegmentConfig) updatePrefixAndPlaylist(p *PipelineConfig) error {
livePlaylistName = removeKnownExtension(livePlaylistName)

// only keep fileDir if it is a subdirectory of playlistDir
if fileDir != "" {
if playlistDir == fileDir {
fileDir = ""
if segmentDir != "" {
if playlistDir == segmentDir {
segmentDir = ""
} else if playlistDir == "" {
playlistDir = fileDir
fileDir = ""
playlistDir = segmentDir
segmentDir = ""
}
}
o.StorageDir = playlistDir

// ensure playlistName
if playlistName == "" {
if filePrefix != "" {
playlistName = filePrefix
if segmentPrefix != "" {
playlistName = segmentPrefix
} else {
playlistName = fmt.Sprintf("%s-%s", identifier, time.Now().Format("2006-01-02T150405"))
}
}
// live playlist disabled by default

// ensure filePrefix
if filePrefix == "" {
filePrefix = playlistName
if segmentPrefix == "" {
segmentPrefix = playlistName
}

// update config
Expand All @@ -148,7 +148,7 @@ func (o *SegmentConfig) updatePrefixAndPlaylist(p *PipelineConfig) error {
if livePlaylistName != "" {
o.LivePlaylistFilename = fmt.Sprintf("%s%s", livePlaylistName, ext)
}
o.SegmentPrefix = fmt.Sprintf("%s%s", fileDir, filePrefix)
o.SegmentPrefix = fmt.Sprintf("%s%s", segmentDir, segmentPrefix)

if o.PlaylistFilename == o.LivePlaylistFilename {
return errors.ErrInvalidInput("live_playlist_name cannot be identical to playlist_name")
Expand All @@ -157,15 +157,11 @@ func (o *SegmentConfig) updatePrefixAndPlaylist(p *PipelineConfig) error {
// Prepend the configuration base directory and the egress Id
// os.ModeDir creates a directory with mode 000 when mapping the directory outside the container
// Append a "/" to the path for consistency with the "UploadConfig == nil" case
o.LocalDir = path.Join(p.TmpDir, p.Info.EgressId) + "/"

// create local directories
if fileDir != "" {
if err := os.MkdirAll(path.Join(o.LocalDir, fileDir), 0755); err != nil {
o.LocalDir = p.TmpDir
if segmentDir != "" {
if err := os.MkdirAll(path.Join(o.LocalDir, segmentDir), 0755); err != nil {
return err
}
} else if err := os.MkdirAll(o.LocalDir, 0755); err != nil {
return err
}

o.SegmentsInfo.PlaylistName = path.Join(o.StorageDir, o.PlaylistFilename)
Expand Down
1 change: 1 addition & 0 deletions pkg/config/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (

const (
Latency = uint64(3e9)
TmpDir = "/home/egress/tmp"
)

type PipelineConfig struct {
Expand Down
17 changes: 16 additions & 1 deletion pkg/pipeline/sink/uploader/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package uploader

import (
"io"
"os"
"path"

Expand Down Expand Up @@ -42,7 +43,21 @@ func (u *localUploader) upload(localFilepath, storageFilepath string, _ types.Ou
return "", 0, err
}

if err = os.Rename(localFilepath, storageFilepath); err != nil {
tmp, err := os.Open(localFilepath)
if err != nil {
return "", 0, err
}

f, err := os.Create(storageFilepath)
if err != nil {
_ = tmp.Close()
return "", 0, err
}

_, err = io.Copy(f, tmp)
_ = f.Close()
_ = tmp.Close()
if err != nil {
return "", 0, err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func NewServer(conf *config.ServiceConfig, bus psrpc.MessageBus, ioClient info.I
}()
}

tmpDir := path.Join(os.TempDir(), s.conf.NodeID)
tmpDir := path.Join(config.TmpDir, s.conf.NodeID)
if err := os.MkdirAll(tmpDir, 0755); err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/server_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (s *Server) launchProcess(req *rpc.StartEgressRequest, info *livekit.Egress
p := &config.PipelineConfig{
BaseConfig: s.conf.BaseConfig,
HandlerID: handlerID,
TmpDir: path.Join(os.TempDir(), handlerID),
TmpDir: path.Join(config.TmpDir, handlerID),
}

confString, err := yaml.Marshal(p)
Expand Down
20 changes: 11 additions & 9 deletions test/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,17 @@ import (
)

func download(t *testing.T, c *config.StorageConfig, localFilepath, storageFilepath string) {
if c.S3 != nil {
logger.Debugw("s3 download", "localFilepath", localFilepath, "storageFilepath", storageFilepath)
downloadS3(t, c.S3, localFilepath, storageFilepath)
} else if c.GCP != nil {
logger.Debugw("gcp download", "localFilepath", localFilepath, "storageFilepath", storageFilepath)
downloadGCP(t, c.GCP, localFilepath, storageFilepath)
} else if c.Azure != nil {
logger.Debugw("azure download", "localFilepath", localFilepath, "storageFilepath", storageFilepath)
downloadAzure(t, c.Azure, localFilepath, storageFilepath)
if c != nil {
if c.S3 != nil {
logger.Debugw("s3 download", "localFilepath", localFilepath, "storageFilepath", storageFilepath)
downloadS3(t, c.S3, localFilepath, storageFilepath)
} else if c.GCP != nil {
logger.Debugw("gcp download", "localFilepath", localFilepath, "storageFilepath", storageFilepath)
downloadGCP(t, c.GCP, localFilepath, storageFilepath)
} else if c.Azure != nil {
logger.Debugw("azure download", "localFilepath", localFilepath, "storageFilepath", storageFilepath)
downloadAzure(t, c.Azure, localFilepath, storageFilepath)
}
}
}

Expand Down

0 comments on commit 347e5f5

Please sign in to comment.