Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an option to make a persistent interceptor queue #57

Merged
merged 5 commits into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ GLOBAL OPTIONS:
--insecure Skips certificate verification. (default: false)
--noprompt Disables password prompt. (default: false)
--silent Disables terminal print. (default: false)
--persistent Creates a persistent interceptor queue. (default: false)
ghokun marked this conversation as resolved.
Show resolved Hide resolved
--help, -h show help
--version, -v print the version
```
29 changes: 24 additions & 5 deletions coyote.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ func main() {
Name: "silent",
Usage: "Disables terminal print.",
},
&cli.BoolFlag{
Name: "persistent",
Usage: "Creates a persistent interceptor queue.",
},
},
Action: func(ctx *cli.Context) error {
u, err := url.Parse(ctx.String("url"))
Expand Down Expand Up @@ -174,17 +178,32 @@ func main() {
log.Printf("💔 Terminating AMQP channel")
}()

persistent := ctx.Bool("persistent")
q, err := ch.QueueDeclare(
fmt.Sprintf("%s.%s", ctx.String("queue"), uuid.NewString()), // queue name
false, // is durable
true, // is auto delete
true, // is exclusive
false, // is no wait
nil, // args
false, // is durable
!persistent, // is auto delete
!persistent, // is exclusive
false, // is no wait
nil, // args
)
if err != nil {
return fmt.Errorf("%s %w", color.RedString("failed to declare a queue:"), err)
}
if persistent {
defer func() {
log.Printf("💔 Deleting queue %s", q.Name)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will delete the queue even if it is persistent right? Can you help me understand what is the benefit here?

Copy link
Owner

@ghokun ghokun Sep 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On a second look, I have an idea to use --queue argument to create persistent queue. Allow coyote to reconnect to an existing queue.

By default we generate interceptor queue names randomly with an UUID suffix:

<queue_name:interceptor>.<uuid>

We can make it persistent if queue argument is provided.

# queue name will not be suffixed with an UUID.
coyote --queue <queue_name>

In this case coyote will try to bind to the queue if it exists or create it does not.

So persistent flag is not needed anymore. What do you think?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You right! The logic here is coupled with a reconnection logic - if the queue is not persistent and we're disconnected from a cluster then RabbitMQ will automatically delete it.
Do you think it might be useful to have an extra flag to enable/disable the queue auto deletion?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now I tied it to queue argument being present or not. Let's revisit with auto reconnect functionality.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a brilliant idea, I like it! And indeed it makes sense to keep the queue even between restarts.

_, err := ch.QueueDelete(
q.Name,
false, // IfUnused: delete only if the queue is unused
false, // IfEmpty: delete only if the queue is empty
false, // NoWait: wait for server confirmation
)
if err != nil {
log.Fatalf("Failed to delete the queue: %v", err)
}
}()
}

for _, c := range ctx.Generic("exchange").(*listen).c {
err = ch.ExchangeDeclarePassive(
Expand Down
1 change: 1 addition & 0 deletions coyote_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ GLOBAL OPTIONS:
--insecure Skips certificate verification. (default: false)
--noprompt Disables password prompt. (default: false)
--silent Disables terminal print. (default: false)
--persistent Creates a persistent interceptor queue. (default: false)
--help, -h show help
--version, -v print the version`
)
Loading