From 626ad5a69fffc99e9bc7d12172af6727ad1899b8 Mon Sep 17 00:00:00 2001 From: blackandred Date: Tue, 15 Feb 2022 22:59:23 +0100 Subject: [PATCH] feat: #164 WIP add upload --- server-go/Makefile | 5 +++ server-go/collections/entity.go | 7 +++- server-go/core/ctx.go | 2 + server-go/go.mod | 11 ++++- server-go/http/collection.go | 4 ++ server-go/main.go | 8 ++++ server-go/storage/entity.go | 44 ++++++++++++++++++++ server-go/storage/repository.go | 8 ++++ server-go/storage/service.go | 27 ++++++++++++ server-go/storage/upload.go | 74 +++++++++++++++++++++++++++++++++ 10 files changed, 188 insertions(+), 2 deletions(-) create mode 100644 server-go/storage/entity.go create mode 100644 server-go/storage/repository.go create mode 100644 server-go/storage/service.go create mode 100644 server-go/storage/upload.go diff --git a/server-go/Makefile b/server-go/Makefile index 581debc8..bd6eddd9 100644 --- a/server-go/Makefile +++ b/server-go/Makefile @@ -40,3 +40,8 @@ postgres_refresh: docker rm -f br_postgres || true sudo rm -rf /tmp/br_postgres make postgres + +minio: + docker run -d \ + --name br_minio \ + minio diff --git a/server-go/collections/entity.go b/server-go/collections/entity.go index 5e6e53e9..fec3c814 100644 --- a/server-go/collections/entity.go +++ b/server-go/collections/entity.go @@ -7,7 +7,8 @@ import ( "github.com/riotkit-org/backup-repository/config" "github.com/riotkit-org/backup-repository/security" "github.com/riotkit-org/backup-repository/users" - cron "github.com/robfig/cron/v3" + "github.com/robfig/cron/v3" + "strings" "time" ) @@ -105,3 +106,7 @@ func (c Collection) CanUploadToMe(user *users.User) bool { return false } + +func (c *Collection) GenerateNextVersionFilename(version int) string { + return strings.Replace(c.Spec.FilenameTemplate, "${version}", string(rune(version)), 1) +} diff --git a/server-go/core/ctx.go b/server-go/core/ctx.go index 62cb7f54..3a0c56fb 100644 --- a/server-go/core/ctx.go +++ b/server-go/core/ctx.go @@ -4,6 +4,7 @@ import ( "github.com/riotkit-org/backup-repository/collections" "github.com/riotkit-org/backup-repository/config" "github.com/riotkit-org/backup-repository/security" + "github.com/riotkit-org/backup-repository/storage" "github.com/riotkit-org/backup-repository/users" ) @@ -12,5 +13,6 @@ type ApplicationContainer struct { Users *users.Service GrantedAccesses *security.Service Collections *collections.Service + Storage *storage.Service JwtSecretKey string } diff --git a/server-go/go.mod b/server-go/go.mod index 10d97746..ec35cd9b 100644 --- a/server-go/go.mod +++ b/server-go/go.mod @@ -28,9 +28,12 @@ require ( github.com/go-playground/validator/v10 v10.4.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang-jwt/jwt/v4 v4.2.0 // indirect + github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.2 // indirect - github.com/google/go-cmp v0.5.5 // indirect + github.com/google/go-cmp v0.5.6 // indirect github.com/google/gofuzz v1.1.0 // indirect + github.com/google/uuid v1.3.0 // indirect + github.com/googleapis/gax-go/v2 v2.1.0 // indirect github.com/googleapis/gnostic v0.5.5 // indirect github.com/imdario/mergo v0.3.12 // indirect github.com/jackc/chunkreader/v2 v2.0.1 // indirect @@ -58,13 +61,19 @@ require ( github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/pretty v1.2.0 // indirect github.com/ugorji/go/codec v1.1.7 // indirect + go.opencensus.io v0.23.0 // indirect + gocloud.dev v0.24.0 // indirect golang.org/x/net v0.0.0-20211209124913-491a49abca63 // indirect golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f // indirect golang.org/x/sys v0.0.0-20211029165221-6e7872819dc8 // indirect golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b // indirect golang.org/x/text v0.3.7 // indirect golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect + golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect + google.golang.org/api v0.56.0 // indirect google.golang.org/appengine v1.6.7 // indirect + google.golang.org/genproto v0.0.0-20210831024726-fe130286e0e2 // indirect + google.golang.org/grpc v1.40.0 // indirect google.golang.org/protobuf v1.27.1 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/server-go/http/collection.go b/server-go/http/collection.go index 4dafcd5a..3b6861f8 100644 --- a/server-go/http/collection.go +++ b/server-go/http/collection.go @@ -40,6 +40,10 @@ func addUploadRoute(r *gin.RouterGroup, ctx *core.ApplicationContainer) { return } + // todo: support Url encoded and raw body + // c.Request.Body + // ctx.Storage + println(collection) }) } diff --git a/server-go/main.go b/server-go/main.go index 9480d576..26f97cf2 100644 --- a/server-go/main.go +++ b/server-go/main.go @@ -8,6 +8,7 @@ import ( "github.com/riotkit-org/backup-repository/db" "github.com/riotkit-org/backup-repository/http" "github.com/riotkit-org/backup-repository/security" + "github.com/riotkit-org/backup-repository/storage" "github.com/riotkit-org/backup-repository/users" log "github.com/sirupsen/logrus" "os" @@ -25,6 +26,7 @@ type options struct { DbPort int `long:"db-port" description:"Database name inside a database" default:"5432"` JwtSecretKey string `long:"jwt-secret-key" short:"s" description:"Secret used for generating JSON Web Tokens for authentication"` Level string `long:"log-level" description:"Log level" default:"debug"` + StorageDriverUrl string `long:"--storage-url" description:"Storage driver url compatible with GO Cloud (https://gocloud.dev/howto/blob/)"` } func main() { @@ -68,6 +70,11 @@ func main() { usersService := users.NewUsersService(configProvider) gaService := security.NewService(dbDriver) collectionsService := collections.NewService(configProvider) + storageService, storageError := storage.NewService(opts.StorageDriverUrl) + if err != nil { + log.Errorln("Cannot initialize storage driver") + log.Fatal(storageError) + } ctx := core.ApplicationContainer{ Config: &configProvider, @@ -75,6 +82,7 @@ func main() { GrantedAccesses: &gaService, JwtSecretKey: opts.JwtSecretKey, Collections: &collectionsService, + Storage: &storageService, } // todo: First thread - HTTP diff --git a/server-go/storage/entity.go b/server-go/storage/entity.go new file mode 100644 index 00000000..9b3d4847 --- /dev/null +++ b/server-go/storage/entity.go @@ -0,0 +1,44 @@ +package storage + +import ( + "github.com/google/uuid" + "github.com/riotkit-org/backup-repository/collections" + "gorm.io/gorm" + "time" +) + +type UploadedVersion struct { + Id string `json:"id" structs:"id" sql:"type:string;primary_key;default:uuid_generate_v4()` + CollectionId string `json:"collectionId"` + VersionNumber int `json:"versionNumber"` + Filename string `json:"filename"` // full filename e.g. iwa-ait-v1-db.tar.gz + Filesize int `json:"filesize"` // in bytes + + // auditing + UploadedBySessionId string `json:"uploadedBySessionId"` + Uploader string `json:"user" structs:"user"` + CreatedAt time.Time + UpdatedAt time.Time + DeletedAt gorm.DeletedAt `gorm:"index"` +} + +func (u *UploadedVersion) GetTargetPath() string { + return u.CollectionId + "/" + u.Filename +} + +func CreateNewVersionFromCollection(c collections.Collection, svc Service, uploader string, uploaderSessionId string, filesize int) UploadedVersion { + nextVersion := svc.FindNextVersionForCollectionId(c.Metadata.Name) + + return UploadedVersion{ + Id: uuid.New().String(), + CollectionId: c.Metadata.Name, + VersionNumber: nextVersion, + Filename: c.GenerateNextVersionFilename(nextVersion), + Filesize: filesize, + UploadedBySessionId: uploaderSessionId, + Uploader: uploader, + CreatedAt: time.Time{}, + UpdatedAt: time.Time{}, + DeletedAt: gorm.DeletedAt{}, + } +} diff --git a/server-go/storage/repository.go b/server-go/storage/repository.go new file mode 100644 index 00000000..8f8e185e --- /dev/null +++ b/server-go/storage/repository.go @@ -0,0 +1,8 @@ +package storage + +type VersionsRepository struct { +} + +func (r VersionsRepository) findLastHighestVersionNumber(name string) interface{} { + +} diff --git a/server-go/storage/service.go b/server-go/storage/service.go new file mode 100644 index 00000000..560336b1 --- /dev/null +++ b/server-go/storage/service.go @@ -0,0 +1,27 @@ +package storage + +import ( + "context" + "github.com/sirupsen/logrus" + "gocloud.dev/blob" +) + +type Service struct { + storage *blob.Bucket + repository *VersionsRepository +} + +func (s *Service) FindNextVersionForCollectionId(name string) int { + lastHigherVersion := s.repository.findLastHighestVersionNumber(name) + return lastHigherVersion + 1 +} + +func NewService(driverUrl string) (Service, error) { + driver, err := blob.OpenBucket(context.Background(), driverUrl) + if err != nil { + logrus.Errorf("Cannot construct storage driver: %v", err) + return Service{}, err + } + + return Service{storage: driver}, nil +} diff --git a/server-go/storage/upload.go b/server-go/storage/upload.go new file mode 100644 index 00000000..a89bce2b --- /dev/null +++ b/server-go/storage/upload.go @@ -0,0 +1,74 @@ +package storage + +import ( + "bytes" + "context" + "errors" + "fmt" + "gocloud.dev/blob" + "io" +) + +func (s *Service) createGPGValidator() func([]byte) error { + startFound := false + endingFound := false + + return func(buff []byte) error { + if bytes.Contains(buff, []byte("-----BEGIN PGP MESSAGE")) { + startFound = true + } + if bytes.Contains(buff, []byte("-----END PGP MESSAGE")) { + endingFound = true + } + return nil + } +} + +func (s *Service) UploadFile(inputStream io.ReadCloser, version UploadedVersion) (bool, error) { + writeStream, err := s.storage.NewWriter(context.Background(), version.GetTargetPath(), &blob.WriterOptions{}) + if err != nil { + return false, errors.New(fmt.Sprintf("cannot upload file, attempted to open a writable stream, error: %v", err)) + } + + if writeErr := s.CopyStream(inputStream, writeStream, 1024, s.createGPGValidator()); writeErr != nil { + return false, errors.New(fmt.Sprintf("cannot upload file, cannot copy stream, error: %v", writeErr)) + } + + return true, nil +} + +// CopyStream copies a readable stream to writable stream, while providing a possibility to use a validation callback on-the-fly +func (s *Service) CopyStream(inputStream io.ReadCloser, writeStream io.WriteCloser, bufferLen int, processor func([]byte) error) error { + p := make([]byte, bufferLen) + + for { + n, err := inputStream.Read(p) + + if err != nil { + if err == io.EOF { + // validation callback + if processingError := processor(p[:n]); processingError != nil { + return processingError + } + // write to second stream + _, writeErr := writeStream.Write(p[:n]) + if writeErr != nil { + return writeErr + } + } + + break + } + // validation callback + if processingError := processor(p[:n]); processingError != nil { + return processingError + } + // write to second stream + _, writeErr := writeStream.Write(p[:n]) + if writeErr != nil { + return writeErr + } + } + + return nil +}