Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cli): allow users to configure agents to connect to insecure control plane or skip ssl cert verification #3941

Merged
merged 6 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions agent/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ type SessionConfig struct {
type Client struct {
mutex sync.Mutex
endpoint string
insecure bool
skipVerify bool
conn *grpc.ClientConn
config Config
sessionConfig *SessionConfig
Expand Down
48 changes: 14 additions & 34 deletions agent/client/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"context"
"crypto/tls"
"fmt"
"net"
"os"
"time"

"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
Expand Down Expand Up @@ -66,14 +64,7 @@ func (c *Client) connect(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

transportCredentials, err := getTransportCredentialsForEndpoint(c.endpoint)
if err != nil {
return fmt.Errorf("could not get transport credentials: %w", err)
}

conn, err := grpc.DialContext(
ctx, c.endpoint,
grpc.WithTransportCredentials(transportCredentials),
opts := []grpc.DialOption{
grpc.WithDefaultServiceConfig(retryPolicy),
grpc.WithIdleTimeout(0), // disable grpc idle timeout
grpc.WithStatsHandler(otelgrpc.NewClientHandler()),
Expand All @@ -82,6 +73,19 @@ func (c *Client) connect(ctx context.Context) error {
propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}),
),
)),
}

if c.insecure {
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
} else {
opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{
InsecureSkipVerify: c.skipVerify,
})))
}

conn, err := grpc.DialContext(
ctx, c.endpoint,
opts...,
)
if err != nil {
return fmt.Errorf("could not connect to server: %w", err)
Expand All @@ -90,27 +94,3 @@ func (c *Client) connect(ctx context.Context) error {
c.conn = conn
return nil
}

func getTransportCredentialsForEndpoint(endpoint string) (credentials.TransportCredentials, error) {
_, port, err := net.SplitHostPort(endpoint)
if err != nil {
return nil, fmt.Errorf("cannot parse endpoint: %w", err)
}

tlsCreds := credentials.NewTLS(&tls.Config{
InsecureSkipVerify: true,
})

if os.Getenv("TRACETEST_DEV_FORCE_URL") == "true" {
return tlsCreds, nil
}

switch port {
case "443":
return tlsCreds, nil

default:
return insecure.NewCredentials(), nil
}

}
12 changes: 12 additions & 0 deletions agent/client/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,15 @@ func WithTracer(tracer trace.Tracer) Option {
c.tracer = tracer
}
}

func WithInsecure() Option {
return func(c *Client) {
c.insecure = true
}
}

