Skip to content

Commit

Permalink
add support for creating quorum queues
Browse files Browse the repository at this point in the history
  • Loading branch information
abhinav-verloop committed Nov 26, 2024
1 parent 448ad41 commit aacc876
Showing 1 changed file with 34 additions and 4 deletions.
38 changes: 34 additions & 4 deletions hedwig.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,19 @@ const (
SubscribeChannel = "subscribe"
)

const (
MinDeliveryLimit uint = 1
)

type Callback func(<-chan amqp.Delivery, *sync.WaitGroup)

type QueueType uint

const (
QueueType_Classic QueueType = 0
QueueType_Quorum QueueType = 1
)

func DefaultSettings() *Settings {
return &Settings{
Exchange: "hedwig",
Expand All @@ -35,9 +46,10 @@ func DefaultSettings() *Settings {

func DefaultQueueSetting(callback Callback, bindings ...string) *QueueSetting {
return &QueueSetting{
Bindings: bindings,
Durable: true,
Callback: callback,
Bindings: bindings,
Durable: true,
Callback: callback,
QueueType: QueueType_Classic,
}
}

Expand All @@ -55,6 +67,13 @@ type QueueSetting struct {
AutoDelete bool
Exclusive bool
NoAck bool
QueueType QueueType
// DeliveryLimit specifies the maximum number of times a message can be redelivered.
// It is useful for preventing message poisoning by limiting how many times a message
// can be retried before it is considered "poisoned" and either dead-lettered or discarded.
// Note: This setting has no effect on Classic queues, which do not support delivery limits.
// Caution: Once set, the DeliveryLimit cannot be updated during the lifetime of the queue.
DeliveryLimit uint
}
type ConsumerSetting struct {
tag string
Expand Down Expand Up @@ -191,7 +210,18 @@ func (h *Hedwig) setupListeners() (err error) {
qSetting.Durable = false
qSetting.Exclusive = true
}
q, err := c.QueueDeclare(qName, qSetting.Durable, qSetting.AutoDelete, qSetting.Exclusive, false, nil)
var queueArgs amqp.Table
if qSetting.QueueType == QueueType_Quorum {
queueArgs = amqp.Table{
"x-queue-type": "quorum",
}
deliveryLimit := MinDeliveryLimit
if qSetting.DeliveryLimit > deliveryLimit {
deliveryLimit = qSetting.DeliveryLimit
}
queueArgs["x-delivery-limit"] = int32(deliveryLimit)
}
q, err := c.QueueDeclare(qName, qSetting.Durable, qSetting.AutoDelete, qSetting.Exclusive, false, queueArgs)
if err != nil {
return err
}
Expand Down

0 comments on commit aacc876

Please sign in to comment.