Skip to content

Commit

Permalink
update tests
Browse files Browse the repository at this point in the history
  • Loading branch information
cristiangreco committed Dec 2, 2024
1 parent c280c81 commit 0a90435
Show file tree
Hide file tree
Showing 2 changed files with 327 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"database/sql"
"fmt"
"strings"
"sync/atomic"
"time"

"github.com/go-kit/log"
Expand Down Expand Up @@ -45,10 +46,10 @@ type QuerySample struct {
collectInterval time.Duration
entryHandler loki.EntryHandler

logger log.Logger

ctx context.Context
cancel context.CancelFunc
logger log.Logger
running *atomic.Bool
ctx context.Context
cancel context.CancelFunc
}

func NewQuerySample(args QuerySampleArguments) (*QuerySample, error) {
Expand All @@ -57,23 +58,29 @@ func NewQuerySample(args QuerySampleArguments) (*QuerySample, error) {
collectInterval: args.CollectInterval,
entryHandler: args.EntryHandler,
logger: args.Logger,
running: &atomic.Bool{},
}, nil
}

func (c *QuerySample) Start(ctx context.Context) error {
level.Debug(c.logger).Log("msg", "QuerySample collector started")

c.running.Store(true)
ctx, cancel := context.WithCancel(ctx)
c.ctx = ctx
c.cancel = cancel

go func() {
defer func() {
c.Stop()
c.running.Store(false)
}()

ticker := time.NewTicker(c.collectInterval)

for {
if err := c.fetchQuerySamples(c.ctx); err != nil {
level.Error(c.logger).Log("msg", "collector stopping due to error", "err", err)
c.Stop()
break
}

Expand All @@ -89,6 +96,10 @@ func (c *QuerySample) Start(ctx context.Context) error {
return nil
}

func (c *QuerySample) Stopped() bool {
return !c.running.Load()
}

// Stop should be kept idempotent
func (c *QuerySample) Stop() {
c.cancel()
Expand Down
Loading

0 comments on commit 0a90435

Please sign in to comment.