Skip to content

Commit

Permalink
feat(agent): start collector at startup (#3173)
Browse files Browse the repository at this point in the history
* feat(agent): start collector at startup

* fix: simplify condition

* fix: tests
  • Loading branch information
mathnogueira authored Sep 21, 2023
1 parent 96510f2 commit 7e15169
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 17 deletions.
19 changes: 6 additions & 13 deletions agent/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package collector
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"syscall"
Expand Down Expand Up @@ -41,19 +40,13 @@ func Start(ctx context.Context, config Config, tracer trace.Tracer) error {
}
})

go func() {
err := grpcServer.Start()
if err != nil {
log.Println("ERROR: could not start gRPC OTLP listener: %w", err)
}
}()
if err = grpcServer.Start(); err != nil {
return fmt.Errorf("could not start gRPC OTLP listener: %w", err)
}

go func() {
err := httpServer.Start()
if err != nil {
log.Println("ERROR: could not start HTTP OTLP listener: %w", err)
}
}()
if err = httpServer.Start(); err != nil {
return fmt.Errorf("could not start HTTP OTLP listener: %w", err)
}

return nil
}
Expand Down
3 changes: 2 additions & 1 deletion agent/collector/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestCollector(t *testing.T) {

noopTracer := trace.NewNoopTracerProvider().Tracer("noop_tracer")

collector.Start(
err = collector.Start(
context.Background(),
collector.Config{
HTTPPort: 4318,
Expand All @@ -29,6 +29,7 @@ func TestCollector(t *testing.T) {
},
noopTracer,
)
require.NoError(t, err)

tracer, err := mocks.NewTracer(context.Background(), "localhost:4317")
require.NoError(t, err)
Expand Down
10 changes: 9 additions & 1 deletion agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ type Config struct {
APIKey string `mapstructure:"api_key"`
Name string `mapstructure:"agent_name"`
ServerURL string `mapstructure:"server_url"`

OTLPServer otlpServer `mapstructure:"otlp_server"`
}

type otlpServer struct {
GRPCPort int `mapstructure:"grpc_port"`
HTTPPort int `mapstructure:"http_port"`
}

func LoadConfig() (Config, error) {
Expand All @@ -29,10 +36,11 @@ func LoadConfig() (Config, error) {
vp.SetConfigType("env")
vp.AutomaticEnv()

vp.SetDefault("DEV_MODE", false)
vp.SetDefault("AGENT_NAME", getHostname())
vp.SetDefault("API_KEY", "")
vp.SetDefault("SERVER_URL", "https://cloud.tracetest.io")
vp.SetDefault("OTLP_SERVER.GRPC_PORT", 4317)
vp.SetDefault("OTLP_SERVER.HTTP_PORT", 4318)

config := Config{}

Expand Down
8 changes: 8 additions & 0 deletions agent/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ func TestConfigDefaults(t *testing.T) {
assert.Equal(t, "", cfg.APIKey)
assert.Equal(t, hostname, cfg.Name)
assert.Equal(t, "https://cloud.tracetest.io", cfg.ServerURL)
assert.Equal(t, 4317, cfg.OTLPServer.GRPCPort)
assert.Equal(t, 4318, cfg.OTLPServer.HTTPPort)
}

func TestConfigWithEnvs(t *testing.T) {
Expand All @@ -27,12 +29,16 @@ func TestConfigWithEnvs(t *testing.T) {
os.Unsetenv("TRACETEST_API_KEY")
os.Unsetenv("TRACETEST_DEV_MODE")
os.Unsetenv("TRACETEST_SERVER_URL")
os.Unsetenv("TRACETEST_OTLP_SERVER_GRPC_PORT")
os.Unsetenv("TRACETEST_OTLP_SERVER_HTTP_PORT")
})

os.Setenv("TRACETEST_AGENT_NAME", "my-agent-name")
os.Setenv("TRACETEST_API_KEY", "my-agent-api-key")
os.Setenv("TRACETEST_DEV_MODE", "true")
os.Setenv("TRACETEST_SERVER_URL", "https://custom.server.com")
os.Setenv("TRACETEST_OTLP_SERVER_GRPC_PORT", "1234")
os.Setenv("TRACETEST_OTLP_SERVER_HTTP_PORT", "1235")

cfg, err := config.LoadConfig()

Expand All @@ -41,4 +47,6 @@ func TestConfigWithEnvs(t *testing.T) {
assert.Equal(t, "my-agent-api-key", cfg.APIKey)
assert.Equal(t, "my-agent-name", cfg.Name)
assert.Equal(t, "https://custom.server.com", cfg.ServerURL)
assert.Equal(t, 1234, cfg.OTLPServer.GRPCPort)
assert.Equal(t, 1235, cfg.OTLPServer.HTTPPort)
}
22 changes: 22 additions & 0 deletions agent/initialization/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import (
"fmt"

"github.com/kubeshop/tracetest/agent/client"
"github.com/kubeshop/tracetest/agent/collector"
"github.com/kubeshop/tracetest/agent/config"
"github.com/kubeshop/tracetest/agent/proto"
"github.com/kubeshop/tracetest/agent/workers"
"go.opentelemetry.io/otel/trace"
)

func NewClient(ctx context.Context, config config.Config) (*client.Client, error) {
Expand Down Expand Up @@ -46,6 +48,26 @@ func Start(ctx context.Context, config config.Config) error {
return err
}

err = startCollector(ctx, config)
if err != nil {
return err
}

client.WaitUntilDisconnected()
return nil
}

func startCollector(ctx context.Context, config config.Config) error {
noopTracer := trace.NewNoopTracerProvider().Tracer("noop")
collectorConfig := collector.Config{
HTTPPort: config.OTLPServer.HTTPPort,
GRPCPort: config.OTLPServer.GRPCPort,
}

err := collector.Start(ctx, collectorConfig, noopTracer)
if err != nil {
return err
}

return nil
}
3 changes: 2 additions & 1 deletion server/otlp/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ func (s *grpcServer) Start() error {
return fmt.Errorf("cannot listen on address %s: %w", s.addr, err)
}
pb.RegisterTraceServiceServer(s.gServer, s)
return s.gServer.Serve(listener)
go s.gServer.Serve(listener)
return nil
}

func (s *grpcServer) Stop() {
Expand Down
3 changes: 2 additions & 1 deletion server/otlp/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ func (s *httpServer) Start() error {
return fmt.Errorf("cannot listen on address %s: %w", s.addr, err)
}

return s.hServer.Serve(listener)
go s.hServer.Serve(listener)
return nil
}

func (s *httpServer) Stop() {
Expand Down

0 comments on commit 7e15169

Please sign in to comment.