Skip to content

Commit

Permalink
Added support for Out Channel Buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
tejzpr committed Mar 11, 2021
1 parent 638633e commit 6a66e08
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 4 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ func workFn(val interface{}) interface{} {
```go
func main() {
max := 10
// Can be a non blocking channel as well
inputChan := make(chan *concurrently.OrderedInput)
doneChan := make(chan bool)
outChan := concurrently.Process(inputChan, workFn, &concurrently.Options{PoolSize: 10})
outChan := concurrently.Process(inputChan, workFn, &concurrently.Options{PoolSize: 10, OutChannelBufferSize: 2})
go func() {
for {
select {
Expand Down Expand Up @@ -58,6 +59,7 @@ func main() {
```go
func main() {
max := 100
// Can be a non blocking channel as well
inputChan := make(chan *concurrently.OrderedInput)
wg := &sync.WaitGroup{}

Expand Down
5 changes: 3 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ type OrderedOutput struct {

// Options options for Process
type Options struct {
PoolSize int
PoolSize int
OutChannelBuffer int
}

// WorkFunction the function which performs work
Expand All @@ -26,7 +27,7 @@ type WorkFunction func(interface{}) interface{}
// It Accepts an OrderedInput read channel, work function and concurrent go routine pool size.
// It Returns an OrderedOutput channel.
func Process(inputChan <-chan *OrderedInput, wf WorkFunction, options *Options) <-chan *OrderedOutput {
outputChan := make(chan *OrderedOutput)
outputChan := make(chan *OrderedOutput, options.OutChannelBuffer)
type processInput struct {
value interface{}
order uint64
Expand Down
2 changes: 1 addition & 1 deletion main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func Test(t *testing.T) {
max := 10
inputChan := make(chan *OrderedInput)
doneChan := make(chan bool)
outChan := Process(inputChan, workFn, &Options{})
outChan := Process(inputChan, workFn, &Options{OutChannelBuffer: 2})
go func(t *testing.T) {
counter := 0
for {
Expand Down

0 comments on commit 6a66e08

Please sign in to comment.