Skip to content

Commit

Permalink
Changes to the output event log format and the poll interval type
Browse files Browse the repository at this point in the history
  • Loading branch information
rnishtala-sumo committed Oct 26, 2023
1 parent 57acf39 commit 71990c2
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 62 deletions.
1 change: 1 addition & 0 deletions .changelog/1273.added.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Adding the Windows AD Inventory Receiver to the collector
11 changes: 0 additions & 11 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

<!-- towncrier release notes -->

## [Unreleased]

### Released TBD

### Added

- feat(receiver/activedirectoryinvreceiver): Add the Windows Active Directory inventory receiver [#1273]

[#1273]: https://github.com/SumoLogic/sumologic-otel-collector/pull/1273
[Unreleased]: https://github.com/SumoLogic/sumologic-otel-collector/compare/v0.88.0-sumo-0...main

## [v0.88.0-sumo-0]

### Released 2023-10-24
Expand Down
8 changes: 4 additions & 4 deletions pkg/receiver/activedirectoryinvreceiver/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Windows Active Directory inventory Receiver
# Windows Active Directory Inventory Receiver

**Stability level**: Alpha

Expand All @@ -10,7 +10,7 @@ Supported pipeline types: logs

```yaml
receivers:
activedirectoryinv:
active_directory_inv:
# Base DN
# default = ""
base_dn: "CN=Users,DC=exampledomain,DC=com"
Expand All @@ -32,7 +32,7 @@ Example configuration:
```yaml
receivers:
## All my example logs
activedirectoryinv:
active_directory_inv:
base_dn: "CN=Users,DC=exampledomain,DC=com"
attributes: [name, mail, department, manager, memberOf]
poll_interval: 24h
Expand All @@ -48,7 +48,7 @@ service:
pipelines:
logs/syslog source:
receivers:
- activedirectoryinvreceiver
- active_directory_inv
exporters:
- logging
```
29 changes: 17 additions & 12 deletions pkg/receiver/activedirectoryinvreceiver/adinvreceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package activedirectoryinvreceiver

import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
Expand Down Expand Up @@ -101,13 +102,8 @@ func (l *ADReceiver) Shutdown(_ context.Context) error {
// Start polling for Active Directory inventory records
func (l *ADReceiver) startPolling(ctx context.Context) {
defer l.wg.Done()
duration, err := time.ParseDuration(l.config.PollInterval)
if err != nil {
l.logger.Error("Failed to parse poll interval", zap.Error(err))
return
}
l.logger.Info("Polling interval: ", zap.Duration("interval", duration))
t := time.NewTicker(duration)
l.logger.Info("Polling interval: ", zap.Duration("interval", l.config.PollInterval))
t := time.NewTicker(l.config.PollInterval)
for {
select {
case <-ctx.Done():
Expand All @@ -130,7 +126,11 @@ func (r *ADReceiver) traverse(node Container, attrs []string, resourceLogs *plog
r.logger.Error("Failed to convert container to object", zap.Error(err))
return
}
setUserAttributes(nodeObject, attrs, resourceLogs)
err = setUserAttributes(nodeObject, attrs, resourceLogs)
if err != nil {
r.logger.Error("Failed to set user attributes", zap.Error(err))
return
}
children, err := node.Children()
if err != nil {
r.logger.Error("Failed to retrieve children", zap.Error(err))
Expand Down Expand Up @@ -168,17 +168,22 @@ func (r *ADReceiver) poll(ctx context.Context) error {
}

// Set user attributes to a log record body
func setUserAttributes(user Object, attrs []string, resourceLogs *plog.ResourceLogs) {
func setUserAttributes(user Object, attrs []string, resourceLogs *plog.ResourceLogs) error {
observedTime := pcommon.NewTimestampFromTime(time.Now())
attributes := ""
attributes := make(map[string]interface{})
for _, attr := range attrs {
values, err := user.Attrs(attr)
if err == nil && len(values) > 0 {
attributes += fmt.Sprintf("%s: %v\n", attr, values)
attributes[attr] = values
}
}
attributesJSON, err := json.Marshal(attributes)
if err != nil {
return err
}
logRecord := resourceLogs.ScopeLogs().At(0).LogRecords().AppendEmpty()
logRecord.SetObservedTimestamp(observedTime)
logRecord.SetTimestamp(observedTime)
logRecord.Body().SetStr(attributes)
logRecord.Body().SetStr(string(attributesJSON))
return nil
}
35 changes: 19 additions & 16 deletions pkg/receiver/activedirectoryinvreceiver/adinvreceiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ package activedirectoryinvreceiver

import (
"context"
"encoding/json"
"fmt"
"testing"
"time"

"github.com/go-adsi/adsi"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
Expand Down Expand Up @@ -100,60 +102,61 @@ func TestStart(t *testing.T) {
mockClient := &MockClient{}
mockRuntime := &MockRuntime{}
mockRuntime.On("SupportedOS").Return(true)

logsRcvr := newLogsReceiver(cfg, zap.NewNop(), mockClient, mockRuntime, sink)

// Start the receiver
err := logsRcvr.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)

// Shutdown the receiver
err = logsRcvr.Shutdown(context.Background())
require.NoError(t, err)
}

func TestStartUnsupportedOS(t *testing.T) {
cfg := CreateDefaultConfig().(*ADConfig)
cfg.DN = "CN=Guest,CN=Users,DC=exampledomain,DC=com"

sink := &consumertest.LogsSink{}
mockClient := &MockClient{}
mockRuntime := &MockRuntime{}
mockRuntime.On("SupportedOS").Return(false)

logsRcvr := newLogsReceiver(cfg, zap.NewNop(), mockClient, mockRuntime, sink)

// Start the receiver
err := logsRcvr.Start(context.Background(), componenttest.NewNopHost())
require.Error(t, err)
require.Contains(t, err.Error(), "activedirectoryinv is only supported on Windows")
require.Contains(t, err.Error(), "active_directory_inv is only supported on Windows")
}

func TestPoll(t *testing.T) {
func TestLogRecord(t *testing.T) {
expectedBody := `{"name":["test"],"mail":["test"],"department":["test"],"manager":["test"],"memberOf":["test"]}`
var expectedResult, actualResult map[string]interface{}
cfg := CreateDefaultConfig().(*ADConfig)
cfg.DN = "CN=Guest,CN=Users,DC=exampledomain,DC=com"
cfg.PollInterval = "1s"
cfg.Attributes = []string{"name"}
cfg.PollInterval = 1 * time.Second // Set poll interval to 1s to speed up test
sink := &consumertest.LogsSink{}
mockClient := defaultMockClient()
mockRuntime := &MockRuntime{}
mockRuntime.On("SupportedOS").Return(true)
logsRcvr := newLogsReceiver(cfg, zap.NewNop(), mockClient, mockRuntime, sink)

// Start the receiver
err := logsRcvr.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)

require.Eventually(t, func() bool {
return sink.LogRecordCount() > 0
}, 2*time.Second, 10*time.Millisecond)

result := sink.AllLogs()[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().AsRaw()
err = json.Unmarshal([]byte(expectedBody), &expectedResult)
require.NoError(t, err)
err = json.Unmarshal([]byte(result.(string)), &actualResult)
require.NoError(t, err)
// Shutdown the receiver
err = logsRcvr.Shutdown(context.Background())
require.NoError(t, err)
assert.Equal(t, expectedResult, actualResult)
}

func defaultMockClient() Client {
mockClient := &MockClient{}
mockContainer := &MockContainer{}
mockObject := &MockObject{}
mockObjectIter := &MockObjectIter{}
attrs := []interface{}{"Guest", "test"}
attrs := []interface{}{"test"}
mockContainer.On("ToObject").Return(mockObject, nil)
mockContainer.On("Children").Return(mockObjectIter, fmt.Errorf("no children"))
mockContainer.On("Close").Return(nil)
Expand Down
20 changes: 12 additions & 8 deletions pkg/receiver/activedirectoryinvreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import (
// ADConfig defines configuration for Active Directory Inventory receiver.

type ADConfig struct {
DN string `mapstructure:"base_dn"` // DN is the base distinguished name to search from
Attributes []string `mapstructure:"attributes"`
PollInterval string `mapstructure:"poll_interval"`
DN string `mapstructure:"base_dn"` // DN is the base distinguished name to search from
Attributes []string `mapstructure:"attributes"`
PollInterval time.Duration `mapstructure:"poll_interval"`
}

var (
Expand All @@ -34,16 +34,20 @@ var (
errSupportedOS = errors.New(typeStr + " is only supported on Windows.")
)

func isValidDuration(durationStr string) bool {
_, err := time.ParseDuration(durationStr)
return err == nil
func isValidDuration(duration time.Duration) bool {
return duration > 0
}

// Validate validates all portions of the relevant config
func (c *ADConfig) Validate() error {

// Define the regular expression pattern for a valid Base DN
pattern := `^((CN|OU)=[^,]+(,|$))*((DC=[^,]+),?)*$`
// Regular expression pattern for a valid DN
// CN=Guest,CN=Users,DC=exampledomain,DC=com
// CN=Guest,OU=Users,DC=exampledomain,DC=com
// DC=exampledomain,DC=com
// CN=Guest,DC=exampledomain,DC=com
// OU=Users,DC=exampledomain,DC=com
pattern := `^((CN|OU)=[^,]+(,|$))*((DC=[^,]+),?)+$`

// Compile the regular expression pattern
regex := regexp.MustCompile(pattern)
Expand Down
16 changes: 9 additions & 7 deletions pkg/receiver/activedirectoryinvreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package activedirectoryinvreceiver

import (
"testing"
"time"

"github.com/stretchr/testify/require"
)
Expand All @@ -31,48 +32,49 @@ func TestValidate(t *testing.T) {
config: ADConfig{
DN: "CN=Guest,CN=Users,DC=exampledomain,DC=com",
Attributes: []string{"name"},
PollInterval: "60s",
PollInterval: 60 * time.Second,
},
},
{
name: "Valid Config DC",
config: ADConfig{
DN: "DC=exampledomain,DC=com",
Attributes: []string{"name"},
PollInterval: "60s",
PollInterval: 60 * time.Second,
},
},
{
name: "Valid Config OU",
config: ADConfig{
DN: "CN=Guest,OU=Users,DC=exampledomain,DC=com",
Attributes: []string{"name"},
PollInterval: "24h",
PollInterval: 24 * time.Hour,
},
},
{
name: "Invalid DN",
config: ADConfig{
DN: "NA",
Attributes: []string{"name"},
PollInterval: "24h",
PollInterval: 24 * time.Hour,
},
expectedErr: errInvalidDN,
},
{
name: "Empty DN",
name: "Invalid Empty DN",
config: ADConfig{
DN: "",
Attributes: []string{"name"},
PollInterval: "24h",
PollInterval: 24 * time.Hour,
},
expectedErr: errInvalidDN,
},
{
name: "Invalid Poll Interval",
config: ADConfig{
DN: "CN=Users,DC=exampledomain,DC=com",
Attributes: []string{"name"},
PollInterval: "test",
PollInterval: 0,
},
expectedErr: errInvalidPollInterval,
},
Expand Down
5 changes: 3 additions & 2 deletions pkg/receiver/activedirectoryinvreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package activedirectoryinvreceiver

import (
"context"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
Expand All @@ -24,7 +25,7 @@ import (

const (
// The value of "type" key in configuration.
typeStr = "activedirectoryinv"
typeStr = "active_directory_inv"
)

// NewFactory creates a factory for Active Directory Inventory receiver
Expand All @@ -41,7 +42,7 @@ func CreateDefaultConfig() component.Config {
return &ADConfig{
DN: "",
Attributes: []string{"name", "mail", "department", "manager", "memberOf"},
PollInterval: "24h",
PollInterval: 24 * time.Hour,
}
}

Expand Down
3 changes: 1 addition & 2 deletions pkg/receiver/activedirectoryinvreceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@ import (
func TestType(t *testing.T) {
factory := NewFactory()
ft := factory.Type()
require.EqualValues(t, "activedirectoryinv", ft)
require.EqualValues(t, "active_directory_inv", ft)
}

func TestCreateLogsReceiver(t *testing.T) {
cfg := CreateDefaultConfig().(*ADConfig)
cfg.DN = "CN=Guest,CN=Users,DC=exampledomain,DC=com" // valid DN
_, err := NewFactory().CreateLogsReceiver(
context.Background(),
receivertest.NewNopCreateSettings(),
Expand Down

0 comments on commit 71990c2

Please sign in to comment.