Skip to content

Commit

Permalink
core: Move storj workaround to uploadFile
Browse files Browse the repository at this point in the history
Primary and backup have different storage providers
so we need to move that lower layer.
  • Loading branch information
victorges committed Jun 19, 2024
1 parent f05b61e commit a858502
Showing 1 changed file with 31 additions and 20 deletions.
51 changes: 31 additions & 20 deletions core/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,36 +52,27 @@ type StorageBackupURLs []struct {
}

func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout time.Duration, storageBackupURLs StorageBackupURLs) (*drivers.SaveDataOutput, error) {
output := outputURI.String()

// While we wait for storj to implement an easier method for global object deletion we are hacking something
// here to allow us to have recording objects deleted after 7 days.
fields := &drivers.FileProperties{}
if strings.Contains(output, "gateway.storjshare.io/catalyst-recordings-com") {
fields = &drivers.FileProperties{Metadata: expiryField}
}

if strings.HasSuffix(output, ".ts") || strings.HasSuffix(output, ".mp4") {
if strings.HasSuffix(outputURI.Path, ".ts") || strings.HasSuffix(outputURI.Path, ".mp4") {
// For segments we just write them in one go here and return early.
// (Otherwise the incremental write logic below caused issues with clipping since it results in partial segments being written.)
fileContents, err := io.ReadAll(input)
if err != nil {
return nil, fmt.Errorf("failed to read file")
}

out, bytesWritten, err := uploadFileWithBackup(outputURI, fileContents, fields, segmentWriteTimeout, true, storageBackupURLs)
out, bytesWritten, err := uploadFileWithBackup(outputURI, fileContents, nil, segmentWriteTimeout, true, storageBackupURLs)
if err != nil {
return nil, fmt.Errorf("failed to upload video %s: (%d bytes) %w", outputURI.Redacted(), bytesWritten, err)
}

if err = extractThumb(outputURI, output, fileContents, storageBackupURLs); err != nil {
if err = extractThumb(outputURI, fileContents, storageBackupURLs); err != nil {
glog.Errorf("extracting thumbnail failed for %s: %v", outputURI.Redacted(), err)
}
return out, nil
}

// For the manifest files we want a very short cache ttl as the files are updating every few seconds
fields.CacheControl = "max-age=1"
fields := &drivers.FileProperties{CacheControl: "max-age=1"}
var fileContents []byte
var lastWrite = time.Now()

Expand Down Expand Up @@ -155,7 +146,17 @@ func buildBackupURI(outputURI *url.URL, storageBackupURLs StorageBackupURLs) (*u
}

func uploadFile(outputURI *url.URL, fileContents []byte, fields *drivers.FileProperties, writeTimeout time.Duration, withRetries bool) (out *drivers.SaveDataOutput, bytesWritten int64, err error) {
driver, err := drivers.ParseOSURL(outputURI.String(), true)
outputStr := outputURI.String()
// While we wait for storj to implement an easier method for global object deletion we are hacking something
// here to allow us to have recording objects deleted after 7 days.
if strings.Contains(outputStr, "gateway.storjshare.io/catalyst-recordings-com") {
fields = newFileProperties(fields)
for k, v := range expiryField {
fields.Metadata[k] = v
}
}

driver, err := drivers.ParseOSURL(outputStr, true)
if err != nil {
return nil, 0, err
}
Expand All @@ -182,14 +183,14 @@ func uploadFile(outputURI *url.URL, fileContents []byte, fields *drivers.FilePro
return out, bytesWritten, err
}

func extractThumb(outputURI *url.URL, filename string, segment []byte, storageBackupURLs StorageBackupURLs) error {
func extractThumb(outputURI *url.URL, segment []byte, storageBackupURLs StorageBackupURLs) error {
tmpDir, err := os.MkdirTemp(os.TempDir(), "thumb-*")
if err != nil {
return fmt.Errorf("temp file creation failed: %w", err)
}
defer os.RemoveAll(tmpDir)
outFile := filepath.Join(tmpDir, "out.jpg")
inFile := filepath.Join(tmpDir, filepath.Base(filename))
inFile := filepath.Join(tmpDir, filepath.Base(outputURI.Path))
if err = os.WriteFile(inFile, segment, 0644); err != nil {
return fmt.Errorf("failed to write input file: %w", err)
}
Expand Down Expand Up @@ -228,12 +229,22 @@ func extractThumb(outputURI *url.URL, filename string, segment []byte, storageBa
}

thumbURL := outputURI.JoinPath("../latest.jpg")
_, _, err = uploadFileWithBackup(thumbURL, thumbData, &drivers.FileProperties{
CacheControl: "max-age=5",
Metadata: expiryField,
}, 10*time.Second, true, storageBackupURLs)
fields := &drivers.FileProperties{CacheControl: "max-age=5"}
_, _, err = uploadFileWithBackup(thumbURL, thumbData, fields, 10*time.Second, true, storageBackupURLs)
if err != nil {
return fmt.Errorf("saving thumbnail failed: %w", err)
}
return nil
}

func newFileProperties(base *drivers.FileProperties) *drivers.FileProperties {
if base == nil {
return &drivers.FileProperties{}
}
copy := *base
copy.Metadata = map[string]string{}
for k, v := range base.Metadata {
copy.Metadata[k] = v
}
return &copy
}

0 comments on commit a858502

Please sign in to comment.