Skip to content

Commit

Permalink
lint
Browse files Browse the repository at this point in the history
  • Loading branch information
mmetc committed Dec 18, 2024
1 parent 64db785 commit f565efa
Showing 1 changed file with 50 additions and 4 deletions.
54 changes: 50 additions & 4 deletions pkg/acquisition/modules/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,33 +288,42 @@ func (d *DockerSource) SupportedModes() []string {
// OneShotAcquisition reads a set of file and returns when done
func (d *DockerSource) OneShotAcquisition(ctx context.Context, out chan types.Event, t *tomb.Tomb) error {
d.logger.Debug("In oneshot")
runningContainer, err := d.Client.ContainerList(ctx, dockerContainer.ListOptions{})

runningContainers, err := d.Client.ContainerList(ctx, dockerContainer.ListOptions{})
if err != nil {
return err
}

foundOne := false
for _, container := range runningContainer {

for _, container := range runningContainers {
if _, ok := d.runningContainerState[container.ID]; ok {
d.logger.Debugf("container with id %s is already being read from", container.ID)
continue
}

if containerConfig := d.EvalContainer(ctx, container); containerConfig != nil {
d.logger.Infof("reading logs from container %s", containerConfig.Name)
d.logger.Debugf("logs options: %+v", *d.containerLogsOptions)

dockerReader, err := d.Client.ContainerLogs(ctx, containerConfig.ID, *d.containerLogsOptions)
if err != nil {
d.logger.Errorf("unable to read logs from container: %+v", err)
return err
}

// we use this library to normalize docker API logs (cf. https://ahmet.im/blog/docker-logs-api-binary-format-explained/)
foundOne = true

var scanner *bufio.Scanner

if containerConfig.Tty {
scanner = bufio.NewScanner(dockerReader)
} else {
reader := dlog.NewReader(dockerReader)
scanner = bufio.NewScanner(reader)
}

for scanner.Scan() {
select {
case <-t.Dying():
Expand All @@ -324,16 +333,19 @@ func (d *DockerSource) OneShotAcquisition(ctx context.Context, out chan types.Ev
if line == "" {
continue
}

l := types.Line{}
l.Raw = line
l.Labels = d.Config.Labels
l.Time = time.Now().UTC()
l.Src = containerConfig.Name
l.Process = true
l.Module = d.GetName()

if d.metricsLevel != configuration.METRICS_NONE {
linesRead.With(prometheus.Labels{"source": containerConfig.Name}).Inc()
}

evt := types.MakeEvent(true, types.LOG, true)
evt.Line = l
evt.Process = true
Expand All @@ -342,10 +354,12 @@ func (d *DockerSource) OneShotAcquisition(ctx context.Context, out chan types.Ev
d.logger.Debugf("Sent line to parsing: %+v", evt.Line.Raw)
}
}

err = scanner.Err()
if err != nil {
d.logger.Errorf("Got error from docker read: %s", err)
}

d.runningContainerState[container.ID] = containerConfig
}
}
Expand Down Expand Up @@ -380,6 +394,7 @@ func (d *DockerSource) getContainerTTY(ctx context.Context, containerId string)
if err != nil {
return false
}

return containerDetails.Config.Tty
}

Expand All @@ -388,6 +403,7 @@ func (d *DockerSource) getContainerLabels(ctx context.Context, containerId strin
if err != nil {
return map[string]interface{}{}
}

return parseLabels(containerDetails.Config.Labels)
}

Expand All @@ -403,6 +419,7 @@ func (d *DockerSource) EvalContainer(ctx context.Context, container dockerTypes.
if strings.HasPrefix(name, "/") && name != "" {
name = name[1:]
}

if name == containerName {
return &ContainerConfig{ID: container.ID, Name: name, Labels: d.Config.Labels, Tty: d.getContainerTTY(ctx, container.ID)}
}
Expand All @@ -429,38 +446,49 @@ func (d *DockerSource) EvalContainer(ctx context.Context, container dockerTypes.
d.logger.Tracef("container has no 'crowdsec' labels set, ignoring container: %s", container.ID)
return nil
}

if _, ok := parsedLabels["enable"]; !ok {
d.logger.Errorf("container has 'crowdsec' labels set but no 'crowdsec.enable' key found")
return nil
}

enable, ok := parsedLabels["enable"].(string)
if !ok {
d.logger.Error("container has 'crowdsec.enable' label set but it's not a string")
return nil
}

if strings.ToLower(enable) != "true" {
d.logger.Debugf("container has 'crowdsec.enable' label not set to true ignoring container: %s", container.ID)
return nil
}

if _, ok = parsedLabels["labels"]; !ok {
d.logger.Error("container has 'crowdsec.enable' label set to true but no 'labels' keys found")
return nil
}

labelsTypeCast, ok := parsedLabels["labels"].(map[string]interface{})
if !ok {
d.logger.Error("container has 'crowdsec.enable' label set to true but 'labels' is not a map")
return nil
}

d.logger.Debugf("container labels %+v", labelsTypeCast)

Check warning on line 478 in pkg/acquisition/modules/docker/docker.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/docker/docker.go#L478

Added line #L478 was not covered by tests
labels := make(map[string]string)

Check warning on line 480 in pkg/acquisition/modules/docker/docker.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/docker/docker.go#L480

Added line #L480 was not covered by tests
for k, v := range labelsTypeCast {
if v, ok := v.(string); ok {
log.Debugf("label %s is a string with value %s", k, v)
labels[k] = v

Check warning on line 485 in pkg/acquisition/modules/docker/docker.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/docker/docker.go#L485

Added line #L485 was not covered by tests
continue
}

d.logger.Errorf("label %s is not a string", k)
}

return &ContainerConfig{ID: container.ID, Name: container.Names[0], Labels: labels, Tty: d.getContainerTTY(ctx, container.ID)}
}

Expand All @@ -470,6 +498,7 @@ func (d *DockerSource) EvalContainer(ctx context.Context, container dockerTypes.
func (d *DockerSource) WatchContainer(ctx context.Context, monitChan chan *ContainerConfig, deleteChan chan *ContainerConfig) error {
ticker := time.NewTicker(d.CheckIntervalDuration)
d.logger.Infof("Container watcher started, interval: %s", d.CheckIntervalDuration.String())

for {
select {
case <-d.t.Dying():
Expand All @@ -478,32 +507,37 @@ func (d *DockerSource) WatchContainer(ctx context.Context, monitChan chan *Conta
case <-ticker.C:
// to track for garbage collection
runningContainersID := make(map[string]bool)
runningContainer, err := d.Client.ContainerList(ctx, dockerContainer.ListOptions{})

runningContainers, err := d.Client.ContainerList(ctx, dockerContainer.ListOptions{})
if err != nil {
if strings.Contains(strings.ToLower(err.Error()), "cannot connect to the docker daemon at") {
for idx, container := range d.runningContainerState {
if d.runningContainerState[idx].t.Alive() {
d.logger.Infof("killing tail for container %s", container.Name)
d.runningContainerState[idx].t.Kill(nil)

Check warning on line 518 in pkg/acquisition/modules/docker/docker.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/docker/docker.go#L518

Added line #L518 was not covered by tests
if err := d.runningContainerState[idx].t.Wait(); err != nil {
d.logger.Infof("error while waiting for death of %s : %s", container.Name, err)
}
}

delete(d.runningContainerState, idx)
}
} else {
log.Errorf("container list err: %s", err)
}

continue
}

for _, container := range runningContainer {
for _, container := range runningContainers {
runningContainersID[container.ID] = true

// don't need to re eval an already monitored container
if _, ok := d.runningContainerState[container.ID]; ok {
continue
}

if containerConfig := d.EvalContainer(ctx, container); containerConfig != nil {
monitChan <- containerConfig
}
Expand All @@ -514,6 +548,7 @@ func (d *DockerSource) WatchContainer(ctx context.Context, monitChan chan *Conta
deleteChan <- containerConfig
}
}

d.logger.Tracef("Reading logs from %d containers", len(d.runningContainerState))

ticker.Reset(d.CheckIntervalDuration)
Expand All @@ -525,7 +560,9 @@ func (d *DockerSource) StreamingAcquisition(ctx context.Context, out chan types.
d.t = t
monitChan := make(chan *ContainerConfig)
deleteChan := make(chan *ContainerConfig)

d.logger.Infof("Starting docker acquisition")

t.Go(func() error {
return d.DockerManager(ctx, monitChan, deleteChan, out)
})
Expand All @@ -546,6 +583,7 @@ func ReadTailScanner(scanner *bufio.Scanner, out chan string, t *tomb.Tomb) erro

func (d *DockerSource) TailDocker(ctx context.Context, container *ContainerConfig, outChan chan types.Event, deleteChan chan *ContainerConfig) error {
container.logger.Infof("start tail for container %s", container.Name)

dockerReader, err := d.Client.ContainerLogs(ctx, container.ID, *d.containerLogsOptions)
if err != nil {
container.logger.Errorf("unable to read logs from container: %+v", err)
Expand All @@ -560,11 +598,13 @@ func (d *DockerSource) TailDocker(ctx context.Context, container *ContainerConfi
reader := dlog.NewReader(dockerReader)
scanner = bufio.NewScanner(reader)
}

readerChan := make(chan string)
readerTomb := &tomb.Tomb{}
readerTomb.Go(func() error {
return ReadTailScanner(scanner, readerChan, readerTomb)
})

for {
select {
case <-container.t.Dying():
Expand Down Expand Up @@ -595,13 +635,15 @@ func (d *DockerSource) TailDocker(ctx context.Context, container *ContainerConfi
// Also reset the Since to avoid re-reading logs
d.Config.Since = time.Now().UTC().Format(time.RFC3339)
d.containerLogsOptions.Since = d.Config.Since

return nil
}
}
}

func (d *DockerSource) DockerManager(ctx context.Context, in chan *ContainerConfig, deleteChan chan *ContainerConfig, outChan chan types.Event) error {
d.logger.Info("DockerSource Manager started")

for {
select {
case newContainer := <-in:
Expand All @@ -611,6 +653,7 @@ func (d *DockerSource) DockerManager(ctx context.Context, in chan *ContainerConf
newContainer.t.Go(func() error {
return d.TailDocker(ctx, newContainer, outChan, deleteChan)
})

d.runningContainerState[newContainer.ID] = newContainer
}
case containerToDelete := <-deleteChan:
Expand All @@ -624,13 +667,16 @@ func (d *DockerSource) DockerManager(ctx context.Context, in chan *ContainerConf
if d.runningContainerState[idx].t.Alive() {
d.logger.Infof("killing tail for container %s", container.Name)
d.runningContainerState[idx].t.Kill(nil)

Check warning on line 670 in pkg/acquisition/modules/docker/docker.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/docker/docker.go#L670

Added line #L670 was not covered by tests
if err := d.runningContainerState[idx].t.Wait(); err != nil {
d.logger.Infof("error while waiting for death of %s : %s", container.Name, err)
}
}
}

d.runningContainerState = nil
d.logger.Debugf("routine cleanup done, return")

return nil
}
}
Expand Down

0 comments on commit f565efa

Please sign in to comment.