diff --git a/gorm_mq/consumer.go b/gorm_mq/consumer.go index 33d3753..1049494 100644 --- a/gorm_mq/consumer.go +++ b/gorm_mq/consumer.go @@ -37,7 +37,6 @@ type MqConsumer struct { name string groupId string locker sync.RWMutex - corsor int64 // 每次至多消费多少 limit int // 抢占超时时间 diff --git a/gorm_mq/mq.go b/gorm_mq/mq.go index 412dfcf..682a16d 100644 --- a/gorm_mq/mq.go +++ b/gorm_mq/mq.go @@ -18,11 +18,12 @@ import ( "context" "errors" "fmt" - "github.com/ecodeclub/mq-sql/gorm_mq/balancer/equal_divide" "log" "sync" "time" + "github.com/ecodeclub/mq-sql/gorm_mq/balancer/equal_divide" + "github.com/ecodeclub/ekit/syncx" "github.com/ecodeclub/mq-api" "github.com/ecodeclub/mq-sql/gorm_mq/domain" @@ -121,7 +122,6 @@ func (m *Mq) Topic(name string, partition int) error { func (tp *Topic) Close() error { tp.lock.Lock() - defer tp.lock.Unlock() tp.once.Do(func() { for _, ch := range tp.msgCh { close(ch) @@ -130,7 +130,7 @@ func (tp *Topic) Close() error { close(ch) } }) - + tp.lock.Unlock() return nil }