Skip to content

Commit

Permalink
Feat: Add pipeline support
Browse files Browse the repository at this point in the history
  • Loading branch information
dynastywind committed Oct 16, 2021
1 parent 6d970b8 commit 5e70e29
Show file tree
Hide file tree
Showing 18 changed files with 583 additions and 0 deletions.
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,19 @@ go get github.com/dynastywind/go-commons

This repository is composed of different utility function modules. You can pick any one of them to use at your own service.

### Pipeline

This module contains a powerful tool to execute a series of jobs either sequentially or concurrently.

#### Usage

```go
pipeline.NewSequentialJob("sum", 0, jobs, sumAggregator, pipeline.DefaultJobConfig().WithAllowError(false),pipeline.NewDefaultErrorHandler(), pipeline.NewDefaultSummary()).Do(context.Background())

pipeline.NewConcurrentJob("sum", 0, jobs, sumAggregator, pipeline.DefaultJobConfig().WithAllowError(false),pipeline.NewDefaultErrorHandler(), pipeline.NewDefaultSummary()).Do(context.Background())
```
By default, a job execution summary will be given once your job is finished, whatever its result.

### Either

This module provides an Either type containing either one type or another.
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ module github.com/dynastywind/go-commons
go 1.16

require (
github.com/google/uuid v1.3.0
github.com/onsi/ginkgo v1.16.4
github.com/onsi/gomega v1.15.0
github.com/sirupsen/logrus v1.8.1
)
9 changes: 9 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
Expand All @@ -18,6 +19,8 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
Expand All @@ -30,8 +33,13 @@ github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7J
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/onsi/gomega v1.15.0 h1:WjP/FQ/sk43MRmnEcT+MlDw2TFvkrXlprrPST/IudjU=
github.com/onsi/gomega v1.15.0/go.mod h1:cIuvLEne0aoVhAgh/O6ac0Op8WWw9H6eYCriF+tEHG0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
Expand All @@ -53,6 +61,7 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
84 changes: 84 additions & 0 deletions pipeline/concurrent_job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package pipeline

import (
"context"
"fmt"
"time"

"github.com/google/uuid"
)

type ConcurrentJob struct {
id string
name string
defaultValue interface{}
jobs []Job
config JobConfig
aggregator Aggregator
errorHandler ErrorHandler
summary Summary
}

func (concurrent ConcurrentJob) Do(ctx context.Context) JobResult {
start := time.Now()
r := concurrent.do(ctx)
elapsed := time.Since(start).Milliseconds()
concurrent.summary.summary(concurrent.id, concurrent.name, len(concurrent.jobs), concurrent.config, elapsed)
return r
}

func (concurrent ConcurrentJob) do(ctx context.Context) JobResult {
data := concurrent.defaultValue
ch := make(chan JobResult, concurrent.config.maxConcurrency)
length := len(concurrent.jobs)
for index, job := range concurrent.jobs {
go func(c context.Context, j Job) {
defer func() {
if re := recover(); re != nil {
ch <- FailureResult(fmt.Errorf("concurrent job %v unexpected error: %v", concurrent.id, re), "Unexpected failure")
}
}()
ch <- j.Do(c)
}(ctx, job)
if (index+1)%concurrent.config.maxConcurrency == 0 || index+1 == length {
for i := 0; i <= index%concurrent.config.maxConcurrency; i++ {
r := <-ch
if r.Success {
d, e := concurrent.aggregator(ctx, data, r.Data)
if e != nil {
if terminate := concurrent.errorHandler.handleError(concurrent.config, concurrent.name, concurrent.id,
fmt.Sprintf("Concurrent job %v aggregation error: %v", concurrent.id, e.Error()), e); terminate != nil {
return *terminate
}
} else {
data = d
}
} else {
if terminate := concurrent.errorHandler.handleError(concurrent.config, concurrent.name, concurrent.id,
fmt.Sprintf("Concurrent job %v error: %v", concurrent.id, r.Message), r.Error); terminate != nil {
return *terminate
}
}
}
}
}
return SuccessResultWithData(data)
}

func NewDefaultConcurrentJob(name string, defaultValue interface{}, jobs []Job, aggregator Aggregator) ConcurrentJob {
return NewConcurrentJob(name, defaultValue, jobs, aggregator, DefaultJobConfig(), NewDefaultErrorHandler(), NewDefaultSummary())
}

func NewConcurrentJob(name string, defaultValue interface{}, jobs []Job, aggregator Aggregator, config JobConfig,
errorHandler ErrorHandler, summary Summary) ConcurrentJob {
return ConcurrentJob{
id: uuid.New().String(),
name: name,
defaultValue: defaultValue,
jobs: jobs,
config: config,
aggregator: aggregator,
errorHandler: errorHandler,
summary: summary,
}
}
50 changes: 50 additions & 0 deletions pipeline/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package pipeline

import "fmt"

const (
DefaultMaxConcurrency = 40
)

type JobConfig struct {
allowError bool
logError bool
summary bool
maxConcurrency int
}

func (config JobConfig) WithAllowError(allowError bool) JobConfig {
config.allowError = allowError
return config
}

func (config JobConfig) WithLogError(logError bool) JobConfig {
config.logError = logError
return config
}

func (config JobConfig) WithMaxConcurrency(maxConcurrency int) JobConfig {
if maxConcurrency > 0 && maxConcurrency <= DefaultMaxConcurrency {
config.maxConcurrency = maxConcurrency
}
return config
}

func (config JobConfig) WithSummary(summary bool) JobConfig {
config.summary = summary
return config
}

