Skip to content

Commit

Permalink
fix: subscription message decode (#3553)
Browse files Browse the repository at this point in the history
* fix: subscription message decode

* fix type check

* fix panics
  • Loading branch information
mathnogueira authored Jan 23, 2024
1 parent c57a99b commit c53915f
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 1 deletion.
9 changes: 9 additions & 0 deletions server/subscription/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,15 @@ func (m Message) DecodeContent(output interface{}) error {
}
m.Content = base64Decoded
}

if _, isBytes := m.Content.([]byte); !isBytes {
bytes, err := json.Marshal(m.Content)
if err != nil {
return fmt.Errorf("could not marshal json: %w", err)
}
m.Content = bytes
}

return json.Unmarshal(m.Content.([]byte), output)
}

Expand Down
41 changes: 41 additions & 0 deletions server/subscription/message_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package subscription_test

import (
"encoding/json"
"testing"

"github.com/kubeshop/tracetest/server/subscription"
"github.com/stretchr/testify/require"
"gotest.tools/v3/assert"
)

func TestDecode(t *testing.T) {
type msg struct {
Value int
AnotherValue string
}

realMessage := msg{
Value: 2,
AnotherValue: "cat",
}

message := subscription.Message{
ResourceID: "xxx",
Content: realMessage,
}

msgBytes, err := json.Marshal(message)
require.NoError(t, err)

var target subscription.Message
err = json.Unmarshal(msgBytes, &target)
require.NoError(t, err)

var targetMsg msg
err = target.DecodeContent(&targetMsg)
require.NoError(t, err)

assert.Equal(t, 2, targetMsg.Value)
assert.Equal(t, "cat", targetMsg.AnotherValue)
}
4 changes: 3 additions & 1 deletion server/testconnection/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,14 @@ func (t *OTLPConnectionTester) GetSpanCount(ctx context.Context, opts ...GetSpan
topicName := PostSpanCountTopicName(WithTenantID(tenantID))
subscriber := subscription.NewSubscriberFunction(func(m subscription.Message) error {
m.DecodeContent(&response)
semaphore.TryAcquire(1)
semaphore.Release(1)
return nil
})

t.subscriptionManager.Subscribe(topicName, subscriber)
defer t.subscriptionManager.Unsubscribe(topicName, subscriber.ID())
// TODO: implement subscription
// defer t.subscriptionManager.Unsubscribe(topicName, subscriber.ID())

t.subscriptionManager.Publish(GetSpanCountTopicName(WithTenantID(tenantID)), OTLPConnectionTestRequest{})

Expand Down

0 comments on commit c53915f

Please sign in to comment.