Skip to content

Commit

Permalink
Adding DescribeDLQJob and CancelDLQJob commands to tdbg (#5086)
Browse files Browse the repository at this point in the history
<!-- Describe what has changed in this PR -->
**What changed?**
Adding commands to tdbg to call DescribeDLQJob and CancelDLQJob APIs

<!-- Tell your future self why have you made these changes -->
**Why?**
Operators will be using these commands to query the status of ongoing
DLQ jobs and
cancel the jobs if needed.

<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->
**How did you test it?**
Unit tests

<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->
**Potential risks**


<!-- Is this PR a hotfix candidate or require that a notification be
sent to the broader community? (Yes/No) -->
**Is hotfix candidate?**

---------

Co-authored-by: Tim Deeb-Swihart <[email protected]>
  • Loading branch information
prathyushpv and tdeebswihart authored Nov 15, 2023
1 parent 82f103d commit ebc3116
Show file tree
Hide file tree
Showing 8 changed files with 561 additions and 253 deletions.
516 changes: 279 additions & 237 deletions api/adminservice/v1/request_response.pb.go

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,8 @@ message DescribeDLQJobResponse {
// For PurgeDLQTasks job, it the ID of the last message that was purged.
// For MergeDLQTasks job, it is the ID of the last message that was re-enqueued and removed from the DLQ.
int64 last_processed_message_id = 7;
// messages_processed is the total number of messages that are re-enqueued and deleted from the DLQ so far by the DLQ job.
int64 messages_processed = 8;
}

message CancelDLQJobRequest {
Expand Down
1 change: 1 addition & 0 deletions service/frontend/admin_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1907,6 +1907,7 @@ func (adh *AdminHandler) DescribeDLQJob(ctx context.Context, request *adminservi
OperationState: state,
MaxMessageId: queryResponse.MaxMessageIDToProcess,
LastProcessedMessageId: queryResponse.LastProcessedMessageID,
MessagesProcessed: queryResponse.NumberOfMessagesProcessed,
StartTime: execution.WorkflowExecutionInfo.StartTime,
EndTime: execution.WorkflowExecutionInfo.CloseTime,
}, nil
Expand Down
146 changes: 130 additions & 16 deletions tests/dlq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ import (
sdkclient "go.temporal.io/sdk/client"
sdkworker "go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"
"go.uber.org/fx"

"go.temporal.io/server/api/adminservice/v1"
"go.temporal.io/server/api/enums/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
Expand All @@ -63,6 +61,7 @@ import (
"go.temporal.io/server/tests/testutils"
"go.temporal.io/server/tools/tdbg"
"go.temporal.io/server/tools/tdbg/tdbgtest"
"go.uber.org/fx"
)

type (
Expand All @@ -76,6 +75,7 @@ type (
sdkClientFactory sdk.ClientFactory
tdbgApp *cli.App
worker sdkworker.Worker
deleteBlockCh chan interface{}
}
dlqTestCase struct {
name string
Expand All @@ -99,6 +99,10 @@ type (
suite *dlqSuite
queues.QueueWriter
}
testTaskQueueManager struct {
suite *dlqSuite
persistence.HistoryTaskQueueManager
}
)

const (
Expand All @@ -112,6 +116,7 @@ func (s *dlqSuite) SetupSuite() {
dynamicconfig.HistoryTaskDLQEnabled: true,
}
s.dlqTasks = make(chan tasks.Task)
s.failingWorkflowIDPrefix = "dlq-test-terminal-wfts-"
s.setupSuite(
"testdata/cluster.yaml",
WithFxOptionsForService(primitives.HistoryService,
Expand All @@ -131,6 +136,14 @@ func (s *dlqSuite) SetupSuite() {
}
},
),
fx.Decorate(
func(m persistence.HistoryTaskQueueManager) persistence.HistoryTaskQueueManager {
return &testTaskQueueManager{
suite: s,
HistoryTaskQueueManager: m,
}
},
),
),
WithFxOptionsForService(primitives.FrontendService,
fx.Populate(&s.sdkClientFactory),
Expand Down Expand Up @@ -163,7 +176,8 @@ func myWorkflow(workflow.Context) (string, error) {

func (s *dlqSuite) SetupTest() {
s.setAssertions()
s.failingWorkflowIDPrefix = "dlq-test-terminal-wfts-"
s.deleteBlockCh = make(chan interface{})
close(s.deleteBlockCh)
}

func (s *dlqSuite) setAssertions() {
Expand Down Expand Up @@ -292,6 +306,7 @@ func (s *dlqSuite) TestReadArtificialDLQTasks() {
// causes the workflow task to be added to the DLQ. This tests the end-to-end functionality of the DLQ, whereas the
// above test is more for testing specific CLI flags when reading from the DLQ. After the workflow task is added to the
// DLQ, this test then purges the DLQ and verifies that the task was deleted.
// This test will then call DescribeDLQJob and CancelDLQJob api to verify.
func (s *dlqSuite) TestPurgeRealWorkflow() {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, testTimeout)
Expand All @@ -300,16 +315,29 @@ func (s *dlqSuite) TestPurgeRealWorkflow() {
_, dlqMessageID := s.executeDoomedWorkflow(ctx)

// Delete the workflow task from the DLQ.
s.purgeMessages(ctx, dlqMessageID)
token := s.purgeMessages(ctx, dlqMessageID)

// Verify that the workflow task is no longer in the DLQ.
dlqTasks := s.readDLQTasks()
s.Empty(dlqTasks, "expected DLQ to be empty after purge")

// Run DescribeJob and validate
response := s.describeJob(token)
s.Equal(enums.DLQ_OPERATION_TYPE_PURGE, response.OperationType)
s.Equal(enums.DLQ_OPERATION_STATE_COMPLETED, response.OperationState)
s.Equal(dlqMessageID, response.MaxMessageId)
s.Equal(dlqMessageID, response.LastProcessedMessageId)
s.Equal(int64(1), response.MessagesProcessed)

// Try to cancel completed workflow
cancelResponse := s.cancelJob(token)
s.Equal(false, cancelResponse.Canceled)
}

// This test executes actual workflows for which we've set up an executor wrapper to return a terminal error. This
// causes the workflow tasks to be added to the DLQ. This tests the end-to-end functionality of the DLQ, whereas the
// above test is more for testing specific CLI flags when reading from the DLQ.
// This test will then call DescribeDLQJob and CancelDLQJob api to verify.
func (s *dlqSuite) TestMergeRealWorkflow() {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, testTimeout)
Expand All @@ -321,16 +349,16 @@ func (s *dlqSuite) TestMergeRealWorkflow() {

// Execute several doomed workflows.
numWorkflows := 3
var dlqMessageID int64
var runs []sdkclient.WorkflowRun
for i := 0; i < numWorkflows; i++ {
run, dlqMessageID := s.executeDoomedWorkflow(ctx)
s.Equal(int64(i), dlqMessageID)
run, dlqMessageID = s.executeDoomedWorkflow(ctx)
runs = append(runs, run)
}

// Re-enqueue the workflow tasks from the DLQ, but don't fail its WFTs this time.
s.failingWorkflowIDPrefix = "some-workflow-id-that-wont-exist"
s.mergeMessages(ctx, int64(numWorkflows-1))
token := s.mergeMessages(ctx, dlqMessageID)

// Verify that the workflow task was deleted from the DLQ after merging.
dlqTasks := s.readDLQTasks()
Expand All @@ -340,6 +368,38 @@ func (s *dlqSuite) TestMergeRealWorkflow() {
for i := 0; i < numWorkflows; i++ {
s.validateWorkflowRun(ctx, runs[i])
}

// Run DescribeJob and validate
response := s.describeJob(token)
s.Equal(enums.DLQ_OPERATION_TYPE_MERGE, response.OperationType)
s.Equal(enums.DLQ_OPERATION_STATE_COMPLETED, response.OperationState)
s.Equal(dlqMessageID, response.MaxMessageId)
s.Equal(dlqMessageID, response.LastProcessedMessageId)
s.Equal(int64(numWorkflows), response.MessagesProcessed)

// Try to cancel completed workflow
cancelResponse := s.cancelJob(token)
s.Equal(false, cancelResponse.Canceled)
}

func (s *dlqSuite) TestCancelRunningMerge() {
s.deleteBlockCh = make(chan interface{})
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, testTimeout)
defer cancel()

// Execute several doomed workflows.
_, dlqMessageID := s.executeDoomedWorkflow(ctx)

token := s.mergeMessagesWithoutBlocking(ctx, dlqMessageID)

// Try to cancel running workflow
cancelResponse := s.cancelJob(token)
s.Equal(true, cancelResponse.Canceled)
// Unblock waiting tests on Delete
close(s.deleteBlockCh)
// Delete the workflow task from the DLQ.
s.purgeMessages(ctx, dlqMessageID)
}

func (s *dlqSuite) validateWorkflowRun(ctx context.Context, run sdkclient.WorkflowRun) {
Expand Down Expand Up @@ -398,7 +458,7 @@ func (s *dlqSuite) executeWorkflow(ctx context.Context, workflowID string) sdkcl
}

// purgeMessages from the DLQ up to and including the specified message ID, blocking until the purge workflow completes.
func (s *dlqSuite) purgeMessages(ctx context.Context, maxMessageIDToDelete int64) {
func (s *dlqSuite) purgeMessages(ctx context.Context, maxMessageIDToDelete int64) []byte {
args := []string{
"tdbg",
"--" + tdbg.FlagYes,
Expand All @@ -421,10 +481,23 @@ func (s *dlqSuite) purgeMessages(ctx context.Context, maxMessageIDToDelete int64
systemSDKClient := s.sdkClientFactory.GetSystemClient()
run := systemSDKClient.GetWorkflow(ctx, token.WorkflowId, token.RunId)
s.NoError(run.Get(ctx, nil))
return response.GetJobToken()
}

// mergeMessages from the DLQ up to and including the specified message ID, blocking until the merge workflow completes.
func (s *dlqSuite) mergeMessages(ctx context.Context, maxMessageID int64) {
func (s *dlqSuite) mergeMessages(ctx context.Context, maxMessageID int64) []byte {
tokenBytes := s.mergeMessagesWithoutBlocking(ctx, maxMessageID)
var token adminservice.DLQJobToken
s.NoError(token.Unmarshal(tokenBytes))

systemSDKClient := s.sdkClientFactory.GetSystemClient()
run := systemSDKClient.GetWorkflow(ctx, token.WorkflowId, token.RunId)
s.NoError(run.Get(ctx, nil))
return tokenBytes
}

// mergeMessages from the DLQ up to and including the specified message ID, returns immediately after running tdbg command.
func (s *dlqSuite) mergeMessagesWithoutBlocking(ctx context.Context, maxMessageID int64) []byte {
args := []string{
"tdbg",
"--" + tdbg.FlagYes,
Expand All @@ -441,13 +514,7 @@ func (s *dlqSuite) mergeMessages(ctx context.Context, maxMessageID int64) {
s.writer.Truncate(0)
var response adminservice.MergeDLQTasksResponse
s.NoError(jsonpb.Unmarshal(bytes.NewReader(output), &response))

var token adminservice.DLQJobToken
s.NoError(token.Unmarshal(response.GetJobToken()))

systemSDKClient := s.sdkClientFactory.GetSystemClient()
run := systemSDKClient.GetWorkflow(ctx, token.WorkflowId, token.RunId)
s.NoError(run.Get(ctx, nil))
return response.GetJobToken()
}

// readDLQTasks from the transfer task DLQ for this cluster and return them.
Expand All @@ -467,6 +534,47 @@ func (s *dlqSuite) readDLQTasks() []tdbgtest.DLQMessage[*persistencespb.Transfer
return dlqTasks
}

// Calls describe dlq job and verify the output
func (s *dlqSuite) describeJob(token []byte) adminservice.DescribeDLQJobResponse {
args := []string{
"tdbg",
"dlq",
"--" + tdbg.FlagDLQVersion, "v2",
"job",
"describe",
"--" + tdbg.FlagJobToken, string(token),
}
err := s.tdbgApp.Run(args)
s.NoError(err)
output := s.writer.Bytes()
fmt.Println(string(output))
s.writer.Truncate(0)
var response adminservice.DescribeDLQJobResponse
s.NoError(jsonpb.Unmarshal(bytes.NewReader(output), &response))
return response
}

// Calls delete dlq job and verify the output
func (s *dlqSuite) cancelJob(token []byte) adminservice.CancelDLQJobResponse {
args := []string{
"tdbg",
"dlq",
"--" + tdbg.FlagDLQVersion, "v2",
"job",
"cancel",
"--" + tdbg.FlagJobToken, string(token),
"--" + tdbg.FlagReason, "testing cancel",
}
err := s.tdbgApp.Run(args)
s.NoError(err)
output := s.writer.Bytes()
fmt.Println(string(output))
s.writer.Truncate(0)
var response adminservice.CancelDLQJobResponse
s.NoError(jsonpb.Unmarshal(bytes.NewReader(output), &response))
return response
}

// verifyNumTasks verifies that the specified file contains the expected number of DLQ tasks, and that each task has the
// expected metadata and payload.
func (s *dlqSuite) verifyNumTasks(file *os.File, expectedNumTasks int) {
Expand Down Expand Up @@ -525,3 +633,9 @@ func (t testExecutor) Execute(ctx context.Context, e queues.Executable) queues.E
}
return t.base.Execute(ctx, e)
}

// ReadTasks is used to block the dlq job workflow until one of them is cancelled in TestCancelRunningMerge.
func (m *testTaskQueueManager) DeleteTasks(ctx context.Context, request *persistence.DeleteTasksRequest) (*persistence.DeleteTasksResponse, error) {
<-m.suite.deleteBlockCh
return m.HistoryTaskQueueManager.DeleteTasks(ctx, request)
}
88 changes: 88 additions & 0 deletions tools/tdbg/dlq_job_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package tdbg

import (
"fmt"
"io"

"github.com/urfave/cli/v2"
"go.temporal.io/server/api/adminservice/v1"
)

type (
DLQJobService struct {
clientFactory ClientFactory
writer io.Writer
}
)

func NewDLQJobService(
clientFactory ClientFactory,
writer io.Writer,
) *DLQV2Service {
return &DLQV2Service{
clientFactory: clientFactory,
writer: writer,
}
}

func (ac *DLQJobService) DescribeJob(c *cli.Context) error {
adminClient := ac.clientFactory.AdminClient(c)
jobToken := c.String(FlagJobToken)
ctx, cancel := newContext(c)
defer cancel()
response, err := adminClient.DescribeDLQJob(ctx, &adminservice.DescribeDLQJobRequest{
JobToken: jobToken,
})
if err != nil {
return fmt.Errorf("call to DescribeDLQJob failed: %w", err)
}
err = newEncoder(ac.writer).Encode(response)
if err != nil {
return fmt.Errorf("unable to encode DescribeDLQJob response: %w", err)
}
return nil
}

func (ac *DLQJobService) CancelJob(c *cli.Context) error {
adminClient := ac.clientFactory.AdminClient(c)
jobToken := c.String(FlagJobToken)
reason := c.String(FlagReason)
ctx, cancel := newContext(c)
defer cancel()
response, err := adminClient.CancelDLQJob(ctx, &adminservice.CancelDLQJobRequest{
JobToken: jobToken,
Reason: reason,
})
if err != nil {
return fmt.Errorf("call to CancelDLQJob failed: %w", err)
}
err = newEncoder(ac.writer).Encode(response)
if err != nil {
return fmt.Errorf("unable to encode CancelDLQJob response: %w", err)
}
return nil
}
8 changes: 8 additions & 0 deletions tools/tdbg/dlq_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,11 @@ func getOutputFile(outputFile string, writer io.Writer) (io.WriteCloser, error)
func (n noCloseWriter) Close() error {
return nil
}

// GetDLQJobService returns a DLQJobService.
func (p *DLQServiceProvider) GetDLQJobService() DLQJobService {
return DLQJobService{
clientFactory: p.clientFactory,
writer: p.writer,
}
}
Loading

0 comments on commit ebc3116

Please sign in to comment.