Skip to content

Commit

Permalink
Merge pull request #100 from basenana/feature/dynamicroot
Browse files Browse the repository at this point in the history
build-in ingest workflow
  • Loading branch information
hyponet authored May 3, 2024
2 parents 9b3dc96 + d869faf commit fcc780d
Show file tree
Hide file tree
Showing 17 changed files with 1,635 additions and 868 deletions.
1,904 changes: 1,155 additions & 749 deletions cmd/apps/apis/fsapi/v1/fsapi-v1.pb.go

Large diffs are not rendered by default.

30 changes: 30 additions & 0 deletions cmd/apps/apis/fsapi/v1/fsapi-v1.proto
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ message QuickInboxRequest {
FileType fileType = 2;
string filename = 3;
string url = 4;
bytes data = 5;

// url source
bool clutterFree = 10;
Expand All @@ -51,6 +52,7 @@ message QuickInboxResponse {
}

service Entries {
rpc FindEntryDetail (FindEntryDetailRequest) returns (GetEntryDetailResponse) {}
rpc GetEntryDetail (GetEntryDetailRequest) returns (GetEntryDetailResponse) {}
rpc CreateEntry (CreateEntryRequest) returns (CreateEntryResponse) {}
rpc UpdateEntry (UpdateEntryRequest) returns (UpdateEntryResponse) {}
Expand All @@ -62,6 +64,12 @@ service Entries {
rpc ReadFile (ReadFileRequest) returns (stream ReadFileResponse) {}
}

message FindEntryDetailRequest {
bool root = 1;
int64 parentID = 2;
string name = 3;
}

message GetEntryDetailRequest {
int64 entryID = 1;
}
Expand Down Expand Up @@ -321,6 +329,28 @@ message CommitSyncedEventRequest {

message CommitSyncedEventResponse {}

service Workflow {
rpc TriggerWorkflow (TriggerWorkflowRequest) returns (TriggerWorkflowResponse) {}
}

message TriggerWorkflowRequest {
message WorkflowTarget {
int64 entryID = 1;
int64 parentEntryID = 2;
}
message WorkflowJobAttr {
string reason = 1;
int64 timeout = 2;
}
string workflowID = 1;
WorkflowTarget target = 2;
WorkflowJobAttr attr = 3;
}

message TriggerWorkflowResponse {
string jobID = 1;
}

message DocumentInfo {
int64 id = 1;
string name = 2;
Expand Down
252 changes: 164 additions & 88 deletions cmd/apps/apis/fsapi/v1/fsapi-v1_grpc.pb.go

Large diffs are not rendered by default.

65 changes: 65 additions & 0 deletions cmd/apps/apis/fsapi/v1/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package v1
import (
"context"
"errors"
"github.com/basenana/nanafs/pkg/workflow"
"io"
"time"

Expand All @@ -44,6 +45,7 @@ type Services interface {
InboxServer
PropertiesServer
NotifyServer
WorkflowServer
}

func InitServices(server *grpc.Server, ctrl controller.Controller, pathEntryMgr *pathmgr.PathManager) (Services, error) {
Expand All @@ -61,6 +63,7 @@ func InitServices(server *grpc.Server, ctrl controller.Controller, pathEntryMgr
RegisterRoomServer(server, s)
RegisterPropertiesServer(server, s)
RegisterNotifyServer(server, s)
RegisterWorkflowServer(server, s)
return s, nil
}

Expand Down Expand Up @@ -299,6 +302,36 @@ func (s *services) GetDocumentDetail(ctx context.Context, request *GetDocumentDe
}, nil
}

func (s *services) FindEntryDetail(ctx context.Context, request *FindEntryDetailRequest) (*GetEntryDetailResponse, error) {
var (
en, par *types.Metadata
err error
)

if request.Root {
en, err = s.ctrl.LoadRootEntry(ctx)
if err != nil {
return nil, status.Error(common.FsApiError(err), "query root entry failed")
}
} else {
par, err = s.ctrl.GetEntry(ctx, request.ParentID)
if err != nil {
return nil, status.Error(common.FsApiError(err), "query parent entry failed")
}

en, err = s.ctrl.FindEntry(ctx, request.ParentID, request.Name)
if err != nil {
return nil, status.Error(common.FsApiError(err), "find child entry failed")
}
}

properties, err := s.queryEntryProperties(ctx, en.ID)
if err != nil {
return nil, status.Error(common.FsApiError(err), "query entry properties failed")
}
return &GetEntryDetailResponse{Entry: entryDetail(en, par), Properties: properties}, nil
}

func (s *services) GetEntryDetail(ctx context.Context, request *GetEntryDetailRequest) (*GetEntryDetailResponse, error) {
caller := s.callerAuthFn(ctx)
if !caller.Authenticated {
Expand Down Expand Up @@ -603,6 +636,7 @@ func (s *services) ReadFile(request *ReadFileRequest, writer Entries_ReadFileSer
func (s *services) QuickInbox(ctx context.Context, request *QuickInboxRequest) (*QuickInboxResponse, error) {
option := inbox.Option{
Url: request.Url,
Data: request.Data,
ClutterFree: request.ClutterFree,
}
switch request.FileType {
Expand Down Expand Up @@ -727,6 +761,37 @@ func (s *services) CommitSyncedEvent(ctx context.Context, request *CommitSyncedE
return &CommitSyncedEventResponse{}, nil
}

func (s *services) TriggerWorkflow(ctx context.Context, request *TriggerWorkflowRequest) (*TriggerWorkflowResponse, error) {
s.logger.Infow("trigger workflow", "workflow", request.WorkflowID)

if request.WorkflowID == "" {
return nil, status.Error(codes.InvalidArgument, "workflow id is empty")
}
if request.Target == nil {
return nil, status.Error(codes.InvalidArgument, "workflow target is empty")
}

if _, err := s.ctrl.GetWorkflow(ctx, request.WorkflowID); err != nil {
return nil, status.Error(common.FsApiError(err), "fetch workflow failed")
}

if request.Attr == nil {
request.Attr = &TriggerWorkflowRequest_WorkflowJobAttr{
Reason: "fsapi trigger",
Timeout: 60 * 10, // default 10min
}
}

job, err := s.ctrl.TriggerWorkflow(ctx, request.WorkflowID,
types.WorkflowTarget{EntryID: request.Target.EntryID, ParentEntryID: request.Target.ParentEntryID},
workflow.JobAttr{Reason: request.Attr.Reason, Timeout: time.Second * time.Duration(request.Attr.Timeout)},
)
if err != nil {
return nil, status.Error(common.FsApiError(err), "trigger workflow failed")
}
return &TriggerWorkflowResponse{JobID: job.Id}, nil
}

func (s *services) queryEntryProperties(ctx context.Context, entryID int64) ([]*Property, error) {
properties, err := s.ctrl.ListEntryExtendField(ctx, entryID)
if err != nil {
Expand Down
7 changes: 5 additions & 2 deletions cmd/apps/apis/fsapi/v1/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,10 @@ func entryDetail(en, parent *types.Metadata) *EntryDetail {
access.Permissions = append(access.Permissions, string(perm))
}

return &EntryDetail{
ed := &EntryDetail{
Id: en.ID,
Name: en.Name,
Aliases: en.Aliases,
Parent: entryInfo(parent),
Kind: string(en.Kind),
IsGroup: en.IsGroup,
Size: en.Size,
Expand All @@ -80,6 +79,10 @@ func entryDetail(en, parent *types.Metadata) *EntryDetail {
ModifiedAt: timestamppb.New(en.ModifiedAt),
AccessAt: timestamppb.New(en.AccessAt),
}
if parent != nil {
ed.Parent = entryInfo(parent)
}
return ed
}

func eventInfo(evt *types.Event) *Event {
Expand Down
15 changes: 14 additions & 1 deletion pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,19 @@ type Controller interface {
WriteFile(ctx context.Context, file dentry.File, data []byte, offset int64) (n int64, err error)
CloseFile(ctx context.Context, file dentry.File) error

ListWorkflows(ctx context.Context) ([]*types.WorkflowSpec, error)
GetWorkflow(ctx context.Context, wfId string) (*types.WorkflowSpec, error)
CreateWorkflow(ctx context.Context, spec *types.WorkflowSpec) (*types.WorkflowSpec, error)
UpdateWorkflow(ctx context.Context, spec *types.WorkflowSpec) (*types.WorkflowSpec, error)
DeleteWorkflow(ctx context.Context, wfId string) error
ListJobs(ctx context.Context, wfId string) ([]*types.WorkflowJob, error)
GetJob(ctx context.Context, wfId string, jobID string) (*types.WorkflowJob, error)

TriggerWorkflow(ctx context.Context, wfId string, tgt types.WorkflowTarget, attr workflow.JobAttr) (*types.WorkflowJob, error)
PauseWorkflowJob(ctx context.Context, jobId string) error
ResumeWorkflowJob(ctx context.Context, jobId string) error
CancelWorkflowJob(ctx context.Context, jobId string) error

FsInfo(ctx context.Context) Info
StartBackendTask(stopCh chan struct{})
SetupShutdownHandler(stopCh chan struct{}) chan struct{}
Expand Down Expand Up @@ -371,7 +384,7 @@ func New(loader config.Loader, meta metastore.Meta) (Controller, error) {
return nil, err
}

if err = plugin.Init(buildin.Services{DocumentManager: ctl.document}, loader); err != nil {
if err = plugin.Init(buildin.Services{DocumentManager: ctl.document, ExtendFieldManager: ctl.entry}, loader); err != nil {
return nil, err
}
ctl.workflow, err = workflow.NewManager(ctl.entry, ctl.document, ctl.Notify, meta, loader)
Expand Down
64 changes: 63 additions & 1 deletion pkg/controller/workflow.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2023 NanaFS Authors.
Copyright 2024 NanaFS Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -15,3 +15,65 @@
*/

package controller

import (
"context"
"github.com/basenana/nanafs/pkg/types"
"github.com/basenana/nanafs/pkg/workflow"
"runtime/trace"
)

func (c *controller) ListWorkflows(ctx context.Context) ([]*types.WorkflowSpec, error) {
defer trace.StartRegion(ctx, "controller.ListWorkflows").End()
return c.workflow.ListWorkflows(ctx)
}

func (c *controller) GetWorkflow(ctx context.Context, wfId string) (*types.WorkflowSpec, error) {
defer trace.StartRegion(ctx, "controller.GetWorkflow").End()
return c.workflow.GetWorkflow(ctx, wfId)
}

func (c *controller) CreateWorkflow(ctx context.Context, spec *types.WorkflowSpec) (*types.WorkflowSpec, error) {
defer trace.StartRegion(ctx, "controller.CreateWorkflow").End()
return c.workflow.CreateWorkflow(ctx, spec)
}

func (c *controller) UpdateWorkflow(ctx context.Context, spec *types.WorkflowSpec) (*types.WorkflowSpec, error) {
defer trace.StartRegion(ctx, "controller.UpdateWorkflow").End()
return c.workflow.UpdateWorkflow(ctx, spec)
}

func (c *controller) DeleteWorkflow(ctx context.Context, wfId string) error {
defer trace.StartRegion(ctx, "controller.DeleteWorkflow").End()
return c.workflow.DeleteWorkflow(ctx, wfId)
}

func (c *controller) ListJobs(ctx context.Context, wfId string) ([]*types.WorkflowJob, error) {
defer trace.StartRegion(ctx, "controller.ListJobs").End()
return c.workflow.ListJobs(ctx, wfId)
}

func (c *controller) GetJob(ctx context.Context, wfId string, jobID string) (*types.WorkflowJob, error) {
defer trace.StartRegion(ctx, "controller.GetJob").End()
return c.workflow.GetJob(ctx, wfId, jobID)
}

func (c *controller) PauseWorkflowJob(ctx context.Context, jobId string) error {
defer trace.StartRegion(ctx, "controller.PauseWorkflowJob").End()
return c.workflow.PauseWorkflowJob(ctx, jobId)
}

func (c *controller) ResumeWorkflowJob(ctx context.Context, jobId string) error {
defer trace.StartRegion(ctx, "controller.ResumeWorkflowJob").End()
return c.workflow.ResumeWorkflowJob(ctx, jobId)
}

func (c *controller) CancelWorkflowJob(ctx context.Context, jobId string) error {
defer trace.StartRegion(ctx, "controller.CancelWorkflowJob").End()
return c.workflow.CancelWorkflowJob(ctx, jobId)
}

func (c *controller) TriggerWorkflow(ctx context.Context, wfId string, tgt types.WorkflowTarget, attr workflow.JobAttr) (*types.WorkflowJob, error) {
defer trace.StartRegion(ctx, "controller.TriggerWorkflow").End()
return c.workflow.TriggerWorkflow(ctx, wfId, tgt, attr)
}
4 changes: 4 additions & 0 deletions pkg/dentry/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ func (m *manager) entryActionEventHandler() {
m.logger.Errorw("encounter error when handle entry event", "entry", evt.entryID, "action", evt.actionType, "err", err)
continue
}
// skip extgroup entries
if en.Storage == externalStorage {
continue
}
events.Publish(events.NamespacedTopic(evt.topicNS, evt.actionType), BuildEntryEvent(evt.actionType, en))
}
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/inbox/inbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/basenana/nanafs/pkg/dentry"
"github.com/basenana/nanafs/pkg/types"
"github.com/basenana/nanafs/pkg/workflow"
"github.com/basenana/nanafs/utils"
"github.com/basenana/nanafs/utils/logger"
"go.uber.org/zap"
"strings"
Expand All @@ -35,6 +36,8 @@ type Inbox struct {
}

func (b *Inbox) QuickInbox(ctx context.Context, fileName string, option Option) (*types.Metadata, error) {
fileName = utils.SafetyFilePathJoin("", fileName)

if fileName == "" || option.FileType == "" {
return nil, fmt.Errorf("filename or file type not set")
}
Expand All @@ -54,6 +57,14 @@ func (b *Inbox) QuickInbox(ctx context.Context, fileName string, option Option)
}
defer file.Close(ctx)

if option.Data != nil {
b.logger.Infow("write data direct", "entry", fileEn.ID, "filename", fileName)
if _, err = file.WriteAt(ctx, option.Data, 0); err != nil {
return nil, err
}
return fileEn, nil
}

err = UrlFile{
Url: option.Url,
FileType: option.FileType,
Expand Down Expand Up @@ -82,4 +93,5 @@ type Option struct {
Url string
FileType string
ClutterFree bool
Data []byte
}
Loading

0 comments on commit fcc780d

Please sign in to comment.