From 5108d12f0e2f695e02591c70c3c080cc29acb62c Mon Sep 17 00:00:00 2001 From: Serov Date: Fri, 22 Sep 2023 09:49:39 +0300 Subject: [PATCH] fixed for review --- read_queue.go | 8 +++----- types.go | 14 -------------- wp.go | 42 ------------------------------------------ wpqueue/type.go | 15 +++++++++++++++ wpqueue/wp.go | 47 +++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 65 insertions(+), 61 deletions(-) delete mode 100644 wp.go create mode 100644 wpqueue/type.go create mode 100644 wpqueue/wp.go diff --git a/read_queue.go b/read_queue.go index 20a9864..41291b4 100644 --- a/read_queue.go +++ b/read_queue.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "github.com/streadway/amqp" + "piper/wpqueue" "sync" ) @@ -129,7 +130,7 @@ func (rq *ReadQueue) Run() error { return fmt.Errorf("[Rq][%s][Run]: %s", rq.queue, err) } wg.Add(1) - pool := NewWorkerPool(rq.routines, deliveries) + pool := wpqueue.NewWorkerPool(rq.routines, rq.queue, deliveries) go pool.RunWorkerPool() go func() { @@ -138,10 +139,7 @@ func (rq *ReadQueue) Run() error { select { case result, ok := <-pool.Results(): if !ok { - continue - } - if !result.Success { - fmt.Printf("[Rq][%s][%d][Run][channel closed]\n", rq.queue, result.WorkerId) + fmt.Printf("[Rq][%s][Run][queue result channel closed]\n", rq.queue) return } var message *Message diff --git a/types.go b/types.go index 65b9610..9548ad0 100644 --- a/types.go +++ b/types.go @@ -1,7 +1,5 @@ package piper -import "github.com/streadway/amqp" - type Message struct { UID string `json:"uid"` Payload any `json:"payload"` @@ -27,15 +25,3 @@ type Report struct { Reject *RejectReport `json:"reject,omitempty"` Fail *FailReport `json:"fail,omitempty"` } - -type WorkerPool struct { - workersCount int - deliveries <-chan amqp.Delivery - results chan ResultDelivery -} - -type ResultDelivery struct { - Success bool - WorkerId int - Delivery amqp.Delivery -} diff --git a/wp.go b/wp.go deleted file mode 100644 index 4ebfbc9..0000000 --- a/wp.go +++ /dev/null @@ -1,42 +0,0 @@ -package piper - -import ( - "github.com/streadway/amqp" - "sync" -) - -func NewWorkerPool(count int, deliveries <-chan amqp.Delivery) WorkerPool { - return WorkerPool{ - workersCount: count, - deliveries: deliveries, - results: make(chan ResultDelivery), - } -} - -func (wp WorkerPool) RunWorkerPool() { - var wg sync.WaitGroup - for i := 0; i < wp.workersCount; i++ { - wg.Add(1) - go workerProcessing(i, &wg, wp.deliveries, wp.results) - } - wg.Wait() - close(wp.results) -} - -func (wp WorkerPool) Results() <-chan ResultDelivery { - return wp.results -} - -func workerProcessing(numWorker int, wg *sync.WaitGroup, deliveries <-chan amqp.Delivery, results chan<- ResultDelivery) { - defer wg.Done() - for { - select { - case delivery, ok := <-deliveries: - results <- ResultDelivery{ - Success: ok, - WorkerId: numWorker, - Delivery: delivery, - } - } - } -} diff --git a/wpqueue/type.go b/wpqueue/type.go new file mode 100644 index 0000000..3f1893d --- /dev/null +++ b/wpqueue/type.go @@ -0,0 +1,15 @@ +package wpqueue + +import "github.com/streadway/amqp" + +type QueueWorkerPool struct { + workersCount int + queue string + deliveries <-chan amqp.Delivery + results chan ResultDelivery +} + +type ResultDelivery struct { + WorkerId int + Delivery amqp.Delivery +} diff --git a/wpqueue/wp.go b/wpqueue/wp.go new file mode 100644 index 0000000..6d39630 --- /dev/null +++ b/wpqueue/wp.go @@ -0,0 +1,47 @@ +package wpqueue + +import ( + "fmt" + "github.com/streadway/amqp" + "sync" +) + +func NewWorkerPool(count int, queue string, deliveries <-chan amqp.Delivery) QueueWorkerPool { + return QueueWorkerPool{ + workersCount: count, + queue: queue, + deliveries: deliveries, + results: make(chan ResultDelivery), + } +} + +func (wp QueueWorkerPool) RunWorkerPool() { + var wg sync.WaitGroup + for i := 0; i < wp.workersCount; i++ { + wg.Add(1) + go wp.workerProcessing(i, &wg) + } + wg.Wait() + close(wp.results) +} + +func (wp QueueWorkerPool) Results() <-chan ResultDelivery { + return wp.results +} + +func (wp QueueWorkerPool) workerProcessing(numWorker int, wg *sync.WaitGroup) { + defer wg.Done() + for { + select { + case delivery, ok := <-wp.deliveries: + if !ok { + fmt.Printf("[Rq][%s][%d][Run][channel closed]\n", wp.queue, numWorker) + return + } + wp.results <- ResultDelivery{ + WorkerId: numWorker, + Delivery: delivery, + } + } + } +}