func WithSkipVerify() Option {
return func(c *Client) {
c.skipVerify = true
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestDataStoreConnectionTestWorkflow(t *testing.T) {
server := mocks.NewGrpcServer()
defer server.Stop()

client, err := client.Connect(ctx, server.Addr())
client, err := client.Connect(ctx, server.Addr(), client.WithInsecure())
require.NoError(t, err)

var receivedConnectionTestRequest *proto.DataStoreConnectionTestRequest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestOtlpConnectionTestWorkflow(t *testing.T) {
server := mocks.NewGrpcServer()
defer server.Stop()

client, err := client.Connect(ctx, server.Addr())
client, err := client.Connect(ctx, server.Addr(), client.WithInsecure())
require.NoError(t, err)

var receivedConnectionTestRequest *proto.OTLPConnectionTestRequest
Expand Down
2 changes: 1 addition & 1 deletion agent/client/workflow_listen_for_poll_requests_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestPollWorkflow(t *testing.T) {
server := mocks.NewGrpcServer()
defer server.Stop()

client, err := client.Connect(ctx, server.Addr())
client, err := client.Connect(ctx, server.Addr(), client.WithInsecure())
require.NoError(t, err)

var receivedPollingRequest *proto.PollingRequest
Expand Down
2 changes: 1 addition & 1 deletion agent/client/workflow_listen_for_trigger_requests_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestTriggerWorkflow(t *testing.T) {
server := mocks.NewGrpcServer()
defer server.Stop()

client, err := client.Connect(ctx, server.Addr())
client, err := client.Connect(ctx, server.Addr(), client.WithInsecure())
require.NoError(t, err)

var receivedTrigger *proto.TriggerRequest
Expand Down
2 changes: 1 addition & 1 deletion agent/client/workflow_send_ds_connection_result_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func TestDataStoreConnectionResult(t *testing.T) {
server := mocks.NewGrpcServer()
defer server.Stop()

client, err := client.Connect(context.Background(), server.Addr())
client, err := client.Connect(context.Background(), server.Addr(), client.WithInsecure())
require.NoError(t, err)

err = client.Start(context.Background())
Expand Down
2 changes: 1 addition & 1 deletion agent/client/workflow_send_otlp_connection_result_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func TestOTLPConnectionResultTrace(t *testing.T) {
server := mocks.NewGrpcServer()
defer server.Stop()

client, err := client.Connect(context.Background(), server.Addr())
client, err := client.Connect(context.Background(), server.Addr(), client.WithInsecure())
require.NoError(t, err)

err = client.Start(context.Background())
Expand Down
2 changes: 1 addition & 1 deletion agent/client/workflow_send_trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func TestSendTrace(t *testing.T) {
server := mocks.NewGrpcServer()
defer server.Stop()

client, err := client.Connect(context.Background(), server.Addr())
client, err := client.Connect(context.Background(), server.Addr(), client.WithInsecure())
require.NoError(t, err)

err = client.Start(context.Background())
Expand Down
2 changes: 1 addition & 1 deletion agent/client/workflow_send_trigger_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func TestSendTriggerResult(t *testing.T) {
server := mocks.NewGrpcServer()
defer server.Stop()

client, err := client.Connect(context.Background(), server.Addr())
client, err := client.Connect(context.Background(), server.Addr(), client.WithInsecure())
require.NoError(t, err)

err = client.Start(context.Background())
Expand Down
2 changes: 1 addition & 1 deletion agent/client/workflow_shutdown_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestShutdownFlow(t *testing.T) {
server := mocks.NewGrpcServer()
defer server.Stop()

client, err := client.Connect(ctx, server.Addr())
client, err := client.Connect(ctx, server.Addr(), client.WithInsecure())
require.NoError(t, err)

var called bool = false
Expand Down
2 changes: 1 addition & 1 deletion agent/client/workflow_startup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func TestStartupFlow(t *testing.T) {
server := mocks.NewGrpcServer()
defer server.Stop()

client, err := client.Connect(context.Background(), server.Addr())
client, err := client.Connect(context.Background(), server.Addr(), client.WithInsecure())
require.NoError(t, err)

err = client.Start(context.Background())
Expand Down
2 changes: 2 additions & 0 deletions agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ type Config struct {
ServerURL string `mapstructure:"server_url"`
CollectorEndpoint string `mapstructure:"collector_endpoint"`
Mode string `mapstructure:"mode"`
Insecure bool `mapstructure:"insecure"`
SkipVerify bool `mapstructure:"skip_verify"`

OTLPServer OtlpServer `mapstructure:"otlp_server"`
}
Expand Down
4 changes: 2 additions & 2 deletions agent/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestConfigWithEnvs(t *testing.T) {
t.Cleanup(func() {
os.Unsetenv("TRACETEST_AGENT_NAME")
os.Unsetenv("TRACETEST_API_KEY")
os.Unsetenv("TRACETEST_DEV_MODE")
os.Unsetenv("TRACETEST_DEV")
os.Unsetenv("TRACETEST_SERVER_URL")
os.Unsetenv("TRACETEST_OTLP_SERVER_GRPC_PORT")
os.Unsetenv("TRACETEST_OTLP_SERVER_HTTP_PORT")
Expand All @@ -38,7 +38,7 @@ func TestConfigWithEnvs(t *testing.T) {
os.Setenv("TRACETEST_AGENT_NAME", "my-agent-name")
os.Setenv("TRACETEST_API_KEY", "my-agent-api-key")
os.Setenv("TRACETEST_ENVIRONMENT_ID", "123456")
os.Setenv("TRACETEST_DEV_MODE", "true")
os.Setenv("TRACETEST_DEV", "true")
os.Setenv("TRACETEST_SERVER_URL", "https://custom.server.com")
os.Setenv("TRACETEST_OTLP_SERVER_GRPC_PORT", "1234")
os.Setenv("TRACETEST_OTLP_SERVER_HTTP_PORT", "1235")
Expand Down
2 changes: 2 additions & 0 deletions agent/config/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ type Flags struct {
Mode Mode
LogLevel string
CollectorEndpoint string
Insecure bool
SkipVerify bool
}

func (f Flags) AutomatedEnvironmentCanBeInferred() bool {
Expand Down
18 changes: 11 additions & 7 deletions agent/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ Once started, Tracetest Agent exposes OTLP ports 4317 and 4318 to ingest traces

func (s *Runner) onStartAgent(ctx context.Context, cfg config.Config) {
if cfg.AgentApiKey != "" {
err := s.StartAgent(ctx, cfg.AgentEndpoint, cfg.AgentApiKey, cfg.UIEndpoint, cfg.EnvironmentID)
err := s.StartAgent(ctx, cfg, cfg.AgentApiKey, cfg.EnvironmentID)
if err != nil {
s.ui.Error(err.Error())
}
Expand All @@ -106,23 +106,27 @@ func (s *Runner) onStartAgent(ctx context.Context, cfg config.Config) {
return
}

err = s.StartAgent(ctx, cfg.AgentEndpoint, agentToken, cfg.UIEndpoint, "")
err = s.StartAgent(ctx, cfg, agentToken, "")
if err != nil {
s.ui.Error(err.Error())
}
}

func (s *Runner) StartAgent(ctx context.Context, endpoint, agentApiKey, uiEndpoint, environmentID string) error {
func (s *Runner) StartAgent(ctx context.Context, cliConfig config.Config, agentApiKey, environmentID string) error {
cfg, err := agentConfig.LoadConfig()

cfg.Insecure = cliConfig.AllowInsecure
cfg.SkipVerify = cliConfig.SkipVerify

s.logger.Debug("Loaded agent config", zap.Any("config", cfg))
if err != nil {
s.logger.Error("Could not load agent config", zap.Error(err))
return err
}

if endpoint != "" {
s.logger.Debug("Overriding agent endpoint", zap.String("endpoint", endpoint))
cfg.ServerURL = endpoint
if cliConfig.AgentEndpoint != "" {
s.logger.Debug("Overriding agent endpoint", zap.String("endpoint", cliConfig.AgentEndpoint))
cfg.ServerURL = cliConfig.AgentEndpoint
}
s.logger.Debug("Agent endpoint", zap.String("endpoint", cfg.ServerURL))

Expand All @@ -140,7 +144,7 @@ func (s *Runner) StartAgent(ctx context.Context, endpoint, agentApiKey, uiEndpoi

if s.mode == agentConfig.Mode_Desktop {
s.logger.Debug("Starting agent in desktop mode")
return s.RunDesktopStrategy(ctx, cfg, uiEndpoint)
return s.RunDesktopStrategy(ctx, cfg, cliConfig.UIEndpoint)
}

s.logger.Debug("Starting agent in verbose mode")
Expand Down
12 changes: 11 additions & 1 deletion agent/runner/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,21 @@ func StartCollector(ctx context.Context, config config.Config, traceCache collec
}

func newControlPlaneClient(ctx context.Context, config config.Config, traceCache collector.TraceCache, observer event.Observer, logger *zap.Logger, tracer trace.Tracer, meter metric.Meter) (*client.Client, error) {
controlPlaneClient, err := client.Connect(ctx, config.ServerURL,
opts := []client.Option{
client.WithAPIKey(config.APIKey),
client.WithAgentName(config.Name),
client.WithEnvironmentID(config.EnvironmentID),
client.WithLogger(logger),
}
if config.Insecure {
opts = append(opts, client.WithInsecure())
}

if config.SkipVerify {
opts = append(opts, client.WithSkipVerify())
}
controlPlaneClient, err := client.Connect(ctx, config.ServerURL,
opts...,
)
if err != nil {
observer.Error(err)
Expand Down
6 changes: 3 additions & 3 deletions agent/workers/poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestPollerWorker(t *testing.T) {
ctx := ContextWithTracingEnabled()
controlPlane := mocks.NewGrpcServer()

client, err := client.Connect(ctx, controlPlane.Addr())
client, err := client.Connect(ctx, controlPlane.Addr(), client.WithInsecure())
require.NoError(t, err)

pollerWorker := workers.NewPollerWorker(client, workers.WithPollerStoppableProcessRunner(workers.NewProcessStopper().RunStoppableProcess))
Expand Down Expand Up @@ -123,7 +123,7 @@ func TestPollerWorkerWithInmemoryDatastore(t *testing.T) {
ctx := context.Background()
controlPlane := mocks.NewGrpcServer()

client, err := client.Connect(ctx, controlPlane.Addr())
client, err := client.Connect(ctx, controlPlane.Addr(), client.WithInsecure())
require.NoError(t, err)

cache := collector.NewTraceCache()
Expand Down Expand Up @@ -186,7 +186,7 @@ func TestPollerWithInvalidDataStore(t *testing.T) {
ctx := context.Background()
controlPlane := mocks.NewGrpcServer()

client, err := client.Connect(ctx, controlPlane.Addr())
client, err := client.Connect(ctx, controlPlane.Addr(), client.WithInsecure())
require.NoError(t, err)

pollerWorker := workers.NewPollerWorker(client, workers.WithPollerStoppableProcessRunner(workers.NewProcessStopper().RunStoppableProcess))
Expand Down
2 changes: 1 addition & 1 deletion agent/workers/trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func setupTriggerWorker(t *testing.T) (*mocks.GrpcServerMock, collector.TraceCac
controlPlane := mocks.NewGrpcServer()
cache := collector.NewTraceCache()

client, err := client.Connect(context.Background(), controlPlane.Addr())
client, err := client.Connect(context.Background(), controlPlane.Addr(), client.WithInsecure())
require.NoError(t, err)

triggerWorker := workers.NewTriggerWorker(
Expand Down
3 changes: 2 additions & 1 deletion cli/cmd/configure_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ var configureCmd = &cobra.Command{
PreRun: setupLogger,
Run: WithResultHandler(WithParamsHandler(configParams)(func(ctx context.Context, cmd *cobra.Command, _ []string) (string, error) {
flags := agentConfig.Flags{
CI: configParams.CI,
CI: configParams.CI,
SkipVerify: skipVerify,
}

config, err := config.LoadConfig("")
Expand Down
9 changes: 8 additions & 1 deletion cli/cmd/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,13 +403,20 @@ func setupResources() {
extraHeaders.Set("x-environment-id", cliConfig.EnvironmentID)
extraHeaders.Set("Authorization", fmt.Sprintf("Bearer %s", cliConfig.Jwt))

// if cliConfig has SkipVerify set to true, use that value.
// otherwise use the value from the flag
if cliConfig.SkipVerify {
skipVerify = true
}

// To avoid a ciruclar reference initialization when setting up the registry and its resources,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// To avoid a ciruclar reference initialization when setting up the registry and its resources,
// To avoid a circular reference initialization when setting up the registry and its resources,

// we create the resources with a pointer to an unconfigured HTTPClient.
// When each command is run, this function is run in the PreRun stage, before any of the actual `Run` code is executed
// We take this chance to configure the HTTPClient with the correct URL and headers.
// To make this configuration propagate to all the resources, we need to replace the pointer to the HTTPClient.
// For more details, see https://github.com/kubeshop/tracetest/pull/2832#discussion_r1245616804
hc := resourcemanager.NewHTTPClient(fmt.Sprintf("%s%s", cliConfig.URL(), cliConfig.Path()), extraHeaders)

hc := resourcemanager.NewHTTPClient(fmt.Sprintf("%s%s", cliConfig.URL(), cliConfig.Path()), extraHeaders, skipVerify)
*httpClient = *hc
}

Expand Down
3 changes: 3 additions & 0 deletions cli/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,14 @@ var (
}
)

var skipVerify bool

func init() {
rootCmd.PersistentFlags().StringVarP(&output, "output", "o", "", fmt.Sprintf("output format [%s]", outputFormatsString))
rootCmd.PersistentFlags().StringVarP(&configFile, "config", "c", "config.yml", "config file will be used by the CLI")
rootCmd.PersistentFlags().BoolVarP(&verbose, "verbose", "v", false, "display debug information")
rootCmd.PersistentFlags().StringVarP(&overrideEndpoint, "server-url", "s", "", "server url")
rootCmd.PersistentFlags().BoolVarP(&skipVerify, "skip-verify", "", false, "skip verification of the server certificate (allows self signed, for example)")

groups := []*cobra.Group{cmdGroupConfig, cmdGroupResources, cmdGroupMisc}

Expand Down
Loading
Loading