Skip to content

Commit

Permalink
Adding ListQueues for Cassandra and SQL (temporalio#5098)
Browse files Browse the repository at this point in the history
<!-- Describe what has changed in this PR -->
**What changed?**
Adding ListQueues API to QueueV2 and implementing it for Cassandra and
SQL

<!-- Tell your future self why have you made these changes -->
**Why?**
This API can be used by operators to list different types of queues
including DLQs.

<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->
**How did you test it?**
Unit tests with 100% coverage.

<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->
**Potential risks**


<!-- Is this PR a hotfix candidate or require that a notification be
sent to the broader community? (Yes/No) -->
**Is hotfix candidate?**
  • Loading branch information
prathyushpv authored Nov 14, 2023
1 parent 1f49713 commit 82f103d
Show file tree
Hide file tree
Showing 16 changed files with 614 additions and 74 deletions.
294 changes: 248 additions & 46 deletions api/persistence/v1/queues.pb.go

Large diffs are not rendered by default.

42 changes: 32 additions & 10 deletions common/persistence/cassandra/queue_v2_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (

commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/api/enums/v1"

persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/persistence"
Expand Down Expand Up @@ -61,13 +60,8 @@ const (
TemplateGetQueueQuery = `SELECT metadata_payload, metadata_encoding, version FROM queues WHERE queue_type = ? AND queue_name = ?`
TemplateRangeDeleteMessagesQuery = `DELETE FROM queue_messages WHERE queue_type = ? AND queue_name = ? AND queue_partition = ? AND message_id >= ? AND message_id <= ?`
TemplateUpdateQueueMetadataQuery = `UPDATE queues SET metadata_payload = ?, metadata_encoding = ?, version = ? WHERE queue_type = ? AND queue_name = ? IF version = ?`

// pageTokenPrefixByte is the first byte of the serialized page token. It's used to ensure that the page token is
// not empty. Without this, if the last_read_message_id is 0, the serialized page token would be empty, and clients
// could erroneously assume that there are no more messages beyond the first page. This is purely used to ensure
// that tokens are non-empty; it is not used to verify that the token is valid like the magic byte in some other
// protocols.
pageTokenPrefixByte = 0
// We will have to ALLOW FILTERING for this query since partition key consists of both queue_type and queue_name.
templateGetQueueNamesQuery = `SELECT queue_name FROM queues WHERE queue_type =? ALLOW FILTERING`
)

var (
Expand Down Expand Up @@ -221,8 +215,7 @@ func (s *queueV2Store) ReadMessages(
return nil, gocql.ConvertError("QueueV2ReadMessages", err)
}

nextPageToken := persistence.GetNextPageTokenForQueueV2(messages)

nextPageToken := persistence.GetNextPageTokenForReadMessages(messages)
return &persistence.InternalReadMessagesResponse{
Messages: messages,
NextPageToken: nextPageToken,
Expand Down Expand Up @@ -474,3 +467,32 @@ func (s *queueV2Store) getMaxMessageID(ctx context.Context, queueType persistenc
}
return maxMessageID, true, nil
}

func (s *queueV2Store) ListQueues(
ctx context.Context,
request *persistence.InternalListQueuesRequest,
) (*persistence.InternalListQueuesResponse, error) {
if request.PageSize <= 0 {
return nil, persistence.ErrNonPositiveListQueuesPageSize
}
iter := s.session.Query(
templateGetQueueNamesQuery,
request.QueueType,
).PageSize(request.PageSize).PageState(request.NextPageToken).WithContext(ctx).Iter()

var queues []string
for {
var queue string
if !iter.Scan(&queue) {
break
}
queues = append(queues, queue)
}
if err := iter.Close(); err != nil {
return nil, gocql.ConvertError("QueueV2ListQueues", err)
}
return &persistence.InternalListQueuesResponse{
QueueNames: queues,
NextPageToken: iter.PageState(),
}, nil
}
10 changes: 10 additions & 0 deletions common/persistence/client/fault_injection.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,16 @@ func (f *FaultInjectionQueueV2) RangeDeleteMessages(
return f.baseQueue.RangeDeleteMessages(ctx, request)
}

func (f *FaultInjectionQueueV2) ListQueues(
ctx context.Context,
request *persistence.InternalListQueuesRequest,
) (*persistence.InternalListQueuesResponse, error) {
if err := f.ErrorGenerator.Generate(); err != nil {
return nil, err
}
return f.baseQueue.ListQueues(ctx, request)
}

func NewFaultInjectionExecutionStore(
rate float64,
executionStore persistence.ExecutionStore,
Expand Down
9 changes: 9 additions & 0 deletions common/persistence/client/fault_injection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type (
readRequests *int
createRequests *int
rangeDeleteRequests *int
listQueuesRequests *int
}
)

Expand Down Expand Up @@ -86,6 +87,14 @@ func (t *testQueueV2) RangeDeleteMessages(
return nil, nil
}

func (t *testQueueV2) ListQueues(
context.Context,
*persistence.InternalListQueuesRequest,
) (*persistence.InternalListQueuesResponse, error) {
*t.listQueuesRequests++
return nil, nil
}

func (t *testDataStoreFactory) NewQueueV2() (persistence.QueueV2, error) {
if t.err != nil {
return nil, t.err
Expand Down
15 changes: 15 additions & 0 deletions common/persistence/mock/store_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions common/persistence/persistence_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,10 @@ type (
ctx context.Context,
request *InternalRangeDeleteMessagesRequest,
) (*InternalRangeDeleteMessagesResponse, error)
ListQueues(
ctx context.Context,
request *InternalListQueuesRequest,
) (*InternalListQueuesResponse, error)
}

QueueV2Type int
Expand Down Expand Up @@ -814,4 +818,15 @@ type (
InternalRangeDeleteMessagesResponse struct {
MessagesDeleted int64
}

InternalListQueuesRequest struct {
QueueType QueueV2Type
PageSize int
NextPageToken []byte
}

InternalListQueuesResponse struct {
QueueNames []string
NextPageToken []byte
}
)
9 changes: 9 additions & 0 deletions common/persistence/queue_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,21 @@ var (
ErrInvalidReadQueueMessagesNextPageToken = &InvalidPersistenceRequestError{
Msg: "invalid next-page token for reading queue messages",
}
ErrInvalidListQueuesNextPageToken = &InvalidPersistenceRequestError{
Msg: "invalid next-page token for listing queues",
}
ErrNonPositiveReadQueueMessagesPageSize = &InvalidPersistenceRequestError{
Msg: "non-positive page size for reading queue messages",
}
ErrInvalidQueueRangeDeleteMaxMessageID = &InvalidPersistenceRequestError{
Msg: "max message id for queue range delete is invalid",
}
ErrNonPositiveListQueuesPageSize = &InvalidPersistenceRequestError{
Msg: "non-positive page size for listing queues",
}
ErrNegativeListQueuesOffset = &InvalidPersistenceRequestError{
Msg: "negative offset for listing queues",
}
)

func NewQueueNotFoundError(queueType QueueV2Type, queueName string) error {
Expand Down
34 changes: 33 additions & 1 deletion common/persistence/queue_v2_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ const (
pageTokenPrefixByte = 0
)

func GetNextPageTokenForQueueV2(result []QueueV2Message) []byte {
func GetNextPageTokenForReadMessages(result []QueueV2Message) []byte {
if len(result) == 0 {
return nil
}
Expand Down Expand Up @@ -84,6 +84,38 @@ func GetMinMessageIDToReadForQueueV2(
return token.LastReadMessageId + 1, nil
}

func GetNextPageTokenForListQueues(queueNumber int64) []byte {
token := &persistencespb.ListQueuesNextPageToken{
LastReadQueueNumber: queueNumber,
}
// This can never fail if you inspect the implementation.
b, _ := token.Marshal()

// See the comment above pageTokenPrefixByte for why we want to do this.
return append([]byte{pageTokenPrefixByte}, b...)
}

func GetOffsetForListQueues(
nextPageToken []byte,
) (int64, error) {
if len(nextPageToken) == 0 {
return 0, nil
}
var token persistencespb.ListQueuesNextPageToken

// Skip the first byte. See the comment on pageTokenPrefixByte for more details.
err := token.Unmarshal(nextPageToken[1:])
if err != nil {
return 0, fmt.Errorf(
"%w: %q: %v",
ErrInvalidReadQueueMessagesNextPageToken,
nextPageToken,
err,
)
}
return token.LastReadQueueNumber, nil
}

func GetPartitionForQueueV2(
queueType QueueV2Type,
queueName string,
Expand Down
45 changes: 44 additions & 1 deletion common/persistence/sql/queue_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"database/sql"
"errors"
"fmt"

commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
Expand Down Expand Up @@ -175,7 +176,7 @@ func (q *queueV2) ReadMessages(
}
messages = append(messages, message)
}
nextPageToken := persistence.GetNextPageTokenForQueueV2(messages)
nextPageToken := persistence.GetNextPageTokenForReadMessages(messages)
response := &persistence.InternalReadMessagesResponse{
Messages: messages,
NextPageToken: nextPageToken,
Expand Down Expand Up @@ -411,3 +412,45 @@ func (q *queueV2) getNextMessageID(ctx context.Context, queueType persistence.Qu
}
return maxMessageID + 1, nil
}

func (q *queueV2) ListQueues(
ctx context.Context,
request *persistence.InternalListQueuesRequest,
) (*persistence.InternalListQueuesResponse, error) {
if request.PageSize <= 0 {
return nil, persistence.ErrNonPositiveListQueuesPageSize
}
offset, err := persistence.GetOffsetForListQueues(request.NextPageToken)
if err != nil {
return nil, err
}
if offset < 0 {
return nil, persistence.ErrNegativeListQueuesOffset
}
rows, err := q.Db.SelectNameFromQueueV2Metadata(ctx, sqlplugin.QueueV2MetadataTypeFilter{
QueueType: request.QueueType,
PageSize: request.PageSize,
PageOffset: offset,
})
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return nil, serviceerror.NewUnavailable(fmt.Sprintf(
"ListQueues failed for type: %v. SelectNameFromQueueV2Metadata operation failed. Error: %v",
request.QueueType,
err),
)
}
var queues []string
for _, row := range rows {
queues = append(queues, row.QueueName)
}
lastReadQueueNumber := offset + int64(len(queues))
var nextPageToken []byte
if len(queues) > 0 {
nextPageToken = persistence.GetNextPageTokenForListQueues(lastReadQueueNumber)
}
response := &persistence.InternalListQueuesResponse{
QueueNames: queues,
NextPageToken: nextPageToken,
}
return response, nil
}
2 changes: 1 addition & 1 deletion common/persistence/sql/sqlplugin/mysql/queue_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (mdb *db) SelectFromQueueV2MetadataForUpdate(ctx context.Context, filter sq

func (mdb *db) SelectNameFromQueueV2Metadata(ctx context.Context, filter sqlplugin.QueueV2MetadataTypeFilter) ([]sqlplugin.QueueV2MetadataRow, error) {
var rows []sqlplugin.QueueV2MetadataRow
err := mdb.conn.GetContext(ctx,
err := mdb.conn.SelectContext(ctx,
&rows,
templateGetNameFromQueueMetadataV2,
filter.QueueType,
Expand Down
2 changes: 1 addition & 1 deletion common/persistence/sql/sqlplugin/postgresql/queue_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (pdb *db) SelectFromQueueV2MetadataForUpdate(ctx context.Context, filter sq

func (pdb *db) SelectNameFromQueueV2Metadata(ctx context.Context, filter sqlplugin.QueueV2MetadataTypeFilter) ([]sqlplugin.QueueV2MetadataRow, error) {
var rows []sqlplugin.QueueV2MetadataRow
err := pdb.conn.GetContext(ctx,
err := pdb.conn.SelectContext(ctx,
&rows,
templateGetNameFromQueueMetadataV2,
filter.QueueType,
Expand Down
2 changes: 1 addition & 1 deletion common/persistence/sql/sqlplugin/sqlite/queue_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (sdb *db) SelectFromQueueV2MetadataForUpdate(ctx context.Context, filter sq
}
func (sdb *db) SelectNameFromQueueV2Metadata(ctx context.Context, filter sqlplugin.QueueV2MetadataTypeFilter) ([]sqlplugin.QueueV2MetadataRow, error) {
var rows []sqlplugin.QueueV2MetadataRow
err := sdb.conn.GetContext(ctx,
err := sdb.conn.SelectContext(ctx,
&rows,
templateGetNameFromQueueMetadataV2,
filter.QueueType,
Expand Down
Loading

0 comments on commit 82f103d

Please sign in to comment.