From 0b26ef093b35d1c48f37b61c353ac113ebc72d35 Mon Sep 17 00:00:00 2001 From: Serov Date: Thu, 21 Sep 2023 11:25:21 +0300 Subject: [PATCH 1/2] add wp --- .gitignore | 1 + example/main.go | 19 +++++++++------- read_queue.go | 58 ++++++++++++++++++++++++------------------------- types.go | 14 ++++++++++++ wp.go | 42 +++++++++++++++++++++++++++++++++++ 5 files changed, 97 insertions(+), 37 deletions(-) create mode 100644 .gitignore create mode 100644 wp.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..723ef36 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.idea \ No newline at end of file diff --git a/example/main.go b/example/main.go index 77ca1b5..6bda764 100644 --- a/example/main.go +++ b/example/main.go @@ -1,18 +1,17 @@ package main import ( - "fmt" "piper" "github.com/streadway/amqp" ) func main() { - connect, err := amqp.Dial("amqp://user:pass@localhost:5672") + connect, err := amqp.Dial("amqp://root:pass@localhost:5672") if err != nil { panic(err) } - c, err := piper.NewReadQueue(connect, "test.exchange", "test", 2, "test") + c, err := piper.NewReadQueue(connect, "test.exchange", "test", 40, "test") if err != nil { panic(err) } @@ -32,14 +31,18 @@ func main() { } }() - c.WithReport("test.report", "test.report") - go c.Run() + rq, err := c.WithReport("test.report", "test.report") if err != nil { panic(err) } - for message := range c.Read() { - fmt.Println(message) - c.Report() <- piper.Report{ + go func() { + err := rq.Run() + if err != nil { + panic(err) + } + }() + for message := range rq.Read() { + rq.Report() <- piper.Report{ Done: &piper.DoneReport{ Status: 1, }, diff --git a/read_queue.go b/read_queue.go index 0ea2d7e..20a9864 100644 --- a/read_queue.go +++ b/read_queue.go @@ -4,9 +4,8 @@ import ( "bytes" "encoding/json" "fmt" - "sync" - "github.com/streadway/amqp" + "sync" ) type ReadQueueReport struct { @@ -129,37 +128,38 @@ func (rq *ReadQueue) Run() error { if err != nil { return fmt.Errorf("[Rq][%s][Run]: %s", rq.queue, err) } + wg.Add(1) + pool := NewWorkerPool(rq.routines, deliveries) + go pool.RunWorkerPool() - wg.Add(rq.routines) go func() { - for i := 0; i < rq.routines; i++ { - go func(idx int) { - defer wg.Done() - for { - select { - case delivery, ok := <-deliveries: - if !ok { - fmt.Printf("[Rq][%s][%d][Run][channel closed]\n", rq.queue, idx) - return - } - var message *Message - if err := json.NewDecoder(bytes.NewReader(delivery.Body)).Decode(&message); err != nil { - fmt.Printf("[Rq][%s][%d][Run][failed decode]: %s\n", rq.queue, idx, err) - if err := delivery.Ack(false); err != nil { - fmt.Printf("[Rq][%s][%d][Run][failed ack]: %s", rq.queue, idx, err) - continue - } - continue - } - if err := delivery.Ack(false); err != nil { - fmt.Printf("[Rq][%s][%d][Run][failed ack]: %s", rq.queue, idx, err) - continue - } - fmt.Println(idx) - rq.read <- *message + defer wg.Done() + for { + select { + case result, ok := <-pool.Results(): + if !ok { + continue + } + if !result.Success { + fmt.Printf("[Rq][%s][%d][Run][channel closed]\n", rq.queue, result.WorkerId) + return + } + var message *Message + if err := json.NewDecoder(bytes.NewReader(result.Delivery.Body)).Decode(&message); err != nil { + fmt.Printf("[Rq][%s][%d][Run][failed decode]: %s\n", rq.queue, result.WorkerId, err) + if err := result.Delivery.Ack(false); err != nil { + fmt.Printf("[Rq][%s][%d][Run][failed ack]: %s", rq.queue, result.WorkerId, err) + continue } + continue } - }(i) + if err := result.Delivery.Ack(false); err != nil { + fmt.Printf("[Rq][%s][%d][Run][failed ack]: %s", rq.queue, result.WorkerId, err) + continue + } + fmt.Println(*message) + rq.read <- *message + } } }() diff --git a/types.go b/types.go index 9548ad0..65b9610 100644 --- a/types.go +++ b/types.go @@ -1,5 +1,7 @@ package piper +import "github.com/streadway/amqp" + type Message struct { UID string `json:"uid"` Payload any `json:"payload"` @@ -25,3 +27,15 @@ type Report struct { Reject *RejectReport `json:"reject,omitempty"` Fail *FailReport `json:"fail,omitempty"` } + +type WorkerPool struct { + workersCount int + deliveries <-chan amqp.Delivery + results chan ResultDelivery +} + +type ResultDelivery struct { + Success bool + WorkerId int + Delivery amqp.Delivery +} diff --git a/wp.go b/wp.go new file mode 100644 index 0000000..4ebfbc9 --- /dev/null +++ b/wp.go @@ -0,0 +1,42 @@ +package piper + +import ( + "github.com/streadway/amqp" + "sync" +) + +func NewWorkerPool(count int, deliveries <-chan amqp.Delivery) WorkerPool { + return WorkerPool{ + workersCount: count, + deliveries: deliveries, + results: make(chan ResultDelivery), + } +} + +func (wp WorkerPool) RunWorkerPool() { + var wg sync.WaitGroup + for i := 0; i < wp.workersCount; i++ { + wg.Add(1) + go workerProcessing(i, &wg, wp.deliveries, wp.results) + } + wg.Wait() + close(wp.results) +} + +func (wp WorkerPool) Results() <-chan ResultDelivery { + return wp.results +} + +func workerProcessing(numWorker int, wg *sync.WaitGroup, deliveries <-chan amqp.Delivery, results chan<- ResultDelivery) { + defer wg.Done() + for { + select { + case delivery, ok := <-deliveries: + results <- ResultDelivery{ + Success: ok, + WorkerId: numWorker, + Delivery: delivery, + } + } + } +} From 5108d12f0e2f695e02591c70c3c080cc29acb62c Mon Sep 17 00:00:00 2001 From: Serov Date: Fri, 22 Sep 2023 09:49:39 +0300 Subject: [PATCH 2/2] fixed for review --- read_queue.go | 8 +++----- types.go | 14 -------------- wp.go | 42 ------------------------------------------ wpqueue/type.go | 15 +++++++++++++++ wpqueue/wp.go | 47 +++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 65 insertions(+), 61 deletions(-) delete mode 100644 wp.go create mode 100644 wpqueue/type.go create mode 100644 wpqueue/wp.go diff --git a/read_queue.go b/read_queue.go index 20a9864..41291b4 100644 --- a/read_queue.go +++ b/read_queue.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "github.com/streadway/amqp" + "piper/wpqueue" "sync" ) @@ -129,7 +130,7 @@ func (rq *ReadQueue) Run() error { return fmt.Errorf("[Rq][%s][Run]: %s", rq.queue, err) } wg.Add(1) - pool := NewWorkerPool(rq.routines, deliveries) + pool := wpqueue.NewWorkerPool(rq.routines, rq.queue, deliveries) go pool.RunWorkerPool() go func() { @@ -138,10 +139,7 @@ func (rq *ReadQueue) Run() error { select { case result, ok := <-pool.Results(): if !ok { - continue - } - if !result.Success { - fmt.Printf("[Rq][%s][%d][Run][channel closed]\n", rq.queue, result.WorkerId) + fmt.Printf("[Rq][%s][Run][queue result channel closed]\n", rq.queue) return } var message *Message diff --git a/types.go b/types.go index 65b9610..9548ad0 100644 --- a/types.go +++ b/types.go @@ -1,7 +1,5 @@ package piper -import "github.com/streadway/amqp" - type Message struct { UID string `json:"uid"` Payload any `json:"payload"` @@ -27,15 +25,3 @@ type Report struct { Reject *RejectReport `json:"reject,omitempty"` Fail *FailReport `json:"fail,omitempty"` } - -type WorkerPool struct { - workersCount int - deliveries <-chan amqp.Delivery - results chan ResultDelivery -} - -type ResultDelivery struct { - Success bool - WorkerId int - Delivery amqp.Delivery -} diff --git a/wp.go b/wp.go deleted file mode 100644 index 4ebfbc9..0000000 --- a/wp.go +++ /dev/null @@ -1,42 +0,0 @@ -package piper - -import ( - "github.com/streadway/amqp" - "sync" -) - -func NewWorkerPool(count int, deliveries <-chan amqp.Delivery) WorkerPool { - return WorkerPool{ - workersCount: count, - deliveries: deliveries, - results: make(chan ResultDelivery), - } -} - -func (wp WorkerPool) RunWorkerPool() { - var wg sync.WaitGroup - for i := 0; i < wp.workersCount; i++ { - wg.Add(1) - go workerProcessing(i, &wg, wp.deliveries, wp.results) - } - wg.Wait() - close(wp.results) -} - -func (wp WorkerPool) Results() <-chan ResultDelivery { - return wp.results -} - -func workerProcessing(numWorker int, wg *sync.WaitGroup, deliveries <-chan amqp.Delivery, results chan<- ResultDelivery) { - defer wg.Done() - for { - select { - case delivery, ok := <-deliveries: - results <- ResultDelivery{ - Success: ok, - WorkerId: numWorker, - Delivery: delivery, - } - } - } -} diff --git a/wpqueue/type.go b/wpqueue/type.go new file mode 100644 index 0000000..3f1893d --- /dev/null +++ b/wpqueue/type.go @@ -0,0 +1,15 @@ +package wpqueue + +import "github.com/streadway/amqp" + +type QueueWorkerPool struct { + workersCount int + queue string + deliveries <-chan amqp.Delivery + results chan ResultDelivery +} + +type ResultDelivery struct { + WorkerId int + Delivery amqp.Delivery +} diff --git a/wpqueue/wp.go b/wpqueue/wp.go new file mode 100644 index 0000000..6d39630 --- /dev/null +++ b/wpqueue/wp.go @@ -0,0 +1,47 @@ +package wpqueue + +import ( + "fmt" + "github.com/streadway/amqp" + "sync" +) + +func NewWorkerPool(count int, queue string, deliveries <-chan amqp.Delivery) QueueWorkerPool { + return QueueWorkerPool{ + workersCount: count, + queue: queue, + deliveries: deliveries, + results: make(chan ResultDelivery), + } +} + +func (wp QueueWorkerPool) RunWorkerPool() { + var wg sync.WaitGroup + for i := 0; i < wp.workersCount; i++ { + wg.Add(1) + go wp.workerProcessing(i, &wg) + } + wg.Wait() + close(wp.results) +} + +func (wp QueueWorkerPool) Results() <-chan ResultDelivery { + return wp.results +} + +func (wp QueueWorkerPool) workerProcessing(numWorker int, wg *sync.WaitGroup) { + defer wg.Done() + for { + select { + case delivery, ok := <-wp.deliveries: + if !ok { + fmt.Printf("[Rq][%s][%d][Run][channel closed]\n", wp.queue, numWorker) + return + } + wp.results <- ResultDelivery{ + WorkerId: numWorker, + Delivery: delivery, + } + } + } +}