Skip to content

Commit

Permalink
Merge pull request #139 from jacobweinstock/image2disk-update
Browse files Browse the repository at this point in the history
Update image2disk:

## Description

<!--- Please describe what this PR is going to change -->
This updates image2disk to improve EOF handling. This also adds retry logic to the whole image2disk process. 

## Why is this needed

<!--- Link to issue you have raised -->

Fixes: #

## How Has This Been Tested?
<!--- Please describe in detail how you tested your changes. -->
<!--- Include details of your testing environment, and the tests you ran to -->
<!--- see how your change affects other areas of the code, etc. -->


## How are existing users impacted? What migration steps/scripts do we need?

<!--- Fixes a bug, unblocks installation, removes a component of the stack etc -->
<!--- Requires a DB migration script, etc. -->


## Checklist:

I have:

- [ ] updated the documentation and/or roadmap (if required)
- [ ] added unit or e2e tests
- [ ] provided instructions on how to upgrade
  • Loading branch information
jacobweinstock authored Aug 23, 2024
2 parents 142e403 + dbeb92e commit 12cf23f
Show file tree
Hide file tree
Showing 7 changed files with 253 additions and 84 deletions.
24 changes: 15 additions & 9 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,35 +3,41 @@ module github.com/tinkerbell/actions
go 1.21

require (
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/containerd/containerd v1.4.4
github.com/deislabs/oras v0.11.1
github.com/diskfs/go-diskfs v1.1.1
github.com/dustin/go-humanize v1.0.0
github.com/klauspost/compress v1.17.7
github.com/diskfs/go-diskfs v1.4.1
github.com/dustin/go-humanize v1.0.1
github.com/klauspost/compress v1.17.9
github.com/lmittmann/tint v1.0.5
github.com/mattn/go-isatty v0.0.3
github.com/opencontainers/go-digest v1.0.0
github.com/peterbourgon/ff/v3 v3.4.0
github.com/pkg/errors v0.9.1
github.com/sirupsen/logrus v1.9.0
github.com/spf13/cobra v1.1.3
github.com/ulikunitz/xz v0.5.11
golang.org/x/sys v0.5.0
github.com/sirupsen/logrus v1.9.4-0.20230606125235-dd1b4c2e81af
github.com/spf13/cobra v1.8.1
github.com/ulikunitz/xz v0.5.12
golang.org/x/sys v0.24.0
)

require (
github.com/Microsoft/go-winio v0.4.16 // indirect
github.com/Microsoft/hcsshim v0.8.14 // indirect
github.com/containerd/cgroups v0.0.0-20200531161412-0dbf7f05ba59 // indirect
github.com/djherbis/times v1.6.0 // indirect
github.com/elliotwutingfeng/asciiset v0.0.0-20230602022725-51bbb787efab // indirect
github.com/gogo/protobuf v1.3.1 // indirect
github.com/golang/protobuf v1.3.2 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/hashicorp/golang-lru v0.5.1 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/opencontainers/image-spec v1.0.1 // indirect
github.com/pierrec/lz4/v4 v4.1.17 // indirect
github.com/pkg/xattr v0.4.9 // indirect
github.com/spf13/pflag v1.0.5 // indirect
go.opencensus.io v0.22.0 // indirect
golang.org/x/net v0.0.0-20191004110552-13f9640d40b9 // indirect
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a // indirect
google.golang.org/genproto v0.0.0-20191108220845-16a3f7862a1a // indirect
google.golang.org/grpc v1.23.1 // indirect
gopkg.in/djherbis/times.v1 v1.3.0 // indirect
)
56 changes: 37 additions & 19 deletions go.sum

Large diffs are not rendered by default.

12 changes: 5 additions & 7 deletions image2disk/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
# syntax=docker/dockerfile:1

FROM golang:1.21-alpine AS image2disk
FROM golang:1.23-alpine AS image2disk
RUN apk add --no-cache git ca-certificates gcc linux-headers musl-dev
COPY . /src
WORKDIR /src/image2disk
RUN --mount=type=cache,sharing=locked,id=gomod,target=/go/pkg/mod/cache \
--mount=type=cache,sharing=locked,id=goroot,target=/root/.cache/go-build \
CGO_ENABLED=1 GOOS=linux go build -a -ldflags "-linkmode external -extldflags '-static' -s -w" -o image2disk
COPY go.mod go.sum /src/
RUN go mod download
COPY . /src
RUN CGO_ENABLED=1 GOOS=linux go build -a -ldflags "-linkmode external -extldflags '-static' -s -w" -o image2disk

