Skip to content
This repository has been archived by the owner on May 10, 2024. It is now read-only.

Commit

Permalink
docs: #164 add support for request cancellation and for fixed request…
Browse files Browse the repository at this point in the history
… timeout
  • Loading branch information
blackandred committed Feb 21, 2022
1 parent 9312587 commit c460f35
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 101 deletions.
1 change: 1 addition & 0 deletions server-go/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/evanphx/json-patch v4.12.0+incompatible // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/gin-contrib/timeout v0.0.3 // indirect
github.com/go-logr/logr v1.2.0 // indirect
github.com/go-playground/locales v0.13.0 // indirect
github.com/go-playground/universal-translator v0.17.0 // indirect
Expand Down
202 changes: 104 additions & 98 deletions server-go/http/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package http
import (
"errors"
"fmt"
"github.com/gin-contrib/timeout"
"github.com/gin-gonic/gin"
"github.com/riotkit-org/backup-repository/core"
"github.com/riotkit-org/backup-repository/security"
Expand All @@ -11,107 +12,112 @@ import (
"time"
)

func addUploadRoute(r *gin.RouterGroup, ctx *core.ApplicationContainer) {
r.POST("/repository/collection/:collectionId/version", func(c *gin.Context) {
// todo: deactivate token if temporary token is used
// todo: handle upload interruptions
// todo: locking support! There should be no concurrent uploads to the same collection

ctxUser, _ := GetContextUser(ctx, c)

// Check if Collection exists
collection, findError := ctx.Collections.GetCollectionById(c.Param("collectionId"))
if findError != nil {
NotFoundResponse(c, errors.New("cannot find specified collection"))
return
}

// [SECURITY] Check permissions
if !collection.CanUploadToMe(ctxUser) {
UnauthorizedResponse(c, errors.New("not authorized to upload versions to this collection"))
}

// [SECURITY] Backup Windows support
if !ctx.Collections.ValidateIsBackupWindowAllowingToUpload(collection, time.Now()) &&
!ctxUser.Spec.Roles.HasRole(security.RoleUploadsAnytime) {

UnauthorizedResponse(c, errors.New("backup window does not allow you to send a backup at this time. "+
"You need a token from a user that has a special permission 'uploadsAnytime'"))
return
}

// [ROTATION STRATEGY][VERSIONING] Increment a version, generate target file path name that will be used on storage
sessionId := GetCurrentSessionId(c)
version, factoryError := ctx.Storage.CreateNewVersionFromCollection(collection, ctxUser.Metadata.Name, sessionId, 0)
if factoryError != nil {
ServerErrorResponse(c, errors.New(fmt.Sprintf("cannot increment version. %v", factoryError)))
return
}

// [ROTATION STRATEGY] Is it allowed to upload? Is there enough space?
rotationStrategyCase, strategyFactorialError := ctx.Storage.CreateRotationStrategyCase(collection)
if strategyFactorialError != nil {
logrus.Errorf(fmt.Sprintf("Cannot create collection strategy for collectionId=%v, error: %v", collection.Metadata.Name, strategyFactorialError))
ServerErrorResponse(c, errors.New("internal error while trying to create rotation strategy. Check server logs"))
return
}
if err := rotationStrategyCase.CanUpload(version); err != nil {
UnauthorizedResponse(c, errors.New(fmt.Sprintf("backup collection strategy declined a possibility to upload, %v", err)))
return
}

var stream io.ReadCloser

// [HTTP] Support form data
if c.ContentType() == "application/x-www-form-urlencoded" || c.ContentType() == "multipart/form-data" {
var openErr error
fh, ffErr := c.FormFile("file")
if ffErr != nil {
ServerErrorResponse(c, errors.New(fmt.Sprintf("cannot read file from multipart/urlencoded form: %v", ffErr)))
func addUploadRoute(r *gin.RouterGroup, ctx *core.ApplicationContainer, requestTimeout time.Duration) {
timeoutMiddleware := timeout.New(
timeout.WithTimeout(requestTimeout),
timeout.WithHandler(func(c *gin.Context) {
// todo: deactivate token if temporary token is used
// todo: locking support! There should be no concurrent uploads to the same collection

ctxUser, _ := GetContextUser(ctx, c)

// Check if Collection exists
collection, findError := ctx.Collections.GetCollectionById(c.Param("collectionId"))
if findError != nil {
NotFoundResponse(c, errors.New("cannot find specified collection"))
return
}

// [SECURITY] Check permissions
if !collection.CanUploadToMe(ctxUser) {
UnauthorizedResponse(c, errors.New("not authorized to upload versions to this collection"))
}

// [SECURITY] Backup Windows support
if !ctx.Collections.ValidateIsBackupWindowAllowingToUpload(collection, time.Now()) &&
!ctxUser.Spec.Roles.HasRole(security.RoleUploadsAnytime) {

UnauthorizedResponse(c, errors.New("backup window does not allow you to send a backup at this time. "+
"You need a token from a user that has a special permission 'uploadsAnytime'"))
return
}
stream, openErr = fh.Open()
if openErr != nil {
ServerErrorResponse(c, errors.New(fmt.Sprintf("cannot open file from multipart/urlencoded form: %v", openErr)))

// [ROTATION STRATEGY][VERSIONING] Increment a version, generate target file path name that will be used on storage
sessionId := GetCurrentSessionId(c)
version, factoryError := ctx.Storage.CreateNewVersionFromCollection(collection, ctxUser.Metadata.Name, sessionId, 0)
if factoryError != nil {
ServerErrorResponse(c, errors.New(fmt.Sprintf("cannot increment version. %v", factoryError)))
return
}

// [ROTATION STRATEGY] Is it allowed to upload? Is there enough space?
rotationStrategyCase, strategyFactorialError := ctx.Storage.CreateRotationStrategyCase(collection)
if strategyFactorialError != nil {
logrus.Errorf(fmt.Sprintf("Cannot create collection strategy for collectionId=%v, error: %v", collection.Metadata.Name, strategyFactorialError))
ServerErrorResponse(c, errors.New("internal error while trying to create rotation strategy. Check server logs"))
return
}
if err := rotationStrategyCase.CanUpload(version); err != nil {
UnauthorizedResponse(c, errors.New(fmt.Sprintf("backup collection strategy declined a possibility to upload, %v", err)))
return
}
defer stream.Close()

} else {
// [HTTP] Support RAW sent data via body
stream = c.Request.Body
}

// [VALIDATION] Middlewares
versionsToDelete := rotationStrategyCase.GetVersionsThatShouldBeDeletedIfThisVersionUploaded(version)
middlewares, err := ctx.Storage.CreateStandardMiddleWares(versionsToDelete, collection)
if err != nil {
ServerErrorResponse(c, errors.New(fmt.Sprintf("cannot construct validators %v", err)))
return
}

// [HTTP] Upload a file from selected source, then handle errors - delete file from storage if not uploaded successfully
wroteLen, uploadError := ctx.Storage.UploadFile(stream, &version, &middlewares)
if uploadError != nil {
_ = ctx.Storage.Delete(&version)

ServerErrorResponse(c, errors.New(fmt.Sprintf("cannot upload version. %v", uploadError)))
return
}

// Set a valid filesize that is known after receiving the file
version.Filesize = wroteLen

// Append version to the registry
if err := ctx.Storage.RegisterVersion(&version); err != nil {
_ = ctx.Storage.Delete(&version)
}
ctx.Storage.CleanUpOlderVersions(versionsToDelete)
logrus.Infof("Uploaded v%v for collectionId=%v, size=%v", version.VersionNumber, version.CollectionId, version.Filesize)

OKResponse(c, gin.H{
"version": version,
})
})

var stream io.ReadCloser

// [HTTP] Support form data
if c.ContentType() == "application/x-www-form-urlencoded" || c.ContentType() == "multipart/form-data" {
var openErr error
fh, ffErr := c.FormFile("file")
if ffErr != nil {
ServerErrorResponse(c, errors.New(fmt.Sprintf("cannot read file from multipart/urlencoded form: %v", ffErr)))
return
}
stream, openErr = fh.Open()
if openErr != nil {
ServerErrorResponse(c, errors.New(fmt.Sprintf("cannot open file from multipart/urlencoded form: %v", openErr)))
}
defer stream.Close()

} else {
// [HTTP] Support RAW sent data via body
stream = c.Request.Body
}

// [VALIDATION] Middlewares
versionsToDelete := rotationStrategyCase.GetVersionsThatShouldBeDeletedIfThisVersionUploaded(version)
middlewares, err := ctx.Storage.CreateStandardMiddleWares(c.Request.Context(), versionsToDelete, collection)
if err != nil {
ServerErrorResponse(c, errors.New(fmt.Sprintf("cannot construct validators %v", err)))
return
}

// [HTTP] Upload a file from selected source, then handle errors - delete file from storage if not uploaded successfully
wroteLen, uploadError := ctx.Storage.UploadFile(stream, &version, &middlewares)
if uploadError != nil {
_ = ctx.Storage.Delete(&version)

ServerErrorResponse(c, errors.New(fmt.Sprintf("cannot upload version. %v", uploadError)))
return
}

// Set a valid filesize that is known after receiving the file
version.Filesize = wroteLen

// Append version to the registry
if err := ctx.Storage.RegisterVersion(&version); err != nil {
_ = ctx.Storage.Delete(&version)
}
ctx.Storage.CleanUpOlderVersions(versionsToDelete)
logrus.Infof("Uploaded v%v for collectionId=%v, size=%v", version.VersionNumber, version.CollectionId, version.Filesize)

OKResponse(c, gin.H{
"version": version,
})
}),
timeout.WithResponse(RequestTimeoutResponse),
)

r.POST("/repository/collection/:collectionId/version", timeoutMiddleware)
}

// todo: healthcheck route
Expand Down
2 changes: 1 addition & 1 deletion server-go/http/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func SpawnHttpApplication(ctx *core.ApplicationContainer) {
addWhoamiRoute(router, ctx)
addLogoutRoute(router, ctx)
addGrantedAccessSearchRoute(router, ctx)
addUploadRoute(router, ctx)
addUploadRoute(router, ctx, 180*time.Minute)
}

_ = r.Run()
Expand Down
13 changes: 12 additions & 1 deletion server-go/http/responses.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package http

import "github.com/gin-gonic/gin"
import (
"github.com/gin-gonic/gin"
"net/http"
)

func NotFoundResponse(c *gin.Context, err error) {
c.IndentedJSON(404, gin.H{
Expand Down Expand Up @@ -32,3 +35,11 @@ func ServerErrorResponse(c *gin.Context, err error) {
"data": gin.H{},
})
}

func RequestTimeoutResponse(c *gin.Context) {
c.IndentedJSON(http.StatusRequestTimeout, gin.H{
"status": false,
"error": "Request took too long",
"data": gin.H{},
})
}
3 changes: 2 additions & 1 deletion server-go/storage/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (s *Service) CalculateAllocatedSpaceAboveSingleVersionLimit(collection *col
return allocatedSpaceAboveLimit, nil
}

func (s *Service) CreateStandardMiddleWares(versionsToDelete []UploadedVersion, collection *collections.Collection) (NestedStreamMiddlewares, error) {
func (s *Service) CreateStandardMiddleWares(context context.Context, versionsToDelete []UploadedVersion, collection *collections.Collection) (NestedStreamMiddlewares, error) {
maxAllowedFilesize, err := s.CalculateMaximumAllowedUploadFilesize(collection, versionsToDelete)
logrus.Debugf("CalculateMaximumAllowedUploadFilesize(%v) = %v", collection.GetId(), maxAllowedFilesize)

Expand All @@ -169,6 +169,7 @@ func (s *Service) CreateStandardMiddleWares(versionsToDelete []UploadedVersion,
}

return NestedStreamMiddlewares{
s.createRequestCancelledMiddleware(context),
s.createQuotaMaxFileSizeMiddleware(maxAllowedFilesize),
s.createNonEmptyMiddleware(),
s.createGPGStreamMiddleware(),
Expand Down
18 changes: 18 additions & 0 deletions server-go/storage/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package storage

import (
"bytes"
"context"
"errors"
"fmt"
"github.com/sirupsen/logrus"
)

//
Expand Down Expand Up @@ -106,3 +108,19 @@ func (s *Service) createQuotaMaxFileSizeMiddleware(maxFileSize int64) streamMidd
return nil
}}
}

// createRequestCancelledMiddleware handles the request cancellation
func (s *Service) createRequestCancelledMiddleware(context context.Context) streamMiddleware {
return streamMiddleware{
processor: func(i []byte, i2 int64, i3 []byte, i4 int) error {
if context.Err() != nil {
logrus.Warning(fmt.Sprintf("Upload was cancelled: %v", context.Err()))
return errors.New("upload was cancelled")
}
return nil
},
resultReporter: func() error {
return nil
},
}
}

0 comments on commit c460f35

Please sign in to comment.