Skip to content

Commit

Permalink
Change field key, add converter
Browse files Browse the repository at this point in the history
  • Loading branch information
dehaansa committed Dec 11, 2024
1 parent ad69096 commit cefb4c4
Show file tree
Hide file tree
Showing 5 changed files with 248 additions and 6 deletions.
4 changes: 2 additions & 2 deletions internal/component/otelcol/receiver/syslog/syslog.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ type UDP struct {
}

type TrimConfig struct {
PreserveLeadingWhitespace bool `alloy:"preserve_leading_whitespace,attr,optional"`
PreserveTrailingWhitespace bool `alloy:"preserve_trailing_whitespace,attr,optional"`
PreserveLeadingWhitespace bool `alloy:"preserve_leading_whitespaces,attr,optional"`
PreserveTrailingWhitespace bool `alloy:"preserve_trailing_whitespaces,attr,optional"`
}

func (c *TrimConfig) Convert() *trim.Config {
Expand Down
8 changes: 4 additions & 4 deletions internal/component/otelcol/receiver/syslog/syslog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,8 @@ func TestUnmarshal(t *testing.T) {
one_log_per_packet = true
add_attributes = true
encoding = "utf-16be"
preserve_leading_whitespace = true
preserve_trailing_whitespace = true
preserve_leading_whitespaces = true
preserve_trailing_whitespaces = true
tls {
include_system_ca_certs_pool = true
reload_interval = "1m"
Expand All @@ -149,8 +149,8 @@ func TestUnmarshal(t *testing.T) {
one_log_per_packet = false
add_attributes = false
encoding = "utf-16le"
preserve_leading_whitespace = false
preserve_trailing_whitespace = false
preserve_leading_whitespaces = false
preserve_trailing_whitespaces = false
async {
readers = 2
processors = 4
Expand Down
130 changes: 130 additions & 0 deletions internal/converter/internal/otelcolconvert/converter_syslogreceiver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package otelcolconvert

import (
"fmt"

"github.com/alecthomas/units"
"github.com/grafana/alloy/internal/component/common/config"
"github.com/grafana/alloy/internal/component/otelcol"
"github.com/grafana/alloy/internal/component/otelcol/receiver/syslog"
"github.com/grafana/alloy/internal/converter/diag"
"github.com/grafana/alloy/internal/converter/internal/common"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/udp"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/split"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/syslogreceiver"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componentstatus"
)

func init() {
converters = append(converters, syslogReceiverConverter{})
}

type syslogReceiverConverter struct{}

func (syslogReceiverConverter) Factory() component.Factory {
return syslogreceiver.NewFactory()
}

func (syslogReceiverConverter) InputComponentName() string {
return "otelcol.receiver.syslog"
}

func (syslogReceiverConverter) ConvertAndAppend(state *State, id componentstatus.InstanceID, cfg component.Config) diag.Diagnostics {
var diags diag.Diagnostics

label := state.AlloyComponentLabel()

args := toOtelcolReceiversyslog(cfg.(*syslogreceiver.SysLogConfig))
block := common.NewBlockWithOverride([]string{"otelcol", "receiver", "syslog"}, label, args)

diags.Add(
diag.SeverityLevelInfo,
fmt.Sprintf("Converted %s into %s", StringifyInstanceID(id), StringifyBlock(block)),
)

state.Body().AppendBlock(block)
return diags
}

func toOtelcolReceiversyslog(cfg *syslogreceiver.SysLogConfig) *syslog.Arguments {
// Protocol string `mapstructure:"protocol,omitempty"`
// Location string `mapstructure:"location,omitempty"`
// EnableOctetCounting bool `mapstructure:"enable_octet_counting,omitempty"`
// AllowSkipPriHeader bool `mapstructure:"allow_skip_pri_header,omitempty"`
// NonTransparentFramingTrailer *string `mapstructure:"non_transparent_framing_trailer,omitempty"`
// MaxOctets int `mapstructure:"max_octets,omitempty"`
args := &syslog.Arguments{
Protocol: config.SysLogFormat(cfg.InputConfig.Protocol),
Location: cfg.InputConfig.Location,
EnableOctetCounting: cfg.InputConfig.EnableOctetCounting,
AllowSkipPriHeader: cfg.InputConfig.AllowSkipPriHeader,
NonTransparentFramingTrailer: syslog.FramingTrailer(*cfg.InputConfig.NonTransparentFramingTrailer),
MaxOctets: cfg.InputConfig.MaxOctets,
DebugMetrics: common.DefaultValue[syslog.Arguments]().DebugMetrics,
}

if cfg.InputConfig.TCP != nil {
args.TCP = &syslog.TCP{
MaxLogSize: units.Base2Bytes(cfg.InputConfig.TCP.MaxLogSize),
ListenAddress: cfg.InputConfig.TCP.ListenAddress,
TLS: toTLSServerArguments(cfg.InputConfig.TCP.TLS),
AddAttributes: cfg.InputConfig.TCP.AddAttributes,
OneLogPerPacket: cfg.InputConfig.TCP.OneLogPerPacket,
Encoding: cfg.InputConfig.TCP.Encoding,
MultilineConfig: toSyslogMultilineConfig(cfg.InputConfig.TCP.SplitConfig),
TrimConfig: toSyslogTrimConfig(cfg.InputConfig.TCP.TrimConfig),
}
}

if cfg.InputConfig.UDP != nil {
args.UDP = &syslog.UDP{
ListenAddress: cfg.InputConfig.UDP.ListenAddress,
OneLogPerPacket: cfg.InputConfig.UDP.OneLogPerPacket,
AddAttributes: cfg.InputConfig.UDP.AddAttributes,
Encoding: cfg.InputConfig.UDP.Encoding,
MultilineConfig: toSyslogMultilineConfig(cfg.InputConfig.UDP.SplitConfig),
TrimConfig: toSyslogTrimConfig(cfg.InputConfig.UDP.TrimConfig),
Async: toSyslogAsyncConfig(cfg.InputConfig.UDP.AsyncConfig),
}
}

// This isn't done in a function because the type is not exported
args.ConsumerRetry = otelcol.ConsumerRetryArguments{
Enabled: cfg.RetryOnFailure.Enabled,
InitialInterval: cfg.RetryOnFailure.InitialInterval,
MaxInterval: cfg.RetryOnFailure.MaxInterval,
MaxElapsedTime: cfg.RetryOnFailure.MaxElapsedTime,
}

return args

}

func toSyslogMultilineConfig(cfg split.Config) *syslog.MultilineConfig {
return &syslog.MultilineConfig{
LineStartPattern: cfg.LineStartPattern,
LineEndPattern: cfg.LineEndPattern,
OmitPattern: cfg.OmitPattern,
}
}

func toSyslogTrimConfig(cfg trim.Config) *syslog.TrimConfig {
return &syslog.TrimConfig{
PreserveLeadingWhitespace: cfg.PreserveLeading,
PreserveTrailingWhitespace: cfg.PreserveTrailing,
}
}

func toSyslogAsyncConfig(cfg *udp.AsyncConfig) *syslog.AsyncConfig {
if cfg == nil {
return nil
}

return &syslog.AsyncConfig{
Readers: cfg.Readers,
Processors: cfg.Processors,
MaxQueueLength: cfg.MaxQueueLength,
}
}
55 changes: 55 additions & 0 deletions internal/converter/internal/otelcolconvert/testdata/syslog.alloy
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
otelcol.receiver.syslog "default" {
enable_octet_counting = true
max_octets = 16000
allow_skip_pri_header = true
non_transparent_framing_trailer = "NUL"

retry_on_failure {
enabled = true
initial_interval = "10s"
max_interval = "1m0s"
max_elapsed_time = "10m0s"
}

tcp {
max_log_size = "2MiB"
listen_address = "localhost:1514"

tls {
reload_interval = "1m0s"
include_system_ca_certs_pool = true
}
add_attributes = true
one_log_per_packet = true
encoding = "utf-16be"

multiline { }
preserve_leading_whitespaces = true
preserve_trailing_whitespaces = true
}

udp {
listen_address = "localhost:1515"
encoding = "utf-16le"

multiline {
line_end_pattern = "logend"
omit_pattern = true
}

async {
readers = 2
processors = 4
max_queue_length = 1000
}
}
}

otelcol.exporter.syslog "default" {
tls {
insecure_skip_verify = true
}
endpoint = "localhost"
port = 1514
enable_octet_counting = true
}
57 changes: 57 additions & 0 deletions internal/converter/internal/otelcolconvert/testdata/syslog.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
receivers:
syslog:
location: "UTC"
protocol: "rfc5424"
enable_octet_counting: true
max_octets: 16000
allow_skip_pri_header: true
non_transparent_framing_trailer: "NUL"
tcp:
listen_address: "localhost:1514"
max_log_size: "2MiB"
one_log_per_packet: true
add_attributes: true
encoding: "utf-16be"
preserve_leading_whitespaces: true
preserve_trailing_whitespaces: true
tls:
include_system_ca_certs_pool: true
reload_interval: "1m"
udp:
listen_address: "localhost:1515"
one_log_per_packet: false
add_attributes: false
encoding: "utf-16le"
preserve_leading_whitespaces: false
preserve_trailing_whitespaces: false
async:
readers: 2
processors: 4
max_queue_length: 1000
multiline:
line_end_pattern: "logend"
omit_pattern: true
retry_on_failure:
enabled: true
initial_interval: "10s"
max_interval: "1m"
max_elapsed_time: "10m"


exporters:
syslog:
endpoint: localhost
port: 1514
protocol: "rfc5424"
network: "tcp"
enable_octet_counting: true
tls:
insecure: false
insecure_skip_verify: true

service:
pipelines:
logs:
receivers: [syslog]
processors: []
exporters: [syslog]

0 comments on commit cefb4c4

Please sign in to comment.