Skip to content

Commit

Permalink
fixed for review
Browse files Browse the repository at this point in the history
  • Loading branch information
Serov committed Sep 22, 2023
1 parent 0b26ef0 commit 5108d12
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 61 deletions.
8 changes: 3 additions & 5 deletions read_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"github.com/streadway/amqp"
"piper/wpqueue"
"sync"
)

Expand Down Expand Up @@ -129,7 +130,7 @@ func (rq *ReadQueue) Run() error {
return fmt.Errorf("[Rq][%s][Run]: %s", rq.queue, err)
}
wg.Add(1)
pool := NewWorkerPool(rq.routines, deliveries)
pool := wpqueue.NewWorkerPool(rq.routines, rq.queue, deliveries)
go pool.RunWorkerPool()

go func() {
Expand All @@ -138,10 +139,7 @@ func (rq *ReadQueue) Run() error {
select {
case result, ok := <-pool.Results():
if !ok {
continue
}
if !result.Success {
fmt.Printf("[Rq][%s][%d][Run][channel closed]\n", rq.queue, result.WorkerId)
fmt.Printf("[Rq][%s][Run][queue result channel closed]\n", rq.queue)
return
}
var message *Message
Expand Down
14 changes: 0 additions & 14 deletions types.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package piper

import "github.com/streadway/amqp"

type Message struct {
UID string `json:"uid"`
Payload any `json:"payload"`
Expand All @@ -27,15 +25,3 @@ type Report struct {
Reject *RejectReport `json:"reject,omitempty"`
Fail *FailReport `json:"fail,omitempty"`
}

type WorkerPool struct {
workersCount int
deliveries <-chan amqp.Delivery
results chan ResultDelivery
}

type ResultDelivery struct {
Success bool
WorkerId int
Delivery amqp.Delivery
}
42 changes: 0 additions & 42 deletions wp.go

This file was deleted.

15 changes: 15 additions & 0 deletions wpqueue/type.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package wpqueue

import "github.com/streadway/amqp"

type QueueWorkerPool struct {
workersCount int
queue string
deliveries <-chan amqp.Delivery
results chan ResultDelivery
}

type ResultDelivery struct {
WorkerId int
Delivery amqp.Delivery
}
47 changes: 47 additions & 0 deletions wpqueue/wp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package wpqueue

import (
"fmt"
"github.com/streadway/amqp"
"sync"
)

func NewWorkerPool(count int, queue string, deliveries <-chan amqp.Delivery) QueueWorkerPool {
return QueueWorkerPool{
workersCount: count,
queue: queue,
deliveries: deliveries,
results: make(chan ResultDelivery),
}
}

func (wp QueueWorkerPool) RunWorkerPool() {
var wg sync.WaitGroup
for i := 0; i < wp.workersCount; i++ {
wg.Add(1)
go wp.workerProcessing(i, &wg)
}
wg.Wait()
close(wp.results)
}

func (wp QueueWorkerPool) Results() <-chan ResultDelivery {
return wp.results
}

func (wp QueueWorkerPool) workerProcessing(numWorker int, wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case delivery, ok := <-wp.deliveries:
if !ok {
fmt.Printf("[Rq][%s][%d][Run][channel closed]\n", wp.queue, numWorker)
return
}
wp.results <- ResultDelivery{
WorkerId: numWorker,
Delivery: delivery,
}
}
}
}

0 comments on commit 5108d12

Please sign in to comment.