Skip to content

Commit

Permalink
Merge pull request #6 from st-tech/feature/support_attributes
Browse files Browse the repository at this point in the history
Implement attributes option
  • Loading branch information
cisetn authored Jul 22, 2024
2 parents 66f93c9 + 05fe4c9 commit cfe4342
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 25 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ $ bash make.sh build_linux
| ----------------|------------------------------------------------|----------------|
| Project | Google Cloud project ID | NONE(required) |
| Topic | Google Cloud Pub/Sub topic name | NONE(required) |
| Format | The format to encode the message. Supported formats are json and map value | map value(optional) |
| Format | The type of message to be sent to pubsub. Currently, only `json` is supported. | NONE(optional) |
| Attributes | JSON string specifying message attributes | NONE(optional) |
| Debug | Print debug log | false(optional) |
| Timeout | The maximum time that the client will attempt to publish a bundle of messages. (millsecond) | 60000 (optional)|
| DelayThreshold | Publish a non-empty batch after this delay has passed. (millsecond) | 1 |
Expand All @@ -47,6 +48,7 @@ $ bash make.sh build_linux
Project your-project(custom)
Topic your-topic-name(custom)
Format json
Attributes {"key1":"value1","key2":"value2"}
```

### Example exec
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
module github.com/gjbae1212/fluent-bit-pubsub
module github.com/st-tech/fluent-bit-pubsub-custom

require (
cloud.google.com/go/pubsub v1.40.0
github.com/fluent/fluent-bit-go v0.0.0-20230731091245-a7a013e2473c
github.com/joho/godotenv v1.5.1
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.9.0
)
Expand All @@ -23,7 +24,6 @@ require (
github.com/google/uuid v1.6.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
github.com/googleapis/gax-go/v2 v2.12.5 // indirect
github.com/joho/godotenv v1.5.1 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect
Expand Down
50 changes: 32 additions & 18 deletions output_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,18 @@ import (

"github.com/fluent/fluent-bit-go/output"
)
import "os"
import (
"encoding/json"
"os"
)

var (
plugin Keeper
hostname string
format string
wrapper = OutputWrapper(&Output{})
plugin Keeper
hostname string
format string
attributes map[string]string

wrapper = OutputWrapper(&Output{})

timeout = pubsub.DefaultPublishSettings.Timeout
delayThreshold = pubsub.DefaultPublishSettings.DelayThreshold
Expand Down Expand Up @@ -69,23 +74,25 @@ func FLBPluginInit(ctx unsafe.Pointer) int {
ct := wrapper.GetConfigKey(ctx, "CountThreshold")
dt := wrapper.GetConfigKey(ctx, "DelayThreshold")
ft := wrapper.GetConfigKey(ctx, "Format")

fmt.Printf("[pubsub-go] plugin parameter project = '%s'\n", project)
fmt.Printf("[pubsub-go] plugin parameter topic = '%s'\n", topic)
fmt.Printf("[pubsub-go] plugin parameter debug = '%s'\n", dg)
fmt.Printf("[pubsub-go] plugin parameter timeout = '%s'\n", to)
fmt.Printf("[pubsub-go] plugin parameter byte threshold = '%s'\n", bt)
fmt.Printf("[pubsub-go] plugin parameter count threshold = '%s'\n", ct)
fmt.Printf("[pubsub-go] plugin parameter delay threshold = '%s'\n", dt)
fmt.Printf("[pubsub-go] plugin parameter format = '%s'\n", ft)
ab := wrapper.GetConfigKey(ctx, "Attributes")

// fmt.Printf("[pubsub-go] plugin parameter project = '%s'\n", project)
// fmt.Printf("[pubsub-go] plugin parameter topic = '%s'\n", topic)
// fmt.Printf("[pubsub-go] plugin parameter debug = '%s'\n", dg)
// fmt.Printf("[pubsub-go] plugin parameter timeout = '%s'\n", to)
// fmt.Printf("[pubsub-go] plugin parameter byte threshold = '%s'\n", bt)
// fmt.Printf("[pubsub-go] plugin parameter count threshold = '%s'\n", ct)
// fmt.Printf("[pubsub-go] plugin parameter delay threshold = '%s'\n", dt)
// fmt.Printf("[pubsub-go] plugin parameter format = '%s'\n", ft)
// fmt.Printf("[pubsub-go] plugin parameter attributes = '%s'\n", ab)

hostname, err = os.Hostname()
if err != nil {
fmt.Printf("[err][init] %+v\n", err)
return output.FLB_ERROR
}

fmt.Printf("[pubsub-go] plugin hostname = '%s'\n", hostname)
// fmt.Printf("[pubsub-go] plugin hostname = '%s'\n", hostname)

if dg != "" {
debug, err = strconv.ParseBool(dg)
Expand Down Expand Up @@ -126,6 +133,13 @@ func FLBPluginInit(ctx unsafe.Pointer) int {
}
delayThreshold = time.Duration(v) * time.Millisecond
}
if ab != "" {
err = json.Unmarshal([]byte(ab), &attributes)
if err != nil {
fmt.Printf("[err][init] %+v\n", err)
return output.FLB_ERROR
}
}
if _, ok := supportFormats[ft]; ok {
format = ft
} else {
Expand Down Expand Up @@ -169,18 +183,18 @@ func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int {
timestamp := ts.(output.FLBTime)

if formatter, ok := supportFormats[format]; ok {
fmt.Printf("[pubsub-go] format = '%s'\n", format)
// fmt.Printf("[pubsub-go] format = '%s'\n", format)
msg, err := formatter.Encode(record)
if err != nil {
fmt.Printf("[err][encode] %+v \n", err)
return output.FLB_ERROR
}
results = append(results, plugin.Send(ctx, msg))
results = append(results, plugin.Send(ctx, msg, attributes))
} else {
for k, v := range record {
//fmt.Printf("[%s] %s %s %v \n", tagname, timestamp.String(), k, v)
_, _, _ = k, timestamp, tagname
results = append(results, plugin.Send(ctx, interfaceToBytes(v)))
results = append(results, plugin.Send(ctx, interfaceToBytes(v), attributes))
}

}
Expand Down
10 changes: 7 additions & 3 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

type Keeper interface {
Send(ctx context.Context, data []byte) *pubsub.PublishResult
Send(ctx context.Context, data []byte, attributes map[string]string) *pubsub.PublishResult
Stop()
}

Expand Down Expand Up @@ -45,11 +45,15 @@ func NewKeeper(projectId, topicName string,
return Keeper(pubs), nil
}

func (gps *GooglePubSub) Send(ctx context.Context, data []byte) *pubsub.PublishResult {
func (gps *GooglePubSub) Send(ctx context.Context, data []byte, attributes map[string]string) *pubsub.PublishResult {
if len(data) == 0 {
return nil
}
return gps.topic.Publish(ctx, &pubsub.Message{Data: data})
msg := &pubsub.Message{Data: data}
if attributes != nil {
msg.Attributes = attributes
}
return gps.topic.Publish(ctx, msg)
}

func (gps *GooglePubSub) Stop() {
Expand Down
15 changes: 14 additions & 1 deletion pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"encoding/json"
"log"
"os"
"testing"
Expand Down Expand Up @@ -65,7 +66,19 @@ func TestGooglePubSub_Send(t *testing.T) {
keeper, err := NewKeeper(projectId, topicName, nil)
assert.NoError(err)

result := keeper.Send(ctx, []byte("aaa"))
data := map[string]interface{}{
"key1": "value1",
"key2": "value2",
}

attributes := map[string]string{
"attr_key1": "attr_value1",
"attr_key2": "attr_value2",
}
msg, err := json.Marshal(data)
assert.NoError(err)
result := keeper.Send(ctx, msg, attributes)

_, err = result.Get(ctx)
assert.NoError(err)
sub := keeper.(*GooglePubSub).client.Subscription(topicName)
Expand Down

0 comments on commit cfe4342

Please sign in to comment.