Skip to content

Commit

Permalink
[ws-daemon] use s5cmd when working with s3
Browse files Browse the repository at this point in the history
  • Loading branch information
kylos101 committed Oct 2, 2023
1 parent 4abea1c commit e3265c0
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 16 deletions.
2 changes: 1 addition & 1 deletion components/ws-daemon/leeway.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ RUN curl -OsSL https://github.com/peak/s5cmd/releases/download/v2.2.2/s5cmd_2.2.
FROM ubuntu:22.04

# trigger manual rebuild increasing the value
ENV TRIGGER_REBUILD=1
ENV TRIGGER_REBUILD=2

## Installing coreutils is super important here as otherwise the loopback device creation fails!
ARG CLOUD_SDK_VERSION=437.0.1
Expand Down
92 changes: 77 additions & 15 deletions components/ws-daemon/pkg/content/initializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,12 +362,15 @@ func RunInitializerChild() (err error) {

var _ storage.DirectAccess = &remoteContentStorage{}

// implements storage.DirectDownloader, this is used by the content-init container
// implements storage.DirectAccess, the interface is largely unimplemented
type remoteContentStorage struct {
RemoteContent map[string]storage.DownloadInfo
}

// Init does nothing
func (rs *remoteContentStorage) Init(ctx context.Context, owner, workspace, instance string) error {
log.Info("remoteContentStorage init")
return nil
}

Expand All @@ -376,6 +379,41 @@ func (rs *remoteContentStorage) EnsureExists(ctx context.Context) error {
return nil
}

// download content from object storage using s5cmd
func (rs *remoteContentStorage) s5cmdDownload(info storage.DownloadInfo) (*os.File, error) {
tempFile, err := os.CreateTemp("", "temporal-s3-file")
if err != nil {
return nil, xerrors.Errorf("creating temporal file: %s", err.Error())
}
tempFile.Close()

args := []string{
"cp",
// # of file parts to download at once
"--concurrency", "20",
// size in MB of each part
"--part-size", "25",
info.URL,
tempFile.Name(),
}
cmd := exec.Command("s5cmd", args...)
downloadStart := time.Now()
out, err := cmd.CombinedOutput()
if err != nil {
log.WithError(err).WithField("out", string(out)).Error("unexpected error downloading file")
return nil, xerrors.Errorf("unexpected error downloading file")
}
downloadDuration := time.Since(downloadStart)
log.WithField("downloadDuration", downloadDuration.String()).Info("S3 download duration")

tempFile, err = os.Open(tempFile.Name())
if err != nil {
return nil, xerrors.Errorf("unexpected error opening downloaded file")
}

return tempFile, nil
}

// Download always returns false and does nothing
func (rs *remoteContentStorage) Download(ctx context.Context, destination string, name string, mappings []archive.IDMapping) (exists bool, err error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "remoteContentStorage.Download")
Expand All @@ -390,18 +428,49 @@ func (rs *remoteContentStorage) Download(ctx context.Context, destination string

span.SetTag("URL", info.URL)

// create a temporal file to download the content
var (
tempFile *os.File
)

// check the download URL to decide how to download
// this is run from the content-init container, and lacks access to the ws-daemon config
if strings.Contains(info.URL, "amazonaws.com") {
tempFile, err = rs.s5cmdDownload(info)
} else {
tempFile, err = rs.aria2cDownload(info)
}

if err != nil {
return true, err
}

defer os.Remove(tempFile.Name())
defer tempFile.Close()

err = archive.ExtractTarbal(ctx, tempFile, destination, archive.WithUIDMapping(mappings), archive.WithGIDMapping(mappings))
if err != nil {
return true, xerrors.Errorf("tar %s: %s", destination, err.Error())
}

return true, nil
}

// download content from object storage using aria2c
func (rs *remoteContentStorage) aria2cDownload(info storage.DownloadInfo) (*os.File, error) {
tempFile, err := os.CreateTemp("", "remote-content-*")
if err != nil {
return true, xerrors.Errorf("cannot create temporal file: %w", err)
return nil, xerrors.Errorf("cannot create temporal file: %w", err)
}
tempFile.Close()

log.Info("aria2c downloading")
downloadStart := time.Now()

args := []string{
"-s10", "-x16", "-j12",
"--retry-wait=5",
"--log-level=error",
"--allow-overwrite=true", // rewrite temporal empty file
"--allow-overwrite=true",
info.URL,
"-o", tempFile.Name(),
}
Expand All @@ -410,23 +479,16 @@ func (rs *remoteContentStorage) Download(ctx context.Context, destination string
out, err := cmd.CombinedOutput()
if err != nil {
log.WithError(err).WithField("out", string(out)).Error("unexpected error downloading file")
return true, xerrors.Errorf("unexpected error downloading file")
return nil, xerrors.Errorf("unexpected error downloading file")
}
downloadDuration := time.Since(downloadStart)
log.WithField("downloadDuration", downloadDuration.String()).Info("aria2c download duration")

tempFile, err = os.Open(tempFile.Name())
if err != nil {
return true, xerrors.Errorf("unexpected error downloading file")
}

defer os.Remove(tempFile.Name())
defer tempFile.Close()

err = archive.ExtractTarbal(ctx, tempFile, destination, archive.WithUIDMapping(mappings), archive.WithGIDMapping(mappings))
if err != nil {
return true, xerrors.Errorf("tar %s: %s", destination, err.Error())
return nil, xerrors.Errorf("unexpected error downloading file")
}

return true, nil
return tempFile, nil
}

// DownloadSnapshot always returns false and does nothing
Expand Down

0 comments on commit e3265c0

Please sign in to comment.