FROM scratch
# Add Certificates into the image, for anything that does HTTPS calls
Expand Down
30 changes: 23 additions & 7 deletions image2disk/README.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,31 @@
```
# image2disk

```bash
quay.io/tinkerbell/actions/image2disk:latest
```

This action will stream a remote disk image (raw) to a block device, and
This Action will stream a remote disk image (raw) to a block device, and
is mainly used to write cloud images to a disk. It is recommended to use the `qemu-img`
tool to convert disk images into raw, it is also possible to compress the raw images
with tar+gzip to prevent wasted disk space
with tar+gzip to prevent wasted disk space.

| env var | data type | default value | required | description |
|---------|-----------|---------------|----------|-------------|
| IMG_URL | string | "" | yes | URL of the image to be streamed |
| DEST_DISK | string | "" | yes | Block device to which to write the image |
| COMPRESSED | bool | false | no | Decompress the image before writing it to the disk |
| RETRY_ENABLED | bool | false | no | Retry the Action, using exponential backoff, for the duration specified in `RETRY_DURATION_MINUTES` before failing |
| RETRY_DURATION_MINUTES | int | 10 | no | Duration for which the Action will retry before failing |
| PROGRESS_INTERVAL_SECONDS | int | 3 | no | Interval at which the progress of the image transfer will be logged |
| TEXT_LOGGING | bool | false | no | Output from the Action will be logged in a more human friendly text format, JSON format is used by default |

The below example will stream a raw ubuntu cloud image (converted by qemu-img) and write
it to the block storage disk `/dev/sda`. The raw image is uncompressed in this example.

```bash
qemu-img convert ubuntu.img ubuntu.raw
```

```yaml
actions:
- name: "stream ubuntu"
Expand All @@ -21,10 +37,10 @@ actions:
COMPRESSED: false
```
The below example will stream a compressed raw ubuntu cloud image (converted by qemu-img)
and then compressed with gzip to reduce local space.
The below example will stream a compressed raw ubuntu cloud image (converted by qemu-img) and write
it to the block storage disk `/dev/sda`. The raw image is compressed with gzip in this example.

```
```bash
qemu-img convert ubuntu.img ubuntu.raw
gzip ubuntu.raw
```
Expand All @@ -40,7 +56,7 @@ actions:
COMPRESSED: true
```

## Compression format supported:
## Supported Compression formats

- bzip2 (`.bzip2`)
- gzip (`.gz`)
Expand Down
114 changes: 80 additions & 34 deletions image2disk/image/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,72 @@ import (
"compress/bzip2"
"compress/gzip"
"context"
"errors"
"fmt"
"io"
"log/slog"
"math"
"net/http"
"os"
"path/filepath"
"strings"
"sync/atomic"
"time"

"github.com/dustin/go-humanize"
"github.com/klauspost/compress/zstd"
log "github.com/sirupsen/logrus"
"github.com/ulikunitz/xz"
"golang.org/x/sys/unix"
)

type Progress struct {
w io.Writer
r io.Reader
wBytes atomic.Int64
rBytes atomic.Int64
}

func NewProgress(w io.Writer, r io.Reader) *Progress {
return &Progress{w: w, r: r}
}

func (p *Progress) Write(b []byte) (n int, err error) {
nu, err := p.w.Write(b)
if err != nil {
p.wBytes.Add(int64(nu))
return nu, fmt.Errorf("error with write: %w", err)
}
p.wBytes.Add(int64(nu))
return nu, nil
}

func (p *Progress) Read(b []byte) (n int, err error) {
nu, err := p.r.Read(b)
if err != nil {
p.rBytes.Add(int64(nu))
return nu, fmt.Errorf("error with read: %w", err)
}
p.rBytes.Add(int64(nu))
return nu, nil
}

func (p *Progress) readBytes() int64 {
return p.rBytes.Load()
}

func (p *Progress) writeBytes() int64 {
return p.wBytes.Load()
}

func prettyByteSize(b int64) string {
bf := float64(b)
for _, unit := range []string{"", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"} {
if math.Abs(bf) < 1024.0 {
return fmt.Sprintf("%3.6f%sB", bf, unit)
}
bf /= 1024.0
}
return fmt.Sprintf("%.6fYiB", bf)
}

// WriteCounter counts the number of bytes written to it. It implements to the io.Writer interface
// and we can pass this into io.TeeReader() which will report progress on each write cycle.
type WriteCounter struct {
Expand All @@ -33,21 +84,11 @@ func (wc *WriteCounter) Write(p []byte) (int, error) {
return n, nil
}

func tickerProgress(byteCounter uint64) {
// Clear the line by using a character return to go back to the start and remove
// the remaining characters by filling it with spaces
fmt.Printf("\r%s", strings.Repeat(" ", 35))

// Return again and print current status of download
// We use the humanize package to print the bytes in a meaningful way (e.g. 10 MB)
fmt.Printf("\rDownloading... %s complete", humanize.Bytes(byteCounter))
}

// Write will pull an image and write it to local storage device
// with compress set to true it will use gzip compression to expand the data before
// writing to an underlying device.
func Write(sourceImage, destinationDevice string, compressed bool) error {
req, err := http.NewRequestWithContext(context.TODO(), "GET", sourceImage, nil)
func Write(ctx context.Context, log *slog.Logger, sourceImage, destinationDevice string, compressed bool, progressInterval time.Duration) error {
req, err := http.NewRequestWithContext(ctx, "GET", sourceImage, nil)
if err != nil {
return err
}
Expand All @@ -59,7 +100,7 @@ func Write(sourceImage, destinationDevice string, compressed bool) error {
defer resp.Body.Close()

if resp.StatusCode > 300 {
// Customise response for the 404 to make degugging simpler
// Customize response for the 404 to make debugging simpler
if resp.StatusCode == 404 {
return fmt.Errorf("%s not found", sourceImage)
}
Expand All @@ -74,42 +115,47 @@ func Write(sourceImage, destinationDevice string, compressed bool) error {
}
defer fileOut.Close()

progressRW := NewProgress(fileOut, resp.Body)

if !compressed {
// Without compression send raw output
out = resp.Body
out = progressRW
} else {
// Find compression algorithm based upon extension
decompressor, err := findDecompressor(sourceImage, resp.Body)
decompressor, err := findDecompressor(sourceImage, progressRW)
if err != nil {
return err
}
defer decompressor.Close()
out = decompressor
}

log.Infof("Beginning write of image [%s] to disk [%s]", filepath.Base(sourceImage), destinationDevice)
// Create our progress reporter and pass it to be used alongside our writer
ticker := time.NewTicker(500 * time.Millisecond)
counter := &WriteCounter{}

log.Info(fmt.Sprintf("Beginning write of image [%s] to disk [%s]", filepath.Base(sourceImage), destinationDevice))
ticker := time.NewTicker(progressInterval)
done := make(chan bool)
go func() {
for ; true; <-ticker.C {
tickerProgress(counter.Total)
totalSize := resp.ContentLength
for {
select {
case <-done:
log.Info("read and write progress", "written", prettyByteSize(progressRW.writeBytes()), "compressedSize", prettyByteSize(totalSize), "read", prettyByteSize(progressRW.readBytes()))
return
case <-ticker.C:
log.Info("read and write progress", "written", prettyByteSize(progressRW.writeBytes()), "compressedSize", prettyByteSize(totalSize), "read", prettyByteSize(progressRW.readBytes()))
}
}
}()
if _, err = io.Copy(fileOut, io.TeeReader(out, counter)); err != nil {
ticker.Stop()
return err
}

count, err := io.Copy(fileOut, out)
if err != nil {
count, err := io.Copy(progressRW, out)
// EOF and ErrUnexpectedEOF can be ignored.
if err != nil && !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) {
ticker.Stop()
return fmt.Errorf("error writing %d bytes to disk [%s] -> %w", count, destinationDevice, err)
done <- true
return fmt.Errorf("error writing %s bytes to disk [%s] -> %w", prettyByteSize(count), destinationDevice, err)
}
fmt.Printf("\n")

ticker.Stop()
done <- true

// Do the equivalent of partprobe on the device
if err := fileOut.Sync(); err != nil {
Expand All @@ -118,7 +164,7 @@ func Write(sourceImage, destinationDevice string, compressed bool) error {

if err := unix.IoctlSetInt(int(fileOut.Fd()), unix.BLKRRPART, 0); err != nil {
// Ignore errors since it may be a partition, but log in case it's helpful
log.Errorf("error re-probing the partitions for the specified device: %v", err)
log.Info("error re-probing the partitions for the specified device", "err", err)
}

return nil
Expand Down
Loading

0 comments on commit 12cf23f

Please sign in to comment.