diff --git a/src/cmd/linuxkit/push_aws.go b/src/cmd/linuxkit/push_aws.go index 9eda3093f4..4f4dfb1996 100644 --- a/src/cmd/linuxkit/push_aws.go +++ b/src/cmd/linuxkit/push_aws.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "path/filepath" + "strconv" "strings" "time" @@ -18,6 +19,19 @@ import ( const timeoutVar = "LINUXKIT_UPLOAD_TIMEOUT" +const max_part_size = 10 * 1024 * 1024 + +//helper function to build the string for the range of bits to copy +func buildCopySourceRange(start int64, objectSize int64) string { + end := start + max_part_size - 1 + if end > objectSize { + end = objectSize - 1 + } + startRange := strconv.FormatInt(start, 10) + stopRange := strconv.FormatInt(end, 10) + return "bytes=" + startRange + "-" + stopRange +} + func pushAWSCmd() *cobra.Command { var ( timeoutFlag int @@ -78,20 +92,96 @@ func pushAWSCmd() *cobra.Command { } dst := name + filepath.Ext(path) - putParams := &s3.PutObjectInput{ - Bucket: aws.String(bucket), - Key: aws.String(dst), - Body: f, - ContentLength: aws.Int64(fi.Size()), - ContentType: aws.String("application/octet-stream"), + + //struct for starting a multipart upload + startInput := s3.CreateMultipartUploadInput{ + Bucket: aws.String(bucket), + Key: aws.String(dst), } - log.Debugf("PutObject:\n%v", putParams) - _, err = storage.PutObjectWithContext(ctx, putParams) + var uploadId string + createOutput, err := storage.CreateMultipartUploadWithContext(ctx, &startInput) if err != nil { - return fmt.Errorf("Error uploading to S3: %v", err) + return err + } + if createOutput != nil { + if createOutput.UploadId != nil { + uploadId = *createOutput.UploadId + } + } + if uploadId == "" { + return fmt.Errorf("No upload id found in start upload request: %v", err) } + var i int64 + var partNumber int64 = 1 + parts := make([]*s3.CompletedPart, 0) + numUploads := aws.Int64(fi.Size()) / max_part_size + + log.Infof("Will attempt upload in %d number of parts to %s", numUploads, aws.String(dst)) + + for i = 0; i < aws.Int64(fi.Size()); i += max_part_size { + copyRange := buildCopySourceRange(i, aws.Int64(fi.Size())) + partInput := s3.UploadPartInput{ + Bucket: aws.String(bucket), + Key: aws.String(dst), + Body: f, + ContentLength: aws.Int64(fi.Size()), + PartNumber: &partNumber, + UploadId: &uploadId, + } + log.Debugf("Attempting to upload part %d range: %s", partNumber, copyRange) + partResp, err := storage.UploadPart(&partInput) + + if err != nil { + log.Error("Attempting to abort upload") + abortIn := s3.AbortMultipartUploadInput{ + UploadId: &uploadId, + } + //ignoring any errors with aborting the copy + storage.AbortMultipartUploadRequest(&abortIn) + return fmt.Errorf("Error uploading part %d : %w", partNumber, err) + } + + //copy etag and part number from response as it is needed for completion + if partResp != nil { + partNum := partNumber + etag := strings.Trim(*partResp.CopyPartResult.ETag, "\"") + cPart := s3.CompletedPart{ + ETag: &etag, + PartNumber: &partNum, + } + parts = append(parts, &cPart) + log.Debugf("Successfully upload part %d of %s", partNumber, uploadId) + } + partNumber++ + if partNumber%50 == 0 { + log.Infof("Completed part %d of %d to %s", partNumber, numUploads, aws.String(dst)) + } + } + + //create struct for completing the upload + mpu := s3.CompletedMultipartUpload{ + Parts: parts, + } + + //complete actual upload + //does not actually copy if the complete command is not received + complete := s3.CompleteMultipartUploadInput{ + Bucket: aws.String(bucket), + Key: aws.String(dst), + UploadId: &uploadId, + MultipartUpload: &mpu, + } + compOutput, err := storage.CompleteMultipartUpload(&complete) + if err != nil { + return fmt.Errorf("Error completing upload: %w", err) + } + if compOutput != nil { + log.Infof("Successfully uploaded Key: %s to Bucket: %s", aws.String(dst), aws.String(dst)) + } + return nil + compute := ec2.New(sess) importParams := &ec2.ImportSnapshotInput{