Skip to content

Commit

Permalink
workflow: make retentionPeriod optional
Browse files Browse the repository at this point in the history
With this commit, we support three different scenarios:

* `retentionPeriod` is undefined - original never removed,
* `retentionPeriod` is zero - original is removed immediately,
* `retentionPeriod` is >0 (e.g. "1s") - timer kicks in.

This fixes #10.
  • Loading branch information
sevein committed Nov 13, 2019
1 parent 416f3fe commit 64e36c9
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 8 deletions.
4 changes: 2 additions & 2 deletions internal/watcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type FilesystemConfig struct {
Inotify bool

Pipeline string
RetentionPeriod time.Duration
RetentionPeriod *time.Duration
}

// See minio.go for more.
Expand All @@ -31,5 +31,5 @@ type MinioConfig struct {
Token string

Pipeline string
RetentionPeriod time.Duration
RetentionPeriod *time.Duration
}
2 changes: 1 addition & 1 deletion internal/watcher/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type BlobEvent struct {
PipelineName string

// Retention period for this blob.
RetentionPeriod time.Duration
RetentionPeriod *time.Duration

// Key of the blob.
Key string
Expand Down
6 changes: 3 additions & 3 deletions internal/watcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ type Watcher interface {
// Every watcher targets a pipeline.
Pipeline() string

RetentionPeriod() time.Duration
RetentionPeriod() *time.Duration

fmt.Stringer // It should return the name of the watcher.
}

type commonWatcherImpl struct {
name string
pipeline string
retentionPeriod time.Duration
retentionPeriod *time.Duration
}

func (w *commonWatcherImpl) String() string {
Expand All @@ -42,7 +42,7 @@ func (w *commonWatcherImpl) Pipeline() string {
return w.pipeline
}

func (w *commonWatcherImpl) RetentionPeriod() time.Duration {
func (w *commonWatcherImpl) RetentionPeriod() *time.Duration {
return w.retentionPeriod
}

Expand Down
4 changes: 2 additions & 2 deletions internal/workflow/processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,8 @@ func (w *ProcessingWorkflow) Execute(ctx workflow.Context, req *collection.Proce

// Schedule deletion of the original in the watched data source.
var deletionTimer workflow.Future
if tinfo.Status == collection.StatusDone {
deletionTimer = workflow.NewTimer(ctx, tinfo.Event.RetentionPeriod)
if tinfo.Status == collection.StatusDone && tinfo.Event.RetentionPeriod != nil {
deletionTimer = workflow.NewTimer(ctx, *tinfo.Event.RetentionPeriod)
}

// Activities that we want to run within the session regardless the
Expand Down

0 comments on commit 64e36c9

Please sign in to comment.