From f5a0b7a1622dc476e28dcd360e81a300e86a9336 Mon Sep 17 00:00:00 2001 From: blackandred Date: Tue, 22 Feb 2022 08:45:16 +0100 Subject: [PATCH] feat: #164 implement locking for upload endpoint, so there will be no parallel uploads to the same collection --- server-go/.gitignore | 1 + server-go/Makefile | 10 ++-- server-go/collections/entity.go | 4 ++ server-go/concurrency/locking.go | 78 ++++++++++++++++++++++++++++++++ server-go/core/ctx.go | 2 + server-go/db/main.go | 5 ++ server-go/http/collection.go | 9 +++- server-go/main.go | 3 ++ 8 files changed, 106 insertions(+), 6 deletions(-) create mode 100644 server-go/.gitignore create mode 100644 server-go/concurrency/locking.go diff --git a/server-go/.gitignore b/server-go/.gitignore new file mode 100644 index 00000000..6ec76edf --- /dev/null +++ b/server-go/.gitignore @@ -0,0 +1 @@ +/.build diff --git a/server-go/Makefile b/server-go/Makefile index 5290588e..bc076745 100644 --- a/server-go/Makefile +++ b/server-go/Makefile @@ -33,13 +33,13 @@ test_upload_by_form_1mb: @echo "-----BEGIN PGP MESSAGE-----" > /tmp/1mb.gpg @openssl rand -base64 $$((735*1024*1)) >> /tmp/1mb.gpg @echo "-----END PGP MESSAGE-----" >> /tmp/1mb.gpg - curl -s -X POST -H 'Authorization: Bearer ${TOKEN}' -F "file=@/tmp/1mb.gpg" 'http://localhost:8080/api/stable/repository/collection/iwa-ait/version' + curl -vvv -X POST -H 'Authorization: Bearer ${TOKEN}' -F "file=@/tmp/1mb.gpg" 'http://localhost:8080/api/stable/repository/collection/iwa-ait/version' --limit-rate 100K test_upload_by_form_5mb: @echo "-----BEGIN PGP MESSAGE-----" > /tmp/5mb.gpg @openssl rand -base64 $$((735*1024*5)) >> /tmp/5mb.gpg @echo "-----END PGP MESSAGE-----" >> /tmp/5mb.gpg - curl -s -X POST -H 'Authorization: Bearer ${TOKEN}' -F "file=@/tmp/5mb.gpg" 'http://localhost:8080/api/stable/repository/collection/iwa-ait/version' + curl -vvv -X POST -H 'Authorization: Bearer ${TOKEN}' -F "file=@/tmp/5mb.gpg" 'http://localhost:8080/api/stable/repository/collection/iwa-ait/version' --limit-rate 1000K postgres: docker run -d \ @@ -48,13 +48,13 @@ postgres: -e POSTGRES_USER=postgres \ -e POSTGRES_DB=postgres \ -e PGDATA=/var/lib/postgresql/data/pgdata \ - -v /tmp/br_postgres:/var/lib/postgresql/data \ + -v $$(pwd)/.build/postgres:/var/lib/postgresql \ -p 5432:5432 \ postgres:14.1-alpine postgres_refresh: docker rm -f br_postgres || true - sudo rm -rf /tmp/br_postgres + sudo rm -rf $$(pwd)/.build/postgres make postgres minio: @@ -62,7 +62,7 @@ minio: --name br_minio \ -p 9000:9000 \ -p 9001:9001 \ - -v /tmp/br_minio:/data \ + -v $$(pwd)/.build/minio:/data \ -e "MINIO_ROOT_USER=AKIAIOSFODNN7EXAMPLE" \ -e "MINIO_ROOT_PASSWORD=wJaFuCKtnFEMI/CApItaliSM/bPxRfiCYEXAMPLEKEY" \ quay.io/minio/minio:RELEASE.2022-02-16T00-35-27Z server /data --console-address 0.0.0.0:9001 diff --git a/server-go/collections/entity.go b/server-go/collections/entity.go index e462cb81..549bf5a3 100644 --- a/server-go/collections/entity.go +++ b/server-go/collections/entity.go @@ -146,3 +146,7 @@ func (c *Collection) getMaxCollectionSizeInBytes() (int64, error) { func (c *Collection) GetId() string { return c.Metadata.Name } + +func (c Collection) GetGlobalIdentifier() string { + return "collection:" + c.GetId() +} diff --git a/server-go/concurrency/locking.go b/server-go/concurrency/locking.go new file mode 100644 index 00000000..3e45f2db --- /dev/null +++ b/server-go/concurrency/locking.go @@ -0,0 +1,78 @@ +package concurrency + +import ( + "database/sql" + "errors" + "fmt" + "gorm.io/gorm" + "math/rand" + "time" +) + +type LocksService struct { + db *gorm.DB +} + +func (ls *LocksService) Lock(id string, howLong time.Duration) (Lock, error) { + if ls.isLockedAlready(id) { + return Lock{}, errors.New("already locked") + } + if err := ls.addLock(id, howLong); err != nil { + return Lock{}, errors.New(fmt.Sprintf("cannot lock transaction, %v", err)) + } + return Lock{ + Id: id, + unlock: func() { + ls.unlock(id) + }, + }, nil +} + +func (ls *LocksService) addLock(id string, howLong time.Duration) error { + expiration := time.Now().Add(howLong) + return ls.db.Exec("INSERT INTO locks (id, expires) VALUES (@id, @expires);", sql.Named("id", id), sql.Named("expires", expiration)).Error +} + +func (ls *LocksService) unlock(id string) { + ls.db.Exec("DELETE FROM locks WHERE locks.id = @id", sql.Named("id", id)) +} + +func (ls *LocksService) isLockedAlready(id string) bool { + var result int + ls.db.Raw("SELECT count(*) FROM locks WHERE locks.id = @id AND locks.expires > @now", sql.Named("id", id), sql.Named("now", time.Now())).Scan(&result) + + if ls.shouldPerformCleanUpNow() { + ls.cleanUp() + } + + return result > 0 +} + +func (ls *LocksService) cleanUp() { + ls.db.Exec("DELETE FROM locks WHERE locks.expires < @now", sql.Named("now", time.Now())) +} + +func (ls *LocksService) shouldPerformCleanUpNow() bool { + s1 := rand.NewSource(time.Now().UnixNano()) + r1 := rand.New(s1) + + return r1.Intn(5) == 2 // PN-VI +} + +func InitializeModel(db *gorm.DB) error { + return db.AutoMigrate(&Lock{}) +} + +func NewService(db *gorm.DB) LocksService { + return LocksService{db} +} + +type Lock struct { + Id string + Expires time.Time + unlock func() +} + +func (l *Lock) Unlock() { + l.unlock() +} diff --git a/server-go/core/ctx.go b/server-go/core/ctx.go index 3a0c56fb..d7c64a80 100644 --- a/server-go/core/ctx.go +++ b/server-go/core/ctx.go @@ -2,6 +2,7 @@ package core import ( "github.com/riotkit-org/backup-repository/collections" + "github.com/riotkit-org/backup-repository/concurrency" "github.com/riotkit-org/backup-repository/config" "github.com/riotkit-org/backup-repository/security" "github.com/riotkit-org/backup-repository/storage" @@ -15,4 +16,5 @@ type ApplicationContainer struct { Collections *collections.Service Storage *storage.Service JwtSecretKey string + Locks *concurrency.LocksService } diff --git a/server-go/db/main.go b/server-go/db/main.go index b74c5aaa..60b82636 100644 --- a/server-go/db/main.go +++ b/server-go/db/main.go @@ -2,6 +2,7 @@ package db import ( "fmt" + "github.com/riotkit-org/backup-repository/concurrency" "github.com/riotkit-org/backup-repository/security" "github.com/riotkit-org/backup-repository/storage" "github.com/sirupsen/logrus" @@ -25,6 +26,10 @@ func InitializeDatabase(db *gorm.DB) bool { logrus.Errorf("Cannot initialize UploadedVersion model: %v", err) return false } + if err := concurrency.InitializeModel(db); err != nil { + logrus.Errorf("Cannot initialize Locks model: %v", err) + return false + } return true } diff --git a/server-go/http/collection.go b/server-go/http/collection.go index 3cddc4a9..9135b3aa 100644 --- a/server-go/http/collection.go +++ b/server-go/http/collection.go @@ -17,7 +17,6 @@ func addUploadRoute(r *gin.RouterGroup, ctx *core.ApplicationContainer, requestT timeout.WithTimeout(requestTimeout), timeout.WithHandler(func(c *gin.Context) { // todo: deactivate token if temporary token is used - // todo: locking support! There should be no concurrent uploads to the same collection ctxUser, _ := GetContextUser(ctx, c) @@ -42,6 +41,14 @@ func addUploadRoute(r *gin.RouterGroup, ctx *core.ApplicationContainer, requestT return } + // [SECURITY] Do not allow parallel uploads to the same collection + lock, lockErr := ctx.Locks.Lock(collection.GetGlobalIdentifier(), requestTimeout) + if lockErr != nil { + ServerErrorResponse(c, errors.New("cannot upload to same collection in parallel")) + return + } + defer lock.Unlock() + // [ROTATION STRATEGY][VERSIONING] Increment a version, generate target file path name that will be used on storage sessionId := GetCurrentSessionId(c) version, factoryError := ctx.Storage.CreateNewVersionFromCollection(collection, ctxUser.Metadata.Name, sessionId, 0) diff --git a/server-go/main.go b/server-go/main.go index e1f6cac2..90567f87 100644 --- a/server-go/main.go +++ b/server-go/main.go @@ -3,6 +3,7 @@ package main import ( "github.com/jessevdk/go-flags" "github.com/riotkit-org/backup-repository/collections" + "github.com/riotkit-org/backup-repository/concurrency" "github.com/riotkit-org/backup-repository/config" "github.com/riotkit-org/backup-repository/core" "github.com/riotkit-org/backup-repository/db" @@ -66,6 +67,7 @@ func main() { log.Errorln("Cannot initialize database connection") log.Fatal(err) } + locksService := concurrency.NewService(dbDriver) db.InitializeDatabase(dbDriver) usersService := users.NewUsersService(configProvider) @@ -84,6 +86,7 @@ func main() { JwtSecretKey: opts.JwtSecretKey, Collections: &collectionsService, Storage: &storageService, + Locks: &locksService, } // todo: First thread - HTTP