func (config JobConfig) String() string {
return fmt.Sprintf("AllowError: %v\nLogError: %v\nSummary: %v\nMaxConcurrency: %v\n",
config.allowError, config.logError, config.summary, config.maxConcurrency)
}

func DefaultJobConfig() JobConfig {
return JobConfig{
allowError: true,
logError: true,
summary: true,
maxConcurrency: DefaultMaxConcurrency,
}
}
29 changes: 29 additions & 0 deletions pipeline/error_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package pipeline

import "github.com/sirupsen/logrus"

type ErrorHandler interface {
name() string
handleError(config JobConfig, name, id, msg string, e error) *JobResult
}

type DefaultErrorHandler struct{}

func (handler DefaultErrorHandler) name() string {
return "DefaultErrorHandler"
}

func (handler DefaultErrorHandler) handleError(config JobConfig, name, id, msg string, e error) *JobResult {
if config.logError {
logrus.WithField("name", name).WithField("id", id).Error(msg)
}
if !config.allowError {
r := FailureResult(e, msg)
return &r
}
return nil
}

func NewDefaultErrorHandler() DefaultErrorHandler {
return DefaultErrorHandler{}
}
20 changes: 20 additions & 0 deletions pipeline/job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package pipeline

import "context"

type Job interface {
// Do returns job execution result
//
// @params ctx Execution context
// @return Job execution result
Do(ctx context.Context) JobResult
}

// Aggregator aggregates a job's result with an accumulated result, and returns this final result
//
// @param ctx Execution context
// @param prior Accumulated result
// @param current Current job's result, which can be diffrent from prior
// @return The first value is the job result to be returned, which should be the same type as prior.
// @return The second value is a potential error
type Aggregator func(ctx context.Context, prior, current interface{}) (interface{}, error)
27 changes: 27 additions & 0 deletions pipeline/result.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package pipeline

type JobResult struct {
Success bool `json:"success"`
Message string `json:"message"`
Error error `json:"-"`
Data interface{} `json:"data"`
}

func SuccessResultWithData(data interface{}) JobResult {
return JobResult{
Success: true,
Data: data,
}
}

func SuccessResult() JobResult {
return SuccessResultWithData(nil)
}

func FailureResult(e error, msg string) JobResult {
return JobResult{
Success: false,
Message: msg,
Error: e,
}
}
86 changes: 86 additions & 0 deletions pipeline/sequential_job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package pipeline

import (
"context"
"fmt"
"time"

"github.com/google/uuid"
)

type SequentialJob struct {
id string
name string
jobs []Job
config JobConfig
defaultValue interface{}
aggregator Aggregator
errorHandler ErrorHandler
summary Summary
}

func (sequential SequentialJob) Do(ctx context.Context) JobResult {
start := time.Now()
r := sequential.do(ctx)
elapsed := time.Since(start).Milliseconds()
sequential.summary.summary(sequential.id, sequential.name, len(sequential.jobs), sequential.config, elapsed)
return r
}

func (sequential SequentialJob) do(ctx context.Context) JobResult {
ch := make(chan JobResult)
go func(c context.Context, jobs []Job) {
var data = sequential.defaultValue
defer func() {
if re := recover(); re != nil {
if r := sequential.errorHandler.handleError(sequential.config, sequential.name, sequential.id,
"Unexpected failure", fmt.Errorf("sequential job %v unexpected error: %v", sequential.id, re)); r != nil {
ch <- *r
} else {
ch <- SuccessResultWithData(data)
}
}
}()
for _, job := range sequential.jobs {
r := job.Do(ctx)
if r.Success {
d, e := sequential.aggregator(ctx, data, r.Data)
if e != nil {
if terminate := sequential.errorHandler.handleError(sequential.config, sequential.name, sequential.id,
fmt.Sprintf("Sequential job %v aggregation error: %v", sequential.id, e.Error()), e); terminate != nil {
ch <- *terminate
return
}
} else {
data = d
}
} else {
if terminate := sequential.errorHandler.handleError(sequential.config, sequential.name, sequential.id,
fmt.Sprintf("Sequential job %v error: %v", sequential.id, r.Message), r.Error); terminate != nil {
ch <- *terminate
return
}
}
}
ch <- SuccessResultWithData(data)
}(ctx, sequential.jobs)
return <-ch
}

func NewDefaultSequentialJob(name string, defaultValue interface{}, jobs []Job, aggregator Aggregator) SequentialJob {
return NewSequentialJob(name, defaultValue, jobs, aggregator, DefaultJobConfig(), NewDefaultErrorHandler(), NewDefaultSummary())
}

func NewSequentialJob(name string, defaultValue interface{}, jobs []Job, aggregator Aggregator, config JobConfig,
errorHandler ErrorHandler, summary Summary) SequentialJob {
return SequentialJob{
id: uuid.New().String(),
name: name,
jobs: jobs,
config: config,
defaultValue: defaultValue,
aggregator: aggregator,
errorHandler: errorHandler,
summary: summary,
}
}
23 changes: 23 additions & 0 deletions pipeline/summary.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package pipeline

import (
"github.com/sirupsen/logrus"
)

type Summary interface {
summary(id, name string, jobs int, config JobConfig, elapsed int64)
}

type DefaultSummary struct{}

func (summary DefaultSummary) summary(id, name string, jobs int, config JobConfig, elapsed int64) {
if config.summary {
logrus.WithField("id", id).WithField("name", name).WithField("jobs count", jobs).
WithField("config", config.String()).WithField("elapsed", elapsed).
Info("Job summary details")
}
}

func NewDefaultSummary() DefaultSummary {
return DefaultSummary{}
}
File renamed without changes.
File renamed without changes.
Loading

0 comments on commit 5e70e29

Please sign in to comment.