Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: [TKC-2882] stream service and parallel step logs #6052

Merged
merged 47 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
bd39964
feat: api methods for service logs
vsukhin Nov 25, 2024
2c77962
fix: client for get service logs
vsukhin Nov 25, 2024
c71f5c8
fix: change log
vsukhin Nov 25, 2024
3e70a83
fix: disable hints
vsukhin Nov 25, 2024
aa9acb3
fix: routing
vsukhin Nov 26, 2024
68fe09f
fix: show service logs
vsukhin Nov 26, 2024
c55187f
fix: check service name
vsukhin Nov 27, 2024
93b7ac6
fix: check service name
vsukhin Nov 27, 2024
cf2dd76
fix: log comment
vsukhin Nov 27, 2024
fff63c0
fix: check for testworkflow service
vsukhin Nov 27, 2024
c7bed19
fix: friendly error
vsukhin Nov 27, 2024
f34e5d1
fix: add spinner
vsukhin Nov 27, 2024
07ce308
feat: proto for service notifications
vsukhin Nov 27, 2024
9c03fe4
feat: add cloud grpc method for server notifications
vsukhin Nov 27, 2024
9d85b35
fix: change timeeout
vsukhin Nov 28, 2024
57db7a0
fix: waiting for service pod
vsukhin Nov 28, 2024
cccfcb7
fix: typo
vsukhin Nov 28, 2024
c4658c5
fix: waiting for service pod
vsukhin Nov 28, 2024
3768550
fix: add service name check
vsukhin Nov 28, 2024
c2fd448
fix: adjust help
vsukhin Nov 29, 2024
b46bb41
fix: add method to parallel step
vsukhin Nov 29, 2024
59bdce3
fix: use retry library
vsukhin Nov 29, 2024
4224930
fix: rename const
vsukhin Nov 29, 2024
bcda47f
fix: 0 attempts
vsukhin Nov 29, 2024
97b5bd3
fix: use option
vsukhin Nov 29, 2024
3e2c45f
fix: remove ctx
vsukhin Nov 29, 2024
2819e43
feat: add cli support for parallel steps
vsukhin Dec 2, 2024
81b0295
fix: rename url
vsukhin Dec 2, 2024
c71fb91
feat: api methods for parallel steps
vsukhin Dec 2, 2024
6003cde
fix: tune parallel step detection
vsukhin Dec 2, 2024
5136c31
fix: typo
vsukhin Dec 2, 2024
6b89ab0
fix: cli
vsukhin Dec 2, 2024
bcffbd5
feat: add proto for parallel step logs
vsukhin Dec 2, 2024
bfdc72f
fix: lint
vsukhin Dec 3, 2024
ce5215f
add: grpc method for parallel steps
vsukhin Dec 3, 2024
fdffbc9
fix: comments
vsukhin Dec 3, 2024
38379b4
fix: comments
vsukhin Dec 3, 2024
9d4c5fa
Merge branch 'main' into vsukhin/feature/stream-service-logs
vsukhin Dec 3, 2024
8a7f8cb
fix: typo
vsukhin Dec 3, 2024
097fb7d
fix: change proto
vsukhin Dec 3, 2024
75d7f88
fix: rename fields
vsukhin Dec 3, 2024
f37671e
Merge branch 'main' into vsukhin/feature/stream-service-logs
vsukhin Dec 3, 2024
cd1be6e
fix: use const
vsukhin Dec 4, 2024
38f959c
fix: check for empty result
vsukhin Dec 4, 2024
288be72
fix: move methods to agent package
vsukhin Dec 9, 2024
e56e548
fix: send org and env ids
vsukhin Dec 9, 2024
e899798
fix: add org and env ids
vsukhin Dec 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions cmd/api-server/commons/commons.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,18 +282,20 @@ func ReadDefaultExecutors(cfg *config.Config) (executors []testkube.ExecutorDeta

func ReadProContext(ctx context.Context, cfg *config.Config, grpcClient cloud.TestKubeCloudAPIClient) config.ProContext {
proContext := config.ProContext{
APIKey: cfg.TestkubeProAPIKey,
URL: cfg.TestkubeProURL,
TLSInsecure: cfg.TestkubeProTLSInsecure,
WorkerCount: cfg.TestkubeProWorkerCount,
LogStreamWorkerCount: cfg.TestkubeProLogStreamWorkerCount,
WorkflowNotificationsWorkerCount: cfg.TestkubeProWorkflowNotificationsWorkerCount,
SkipVerify: cfg.TestkubeProSkipVerify,
EnvID: cfg.TestkubeProEnvID,
OrgID: cfg.TestkubeProOrgID,
Migrate: cfg.TestkubeProMigrate,
ConnectionTimeout: cfg.TestkubeProConnectionTimeout,
DashboardURI: cfg.TestkubeDashboardURI,
APIKey: cfg.TestkubeProAPIKey,
URL: cfg.TestkubeProURL,
TLSInsecure: cfg.TestkubeProTLSInsecure,
WorkerCount: cfg.TestkubeProWorkerCount,
LogStreamWorkerCount: cfg.TestkubeProLogStreamWorkerCount,
WorkflowNotificationsWorkerCount: cfg.TestkubeProWorkflowNotificationsWorkerCount,
WorkflowServiceNotificationsWorkerCount: cfg.TestkubeProWorkflowServiceNotificationsWorkerCount,
WorkflowParallelStepNotificationsWorkerCount: cfg.TestkubeProWorkflowParallelStepNotificationsWorkerCount,
SkipVerify: cfg.TestkubeProSkipVerify,
EnvID: cfg.TestkubeProEnvID,
OrgID: cfg.TestkubeProOrgID,
Migrate: cfg.TestkubeProMigrate,
ConnectionTimeout: cfg.TestkubeProConnectionTimeout,
DashboardURI: cfg.TestkubeDashboardURI,
}

if cfg.TestkubeProAPIKey == "" || grpcClient == nil {
Expand Down
29 changes: 5 additions & 24 deletions cmd/api-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"context"
"errors"
"flag"
"fmt"
"strings"
Expand All @@ -22,12 +21,10 @@ import (
"github.com/kubeshop/testkube/pkg/event/kind/k8sevent"
"github.com/kubeshop/testkube/pkg/event/kind/webhook"
ws "github.com/kubeshop/testkube/pkg/event/kind/websocket"
"github.com/kubeshop/testkube/pkg/executor/output"
"github.com/kubeshop/testkube/pkg/secretmanager"
"github.com/kubeshop/testkube/pkg/server"
"github.com/kubeshop/testkube/pkg/tcl/checktcl"
"github.com/kubeshop/testkube/pkg/tcl/schedulertcl"
"github.com/kubeshop/testkube/pkg/testworkflows/executionworker/executionworkertypes"
"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowprocessor/presets"

"github.com/kubeshop/testkube/internal/common"
Expand Down Expand Up @@ -308,26 +305,8 @@ func main() {
api.Init(httpServer)

log.DefaultLogger.Info("starting agent service")
getTestWorkflowNotificationsStream := func(ctx context.Context, executionID string) (<-chan testkube.TestWorkflowExecutionNotification, error) {
execution, err := testWorkflowResultsRepository.Get(ctx, executionID)
if err != nil {
return nil, err
}
notifications := executionWorker.Notifications(ctx, execution.Id, executionworkertypes.NotificationsOptions{
Hints: executionworkertypes.Hints{
Namespace: execution.Namespace,
Signature: execution.Signature,
ScheduledAt: common.Ptr(execution.ScheduledAt),
},
})
if notifications.Err() != nil {
return nil, notifications.Err()
}
return notifications.Channel(), nil
}
getDeprecatedLogStream := func(ctx context.Context, executionID string) (chan output.Output, error) {
return nil, errors.New("deprecated features have been disabled")
}

getDeprecatedLogStream := agent.GetDeprecatedLogStream
if deprecatedSystem != nil && deprecatedSystem.StreamLogs != nil {
getDeprecatedLogStream = deprecatedSystem.StreamLogs
}
Expand All @@ -336,7 +315,9 @@ func main() {
httpServer.Mux.Handler(),
grpcClient,
getDeprecatedLogStream,
getTestWorkflowNotificationsStream,
agent.GetTestWorkflowNotificationsStream(testWorkflowResultsRepository, executionWorker),
agent.GetTestWorkflowServiceNotificationsStream(testWorkflowResultsRepository, executionWorker),
agent.GetTestWorkflowParallelStepNotificationsStream(testWorkflowResultsRepository, executionWorker),
clusterId,
cfg.TestkubeClusterName,
features,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func NewGetTestWorkflowExecutionsCmd() *cobra.Command {
ui.Info("Getting logs for test workflow execution", executionID)

logs, err := client.GetTestWorkflowExecutionLogs(executionID)
ui.ExitOnError("getting logs from executor", err)
ui.ExitOnError("getting logs from test workflow", err)

sigs := flattenSignatures(execution.Signature)

Expand Down
156 changes: 145 additions & 11 deletions cmd/kubectl-testkube/commands/testworkflows/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
const (
LogTimestampLength = 30 // time.RFC3339Nano without 00:00 timezone
apiErrorMessage = "processing error:"
logsCheckDelay = 100 * time.Millisecond
)

var (
Expand All @@ -47,6 +48,10 @@ func NewRunTestWorkflowCmd() *cobra.Command {
masks []string
tags map[string]string
selectors []string
serviceName string
parallelStepName string
serviceIndex int
parallelStepIndex int
)

cmd := &cobra.Command{
Expand Down Expand Up @@ -146,7 +151,15 @@ func NewRunTestWorkflowCmd() *cobra.Command {
ui.NL()
if !execution.FailedToInitialize() {
if watchEnabled && len(args) > 0 {
exitCode = uiWatch(execution, client)
var pServiceName, pParallelStepName *string
if cmd.Flag("service-name").Changed || cmd.Flag("service-index").Changed {
pServiceName = &serviceName
}
if cmd.Flag("parallel-step-name").Changed || cmd.Flag("parallel-step-index").Changed {
pParallelStepName = &parallelStepName
}

exitCode = uiWatch(execution, pServiceName, serviceIndex, pParallelStepName, parallelStepIndex, client)
ui.NL()
if downloadArtifactsEnabled {
tests.DownloadTestWorkflowArtifacts(execution.Id, downloadDir, format, masks, client, outputPretty)
Expand Down Expand Up @@ -181,12 +194,46 @@ func NewRunTestWorkflowCmd() *cobra.Command {
cmd.Flags().StringArrayVarP(&masks, "mask", "", []string{}, "regexp to filter downloaded files, single or comma separated, like report/.* or .*\\.json,.*\\.js$")
cmd.Flags().StringToStringVarP(&tags, "tag", "", map[string]string{}, "execution tags in a form of name1=val1 passed to executor")
cmd.Flags().StringSliceVarP(&selectors, "label", "l", nil, "label key value pair: --label key1=value1 or label expression")
cmd.Flags().StringVar(&serviceName, "service-name", "", "test workflow service name")
cmd.Flags().IntVar(&serviceIndex, "service-index", 0, "test workflow service index starting from 0")
cmd.Flags().StringVar(&parallelStepName, "parallel-step-name", "", "test workflow parallel step name or reference")
cmd.Flags().IntVar(&parallelStepIndex, "parallel-step-index", 0, "test workflow parallel step index starting from 0")

return cmd
}

func uiWatch(execution testkube.TestWorkflowExecution, client apiclientv1.Client) int {
result, err := watchTestWorkflowLogs(execution.Id, execution.Signature, client)
func uiWatch(execution testkube.TestWorkflowExecution, serviceName *string, serviceIndex int,
parallelStepName *string, parallelStepIndex int, client apiclientv1.Client) int {
var result *testkube.TestWorkflowResult
var err error

switch {
case serviceName != nil:
found := false
if execution.Workflow != nil {
found = execution.Workflow.HasService(*serviceName)
}

if !found {
ui.Failf("unknown service '%s' for test workflow execution %s", *serviceName, execution.Id)
}

result, err = watchTestWorkflowServiceLogs(execution.Id, *serviceName, serviceIndex, execution.Signature, client)
case parallelStepName != nil:
ref := execution.GetParallelStepReference(*parallelStepName)
if ref == "" {
ui.Failf("unknown parallel step '%s' for test workflow execution %s", *parallelStepName, execution.Id)
}

result, err = watchTestWorkflowParallelStepLogs(execution.Id, ref, parallelStepIndex, execution.Signature, client)
default:
result, err = watchTestWorkflowLogs(execution.Id, execution.Signature, client)
}

if result == nil && err == nil {
err = errors.New("no result found")
}

ui.ExitOnError("reading test workflow execution logs", err)

// Apply the result in the execution
Expand Down Expand Up @@ -283,15 +330,10 @@ func getTimestampLength(line string) int {
return 0
}

func watchTestWorkflowLogs(id string, signature []testkube.TestWorkflowSignature, client apiclientv1.Client) (*testkube.TestWorkflowResult, error) {
ui.Info("Getting logs from test workflow job", id)

notifications, err := client.GetTestWorkflowExecutionNotifications(id)
ui.ExitOnError("getting logs from executor", err)

func printTestWorkflowLogs(signature []testkube.TestWorkflowSignature,
notifications chan testkube.TestWorkflowExecutionNotification) (result *testkube.TestWorkflowResult) {
steps := flattenSignatures(signature)

var result *testkube.TestWorkflowResult
var isLineBeginning = true
for l := range notifications {
if l.Output != nil {
Expand All @@ -309,8 +351,100 @@ func watchTestWorkflowLogs(id string, signature []testkube.TestWorkflowSignature
}

ui.NL()
return result
}

func watchTestWorkflowLogs(id string, signature []testkube.TestWorkflowSignature, client apiclientv1.Client) (*testkube.TestWorkflowResult, error) {
ui.Info("Getting logs from test workflow job", id)

notifications, err := client.GetTestWorkflowExecutionNotifications(id)
if err != nil {
return nil, err
}

return printTestWorkflowLogs(signature, notifications), nil
}

func watchTestWorkflowServiceLogs(id, serviceName string, serviceIndex int,
signature []testkube.TestWorkflowSignature, client apiclientv1.Client) (*testkube.TestWorkflowResult, error) {
ui.Info("Getting logs from test workflow service job", fmt.Sprintf("%s-%s-%d", id, serviceName, serviceIndex))

var (
notifications chan testkube.TestWorkflowExecutionNotification
nErr error
)

spinner := ui.NewSpinner("Waiting for service logs")
for {
notifications, nErr = client.GetTestWorkflowExecutionServiceNotifications(id, serviceName, serviceIndex)
if nErr != nil {
execution, cErr := client.GetTestWorkflowExecution(id)
if cErr != nil {
spinner.Fail()
return nil, cErr
}

if execution.Result != nil {
if execution.Result.IsFinished() {
nErr = errors.New("test workflow execution is finished")
} else {
time.Sleep(logsCheckDelay)
continue
}
}
}

if nErr != nil {
spinner.Fail()
return nil, nErr
}

break
}

spinner.Success()
return printTestWorkflowLogs(signature, notifications), nil
}

func watchTestWorkflowParallelStepLogs(id, ref string, workerIndex int,
signature []testkube.TestWorkflowSignature, client apiclientv1.Client) (*testkube.TestWorkflowResult, error) {
ui.Info("Getting logs from test workflow parallel step job", fmt.Sprintf("%s-%s-%d", id, ref, workerIndex))

var (
notifications chan testkube.TestWorkflowExecutionNotification
nErr error
)

spinner := ui.NewSpinner("Waiting for parallel step logs")
for {
notifications, nErr = client.GetTestWorkflowExecutionParallelStepNotifications(id, ref, workerIndex)
if nErr != nil {
execution, cErr := client.GetTestWorkflowExecution(id)
if cErr != nil {
spinner.Fail()
return nil, cErr
}

if execution.Result != nil {
if execution.Result.IsFinished() {
nErr = errors.New("test workflow execution is finished")
} else {
time.Sleep(logsCheckDelay)
continue
}
}
}

if nErr != nil {
spinner.Fail()
return nil, nErr
}

break
}

return result, err
spinner.Success()
return printTestWorkflowLogs(signature, notifications), nil
}

func printStatusHeader(i, n int, name string) {
Expand Down
22 changes: 21 additions & 1 deletion cmd/kubectl-testkube/commands/testworkflows/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ import (
)

func NewWatchTestWorkflowExecutionCmd() *cobra.Command {
var (
serviceName string
parallelStepName string
serviceIndex int
parallelStepIndex int
)

cmd := &cobra.Command{
Use: "testworkflowexecution <executionName>",
Aliases: []string{"testworkflowexecutions", "twe", "tw"},
Expand All @@ -31,7 +38,15 @@ func NewWatchTestWorkflowExecutionCmd() *cobra.Command {
ui.ExitOnError("render test workflow execution", err)

ui.NL()
exitCode := uiWatch(execution, client)
var pServiceName, pParallelStepName *string
if cmd.Flag("service-name").Changed || cmd.Flag("service-index").Changed {
pServiceName = &serviceName
}
if cmd.Flag("parallel-step-name").Changed || cmd.Flag("parallel-step-index").Changed {
pParallelStepName = &parallelStepName
}

exitCode := uiWatch(execution, pServiceName, serviceIndex, pParallelStepName, parallelStepIndex, client)
ui.NL()

execution, err = client.GetTestWorkflowExecution(execution.Id)
Expand All @@ -43,5 +58,10 @@ func NewWatchTestWorkflowExecutionCmd() *cobra.Command {
},
}

cmd.Flags().StringVar(&serviceName, "service-name", "", "test workflow service name")
cmd.Flags().IntVar(&serviceIndex, "service-index", 0, "test workflow service index starting from 0")
cmd.Flags().StringVar(&parallelStepName, "parallel-step-name", "", "test workflow parallel step name or reference")
cmd.Flags().IntVar(&parallelStepIndex, "parallel-step-index", 0, "test workflow parallel step index starting from 0")

return cmd
}
4 changes: 4 additions & 0 deletions internal/app/api/v1/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,11 @@ func (s *TestkubeAPI) Init(server server.HTTPServer) {
testWorkflowExecutions.Post("/", s.ExecuteTestWorkflowHandler())
testWorkflowExecutions.Get("/:executionID", s.GetTestWorkflowExecutionHandler())
testWorkflowExecutions.Get("/:executionID/notifications", s.StreamTestWorkflowExecutionNotificationsHandler())
testWorkflowExecutions.Get("/:executionID/notifications/services/:serviceName/:serviceIndex<int>", s.StreamTestWorkflowExecutionServiceNotificationsHandler())
testWorkflowExecutions.Get("/:executionID/notifications/parallel-steps/:ref/:workerIndex<int>", s.StreamTestWorkflowExecutionParallelStepNotificationsHandler())
testWorkflowExecutions.Get("/:executionID/notifications/stream", s.StreamTestWorkflowExecutionNotificationsWebSocketHandler())
testWorkflowExecutions.Get("/:executionID/notifications/stream/services/:serviceName/:serviceIndex<int>", s.StreamTestWorkflowExecutionServiceNotificationsWebSocketHandler())
testWorkflowExecutions.Get("/:executionID/notifications/stream/parallel-steps/:ref/:workerIndex<int>", s.StreamTestWorkflowExecutionParallelStepNotificationsWebSocketHandler())
testWorkflowExecutions.Post("/:executionID/abort", s.AbortTestWorkflowExecutionHandler())
testWorkflowExecutions.Post("/:executionID/pause", s.PauseTestWorkflowExecutionHandler())
testWorkflowExecutions.Post("/:executionID/resume", s.ResumeTestWorkflowExecutionHandler())
Expand Down
Loading
Loading