Skip to content

Commit

Permalink
Merge pull request #2 from ignavan39/v2
Browse files Browse the repository at this point in the history
add worker pool
  • Loading branch information
ignavan39 authored Sep 22, 2023
2 parents 7ea0d6f + 5108d12 commit 8221be8
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 37 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.idea
19 changes: 11 additions & 8 deletions example/main.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
package main

import (
"fmt"
"piper"

"github.com/streadway/amqp"
)

func main() {
connect, err := amqp.Dial("amqp://user:pass@localhost:5672")
connect, err := amqp.Dial("amqp://root:pass@localhost:5672")
if err != nil {
panic(err)
}
c, err := piper.NewReadQueue(connect, "test.exchange", "test", 2, "test")
c, err := piper.NewReadQueue(connect, "test.exchange", "test", 40, "test")
if err != nil {
panic(err)
}
Expand All @@ -32,14 +31,18 @@ func main() {
}
}()

c.WithReport("test.report", "test.report")
go c.Run()
rq, err := c.WithReport("test.report", "test.report")
if err != nil {
panic(err)
}
for message := range c.Read() {
fmt.Println(message)
c.Report() <- piper.Report{
go func() {
err := rq.Run()
if err != nil {
panic(err)
}
}()
for message := range rq.Read() {
rq.Report() <- piper.Report{
Done: &piper.DoneReport{
Status: 1,
},
Expand Down
56 changes: 27 additions & 29 deletions read_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import (
"bytes"
"encoding/json"
"fmt"
"sync"

"github.com/streadway/amqp"
"piper/wpqueue"
"sync"
)

type ReadQueueReport struct {
Expand Down Expand Up @@ -129,37 +129,35 @@ func (rq *ReadQueue) Run() error {
if err != nil {
return fmt.Errorf("[Rq][%s][Run]: %s", rq.queue, err)
}
wg.Add(1)
pool := wpqueue.NewWorkerPool(rq.routines, rq.queue, deliveries)
go pool.RunWorkerPool()

wg.Add(rq.routines)
go func() {
for i := 0; i < rq.routines; i++ {
go func(idx int) {
defer wg.Done()
for {
select {
case delivery, ok := <-deliveries:
if !ok {
fmt.Printf("[Rq][%s][%d][Run][channel closed]\n", rq.queue, idx)
return
}
var message *Message
if err := json.NewDecoder(bytes.NewReader(delivery.Body)).Decode(&message); err != nil {
fmt.Printf("[Rq][%s][%d][Run][failed decode]: %s\n", rq.queue, idx, err)
if err := delivery.Ack(false); err != nil {
fmt.Printf("[Rq][%s][%d][Run][failed ack]: %s", rq.queue, idx, err)
continue
}
continue
}
if err := delivery.Ack(false); err != nil {
fmt.Printf("[Rq][%s][%d][Run][failed ack]: %s", rq.queue, idx, err)
continue
}
fmt.Println(idx)
rq.read <- *message
defer wg.Done()
for {
select {
case result, ok := <-pool.Results():
if !ok {
fmt.Printf("[Rq][%s][Run][queue result channel closed]\n", rq.queue)
return
}
var message *Message
if err := json.NewDecoder(bytes.NewReader(result.Delivery.Body)).Decode(&message); err != nil {
fmt.Printf("[Rq][%s][%d][Run][failed decode]: %s\n", rq.queue, result.WorkerId, err)
if err := result.Delivery.Ack(false); err != nil {
fmt.Printf("[Rq][%s][%d][Run][failed ack]: %s", rq.queue, result.WorkerId, err)
continue
}
continue
}
if err := result.Delivery.Ack(false); err != nil {
fmt.Printf("[Rq][%s][%d][Run][failed ack]: %s", rq.queue, result.WorkerId, err)
continue
}
}(i)
fmt.Println(*message)
rq.read <- *message
}
}
}()

Expand Down
15 changes: 15 additions & 0 deletions wpqueue/type.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package wpqueue

import "github.com/streadway/amqp"

type QueueWorkerPool struct {
workersCount int
queue string
deliveries <-chan amqp.Delivery
results chan ResultDelivery
}

type ResultDelivery struct {
WorkerId int
Delivery amqp.Delivery
}
47 changes: 47 additions & 0 deletions wpqueue/wp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package wpqueue

import (
"fmt"
"github.com/streadway/amqp"
"sync"
)

func NewWorkerPool(count int, queue string, deliveries <-chan amqp.Delivery) QueueWorkerPool {
return QueueWorkerPool{
workersCount: count,
queue: queue,
deliveries: deliveries,
results: make(chan ResultDelivery),
}
}

func (wp QueueWorkerPool) RunWorkerPool() {
var wg sync.WaitGroup
for i := 0; i < wp.workersCount; i++ {
wg.Add(1)
go wp.workerProcessing(i, &wg)
}
wg.Wait()
close(wp.results)
}

func (wp QueueWorkerPool) Results() <-chan ResultDelivery {
return wp.results
}

func (wp QueueWorkerPool) workerProcessing(numWorker int, wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case delivery, ok := <-wp.deliveries:
if !ok {
fmt.Printf("[Rq][%s][%d][Run][channel closed]\n", wp.queue, numWorker)
return
}
wp.results <- ResultDelivery{
WorkerId: numWorker,
Delivery: delivery,
}
}
}
}

0 comments on commit 8221be8

Please sign in to comment.