Skip to content

Commit

Permalink
feat: reprocess endpoint (#262)
Browse files Browse the repository at this point in the history
* feat(api): endpoint to reprocess a file

* refactor(api): no naked return

* refactor(api): don't mark folders as rejected

* fix: set snapshot.task_id to NULL when deleting a task

* chore: format code

* chore: fix linter error
  • Loading branch information
bouassaba authored Aug 12, 2024
1 parent 35baa2a commit 46d8a54
Show file tree
Hide file tree
Showing 9 changed files with 285 additions and 83 deletions.
42 changes: 26 additions & 16 deletions api/docs/index.html

Large diffs are not rendered by default.

39 changes: 39 additions & 0 deletions api/docs/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,17 @@ definitions:
totalPages:
type: integer
type: object
service.ReprocessResponse:
properties:
accepted:
items:
type: string
type: array
rejected:
items:
type: string
type: array
type: object
service.Snapshot:
properties:
createTime:
Expand Down Expand Up @@ -1427,6 +1438,34 @@ paths:
summary: Download Preview
tags:
- Files
/files/{id}/reprocess:
post:
description: Reprocess
operationId: files_reprocess
parameters:
- description: ID
in: path
name: id
required: true
type: string
produces:
- application/json
responses:
"200":
description: OK
schema:
$ref: '#/definitions/service.ReprocessResponse'
"404":
description: Not Found
schema:
$ref: '#/definitions/errorpkg.ErrorResponse'
"500":
description: Internal Server Error
schema:
$ref: '#/definitions/errorpkg.ErrorResponse'
summary: Reprocess
tags:
- Files
/files/{id}/revoke_group_permission:
post:
description: Revoke Group Permission
Expand Down
12 changes: 12 additions & 0 deletions api/repo/snapshot_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type SnapshotRepo interface {
Find(id string) (model.Snapshot, error)
FindByVersion(version int64) (model.Snapshot, error)
FindAllForFile(fileID string) ([]model.Snapshot, error)
FindAllForTask(taskID string) ([]*snapshotEntity, error)
FindAllDangling() ([]model.Snapshot, error)
FindAllPrevious(fileID string, version int64) ([]model.Snapshot, error)
GetIDsByFile(fileID string) ([]string, error)
Expand Down Expand Up @@ -600,6 +601,17 @@ func (repo *snapshotRepo) FindAllForFile(fileID string) ([]model.Snapshot, error
return res, nil
}

func (repo *snapshotRepo) FindAllForTask(taskID string) ([]*snapshotEntity, error) {
var res []*snapshotEntity
db := repo.db.
Raw(`SELECT * FROM snapshot WHERE task_id = ?`, taskID).
Scan(&res)
if db.Error != nil {
return nil, db.Error
}
return res, nil
}

func (repo *snapshotRepo) FindAllDangling() ([]model.Snapshot, error) {
var entities []*snapshotEntity
db := repo.db.
Expand Down
24 changes: 23 additions & 1 deletion api/router/file_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func (r *FileRouter) AppendRoutes(g fiber.Router) {
g.Post("/:id/move/:targetId", r.MoveOne)
g.Post("/:id/copy/:targetId", r.CopyOne)
g.Patch("/:id/name", r.PatchName)
g.Post("/:id/reprocess", r.Reprocess)
g.Get("/:id/size", r.GetSize)
g.Post("/grant_user_permission", r.GrantUserPermission)
g.Post("/revoke_user_permission", r.RevokeUserPermission)
Expand Down Expand Up @@ -566,6 +567,27 @@ func (r *FileRouter) PatchName(c *fiber.Ctx) error {
return c.JSON(res)
}

// Reprocess godoc
//
// @Summary Reprocess
// @Description Reprocess
// @Tags Files
// @Id files_reprocess
// @Produce json
// @Param id path string true "ID"
// @Success 200 {object} service.ReprocessResponse
// @Failure 404 {object} errorpkg.ErrorResponse
// @Failure 500 {object} errorpkg.ErrorResponse
// @Router /files/{id}/reprocess [post]
func (r *FileRouter) Reprocess(c *fiber.Ctx) error {
userID := GetUserID(c)
res, err := r.fileSvc.Reprocess(c.Params("id"), userID)
if err != nil {
return err
}
return c.JSON(res)
}

type FileDeleteOptions struct {
IDs []string `json:"ids" validate:"required"`
}
Expand All @@ -587,7 +609,7 @@ func (r *FileRouter) DeleteOne(c *fiber.Ctx) error {
if err := r.fileSvc.DeleteOne(c.Params("id"), userID); err != nil {
return err
}
return c.JSON(http.StatusNoContent)
return c.SendStatus(http.StatusNoContent)
}

// DeleteMany godoc
Expand Down
126 changes: 126 additions & 0 deletions api/service/file_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"fmt"
"os"
"path/filepath"
"slices"
"sort"
"strings"
"time"
Expand Down Expand Up @@ -1170,6 +1171,131 @@ func (svc *FileService) PatchName(id string, name string, userID string) (*File,
return res, nil
}

type ReprocessResponse struct {
Accepted []string `json:"accepted"`
Rejected []string `json:"rejected"`
}

func (r *ReprocessResponse) AppendAccepted(id string) {
if !slices.Contains(r.Accepted, id) {
r.Accepted = append(r.Accepted, id)
}
}

func (r *ReprocessResponse) AppendRejected(id string) {
if !slices.Contains(r.Rejected, id) {
r.Rejected = append(r.Rejected, id)
}
}

func (svc *FileService) Reprocess(id string, userID string) (res *ReprocessResponse, err error) {
res = &ReprocessResponse{
// We intend to send an empty array to the caller, better than nil
Accepted: []string{},
Rejected: []string{},
}

var ancestor model.File
ancestor, err = svc.fileCache.Get(id)
if err != nil {
return nil, err
}

var tree []model.File
if ancestor.GetType() == model.FileTypeFolder {
if err = svc.fileGuard.Authorize(userID, ancestor, model.PermissionViewer); err != nil {
return nil, err
}
tree, err = svc.fileRepo.FindTree(ancestor.GetID())
if err != nil {
return nil, err
}
} else if ancestor.GetType() == model.FileTypeFile {
var file model.File
file, err = svc.fileCache.Get(id)
if err != nil {
return nil, err
}
tree = append(tree, file)
}

for _, file := range tree {
if file.GetType() != model.FileTypeFile {
continue
}
if err = svc.fileGuard.Authorize(userID, file, model.PermissionEditor); err != nil {
log.GetLogger().Error(err)
continue
}
if !svc.canReprocessFile(file) {
res.AppendRejected(file.GetID())
continue
}

var snapshot model.Snapshot
snapshot, err = svc.snapshotCache.Get(*file.GetSnapshotID())
if err != nil {
log.GetLogger().Error(err)
continue
}
if !svc.canReprocessSnapshot(snapshot) {
res.AppendRejected(file.GetID())
continue
}

// Create a task
var task model.Task
task, err = svc.taskSvc.insertAndSync(repo.TaskInsertOptions{
ID: helper.NewID(),
Name: "Waiting.",
UserID: userID,
IsIndeterminate: true,
Status: model.TaskStatusWaiting,
Payload: map[string]string{repo.TaskPayloadObjectKey: file.GetName()},
})
if err != nil {
log.GetLogger().Error(err)
continue
}
snapshot.SetTaskID(helper.ToPtr(task.GetID()))
if err := svc.snapshotSvc.SaveAndSync(snapshot); err != nil {
log.GetLogger().Error(err)
continue
}

// Forward to conversion microservice
if err = svc.pipelineClient.Run(&conversion_client.PipelineRunOptions{
TaskID: task.GetID(),
SnapshotID: snapshot.GetID(),
Bucket: snapshot.GetOriginal().Bucket,
Key: snapshot.GetOriginal().Key,
}); err != nil {
log.GetLogger().Error(err)
continue
} else {
res.AppendAccepted(file.GetID())
}
}
return res, nil
}

func (svc *FileService) canReprocessFile(file model.File) bool {
// We don't reprocess if there is no active snapshot
return file.GetSnapshotID() != nil
}

func (svc *FileService) canReprocessSnapshot(snapshot model.Snapshot) bool {
// We don't reprocess if there is an existing task
if snapshot.GetTaskID() != nil {
return false
}
// We don't reprocess without an "original" on the active snapshot
if !snapshot.HasOriginal() {
return false
}
return true
}

func (svc *FileService) DeleteOne(id string, userID string) error {
file, err := svc.fileCache.Get(id)
if err != nil {
Expand Down
57 changes: 46 additions & 11 deletions api/service/task_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,33 @@ import (
"github.com/kouprlabs/voltaserve/api/errorpkg"
"github.com/kouprlabs/voltaserve/api/helper"
"github.com/kouprlabs/voltaserve/api/infra"
"github.com/kouprlabs/voltaserve/api/log"
"github.com/kouprlabs/voltaserve/api/model"
"github.com/kouprlabs/voltaserve/api/repo"
"github.com/kouprlabs/voltaserve/api/search"
)

type TaskService struct {
taskMapper *taskMapper
taskCache *cache.TaskCache
taskSearch *search.TaskSearch
taskRepo repo.TaskRepo
taskMapper *taskMapper
taskCache *cache.TaskCache
taskSearch *search.TaskSearch
taskRepo repo.TaskRepo
snapshotRepo repo.SnapshotRepo
snapshotCache *cache.SnapshotCache
fileRepo repo.FileRepo
fileCache *cache.FileCache
}

func NewTaskService() *TaskService {
return &TaskService{
taskMapper: newTaskMapper(),
taskCache: cache.NewTaskCache(),
taskSearch: search.NewTaskSearch(),
taskRepo: repo.NewTaskRepo(),
taskMapper: newTaskMapper(),
taskCache: cache.NewTaskCache(),
taskSearch: search.NewTaskSearch(),
taskRepo: repo.NewTaskRepo(),
snapshotRepo: repo.NewSnapshotRepo(),
snapshotCache: cache.NewSnapshotCache(),
fileRepo: repo.NewFileRepo(),
fileCache: cache.NewFileCache(),
}
}

Expand Down Expand Up @@ -370,13 +379,39 @@ func (svc *TaskService) saveAndSync(task model.Task) error {
}

func (svc *TaskService) deleteAndSync(id string) error {
if err := svc.taskRepo.Delete(id); err != nil {
snapshots, err := svc.snapshotRepo.FindAllForTask(id)
if err != nil {
return err
}
/* Clear task ID field from all snapshots and files in both repo and cache */
for _, snapshot := range snapshots {
snapshot.SetTaskID(nil)
if err = svc.snapshotRepo.Save(snapshot); err != nil {
log.GetLogger().Error(err)
}
if _, err = svc.snapshotCache.Refresh(snapshot.GetID()); err != nil {
log.GetLogger().Error(err)
}
var filesIDs []string
filesIDs, err = svc.fileRepo.GetIDsBySnapshot(snapshot.ID)
if err == nil {
for _, fileID := range filesIDs {
if _, err = svc.fileCache.Refresh(fileID); err != nil {
log.GetLogger().Error(err)
}
}
} else {
log.GetLogger().Error(err)
}
}
/* Proceed with deleting the task */
if err = svc.taskRepo.Delete(id); err != nil {
return err
}
if err := svc.taskCache.Delete(id); err != nil {
if err = svc.taskCache.Delete(id); err != nil {
return err
}
if err := svc.taskSearch.Delete([]string{id}); err != nil {
if err = svc.taskSearch.Delete([]string{id}); err != nil {
return err
}
return nil
Expand Down
Loading

0 comments on commit 46d8a54

Please sign in to comment.