Skip to content

Commit

Permalink
fixed an issue where the tail end of the backup doesn't get written t…
Browse files Browse the repository at this point in the history
…o file as it sits in scp buffer. This is due to streaming the data to scp without specifying the exact file size. Added on an extra sql comment on the end to flush scp buffer
  • Loading branch information
ctomkow committed Sep 26, 2019
1 parent 4f38bf5 commit f04cfdd
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 14 deletions.
2 changes: 1 addition & 1 deletion backup/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func ToRemote(sh *net.SSH, workingDir string, dumpName string, stdout *io.ReadCl
if err != nil {
return err
}
if err = netio.Copy(stdout, dumpName, workingDir, "0600", ex, sh); err != nil {
if err = netio.StreamMySqlDump(stdout, dumpName, workingDir, "0600", ex, sh); err != nil {
return err
}
_, err = ex.RemoteCmd(sh, "rm "+workingDir+"~"+dumpName+".lock")
Expand Down
4 changes: 2 additions & 2 deletions buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ type CircularQueue struct {
// circular queue is an array of structs queue{dbName string, timestamp time.Time}
// it has an artificial buffer limit size of 31, regardless of user specified max_backups
queue [31]struct {
name string
name string
}

// the start and end pointers of the queue
Expand Down Expand Up @@ -50,7 +50,7 @@ func (cq *CircularQueue) Enqueue(element string) string {

// add to queue
cq.queue[cq.head] = struct {
name string
name string
}{name: element}

// in this order!
Expand Down
2 changes: 1 addition & 1 deletion net/ssh.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (sh *SSH) TestConnection() error {
func (sh *SSH) Reconnect(tries int, delayInSec int) error {

for i := 1; i <= tries; i++ {
glog.Error("[" + strconv.Itoa(i) + "/" + strconv.Itoa(tries)+ "]" + " attempting to re-connect with remote")
glog.Error("[" + strconv.Itoa(i) + "/" + strconv.Itoa(tries) + "]" + " attempting to re-connect with remote")
if err := sh.Connect(); err != nil {
glog.Error("failed to re-establish connection with remote")
} else {
Expand Down
42 changes: 33 additions & 9 deletions netio/scp.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package netio

import (
"bytes"
"errors"
"fmt"
"github.com/ctomkow/tto/exec"
Expand All @@ -17,22 +18,45 @@ import (
"time"
)

func Copy(byteBuffer *io.ReadCloser, filename string, workingDir string, permissions string, ex *exec.Exec, sh *net.SSH) error {
func StreamMySqlDump(byteBuffer *io.ReadCloser, filename string, workingDir string, permissions string, ex *exec.Exec, sh *net.SSH) error {

// ensure a new session is created before acting!
if err := sh.NewSession(); err != nil {
return err
}

// add dashes (comment delimiter) to end of db dump to flush scp BUF because we are not specifying the exact file size
// https://salsa.debian.org/ssh-team/openssh/blob/master/scp.c
// https://github.com/openssh/openssh-portable/blob/master/scp.c
//
// #define COPY_BUFLEN 16384
//
// since this is a hack to make scp do our bidding, increase current COPY_BUFLEN by an order of magnitude
// this only adds 160kB of overhead, not an issue when most prod databases are hundreds of MB or GB's.
//
// the buffer is a valid SQL comment
// https://docs.oracle.com/cd/B12037_01/server.101/b10759/sql_elements006.htm
// -- zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz[...]\n
COPY_BUFLEN := 16384 * 10
buf := make([]byte, COPY_BUFLEN)
buf[0] = '-'
buf[1] = '-'
buf[2] = ' '
for i := 3; i < COPY_BUFLEN-1; i++ {
buf[i] = 'z'
}
buf[COPY_BUFLEN-1] = '\n'
flush := bytes.NewReader(buf)

// since i don't know the size of the dump, set a static max to 100GB (107374182400 bytes)
if err := copy(*byteBuffer, workingDir+filename, permissions, 107374182400, ex, sh); err != nil {
if err := stream(*byteBuffer, workingDir+filename, permissions, 107374182400, ex, sh, flush); err != nil {
glog.Error(err)
}

return nil
}

func copy(r io.ReadCloser, absolutePath string, permissions string, size int64, ex *exec.Exec, sh *net.SSH) error {
func stream(r io.ReadCloser, absolutePath string, permissions string, size int64, ex *exec.Exec, sh *net.SSH, flush io.Reader) error {

filename := path.Base(absolutePath)
directory := path.Dir(absolutePath)
Expand All @@ -54,25 +78,26 @@ func copy(r io.ReadCloser, absolutePath string, permissions string, size int64,
glog.Exit(err)
}
}()

_, err = fmt.Fprintln(w, "C"+permissions, size, filename)
if err != nil {
errCh <- err
return
}

_, err = io.Copy(w, r)
if err != nil {
errCh <- err
return
}

_, err = io.Copy(w, flush)
if err != nil {
errCh <- err
return
}
_, err = fmt.Fprint(w, "\x00")
if err != nil {
errCh <- err
return
}

}()

go func() {
Expand All @@ -83,7 +108,6 @@ func copy(r io.ReadCloser, absolutePath string, permissions string, size int64,
// SCP would properly close if we specify a correct file size, but we don't know that because we are streaming mysqldump
// Therefore it is set to a max of 100GB
// Consequently, we cannot handle an error case here :/

}
}()

Expand All @@ -92,7 +116,7 @@ func copy(r io.ReadCloser, absolutePath string, permissions string, size int64,
return errors.New("timeout when upload files")
}

if err:= ex.Cmd.Wait(); err != nil {
if err := ex.Cmd.Wait(); err != nil {
return err
}

Expand Down
2 changes: 1 addition & 1 deletion util.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func SortBackups(filenames []string) []string {
if err != nil {
glog.Fatal(err)
}
dbName = slicedElements[0]
dbName = slicedElements[0]
afterDash := slicedElements[1]

// grab before dot but after dash
Expand Down

0 comments on commit f04cfdd

Please sign in to comment.