Skip to content

Commit

Permalink
[chore][exporter/googlecloudpubsub] improve unit testing (#37347)
Browse files Browse the repository at this point in the history
#### Description
Improves unit testing without having to rely on an emulated server.

#### Link to tracking issue
Part of the preparation for #32850, as mentioned in
#36591 (comment)
  • Loading branch information
kevinnoel-be authored Jan 21, 2025
1 parent 30a1676 commit a7e3857
Show file tree
Hide file tree
Showing 5 changed files with 227 additions and 144 deletions.
16 changes: 9 additions & 7 deletions exporter/googlecloudpubsubexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import (
"go.uber.org/zap"
)

const name = "googlecloudpubsub"

type pubsubExporter struct {
logger *zap.Logger
client publisherClient
Expand All @@ -35,10 +33,10 @@ type pubsubExporter struct {
metricsWatermarkFunc metricsWatermarkFunc
logsMarshaler plog.Marshaler
logsWatermarkFunc logsWatermarkFunc
}

func (*pubsubExporter) Name() string {
return name
// To be overridden in tests
makeUUID func() (uuid.UUID, error)
makeClient func(ctx context.Context, cfg *Config, userAgent string) (publisherClient, error)
}

type encoding int
Expand Down Expand Up @@ -67,7 +65,7 @@ func (ex *pubsubExporter) start(ctx context.Context, _ component.Host) error {
ctx, ex.cancel = context.WithCancel(ctx)

if ex.client == nil {
client, err := newPublisherClient(ctx, ex.config, ex.userAgent)
client, err := ex.makeClient(ctx, ex.config, ex.userAgent)
if err != nil {
return fmt.Errorf("failed creating the gRPC client to Pubsub: %w", err)
}
Expand All @@ -88,7 +86,11 @@ func (ex *pubsubExporter) shutdown(_ context.Context) error {
}

func (ex *pubsubExporter) publishMessage(ctx context.Context, encoding encoding, data []byte, watermark time.Time) error {
id, err := uuid.NewRandom()
if len(data) == 0 {
return nil
}

id, err := ex.makeUUID()
if err != nil {
return err
}
Expand Down
268 changes: 214 additions & 54 deletions exporter/googlecloudpubsubexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,78 +5,238 @@ package googlecloudpubsubexporter

import (
"context"
"fmt"
"testing"
"time"

pb "cloud.google.com/go/pubsub/apiv1/pubsubpb"
"cloud.google.com/go/pubsub/pstest"
"github.com/google/uuid"
"github.com/googleapis/gax-go/v2"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
)

func TestName(t *testing.T) {
exporter := &pubsubExporter{}
assert.Equal(t, "googlecloudpubsub", exporter.Name())
}
const (
defaultUUID = "00000000-0000-0000-0000-000000000000"
defaultProjectID = "my-project"
defaultTopic = "projects/my-project/topics/otlp"
)

func TestExporterDefaultSettings(t *testing.T) {
ctx := context.Background()
// Start a fake server running locally.
srv := pstest.NewServer()
defer srv.Close()
_, err := srv.GServer.CreateTopic(ctx, &pb.Topic{
Name: "projects/my-project/topics/otlp",
func TestExporterNoData(t *testing.T) {
exporter, publisher := newTestExporter(t, func(config *Config) {
config.Watermark.Behavior = "earliest"
})
assert.NoError(t, err)

factory := NewFactory()
cfg := factory.CreateDefaultConfig()
exporterConfig := cfg.(*Config)
exporterConfig.Endpoint = srv.Addr
exporterConfig.Insecure = true
exporterConfig.ProjectID = "my-project"
exporterConfig.Topic = "projects/my-project/topics/otlp"
exporterConfig.TimeoutSettings = exporterhelper.TimeoutConfig{
Timeout: 12 * time.Second,
}
exporter := ensureExporter(exportertest.NewNopSettings(), exporterConfig)
assert.NoError(t, exporter.start(ctx, nil))
assert.NoError(t, exporter.consumeTraces(ctx, ptrace.NewTraces()))
assert.NoError(t, exporter.consumeMetrics(ctx, pmetric.NewMetrics()))
ctx := context.Background()
assert.NoError(t, exporter.consumeLogs(ctx, plog.NewLogs()))
assert.NoError(t, exporter.shutdown(ctx))
assert.NoError(t, exporter.consumeMetrics(ctx, pmetric.NewMetrics()))
assert.NoError(t, exporter.consumeTraces(ctx, ptrace.NewTraces()))

assert.Zero(t, publisher.requests)
}

func TestExporterCompression(t *testing.T) {
ctx := context.Background()
// Start a fake server running locally.
srv := pstest.NewServer()
defer srv.Close()
_, err := srv.GServer.CreateTopic(ctx, &pb.Topic{
Name: "projects/my-project/topics/otlp",
func TestExporterClientError(t *testing.T) {
cfg := NewFactory().CreateDefaultConfig().(*Config)
cfg.ProjectID = defaultProjectID
cfg.Topic = defaultTopic
require.NoError(t, cfg.Validate())

exporter := ensureExporter(exportertest.NewNopSettings(), cfg)
exporter.makeClient = func(context.Context, *Config, string) (publisherClient, error) {
return nil, fmt.Errorf("something went wrong")
}

require.Error(t, exporter.start(context.Background(), componenttest.NewNopHost()))
}

func TestExporterSimpleData(t *testing.T) {
t.Run("logs", func(t *testing.T) {
exporter, publisher := newTestExporter(t)

logs := plog.NewLogs()
logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("some log message")

require.NoError(t, exporter.consumeLogs(context.Background(), logs))
require.Len(t, publisher.requests, 1)

request := publisher.requests[0]
assert.Equal(t, defaultTopic, request.Topic)
assert.Len(t, request.Messages, 1)

message := request.Messages[0]
assert.NotEmpty(t, message.Data)
assert.Subset(t, message.Attributes, map[string]string{
"ce-type": "org.opentelemetry.otlp.logs.v1",
"content-type": "application/protobuf",
})
})

t.Run("metrics", func(t *testing.T) {
exporter, publisher := newTestExporter(t)

metrics := pmetric.NewMetrics()
metric := metrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()
metric.SetName("some.metric")
metric.SetEmptyGauge().DataPoints().AppendEmpty().SetIntValue(42)

require.NoError(t, exporter.consumeMetrics(context.Background(), metrics))
require.Len(t, publisher.requests, 1)

request := publisher.requests[0]
assert.Equal(t, defaultTopic, request.Topic)
assert.Len(t, request.Messages, 1)

message := request.Messages[0]
assert.NotEmpty(t, message.Data)
assert.Subset(t, message.Attributes, map[string]string{
"ce-type": "org.opentelemetry.otlp.metrics.v1",
"content-type": "application/protobuf",
})
})
assert.NoError(t, err)

t.Run("traces", func(t *testing.T) {
exporter, publisher := newTestExporter(t)

traces := ptrace.NewTraces()
span := traces.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty()
span.SetName("some span")

require.NoError(t, exporter.consumeTraces(context.Background(), traces))
require.Len(t, publisher.requests, 1)

request := publisher.requests[0]
assert.Equal(t, defaultTopic, request.Topic)
assert.Len(t, request.Messages, 1)

message := request.Messages[0]
assert.NotEmpty(t, message.Data)
assert.Subset(t, message.Attributes, map[string]string{
"ce-type": "org.opentelemetry.otlp.traces.v1",
"content-type": "application/protobuf",
})
})
}

func TestExporterSimpleDataWithCompression(t *testing.T) {
withCompression := func(config *Config) {
config.Compression = "gzip"
}

t.Run("logs", func(t *testing.T) {
exporter, publisher := newTestExporter(t, withCompression)

logs := plog.NewLogs()
logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("some log message")

require.NoError(t, exporter.consumeLogs(context.Background(), logs))
require.Len(t, publisher.requests, 1)

request := publisher.requests[0]
assert.Equal(t, defaultTopic, request.Topic)
assert.Len(t, request.Messages, 1)

message := request.Messages[0]
assert.NotEmpty(t, message.Data)
assert.Subset(t, message.Attributes, map[string]string{
"ce-id": "00000000-0000-0000-0000-000000000000",
"ce-source": "/opentelemetry/collector/googlecloudpubsub/latest",
"ce-specversion": "1.0",
"ce-type": "org.opentelemetry.otlp.logs.v1",
"content-type": "application/protobuf",
"content-encoding": "gzip",
})
})

t.Run("metrics", func(t *testing.T) {
exporter, publisher := newTestExporter(t, withCompression)

metrics := pmetric.NewMetrics()
metric := metrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()
metric.SetName("some.metric")
metric.SetEmptyGauge().DataPoints().AppendEmpty().SetIntValue(42)

require.NoError(t, exporter.consumeMetrics(context.Background(), metrics))
require.Len(t, publisher.requests, 1)

request := publisher.requests[0]
assert.Equal(t, defaultTopic, request.Topic)
assert.Len(t, request.Messages, 1)

message := request.Messages[0]
assert.NotEmpty(t, message.Data)
assert.Subset(t, message.Attributes, map[string]string{
"ce-type": "org.opentelemetry.otlp.metrics.v1",
"content-type": "application/protobuf",
"content-encoding": "gzip",
})
})

t.Run("traces", func(t *testing.T) {
exporter, publisher := newTestExporter(t, withCompression)

traces := ptrace.NewTraces()
span := traces.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty()
span.SetName("some span")

require.NoError(t, exporter.consumeTraces(context.Background(), traces))
require.Len(t, publisher.requests, 1)

request := publisher.requests[0]
assert.Equal(t, defaultTopic, request.Topic)
assert.Len(t, request.Messages, 1)

message := request.Messages[0]
assert.NotEmpty(t, message.Data)
assert.Subset(t, message.Attributes, map[string]string{
"ce-type": "org.opentelemetry.otlp.traces.v1",
"content-type": "application/protobuf",
"content-encoding": "gzip",
})
})
}

// Helpers

func newTestExporter(t *testing.T, options ...func(*Config)) (*pubsubExporter, *mockPublisher) {
t.Helper()

factory := NewFactory()
cfg := factory.CreateDefaultConfig()
exporterConfig := cfg.(*Config)
exporterConfig.Endpoint = srv.Addr
exporterConfig.UserAgent = "test-user-agent"
exporterConfig.Insecure = true
exporterConfig.ProjectID = "my-project"
exporterConfig.Topic = "projects/my-project/topics/otlp"
exporterConfig.TimeoutSettings = exporterhelper.TimeoutConfig{
Timeout: 12 * time.Second,
cfg := factory.CreateDefaultConfig().(*Config)
cfg.ProjectID = defaultProjectID
cfg.Topic = defaultTopic
for _, option := range options {
option(cfg)
}
exporterConfig.Compression = "gzip"
exporter := ensureExporter(exportertest.NewNopSettings(), exporterConfig)
assert.NoError(t, exporter.start(ctx, nil))
assert.NoError(t, exporter.consumeTraces(ctx, ptrace.NewTraces()))
assert.NoError(t, exporter.consumeMetrics(ctx, pmetric.NewMetrics()))
assert.NoError(t, exporter.consumeLogs(ctx, plog.NewLogs()))
assert.NoError(t, exporter.shutdown(ctx))
require.NoError(t, cfg.Validate())

exporter := ensureExporter(exportertest.NewNopSettings(), cfg)
publisher := &mockPublisher{}
exporter.makeClient = func(context.Context, *Config, string) (publisherClient, error) {
return publisher, nil
}
exporter.makeUUID = func() (uuid.UUID, error) {
return uuid.Parse(defaultUUID)
}

require.NoError(t, exporter.start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() { assert.NoError(t, exporter.shutdown(context.Background())) })

return exporter, publisher
}

type mockPublisher struct {
requests []*pb.PublishRequest
}

func (m *mockPublisher) Publish(_ context.Context, request *pb.PublishRequest, _ ...gax.CallOption) (*pb.PublishResponse, error) {
m.requests = append(m.requests, request)
return &pb.PublishResponse{}, nil
}

func (m *mockPublisher) Close() error {
return nil
}
5 changes: 4 additions & 1 deletion exporter/googlecloudpubsubexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"strings"
"time"

"github.com/google/uuid"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
Expand Down Expand Up @@ -46,11 +47,13 @@ func ensureExporter(params exporter.Settings, pCfg *Config) *pubsubExporter {
receiver = &pubsubExporter{
logger: params.Logger,
userAgent: strings.ReplaceAll(pCfg.UserAgent, "{{version}}", params.BuildInfo.Version),
ceSource: fmt.Sprintf("/opentelemetry/collector/%s/%s", name, params.BuildInfo.Version),
ceSource: fmt.Sprintf("/opentelemetry/collector/%s/%s", metadata.Type.String(), params.BuildInfo.Version),
config: pCfg,
tracesMarshaler: &ptrace.ProtoMarshaler{},
metricsMarshaler: &pmetric.ProtoMarshaler{},
logsMarshaler: &plog.ProtoMarshaler{},
makeUUID: uuid.NewRandom,
makeClient: newPublisherClient,
}
// we ignore the error here as the config is already validated with the same method
receiver.ceCompression, _ = pCfg.parseCompression()
Expand Down
5 changes: 0 additions & 5 deletions exporter/googlecloudpubsubexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ require (
)

require (
cloud.google.com/go v0.116.0 // indirect
cloud.google.com/go/auth v0.13.0 // indirect
cloud.google.com/go/auth/oauth2adapt v0.2.6 // indirect
cloud.google.com/go/compute/metadata v0.6.0 // indirect
Expand All @@ -34,8 +33,6 @@ require (
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-viper/mapstructure/v2 v2.2.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/s2a-go v0.1.8 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.4 // indirect
github.com/hashicorp/go-version v1.7.0 // indirect
Expand All @@ -49,8 +46,6 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect
go.einride.tech/aip v0.68.0 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/collector/config/configtelemetry v0.117.1-0.20250119231113-f07ebc3afb51 // indirect
go.opentelemetry.io/collector/consumer/consumererror v0.117.1-0.20250119231113-f07ebc3afb51 // indirect
go.opentelemetry.io/collector/consumer/consumertest v0.117.1-0.20250119231113-f07ebc3afb51 // indirect
Expand Down
Loading

0 comments on commit a7e3857

Please sign in to comment.