From 82f103d38eaf317a078a00f812e3e792683c097f Mon Sep 17 00:00:00 2001 From: Prathyush PV Date: Mon, 13 Nov 2023 18:23:37 -0800 Subject: [PATCH] Adding ListQueues for Cassandra and SQL (#5098) **What changed?** Adding ListQueues API to QueueV2 and implementing it for Cassandra and SQL **Why?** This API can be used by operators to list different types of queues including DLQs. **How did you test it?** Unit tests with 100% coverage. **Potential risks** **Is hotfix candidate?** --- api/persistence/v1/queues.pb.go | 294 +++++++++++++++--- .../persistence/cassandra/queue_v2_store.go | 42 ++- common/persistence/client/fault_injection.go | 10 + .../client/fault_injection_test.go | 9 + common/persistence/mock/store_mock.go | 15 + common/persistence/persistence_interface.go | 15 + common/persistence/queue_v2.go | 9 + common/persistence/queue_v2_util.go | 34 +- common/persistence/sql/queue_v2.go | 45 ++- .../sql/sqlplugin/mysql/queue_v2.go | 2 +- .../sql/sqlplugin/postgresql/queue_v2.go | 2 +- .../sql/sqlplugin/sqlite/queue_v2.go | 2 +- .../sql/sqlplugin/tests/queue_v2.go | 69 +++- .../history_task_queue_manager_test_suite.go | 10 + .../persistence/tests/queue_v2_test_suite.go | 126 +++++++- .../server/api/persistence/v1/queues.proto | 4 + 16 files changed, 614 insertions(+), 74 deletions(-) diff --git a/api/persistence/v1/queues.pb.go b/api/persistence/v1/queues.pb.go index 8a708ed9f22..e27216b35f7 100644 --- a/api/persistence/v1/queues.pb.go +++ b/api/persistence/v1/queues.pb.go @@ -290,6 +290,49 @@ func (m *ReadQueueMessagesNextPageToken) GetLastReadMessageId() int64 { return 0 } +type ListQueuesNextPageToken struct { + LastReadQueueNumber int64 `protobuf:"varint,1,opt,name=last_read_queue_number,json=lastReadQueueNumber,proto3" json:"last_read_queue_number,omitempty"` +} + +func (m *ListQueuesNextPageToken) Reset() { *m = ListQueuesNextPageToken{} } +func (*ListQueuesNextPageToken) ProtoMessage() {} +func (*ListQueuesNextPageToken) Descriptor() ([]byte, []int) { + return fileDescriptor_b7fa5f143ac80378, []int{5} +} +func (m *ListQueuesNextPageToken) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *ListQueuesNextPageToken) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_ListQueuesNextPageToken.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *ListQueuesNextPageToken) XXX_Merge(src proto.Message) { + xxx_messageInfo_ListQueuesNextPageToken.Merge(m, src) +} +func (m *ListQueuesNextPageToken) XXX_Size() int { + return m.Size() +} +func (m *ListQueuesNextPageToken) XXX_DiscardUnknown() { + xxx_messageInfo_ListQueuesNextPageToken.DiscardUnknown(m) +} + +var xxx_messageInfo_ListQueuesNextPageToken proto.InternalMessageInfo + +func (m *ListQueuesNextPageToken) GetLastReadQueueNumber() int64 { + if m != nil { + return m.LastReadQueueNumber + } + return 0 +} + // HistoryTask represents an internal history service task for a particular shard. We use a blob because there is no // common proto for all task proto types. type HistoryTask struct { @@ -307,7 +350,7 @@ type HistoryTask struct { func (m *HistoryTask) Reset() { *m = HistoryTask{} } func (*HistoryTask) ProtoMessage() {} func (*HistoryTask) Descriptor() ([]byte, []int) { - return fileDescriptor_b7fa5f143ac80378, []int{5} + return fileDescriptor_b7fa5f143ac80378, []int{6} } func (m *HistoryTask) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -366,7 +409,7 @@ type QueuePartition struct { func (m *QueuePartition) Reset() { *m = QueuePartition{} } func (*QueuePartition) ProtoMessage() {} func (*QueuePartition) Descriptor() ([]byte, []int) { - return fileDescriptor_b7fa5f143ac80378, []int{6} + return fileDescriptor_b7fa5f143ac80378, []int{7} } func (m *QueuePartition) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -410,7 +453,7 @@ type Queue struct { func (m *Queue) Reset() { *m = Queue{} } func (*Queue) ProtoMessage() {} func (*Queue) Descriptor() ([]byte, []int) { - return fileDescriptor_b7fa5f143ac80378, []int{7} + return fileDescriptor_b7fa5f143ac80378, []int{8} } func (m *Queue) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -453,6 +496,7 @@ func init() { proto.RegisterType((*QueueSliceScope)(nil), "temporal.server.api.persistence.v1.QueueSliceScope") proto.RegisterType((*QueueSliceRange)(nil), "temporal.server.api.persistence.v1.QueueSliceRange") proto.RegisterType((*ReadQueueMessagesNextPageToken)(nil), "temporal.server.api.persistence.v1.ReadQueueMessagesNextPageToken") + proto.RegisterType((*ListQueuesNextPageToken)(nil), "temporal.server.api.persistence.v1.ListQueuesNextPageToken") proto.RegisterType((*HistoryTask)(nil), "temporal.server.api.persistence.v1.HistoryTask") proto.RegisterType((*QueuePartition)(nil), "temporal.server.api.persistence.v1.QueuePartition") proto.RegisterType((*Queue)(nil), "temporal.server.api.persistence.v1.Queue") @@ -464,49 +508,51 @@ func init() { } var fileDescriptor_b7fa5f143ac80378 = []byte{ - // 657 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x94, 0xcb, 0x6e, 0xd3, 0x40, - 0x14, 0x86, 0x33, 0x2d, 0x29, 0x30, 0x4d, 0x6f, 0x23, 0x16, 0x25, 0x42, 0x43, 0x64, 0x75, 0x51, - 0x09, 0xe1, 0xa8, 0x17, 0x21, 0x2e, 0x1b, 0x54, 0x81, 0xd4, 0x52, 0x15, 0xa5, 0x6e, 0x25, 0x04, - 0x0b, 0xa2, 0x49, 0x72, 0x94, 0x0c, 0xb1, 0x3d, 0xee, 0xcc, 0x24, 0x24, 0x3b, 0x1e, 0x81, 0xc7, - 0x80, 0x07, 0xe0, 0x1d, 0x58, 0x76, 0x83, 0xd4, 0x15, 0xa2, 0xee, 0x86, 0x65, 0x1f, 0x01, 0x8d, - 0x6f, 0x71, 0x5b, 0x21, 0x5c, 0x76, 0x1e, 0x8f, 0xff, 0xef, 0xff, 0xcf, 0xf8, 0x9c, 0xc1, 0x75, - 0x0d, 0x5e, 0x20, 0x24, 0x73, 0xeb, 0x0a, 0xe4, 0x10, 0x64, 0x9d, 0x05, 0xbc, 0x1e, 0x80, 0x54, - 0x5c, 0x69, 0xf0, 0xdb, 0x50, 0x1f, 0xae, 0xd5, 0x8f, 0x06, 0x30, 0x00, 0x65, 0x07, 0x52, 0x68, - 0x41, 0xac, 0x54, 0x60, 0xc7, 0x02, 0x9b, 0x05, 0xdc, 0xce, 0x09, 0xec, 0xe1, 0x5a, 0x75, 0x25, - 0x83, 0x1a, 0x5a, 0x5b, 0x78, 0x9e, 0xf0, 0x0d, 0xc8, 0x03, 0xa5, 0x58, 0x17, 0x62, 0x52, 0x75, - 0xa3, 0x80, 0x75, 0x20, 0xa1, 0xc3, 0xdb, 0x4c, 0xa7, 0xf6, 0x55, 0xbb, 0x80, 0x48, 0x33, 0xd5, - 0x4f, 0xbe, 0xb7, 0x7e, 0x4e, 0x61, 0xbc, 0x6f, 0xf2, 0x1f, 0x68, 0xa6, 0x81, 0x00, 0x9e, 0x93, - 0xc0, 0x3a, 0x20, 0x9b, 0xca, 0xac, 0xd5, 0x32, 0xaa, 0x4d, 0xaf, 0xce, 0xae, 0x3f, 0xb7, 0xff, - 0x5d, 0x95, 0x3d, 0xc1, 0xd8, 0x4e, 0xc4, 0x88, 0x9e, 0xd5, 0x4b, 0x5f, 0xcb, 0xb1, 0x53, 0x91, - 0xb9, 0x57, 0x44, 0xe2, 0xfb, 0x30, 0x6a, 0xbb, 0x03, 0xc5, 0x87, 0xd0, 0x4c, 0x0c, 0x7b, 0xbc, - 0xdb, 0x6b, 0x7e, 0x64, 0x1a, 0xa4, 0xc7, 0x64, 0x7f, 0x79, 0xaa, 0x86, 0x56, 0x67, 0xd7, 0x1f, - 0x14, 0x31, 0x3e, 0x64, 0xaa, 0xbf, 0x0b, 0x63, 0xe7, 0x5e, 0xc6, 0x8c, 0xfd, 0xb7, 0x79, 0xb7, - 0xf7, 0x26, 0x05, 0x56, 0x07, 0x78, 0xe9, 0x4a, 0x2c, 0xb2, 0x88, 0xa7, 0xfb, 0x30, 0x5e, 0x46, - 0x35, 0xb4, 0x3a, 0xed, 0x98, 0x47, 0xf2, 0x0a, 0x97, 0x87, 0xcc, 0x1d, 0x40, 0x12, 0x60, 0xb3, - 0x70, 0xe5, 0x39, 0xb8, 0x13, 0x23, 0x9e, 0x4e, 0x3d, 0x46, 0x56, 0x13, 0x2f, 0x5e, 0xde, 0x26, - 0xbb, 0x78, 0x46, 0xb5, 0x45, 0x90, 0x1d, 0xef, 0x46, 0xf1, 0xe3, 0x75, 0x79, 0x1b, 0x0e, 0x8c, - 0xd6, 0x49, 0x10, 0xd6, 0x57, 0x84, 0x17, 0x2e, 0xed, 0x91, 0x1d, 0x5c, 0x96, 0xcc, 0xef, 0x42, - 0x54, 0xd8, 0xb5, 0xf9, 0x8e, 0x91, 0x3a, 0x31, 0x81, 0xec, 0xe2, 0xdb, 0x59, 0x93, 0x25, 0x67, - 0xf2, 0xb0, 0x08, 0xae, 0x91, 0x8a, 0x9c, 0x89, 0xde, 0xfa, 0x76, 0x21, 0x6b, 0xe4, 0x43, 0x1a, - 0x78, 0x8e, 0xfb, 0x69, 0x2f, 0x78, 0xdc, 0x4f, 0x32, 0x5f, 0xeb, 0xcf, 0x57, 0x32, 0xc2, 0x1e, - 0xf7, 0x0d, 0x71, 0xd2, 0x5d, 0x1e, 0x1b, 0xfd, 0x4f, 0x2f, 0x55, 0x32, 0xc2, 0x1e, 0x1b, 0x59, - 0xfb, 0x98, 0x9a, 0xff, 0x17, 0x45, 0xdf, 0x8b, 0x87, 0x54, 0xbd, 0x86, 0x91, 0x6e, 0xb0, 0x2e, - 0x1c, 0x8a, 0x3e, 0xf8, 0xa4, 0x8e, 0xef, 0xb8, 0x4c, 0xe9, 0xa8, 0x99, 0x9b, 0xc9, 0x1c, 0x37, - 0x79, 0x27, 0xe9, 0xac, 0x25, 0xb3, 0x67, 0x08, 0x89, 0x78, 0xa7, 0x63, 0xbd, 0xc7, 0xb3, 0xdb, - 0x5c, 0x69, 0x21, 0xc7, 0xc6, 0x92, 0xdc, 0xc5, 0xb7, 0x54, 0x8f, 0xc9, 0x4e, 0xaa, 0x29, 0x3b, - 0x37, 0xa3, 0xf5, 0x4e, 0x87, 0x6c, 0xe2, 0x1b, 0x2d, 0x57, 0xb4, 0x92, 0x2a, 0x6a, 0x93, 0x2a, - 0x4c, 0xfc, 0xf8, 0xf2, 0x30, 0xc9, 0x5f, 0x30, 0xcd, 0xb6, 0x5c, 0xd1, 0x72, 0xa2, 0xaf, 0xad, - 0x47, 0x78, 0x3e, 0x8a, 0xdb, 0x60, 0x52, 0x73, 0xcd, 0x85, 0x4f, 0x56, 0xf0, 0xbc, 0xc7, 0xfd, - 0xab, 0xe1, 0x2a, 0x1e, 0xf7, 0x27, 0xb9, 0x7e, 0x20, 0x5c, 0x8e, 0x84, 0xe4, 0x2d, 0xc6, 0x41, - 0x2a, 0x4e, 0x3b, 0xf5, 0x49, 0xe1, 0x4e, 0xb2, 0x33, 0xe3, 0xe4, 0x06, 0xc8, 0xc1, 0xaa, 0x47, - 0x78, 0xe1, 0xd2, 0x76, 0x7e, 0x12, 0xcb, 0xf1, 0x24, 0x6e, 0x5f, 0x9c, 0xc4, 0xf5, 0xc2, 0xd6, - 0x19, 0x3a, 0x37, 0x87, 0x5b, 0x1f, 0x8e, 0x4f, 0x69, 0xe9, 0xe4, 0x94, 0x96, 0xce, 0x4f, 0x29, - 0xfa, 0x14, 0x52, 0xf4, 0x25, 0xa4, 0xe8, 0x7b, 0x48, 0xd1, 0x71, 0x48, 0xd1, 0xaf, 0x90, 0xa2, - 0xdf, 0x21, 0x2d, 0x9d, 0x87, 0x14, 0x7d, 0x3e, 0xa3, 0xa5, 0xe3, 0x33, 0x5a, 0x3a, 0x39, 0xa3, - 0xa5, 0x77, 0x9b, 0x5d, 0x31, 0xb1, 0xe5, 0xe2, 0xef, 0x97, 0xea, 0xb3, 0xdc, 0xb2, 0x35, 0x13, - 0xdd, 0xad, 0x1b, 0x7f, 0x02, 0x00, 0x00, 0xff, 0xff, 0x74, 0x0d, 0x15, 0xa9, 0x3d, 0x06, 0x00, - 0x00, + // 690 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x94, 0xcb, 0x6e, 0x13, 0x3d, + 0x14, 0xc7, 0xe3, 0xf6, 0x4b, 0x3f, 0x70, 0xd3, 0x9b, 0x41, 0x50, 0x22, 0x64, 0xa2, 0x51, 0x17, + 0x95, 0x10, 0x13, 0xb5, 0xa9, 0x10, 0x97, 0x0d, 0xaa, 0x40, 0x6a, 0x29, 0xad, 0xd2, 0x69, 0x25, + 0x04, 0x0b, 0x46, 0x4e, 0x62, 0x25, 0x26, 0x73, 0xab, 0xed, 0x84, 0x64, 0xc7, 0x23, 0xf0, 0x18, + 0xf0, 0x00, 0xbc, 0x03, 0xcb, 0x6e, 0x90, 0xba, 0x42, 0x74, 0xba, 0x61, 0xd9, 0x47, 0x40, 0xf6, + 0x5c, 0x9b, 0x0a, 0x31, 0x65, 0x37, 0x1e, 0xcf, 0xff, 0xf7, 0x3f, 0xe7, 0xcc, 0x39, 0x07, 0xd6, + 0x25, 0x75, 0x03, 0x9f, 0x13, 0xa7, 0x2e, 0x28, 0x1f, 0x52, 0x5e, 0x27, 0x01, 0xab, 0x07, 0x94, + 0x0b, 0x26, 0x24, 0xf5, 0xda, 0xb4, 0x3e, 0x5c, 0xab, 0x1f, 0x0d, 0xe8, 0x80, 0x0a, 0x33, 0xe0, + 0xbe, 0xf4, 0x91, 0x91, 0x08, 0xcc, 0x48, 0x60, 0x92, 0x80, 0x99, 0x39, 0x81, 0x39, 0x5c, 0xab, + 0xae, 0xa4, 0x50, 0x45, 0x6b, 0xfb, 0xae, 0xeb, 0x7b, 0x0a, 0xe4, 0x52, 0x21, 0x48, 0x97, 0x46, + 0xa4, 0x6a, 0xa3, 0x80, 0x75, 0xc0, 0x69, 0x87, 0xb5, 0x89, 0x4c, 0xec, 0xab, 0x66, 0x01, 0x91, + 0x24, 0xa2, 0x1f, 0x7f, 0x6f, 0xfc, 0x98, 0x82, 0x70, 0x5f, 0xc5, 0x7f, 0x20, 0x89, 0xa4, 0x88, + 0xc2, 0x39, 0x4e, 0x49, 0x87, 0x72, 0x5b, 0xa8, 0xb3, 0x58, 0x06, 0xb5, 0xe9, 0xd5, 0xd9, 0xf5, + 0x67, 0xe6, 0xdf, 0xb3, 0x32, 0x33, 0x8c, 0x69, 0x69, 0x86, 0x7e, 0x16, 0x2f, 0x3c, 0xc9, 0xc7, + 0x56, 0x85, 0xe7, 0x5e, 0x21, 0x0e, 0xef, 0xd1, 0x51, 0xdb, 0x19, 0x08, 0x36, 0xa4, 0x76, 0x6c, + 0xd8, 0x63, 0xdd, 0x9e, 0xfd, 0x81, 0x48, 0xca, 0x5d, 0xc2, 0xfb, 0xcb, 0x53, 0x35, 0xb0, 0x3a, + 0xbb, 0x7e, 0xbf, 0x88, 0xf1, 0x21, 0x11, 0xfd, 0x1d, 0x3a, 0xb6, 0xee, 0xa6, 0xcc, 0xc8, 0x7f, + 0x8b, 0x75, 0x7b, 0xaf, 0x13, 0x60, 0x75, 0x00, 0x97, 0x2e, 0x85, 0x85, 0x16, 0xe1, 0x74, 0x9f, + 0x8e, 0x97, 0x41, 0x0d, 0xac, 0x4e, 0x5b, 0xea, 0x11, 0xbd, 0x84, 0xe5, 0x21, 0x71, 0x06, 0x34, + 0x0e, 0x60, 0xa3, 0x70, 0xe6, 0x39, 0xb8, 0x15, 0x21, 0x9e, 0x4c, 0x3d, 0x02, 0x86, 0x0d, 0x17, + 0x27, 0xaf, 0xd1, 0x0e, 0x9c, 0x11, 0x6d, 0x3f, 0x48, 0xcb, 0xdb, 0x28, 0x5e, 0x5e, 0x87, 0xb5, + 0xe9, 0x81, 0xd2, 0x5a, 0x31, 0xc2, 0xf8, 0x02, 0xe0, 0xc2, 0xc4, 0x1d, 0xda, 0x86, 0x65, 0x4e, + 0xbc, 0x2e, 0xd5, 0x89, 0x5d, 0x99, 0x6f, 0x29, 0xa9, 0x15, 0x11, 0xd0, 0x0e, 0xbc, 0x9e, 0x36, + 0x59, 0x5c, 0x93, 0x07, 0x45, 0x70, 0xcd, 0x44, 0x64, 0x65, 0x7a, 0xe3, 0xeb, 0x85, 0x58, 0xb5, + 0x0f, 0x6a, 0xc2, 0x39, 0xe6, 0x25, 0xbd, 0xe0, 0x32, 0x2f, 0x8e, 0xf9, 0x4a, 0x7f, 0xbe, 0x92, + 0x12, 0x76, 0x99, 0xa7, 0x88, 0x59, 0x77, 0xb9, 0x64, 0xf4, 0x2f, 0xbd, 0x54, 0x49, 0x09, 0xbb, + 0x64, 0x64, 0xec, 0x43, 0xac, 0xfe, 0x9f, 0x0e, 0x7d, 0x37, 0x1a, 0x52, 0xb1, 0x47, 0x47, 0xb2, + 0x49, 0xba, 0xf4, 0xd0, 0xef, 0x53, 0x0f, 0xd5, 0xe1, 0x4d, 0x87, 0x08, 0xa9, 0x9b, 0xd9, 0x8e, + 0xe7, 0xd8, 0x66, 0x9d, 0xb8, 0xb3, 0x96, 0xd4, 0x9d, 0x22, 0xc4, 0xe2, 0xed, 0x8e, 0xb1, 0x07, + 0x6f, 0xbf, 0x62, 0x42, 0x6a, 0xe4, 0x04, 0xab, 0x01, 0x6f, 0x65, 0x2c, 0xbd, 0x5c, 0x6c, 0x6f, + 0xe0, 0xb6, 0x28, 0x8f, 0x69, 0x37, 0x12, 0x9a, 0x16, 0xef, 0xe9, 0x2b, 0xe3, 0x1d, 0x9c, 0xdd, + 0x62, 0x42, 0xfa, 0x7c, 0xac, 0x52, 0x40, 0x77, 0xe0, 0x35, 0xd1, 0x23, 0xbc, 0x93, 0xc4, 0x50, + 0xb6, 0xfe, 0xd7, 0xe7, 0xed, 0x0e, 0xda, 0x80, 0xff, 0xb5, 0x1c, 0xbf, 0x15, 0x57, 0xa5, 0x96, + 0x55, 0x45, 0x95, 0x23, 0x5a, 0x46, 0xaa, 0x12, 0xcf, 0x89, 0x24, 0x9b, 0x8e, 0xdf, 0xb2, 0xf4, + 0xd7, 0xc6, 0x43, 0x38, 0xaf, 0xed, 0x9a, 0x84, 0x4b, 0x26, 0x99, 0xef, 0xa1, 0x15, 0x38, 0xef, + 0x32, 0xef, 0x72, 0xb2, 0x15, 0x97, 0x79, 0x59, 0x9e, 0xdf, 0x01, 0x2c, 0x6b, 0x21, 0x7a, 0x03, + 0x61, 0x90, 0x88, 0x93, 0xce, 0x7f, 0x5c, 0xb8, 0x33, 0xcd, 0xd4, 0x38, 0xde, 0x28, 0x39, 0x58, + 0xf5, 0x08, 0x2e, 0x4c, 0x5c, 0xe7, 0x27, 0xbb, 0x1c, 0x4d, 0xf6, 0xd6, 0xc5, 0xc9, 0x5e, 0x2f, + 0x6c, 0x9d, 0xa2, 0x73, 0x73, 0xbd, 0xf9, 0xfe, 0xf8, 0x14, 0x97, 0x4e, 0x4e, 0x71, 0xe9, 0xfc, + 0x14, 0x83, 0x8f, 0x21, 0x06, 0x9f, 0x43, 0x0c, 0xbe, 0x85, 0x18, 0x1c, 0x87, 0x18, 0xfc, 0x0c, + 0x31, 0xf8, 0x15, 0xe2, 0xd2, 0x79, 0x88, 0xc1, 0xa7, 0x33, 0x5c, 0x3a, 0x3e, 0xc3, 0xa5, 0x93, + 0x33, 0x5c, 0x7a, 0xbb, 0xd1, 0xf5, 0x33, 0x5b, 0xe6, 0xff, 0x79, 0x49, 0x3f, 0xcd, 0x1d, 0x5b, + 0x33, 0x7a, 0x57, 0x37, 0x7e, 0x07, 0x00, 0x00, 0xff, 0xff, 0xce, 0x36, 0x5c, 0x3b, 0x8d, 0x06, + 0x00, 0x00, } func (this *QueueState) Equal(that interface{}) bool { @@ -648,6 +694,30 @@ func (this *ReadQueueMessagesNextPageToken) Equal(that interface{}) bool { } return true } +func (this *ListQueuesNextPageToken) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*ListQueuesNextPageToken) + if !ok { + that2, ok := that.(ListQueuesNextPageToken) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.LastReadQueueNumber != that1.LastReadQueueNumber { + return false + } + return true +} func (this *HistoryTask) Equal(that interface{}) bool { if that == nil { return this == nil @@ -805,6 +875,16 @@ func (this *ReadQueueMessagesNextPageToken) GoString() string { s = append(s, "}") return strings.Join(s, "") } +func (this *ListQueuesNextPageToken) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 5) + s = append(s, "&persistence.ListQueuesNextPageToken{") + s = append(s, "LastReadQueueNumber: "+fmt.Sprintf("%#v", this.LastReadQueueNumber)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} func (this *HistoryTask) GoString() string { if this == nil { return "nil" @@ -1076,6 +1156,34 @@ func (m *ReadQueueMessagesNextPageToken) MarshalToSizedBuffer(dAtA []byte) (int, return len(dAtA) - i, nil } +func (m *ListQueuesNextPageToken) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *ListQueuesNextPageToken) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *ListQueuesNextPageToken) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.LastReadQueueNumber != 0 { + i = encodeVarintQueues(dAtA, i, uint64(m.LastReadQueueNumber)) + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + func (m *HistoryTask) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -1289,6 +1397,18 @@ func (m *ReadQueueMessagesNextPageToken) Size() (n int) { return n } +func (m *ListQueuesNextPageToken) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.LastReadQueueNumber != 0 { + n += 1 + sovQueues(uint64(m.LastReadQueueNumber)) + } + return n +} + func (m *HistoryTask) Size() (n int) { if m == nil { return 0 @@ -1413,6 +1533,16 @@ func (this *ReadQueueMessagesNextPageToken) String() string { }, "") return s } +func (this *ListQueuesNextPageToken) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&ListQueuesNextPageToken{`, + `LastReadQueueNumber:` + fmt.Sprintf("%v", this.LastReadQueueNumber) + `,`, + `}`, + }, "") + return s +} func (this *HistoryTask) String() string { if this == nil { return "nil" @@ -2075,6 +2205,78 @@ func (m *ReadQueueMessagesNextPageToken) Unmarshal(dAtA []byte) error { } return nil } +func (m *ListQueuesNextPageToken) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQueues + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ListQueuesNextPageToken: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ListQueuesNextPageToken: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field LastReadQueueNumber", wireType) + } + m.LastReadQueueNumber = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQueues + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.LastReadQueueNumber |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipQueues(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthQueues + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthQueues + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func (m *HistoryTask) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 diff --git a/common/persistence/cassandra/queue_v2_store.go b/common/persistence/cassandra/queue_v2_store.go index 7a098d2d147..ee451934f9d 100644 --- a/common/persistence/cassandra/queue_v2_store.go +++ b/common/persistence/cassandra/queue_v2_store.go @@ -30,7 +30,6 @@ import ( commonpb "go.temporal.io/api/common/v1" "go.temporal.io/api/enums/v1" - persistencespb "go.temporal.io/server/api/persistence/v1" "go.temporal.io/server/common/log" "go.temporal.io/server/common/persistence" @@ -61,13 +60,8 @@ const ( TemplateGetQueueQuery = `SELECT metadata_payload, metadata_encoding, version FROM queues WHERE queue_type = ? AND queue_name = ?` TemplateRangeDeleteMessagesQuery = `DELETE FROM queue_messages WHERE queue_type = ? AND queue_name = ? AND queue_partition = ? AND message_id >= ? AND message_id <= ?` TemplateUpdateQueueMetadataQuery = `UPDATE queues SET metadata_payload = ?, metadata_encoding = ?, version = ? WHERE queue_type = ? AND queue_name = ? IF version = ?` - - // pageTokenPrefixByte is the first byte of the serialized page token. It's used to ensure that the page token is - // not empty. Without this, if the last_read_message_id is 0, the serialized page token would be empty, and clients - // could erroneously assume that there are no more messages beyond the first page. This is purely used to ensure - // that tokens are non-empty; it is not used to verify that the token is valid like the magic byte in some other - // protocols. - pageTokenPrefixByte = 0 + // We will have to ALLOW FILTERING for this query since partition key consists of both queue_type and queue_name. + templateGetQueueNamesQuery = `SELECT queue_name FROM queues WHERE queue_type =? ALLOW FILTERING` ) var ( @@ -221,8 +215,7 @@ func (s *queueV2Store) ReadMessages( return nil, gocql.ConvertError("QueueV2ReadMessages", err) } - nextPageToken := persistence.GetNextPageTokenForQueueV2(messages) - + nextPageToken := persistence.GetNextPageTokenForReadMessages(messages) return &persistence.InternalReadMessagesResponse{ Messages: messages, NextPageToken: nextPageToken, @@ -474,3 +467,32 @@ func (s *queueV2Store) getMaxMessageID(ctx context.Context, queueType persistenc } return maxMessageID, true, nil } + +func (s *queueV2Store) ListQueues( + ctx context.Context, + request *persistence.InternalListQueuesRequest, +) (*persistence.InternalListQueuesResponse, error) { + if request.PageSize <= 0 { + return nil, persistence.ErrNonPositiveListQueuesPageSize + } + iter := s.session.Query( + templateGetQueueNamesQuery, + request.QueueType, + ).PageSize(request.PageSize).PageState(request.NextPageToken).WithContext(ctx).Iter() + + var queues []string + for { + var queue string + if !iter.Scan(&queue) { + break + } + queues = append(queues, queue) + } + if err := iter.Close(); err != nil { + return nil, gocql.ConvertError("QueueV2ListQueues", err) + } + return &persistence.InternalListQueuesResponse{ + QueueNames: queues, + NextPageToken: iter.PageState(), + }, nil +} diff --git a/common/persistence/client/fault_injection.go b/common/persistence/client/fault_injection.go index 5105be904d6..56660c21918 100644 --- a/common/persistence/client/fault_injection.go +++ b/common/persistence/client/fault_injection.go @@ -496,6 +496,16 @@ func (f *FaultInjectionQueueV2) RangeDeleteMessages( return f.baseQueue.RangeDeleteMessages(ctx, request) } +func (f *FaultInjectionQueueV2) ListQueues( + ctx context.Context, + request *persistence.InternalListQueuesRequest, +) (*persistence.InternalListQueuesResponse, error) { + if err := f.ErrorGenerator.Generate(); err != nil { + return nil, err + } + return f.baseQueue.ListQueues(ctx, request) +} + func NewFaultInjectionExecutionStore( rate float64, executionStore persistence.ExecutionStore, diff --git a/common/persistence/client/fault_injection_test.go b/common/persistence/client/fault_injection_test.go index 64781cb8473..e7b52b048ea 100644 --- a/common/persistence/client/fault_injection_test.go +++ b/common/persistence/client/fault_injection_test.go @@ -51,6 +51,7 @@ type ( readRequests *int createRequests *int rangeDeleteRequests *int + listQueuesRequests *int } ) @@ -86,6 +87,14 @@ func (t *testQueueV2) RangeDeleteMessages( return nil, nil } +func (t *testQueueV2) ListQueues( + context.Context, + *persistence.InternalListQueuesRequest, +) (*persistence.InternalListQueuesResponse, error) { + *t.listQueuesRequests++ + return nil, nil +} + func (t *testDataStoreFactory) NewQueueV2() (persistence.QueueV2, error) { if t.err != nil { return nil, t.err diff --git a/common/persistence/mock/store_mock.go b/common/persistence/mock/store_mock.go index 5067fa2e74a..fe71e02ac19 100644 --- a/common/persistence/mock/store_mock.go +++ b/common/persistence/mock/store_mock.go @@ -1437,6 +1437,21 @@ func (mr *MockQueueV2MockRecorder) EnqueueMessage(ctx, request interface{}) *gom return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EnqueueMessage", reflect.TypeOf((*MockQueueV2)(nil).EnqueueMessage), ctx, request) } +// ListQueues mocks base method. +func (m *MockQueueV2) ListQueues(ctx context.Context, request *persistence.InternalListQueuesRequest) (*persistence.InternalListQueuesResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ListQueues", ctx, request) + ret0, _ := ret[0].(*persistence.InternalListQueuesResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListQueues indicates an expected call of ListQueues. +func (mr *MockQueueV2MockRecorder) ListQueues(ctx, request interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListQueues", reflect.TypeOf((*MockQueueV2)(nil).ListQueues), ctx, request) +} + // RangeDeleteMessages mocks base method. func (m *MockQueueV2) RangeDeleteMessages(ctx context.Context, request *persistence.InternalRangeDeleteMessagesRequest) (*persistence.InternalRangeDeleteMessagesResponse, error) { m.ctrl.T.Helper() diff --git a/common/persistence/persistence_interface.go b/common/persistence/persistence_interface.go index 2d2d47ca822..5e9d1f54af7 100644 --- a/common/persistence/persistence_interface.go +++ b/common/persistence/persistence_interface.go @@ -760,6 +760,10 @@ type ( ctx context.Context, request *InternalRangeDeleteMessagesRequest, ) (*InternalRangeDeleteMessagesResponse, error) + ListQueues( + ctx context.Context, + request *InternalListQueuesRequest, + ) (*InternalListQueuesResponse, error) } QueueV2Type int @@ -814,4 +818,15 @@ type ( InternalRangeDeleteMessagesResponse struct { MessagesDeleted int64 } + + InternalListQueuesRequest struct { + QueueType QueueV2Type + PageSize int + NextPageToken []byte + } + + InternalListQueuesResponse struct { + QueueNames []string + NextPageToken []byte + } ) diff --git a/common/persistence/queue_v2.go b/common/persistence/queue_v2.go index 1ab4acacab7..ea851547a2b 100644 --- a/common/persistence/queue_v2.go +++ b/common/persistence/queue_v2.go @@ -43,12 +43,21 @@ var ( ErrInvalidReadQueueMessagesNextPageToken = &InvalidPersistenceRequestError{ Msg: "invalid next-page token for reading queue messages", } + ErrInvalidListQueuesNextPageToken = &InvalidPersistenceRequestError{ + Msg: "invalid next-page token for listing queues", + } ErrNonPositiveReadQueueMessagesPageSize = &InvalidPersistenceRequestError{ Msg: "non-positive page size for reading queue messages", } ErrInvalidQueueRangeDeleteMaxMessageID = &InvalidPersistenceRequestError{ Msg: "max message id for queue range delete is invalid", } + ErrNonPositiveListQueuesPageSize = &InvalidPersistenceRequestError{ + Msg: "non-positive page size for listing queues", + } + ErrNegativeListQueuesOffset = &InvalidPersistenceRequestError{ + Msg: "negative offset for listing queues", + } ) func NewQueueNotFoundError(queueType QueueV2Type, queueName string) error { diff --git a/common/persistence/queue_v2_util.go b/common/persistence/queue_v2_util.go index d81bce9d1a4..d82306d6ac1 100644 --- a/common/persistence/queue_v2_util.go +++ b/common/persistence/queue_v2_util.go @@ -41,7 +41,7 @@ const ( pageTokenPrefixByte = 0 ) -func GetNextPageTokenForQueueV2(result []QueueV2Message) []byte { +func GetNextPageTokenForReadMessages(result []QueueV2Message) []byte { if len(result) == 0 { return nil } @@ -84,6 +84,38 @@ func GetMinMessageIDToReadForQueueV2( return token.LastReadMessageId + 1, nil } +func GetNextPageTokenForListQueues(queueNumber int64) []byte { + token := &persistencespb.ListQueuesNextPageToken{ + LastReadQueueNumber: queueNumber, + } + // This can never fail if you inspect the implementation. + b, _ := token.Marshal() + + // See the comment above pageTokenPrefixByte for why we want to do this. + return append([]byte{pageTokenPrefixByte}, b...) +} + +func GetOffsetForListQueues( + nextPageToken []byte, +) (int64, error) { + if len(nextPageToken) == 0 { + return 0, nil + } + var token persistencespb.ListQueuesNextPageToken + + // Skip the first byte. See the comment on pageTokenPrefixByte for more details. + err := token.Unmarshal(nextPageToken[1:]) + if err != nil { + return 0, fmt.Errorf( + "%w: %q: %v", + ErrInvalidReadQueueMessagesNextPageToken, + nextPageToken, + err, + ) + } + return token.LastReadQueueNumber, nil +} + func GetPartitionForQueueV2( queueType QueueV2Type, queueName string, diff --git a/common/persistence/sql/queue_v2.go b/common/persistence/sql/queue_v2.go index 09a2746273f..947e455a5f4 100644 --- a/common/persistence/sql/queue_v2.go +++ b/common/persistence/sql/queue_v2.go @@ -29,6 +29,7 @@ import ( "database/sql" "errors" "fmt" + commonpb "go.temporal.io/api/common/v1" "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" @@ -175,7 +176,7 @@ func (q *queueV2) ReadMessages( } messages = append(messages, message) } - nextPageToken := persistence.GetNextPageTokenForQueueV2(messages) + nextPageToken := persistence.GetNextPageTokenForReadMessages(messages) response := &persistence.InternalReadMessagesResponse{ Messages: messages, NextPageToken: nextPageToken, @@ -411,3 +412,45 @@ func (q *queueV2) getNextMessageID(ctx context.Context, queueType persistence.Qu } return maxMessageID + 1, nil } + +func (q *queueV2) ListQueues( + ctx context.Context, + request *persistence.InternalListQueuesRequest, +) (*persistence.InternalListQueuesResponse, error) { + if request.PageSize <= 0 { + return nil, persistence.ErrNonPositiveListQueuesPageSize + } + offset, err := persistence.GetOffsetForListQueues(request.NextPageToken) + if err != nil { + return nil, err + } + if offset < 0 { + return nil, persistence.ErrNegativeListQueuesOffset + } + rows, err := q.Db.SelectNameFromQueueV2Metadata(ctx, sqlplugin.QueueV2MetadataTypeFilter{ + QueueType: request.QueueType, + PageSize: request.PageSize, + PageOffset: offset, + }) + if err != nil && !errors.Is(err, sql.ErrNoRows) { + return nil, serviceerror.NewUnavailable(fmt.Sprintf( + "ListQueues failed for type: %v. SelectNameFromQueueV2Metadata operation failed. Error: %v", + request.QueueType, + err), + ) + } + var queues []string + for _, row := range rows { + queues = append(queues, row.QueueName) + } + lastReadQueueNumber := offset + int64(len(queues)) + var nextPageToken []byte + if len(queues) > 0 { + nextPageToken = persistence.GetNextPageTokenForListQueues(lastReadQueueNumber) + } + response := &persistence.InternalListQueuesResponse{ + QueueNames: queues, + NextPageToken: nextPageToken, + } + return response, nil +} diff --git a/common/persistence/sql/sqlplugin/mysql/queue_v2.go b/common/persistence/sql/sqlplugin/mysql/queue_v2.go index b547566e62f..07f2ac9f11b 100644 --- a/common/persistence/sql/sqlplugin/mysql/queue_v2.go +++ b/common/persistence/sql/sqlplugin/mysql/queue_v2.go @@ -89,7 +89,7 @@ func (mdb *db) SelectFromQueueV2MetadataForUpdate(ctx context.Context, filter sq func (mdb *db) SelectNameFromQueueV2Metadata(ctx context.Context, filter sqlplugin.QueueV2MetadataTypeFilter) ([]sqlplugin.QueueV2MetadataRow, error) { var rows []sqlplugin.QueueV2MetadataRow - err := mdb.conn.GetContext(ctx, + err := mdb.conn.SelectContext(ctx, &rows, templateGetNameFromQueueMetadataV2, filter.QueueType, diff --git a/common/persistence/sql/sqlplugin/postgresql/queue_v2.go b/common/persistence/sql/sqlplugin/postgresql/queue_v2.go index 727f1fa1880..d4a90e9831a 100644 --- a/common/persistence/sql/sqlplugin/postgresql/queue_v2.go +++ b/common/persistence/sql/sqlplugin/postgresql/queue_v2.go @@ -89,7 +89,7 @@ func (pdb *db) SelectFromQueueV2MetadataForUpdate(ctx context.Context, filter sq func (pdb *db) SelectNameFromQueueV2Metadata(ctx context.Context, filter sqlplugin.QueueV2MetadataTypeFilter) ([]sqlplugin.QueueV2MetadataRow, error) { var rows []sqlplugin.QueueV2MetadataRow - err := pdb.conn.GetContext(ctx, + err := pdb.conn.SelectContext(ctx, &rows, templateGetNameFromQueueMetadataV2, filter.QueueType, diff --git a/common/persistence/sql/sqlplugin/sqlite/queue_v2.go b/common/persistence/sql/sqlplugin/sqlite/queue_v2.go index 8639ec765e3..b68447f8782 100644 --- a/common/persistence/sql/sqlplugin/sqlite/queue_v2.go +++ b/common/persistence/sql/sqlplugin/sqlite/queue_v2.go @@ -75,7 +75,7 @@ func (sdb *db) SelectFromQueueV2MetadataForUpdate(ctx context.Context, filter sq } func (sdb *db) SelectNameFromQueueV2Metadata(ctx context.Context, filter sqlplugin.QueueV2MetadataTypeFilter) ([]sqlplugin.QueueV2MetadataRow, error) { var rows []sqlplugin.QueueV2MetadataRow - err := sdb.conn.GetContext(ctx, + err := sdb.conn.SelectContext(ctx, &rows, templateGetNameFromQueueMetadataV2, filter.QueueType, diff --git a/common/persistence/sql/sqlplugin/tests/queue_v2.go b/common/persistence/sql/sqlplugin/tests/queue_v2.go index 5b45eb851ed..13282b097f0 100644 --- a/common/persistence/sql/sqlplugin/tests/queue_v2.go +++ b/common/persistence/sql/sqlplugin/tests/queue_v2.go @@ -58,22 +58,24 @@ var ( ErrInsertMetadataFailed = errors.New("insertMetadataFailed") ErrRangeDeleteFailed = errors.New("rangeDeleteFailed") ErrUpdateMetadataFailed = errors.New("updateMetadataFailed") + ErrSelectQueueNames = errors.New("selectQueueNamesFailed") ) type ( faultyDB struct { sqlplugin.DB - getLastMessageIdErr error - txBeginErr error - txCommitErr error - insertErr error - txRollbackErr error - rangeSelectError error - selectMetadataError error - insertMetadataError error - rangeDeleteError error - updateMetadataError error - commitCalls int + getLastMessageIdErr error + txBeginErr error + txCommitErr error + insertErr error + txRollbackErr error + rangeSelectError error + selectMetadataError error + insertMetadataError error + rangeDeleteError error + updateMetadataError error + selectQueueNamesError error + commitCalls int } faultyTx struct { db *faultyDB @@ -130,6 +132,13 @@ func (db *faultyDB) SelectFromQueueV2Metadata(ctx context.Context, filter sqlplu return db.DB.SelectFromQueueV2Metadata(ctx, filter) } +func (db *faultyDB) SelectNameFromQueueV2Metadata(ctx context.Context, filter sqlplugin.QueueV2MetadataTypeFilter) ([]sqlplugin.QueueV2MetadataRow, error) { + if db.selectQueueNamesError != nil { + return nil, db.selectQueueNamesError + } + return db.DB.SelectNameFromQueueV2Metadata(ctx, filter) +} + func (db *faultyDB) SelectFromQueueV2MetadataForUpdate(ctx context.Context, filter sqlplugin.QueueV2MetadataFilter) (*sqlplugin.QueueV2MetadataRow, error) { if db.selectMetadataError != nil { return &sqlplugin.QueueV2MetadataRow{}, db.selectMetadataError @@ -254,6 +263,14 @@ func RunSQLQueueV2TestSuite(t *testing.T, baseDB sqlplugin.DB) { t.Parallel() testSelectMetadataFails(ctx, t, baseDB) }) + t.Run("SelectNameFromQueueV2MetadataFails", func(t *testing.T) { + t.Parallel() + testSelectNameFromQueueV2MetadataFails(ctx, t, baseDB) + }) + t.Run("SelectNameFromQueueV2NegativeToken", func(t *testing.T) { + t.Parallel() + testSelectNameFromQueueV2NegativeToken(ctx, t, baseDB) + }) } @@ -612,3 +629,33 @@ func testRangeDeleteActuallyDeletes(ctx context.Context, t *testing.T, db sqlplu require.NoError(t, err) assert.Equal(t, int64(persistence.FirstQueueMessageID+3), response.Metadata.ID) } + +func testSelectNameFromQueueV2MetadataFails(ctx context.Context, t *testing.T, baseDB sqlplugin.DB) { + queueType := persistence.QueueTypeHistoryDLQ + db := &faultyDB{ + DB: baseDB, + selectQueueNamesError: ErrSelectQueueNames, + } + logger := &logRecorder{Logger: log.NewTestLogger()} + q := persistencesql.NewQueueV2(db, logger) + _, err := q.ListQueues(ctx, &persistence.InternalListQueuesRequest{ + QueueType: queueType, + PageSize: 10, + NextPageToken: nil, + }) + assert.Error(t, err) + assert.ErrorContains(t, err, "SelectNameFromQueueV2Metadata operation failed") +} + +func testSelectNameFromQueueV2NegativeToken(ctx context.Context, t *testing.T, baseDB sqlplugin.DB) { + queueType := persistence.QueueTypeHistoryDLQ + logger := &logRecorder{Logger: log.NewTestLogger()} + q := persistencesql.NewQueueV2(baseDB, logger) + _, err := q.ListQueues(ctx, &persistence.InternalListQueuesRequest{ + QueueType: queueType, + PageSize: 1, + NextPageToken: persistence.GetNextPageTokenForListQueues(-1), + }) + require.Error(t, err) + require.ErrorIs(t, err, persistence.ErrNegativeListQueuesOffset) +} diff --git a/common/persistence/tests/history_task_queue_manager_test_suite.go b/common/persistence/tests/history_task_queue_manager_test_suite.go index a28669b11c7..396812bfcdc 100644 --- a/common/persistence/tests/history_task_queue_manager_test_suite.go +++ b/common/persistence/tests/history_task_queue_manager_test_suite.go @@ -95,6 +95,16 @@ func (q faultyQueue) RangeDeleteMessages( return q.base.RangeDeleteMessages(ctx, req) } +func (q faultyQueue) ListQueues( + ctx context.Context, + req *persistence.InternalListQueuesRequest, +) (*persistence.InternalListQueuesResponse, error) { + if q.rangeDeleteMessagesErr != nil { + return nil, q.rangeDeleteMessagesErr + } + return q.base.ListQueues(ctx, req) +} + // RunHistoryTaskQueueManagerTestSuite runs all tests for the history task queue manager against a given queue provided by a // particular database. This test suite should be re-used to test all queue implementations. func RunHistoryTaskQueueManagerTestSuite(t *testing.T, queue persistence.QueueV2) { diff --git a/common/persistence/tests/queue_v2_test_suite.go b/common/persistence/tests/queue_v2_test_suite.go index fb1ffc16901..8a82c087d4b 100644 --- a/common/persistence/tests/queue_v2_test_suite.go +++ b/common/persistence/tests/queue_v2_test_suite.go @@ -33,7 +33,6 @@ import ( "github.com/stretchr/testify/require" "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" - "go.temporal.io/server/common/persistence/persistencetest" "go.temporal.io/server/common/persistence/serialization" "go.temporal.io/server/common/persistence/sql/sqlplugin/tests" @@ -55,7 +54,9 @@ func RunQueueV2TestSuite(t *testing.T, q persistence.QueueV2) { QueueName: queueName, }) require.NoError(t, err) - + t.Run("TestListQueues", func(t *testing.T) { + testListQueues(ctx, t, q) + }) t.Run("TestHappyPath", func(t *testing.T) { t.Parallel() @@ -412,3 +413,124 @@ func RunQueueV2TestSuiteForSQL(t *testing.T, factory *sql.Factory) { tests.RunSQLQueueV2TestSuite(t, db) }) } + +func testListQueues(ctx context.Context, t *testing.T, queue persistence.QueueV2) { + t.Run("HappyPath", func(t *testing.T) { + // ListQueues when empty + queueType := persistence.QueueTypeHistoryDLQ + response, err := queue.ListQueues(ctx, &persistence.InternalListQueuesRequest{ + QueueType: queueType, + PageSize: 10, + NextPageToken: nil, + }) + require.NoError(t, err) + require.Equal(t, 0, len(response.QueueNames)) + + // List of all created queues + var queueNames []string + + // List one queue. + queueName := "test-queue-" + t.Name() + "first" + queueNames = append(queueNames, queueName) + _, err = queue.CreateQueue(ctx, &persistence.InternalCreateQueueRequest{ + QueueType: queueType, + QueueName: queueName, + }) + require.NoError(t, err) + response, err = queue.ListQueues(ctx, &persistence.InternalListQueuesRequest{ + QueueType: queueType, + PageSize: 10, + NextPageToken: nil, + }) + require.NoError(t, err) + require.Equal(t, 1, len(response.QueueNames)) + require.Equal(t, queueName, response.QueueNames[0]) + + // List multiple queues. + queueName = "test-queue-" + t.Name() + "second" + queueNames = append(queueNames, queueName) + _, err = queue.CreateQueue(ctx, &persistence.InternalCreateQueueRequest{ + QueueType: queueType, + QueueName: queueName, + }) + require.NoError(t, err) + response, err = queue.ListQueues(ctx, &persistence.InternalListQueuesRequest{ + QueueType: queueType, + PageSize: 10, + NextPageToken: nil, + }) + require.NoError(t, err) + require.Equal(t, 2, len(response.QueueNames)) + require.Contains(t, response.QueueNames, queueName) + + // List multiple queues in pages. + for i := 0; i < 3; i++ { + queueNames = append(queueNames, "test-queue-"+t.Name()+strconv.Itoa(i)) + } + for _, queueName := range queueNames[2:] { + _, err := queue.CreateQueue(ctx, &persistence.InternalCreateQueueRequest{ + QueueType: queueType, + QueueName: queueName, + }) + require.NoError(t, err) + } + var listedQueueNames []string + response, err = queue.ListQueues(ctx, &persistence.InternalListQueuesRequest{ + QueueType: queueType, + PageSize: 1, + NextPageToken: nil, + }) + require.NoError(t, err) + require.Equal(t, 1, len(response.QueueNames)) + listedQueueNames = append(listedQueueNames, response.QueueNames...) + response, err = queue.ListQueues(ctx, &persistence.InternalListQueuesRequest{ + QueueType: queueType, + PageSize: 1, + NextPageToken: response.NextPageToken, + }) + require.NoError(t, err) + require.Equal(t, 1, len(response.QueueNames)) + listedQueueNames = append(listedQueueNames, response.QueueNames...) + response, err = queue.ListQueues(ctx, &persistence.InternalListQueuesRequest{ + QueueType: queueType, + PageSize: 3, + NextPageToken: response.NextPageToken, + }) + require.NoError(t, err) + require.Equal(t, 3, len(response.QueueNames)) + listedQueueNames = append(listedQueueNames, response.QueueNames...) + response, err = queue.ListQueues(ctx, &persistence.InternalListQueuesRequest{ + QueueType: queueType, + PageSize: 1, + NextPageToken: response.NextPageToken, + }) + require.NoError(t, err) + require.Equal(t, 0, len(response.QueueNames)) + require.Empty(t, response.NextPageToken) + for _, queueName := range queueNames { + require.Contains(t, listedQueueNames, queueName) + + } + }) + t.Run("NegativePageSize", func(t *testing.T) { + t.Parallel() + queueType := persistence.QueueTypeHistoryDLQ + _, err := queue.ListQueues(ctx, &persistence.InternalListQueuesRequest{ + QueueType: queueType, + PageSize: -1, + NextPageToken: nil, + }) + require.Error(t, err) + require.ErrorIs(t, err, persistence.ErrNonPositiveListQueuesPageSize) + }) + t.Run("InvalidPageToken", func(t *testing.T) { + t.Parallel() + queueType := persistence.QueueTypeHistoryDLQ + _, err := queue.ListQueues(ctx, &persistence.InternalListQueuesRequest{ + QueueType: queueType, + PageSize: 1, + NextPageToken: []byte("some invalid token"), + }) + assert.Error(t, err) + }) +} diff --git a/proto/internal/temporal/server/api/persistence/v1/queues.proto b/proto/internal/temporal/server/api/persistence/v1/queues.proto index 96ae75487fa..f5a1f2929ca 100644 --- a/proto/internal/temporal/server/api/persistence/v1/queues.proto +++ b/proto/internal/temporal/server/api/persistence/v1/queues.proto @@ -50,6 +50,10 @@ message ReadQueueMessagesNextPageToken { int64 last_read_message_id = 1; } +message ListQueuesNextPageToken { + int64 last_read_queue_number = 1; +} + // HistoryTask represents an internal history service task for a particular shard. We use a blob because there is no // common proto for all task proto types. message HistoryTask {