From 71507a2b640ad071d88ee894e80235f93be73c3d Mon Sep 17 00:00:00 2001 From: Kaviraj Kanagaraj Date: Mon, 3 Jun 2024 18:18:39 +0200 Subject: [PATCH] feat(canary): Add test to check query results with and without cache. (#13104) Signed-off-by: Kaviraj --- .../cmd/promtail/promtail-local-config.yaml | 1 + cmd/loki-canary/main.go | 6 +- pkg/canary/comparator/comparator.go | 99 +++++++++++++++++-- pkg/canary/comparator/comparator_test.go | 99 +++++++++++++++++-- pkg/canary/reader/reader.go | 11 ++- 5 files changed, 192 insertions(+), 24 deletions(-) diff --git a/clients/cmd/promtail/promtail-local-config.yaml b/clients/cmd/promtail/promtail-local-config.yaml index 3b9256537ec80..fed2e1256a055 100644 --- a/clients/cmd/promtail/promtail-local-config.yaml +++ b/clients/cmd/promtail/promtail-local-config.yaml @@ -16,3 +16,4 @@ scrape_configs: labels: job: varlogs __path__: /var/log/*log + stream: stdout diff --git a/cmd/loki-canary/main.go b/cmd/loki-canary/main.go index 061b98321047e..304bb1b1c3e91 100644 --- a/cmd/loki-canary/main.go +++ b/cmd/loki-canary/main.go @@ -79,6 +79,10 @@ func main() { metricTestQueryRange := flag.Duration("metric-test-range", 24*time.Hour, "The range value [24h] used in the metric test instant-query."+ " Note: this value is truncated to the running time of the canary until this value is reached") + cacheTestInterval := flag.Duration("cache-test-interval", 15*time.Minute, "The interval the cache test query should be run") + cacheTestQueryRange := flag.Duration("cache-test-range", 24*time.Hour, "The range value [24h] used in the cache test instant-query.") + cacheTestQueryNow := flag.Duration("cache-test-now", 1*time.Hour, "duration how far back from current time the execution time (--now) should be set for running this query in the cache test instant-query.") + spotCheckInterval := flag.Duration("spot-check-interval", 15*time.Minute, "Interval that a single result will be kept from sent entries and spot-checked against Loki, "+ "e.g. 15min default one entry every 15 min will be saved and then queried again every 15min until spot-check-max is reached") spotCheckMax := flag.Duration("spot-check-max", 4*time.Hour, "How far back to check a spot check entry before dropping it") @@ -189,7 +193,7 @@ func main() { _, _ = fmt.Fprintf(os.Stderr, "Unable to create reader for Loki querier, check config: %s", err) os.Exit(1) } - c.comparator = comparator.NewComparator(os.Stderr, *wait, *maxWait, *pruneInterval, *spotCheckInterval, *spotCheckMax, *spotCheckQueryRate, *spotCheckWait, *metricTestInterval, *metricTestQueryRange, *interval, *buckets, sentChan, receivedChan, c.reader, true) + c.comparator = comparator.NewComparator(os.Stderr, *wait, *maxWait, *pruneInterval, *spotCheckInterval, *spotCheckMax, *spotCheckQueryRate, *spotCheckWait, *metricTestInterval, *metricTestQueryRange, *cacheTestInterval, *cacheTestQueryRange, *cacheTestQueryNow, *interval, *buckets, sentChan, receivedChan, c.reader, true) } startCanary() diff --git a/pkg/canary/comparator/comparator.go b/pkg/canary/comparator/comparator.go index 8f57af09ba783..a575c74eccd81 100644 --- a/pkg/canary/comparator/comparator.go +++ b/pkg/canary/comparator/comparator.go @@ -3,6 +3,7 @@ package comparator import ( "fmt" "io" + "math" "math/rand" "sync" "time" @@ -24,6 +25,8 @@ const ( DebugWebsocketMissingEntry = "websocket missing entry: %v\n" DebugQueryResult = "confirmation query result: %v\n" DebugEntryFound = "missing websocket entry %v was found %v seconds after it was originally sent\n" + + floatDiffTolerance = 1e-6 ) var ( @@ -90,6 +93,16 @@ var ( Help: "how long the spot check test query execution took in seconds.", Buckets: instrument.DefBuckets, }) + queryResultsDiff = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: "loki_canary", + Name: "cache_test_query_results_diff_total", + Help: "counts number of times the query results was different with and without cache ", + }) + queryResultsTotal = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "loki_canary", + Name: "cache_test_query_results_total", + Help: "counts number of times the query results test requests are done ", + }, []string{"status"}) // status=success/failure ) type Comparator struct { @@ -98,6 +111,7 @@ type Comparator struct { spotEntMtx sync.Mutex // Locks access to []spotCheck spotMtx sync.Mutex // Locks spotcheckRunning for single threaded but async spotCheck() metTestMtx sync.Mutex // Locks metricTestRunning for single threaded but async metricTest() + cacheTestMtx sync.Mutex // Locks cacheTestRunning for single threaded but async cacheTest() pruneMtx sync.Mutex // Locks pruneEntriesRunning for single threaded but async pruneEntries() w io.Writer entries []*time.Time @@ -116,14 +130,19 @@ type Comparator struct { metricTestInterval time.Duration metricTestRange time.Duration metricTestRunning bool - writeInterval time.Duration - confirmAsync bool - startTime time.Time - sent chan time.Time - recv chan time.Time - rdr reader.LokiReader - quit chan struct{} - done chan struct{} + cacheTestInterval time.Duration + cacheTestRange time.Duration + // how far back from current time the execution time (--now) should be set for running this query. + cacheTestNow time.Duration + cacheTestRunning bool + writeInterval time.Duration + confirmAsync bool + startTime time.Time + sent chan time.Time + recv chan time.Time + rdr reader.LokiReader + quit chan struct{} + done chan struct{} } func NewComparator(writer io.Writer, @@ -133,6 +152,9 @@ func NewComparator(writer io.Writer, spotCheckInterval, spotCheckMax, spotCheckQueryRate, spotCheckWait time.Duration, metricTestInterval time.Duration, metricTestRange time.Duration, + cacheTestInterval time.Duration, + cacheTestRange time.Duration, + cacheTestNow time.Duration, writeInterval time.Duration, buckets int, sentChan chan time.Time, @@ -155,6 +177,10 @@ func NewComparator(writer io.Writer, metricTestInterval: metricTestInterval, metricTestRange: metricTestRange, metricTestRunning: false, + cacheTestInterval: cacheTestInterval, + cacheTestRange: cacheTestRange, + cacheTestNow: cacheTestNow, + cacheTestRunning: false, writeInterval: writeInterval, confirmAsync: confirmAsync, startTime: time.Now(), @@ -252,10 +278,12 @@ func (c *Comparator) run() { randomGenerator := rand.New(rand.NewSource(time.Now().UnixNano())) mt := time.NewTicker(time.Duration(randomGenerator.Int63n(c.metricTestInterval.Nanoseconds()))) sc := time.NewTicker(c.spotCheckQueryRate) + ct := time.NewTicker(c.cacheTestInterval) defer func() { t.Stop() mt.Stop() sc.Stop() + ct.Stop() close(c.done) }() @@ -294,12 +322,65 @@ func (c *Comparator) run() { firstMt = false mt.Reset(c.metricTestInterval) } + case <-ct.C: + // Only run one instance of cache tests at a time. + c.cacheTestMtx.Lock() + if !c.cacheTestRunning { + c.cacheTestRunning = true + go c.cacheTest(time.Now()) + } + c.cacheTestMtx.Unlock() + case <-c.quit: return } } } +func (c *Comparator) cacheTest(currTime time.Time) { + defer func() { + c.cacheTestMtx.Lock() + c.cacheTestRunning = false + c.cacheTestMtx.Unlock() + }() + + // cacheTest is currently run using `reader.CountOverTime()` which is an instant query. + // We make the query with and without cache over the data that is not changing (e.g: --now="1hr ago") instead of on latest data that is a moving target. + queryStartTime := currTime.Add(-c.cacheTestNow) + + // We cannot query for range before the pod even started. + if queryStartTime.Before(c.startTime) { + // we wait. + fmt.Fprintf(c.w, "cacheTest not run. still waiting for query start range(%s) to past the process start time(%s).\n", queryStartTime, c.startTime) + return + } + + rangeDuration := c.cacheTestRange + rng := fmt.Sprintf("%.0fs", rangeDuration.Seconds()) + + // with cache + countCache, err := c.rdr.QueryCountOverTime(rng, queryStartTime, true) + if err != nil { + fmt.Fprintf(c.w, "error running cache query test with cache: %s\n", err.Error()) + queryResultsTotal.WithLabelValues("failure").Inc() + return + } + + // without cache + countNocache, err := c.rdr.QueryCountOverTime(rng, queryStartTime, false) + if err != nil { + fmt.Fprintf(c.w, "error running cache query test without cache: %s\n", err.Error()) + queryResultsTotal.WithLabelValues("failure").Inc() + return + } + + queryResultsTotal.WithLabelValues("success").Inc() + if math.Abs(countNocache-countCache) > floatDiffTolerance { + queryResultsDiff.Inc() + fmt.Fprintf(c.w, "found a diff in instant query results time: %s, result_with_cache: %v, result_without_cache: %v\n", queryStartTime, countCache, countNocache) + } +} + // check that the expected # of log lines have been written to Loki func (c *Comparator) metricTest(currTime time.Time) { // Always make sure to set the running state back to false @@ -317,7 +398,7 @@ func (c *Comparator) metricTest(currTime time.Time) { adjustedRange = currTime.Sub(c.startTime) } begin := time.Now() - actualCount, err := c.rdr.QueryCountOverTime(fmt.Sprintf("%.0fs", adjustedRange.Seconds())) + actualCount, err := c.rdr.QueryCountOverTime(fmt.Sprintf("%.0fs", adjustedRange.Seconds()), begin, true) metricTestLatency.Observe(time.Since(begin).Seconds()) if err != nil { fmt.Fprintf(c.w, "error running metric query test: %s\n", err.Error()) diff --git a/pkg/canary/comparator/comparator_test.go b/pkg/canary/comparator/comparator_test.go index 39c868fb0dd36..db28545397e3c 100644 --- a/pkg/canary/comparator/comparator_test.go +++ b/pkg/canary/comparator/comparator_test.go @@ -19,7 +19,7 @@ func TestComparatorEntryReceivedOutOfOrder(t *testing.T) { duplicateEntries = &mockCounter{} actual := &bytes.Buffer{} - c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 1*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 0, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), nil, false) + c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 1*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 0, 1*time.Minute, 0, 1*time.Hour, 3*time.Hour, 30*time.Minute, 0, 1, make(chan time.Time), make(chan time.Time), nil, false) t1 := time.Now() t2 := t1.Add(1 * time.Second) @@ -60,7 +60,7 @@ func TestComparatorEntryReceivedNotExpected(t *testing.T) { duplicateEntries = &mockCounter{} actual := &bytes.Buffer{} - c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 1*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 0, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), nil, false) + c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 1*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 0, 1*time.Minute, 0, 1*time.Hour, 3*time.Hour, 30*time.Minute, 0, 1, make(chan time.Time), make(chan time.Time), nil, false) t1 := time.Now() t2 := t1.Add(1 * time.Second) @@ -101,7 +101,7 @@ func TestComparatorEntryReceivedDuplicate(t *testing.T) { duplicateEntries = &mockCounter{} actual := &bytes.Buffer{} - c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 1*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 0, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), nil, false) + c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 1*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 0, 1*time.Minute, 0, 1*time.Hour, 3*time.Hour, 30*time.Minute, 0, 1, make(chan time.Time), make(chan time.Time), nil, false) t1 := time.Unix(0, 0) t2 := t1.Add(1 * time.Second) @@ -159,7 +159,7 @@ func TestEntryNeverReceived(t *testing.T) { wait := 60 * time.Second maxWait := 300 * time.Second //We set the prune interval timer to a huge value here so that it never runs, instead we call pruneEntries manually below - c := NewComparator(actual, wait, maxWait, 50*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 0, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), mr, false) + c := NewComparator(actual, wait, maxWait, 50*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 0, 1*time.Minute, 0, 1*time.Hour, 3*time.Hour, 30*time.Minute, 0, 1, make(chan time.Time), make(chan time.Time), mr, false) c.entrySent(t1) c.entrySent(t2) @@ -232,7 +232,7 @@ func TestConcurrentConfirmMissing(t *testing.T) { wait := 30 * time.Millisecond maxWait := 30 * time.Millisecond - c := NewComparator(output, wait, maxWait, 50*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 0, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), mr, false) + c := NewComparator(output, wait, maxWait, 50*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 0, 1*time.Minute, 0, 1*time.Hour, 3*time.Hour, 30*time.Minute, 0, 1, make(chan time.Time), make(chan time.Time), mr, false) for _, t := range found { tCopy := t @@ -263,7 +263,7 @@ func TestPruneAckdEntires(t *testing.T) { wait := 30 * time.Millisecond maxWait := 30 * time.Millisecond //We set the prune interval timer to a huge value here so that it never runs, instead we call pruneEntries manually below - c := NewComparator(actual, wait, maxWait, 50*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 0, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), nil, false) + c := NewComparator(actual, wait, maxWait, 50*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 0, 1*time.Minute, 0, 1*time.Hour, 3*time.Hour, 30*time.Minute, 0, 1, make(chan time.Time), make(chan time.Time), nil, false) t1 := time.Unix(0, 0) t2 := t1.Add(1 * time.Millisecond) @@ -320,7 +320,7 @@ func TestSpotCheck(t *testing.T) { spotCheck := 10 * time.Millisecond spotCheckMax := 20 * time.Millisecond //We set the prune interval timer to a huge value here so that it never runs, instead we call spotCheckEntries manually below - c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 50*time.Hour, spotCheck, spotCheckMax, 4*time.Hour, 3*time.Millisecond, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), mr, false) + c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 50*time.Hour, spotCheck, spotCheckMax, 4*time.Hour, 3*time.Millisecond, 1*time.Minute, 0, 1*time.Hour, 3*time.Hour, 30*time.Minute, 0, 1, make(chan time.Time), make(chan time.Time), mr, false) // Send all the entries for i := range entries { @@ -360,6 +360,42 @@ func TestSpotCheck(t *testing.T) { prometheus.Unregister(responseLatency) } +func TestCacheTest(t *testing.T) { + actual := &bytes.Buffer{} + mr := &mockReader{} + now := time.Now() + cacheTestInterval := 500 * time.Millisecond + cacheTestRange := 30 * time.Second + cacheTestNow := 2 * time.Second + + c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 50*time.Hour, 0, 0, 4*time.Hour, 0, 10*time.Minute, 0, cacheTestInterval, cacheTestRange, cacheTestNow, 1*time.Hour, 1, make(chan time.Time), make(chan time.Time), mr, false) + // Force the start time to a known value + c.startTime = time.Unix(10, 0) + + queryResultsDiff = &mockCounter{} + mr.countOverTime = 2.3 + mr.noCacheCountOvertime = mr.countOverTime // same value for both with and without cache + c.cacheTest(now) + assert.Equal(t, 0, queryResultsDiff.(*mockCounter).count) + + queryResultsDiff = &mockCounter{} // reset counter + mr.countOverTime = 2.3 // value not important + mr.noCacheCountOvertime = 2.5 // different than `countOverTime` value. + c.cacheTest(now) + assert.Equal(t, 1, queryResultsDiff.(*mockCounter).count) + + queryResultsDiff = &mockCounter{} // reset counter + mr.countOverTime = 2.3 // value not important + mr.noCacheCountOvertime = 2.30000005 // different than `countOverTime` value but withing tolerance + c.cacheTest(now) + assert.Equal(t, 0, queryResultsDiff.(*mockCounter).count) + + // This avoids a panic on subsequent test execution, + // seems ugly but was easy, and multiple instantiations + // of the comparator should be an error + prometheus.Unregister(responseLatency) +} + func TestMetricTest(t *testing.T) { metricTestActual = &mockGauge{} metricTestExpected = &mockGauge{} @@ -371,7 +407,7 @@ func TestMetricTest(t *testing.T) { mr := &mockReader{} metricTestRange := 30 * time.Second //We set the prune interval timer to a huge value here so that it never runs, instead we call spotCheckEntries manually below - c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 50*time.Hour, 0, 0, 4*time.Hour, 0, 10*time.Minute, metricTestRange, writeInterval, 1, make(chan time.Time), make(chan time.Time), mr, false) + c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 50*time.Hour, 0, 0, 4*time.Hour, 0, 10*time.Minute, metricTestRange, 1*time.Hour, 3*time.Hour, 30*time.Minute, writeInterval, 1, make(chan time.Time), make(chan time.Time), mr, false) // Force the start time to a known value c.startTime = time.Unix(10, 0) @@ -456,6 +492,42 @@ func (m *mockCounter) Inc() { m.count++ } +type mockCounterVec struct { + mockCounter + labels []string +} + +func (m *mockCounterVec) WithLabelValues(lvs ...string) prometheus.Counter { + m.labels = lvs + return &m.mockCounter +} + +func (m *mockCounterVec) Desc() *prometheus.Desc { + panic("implement me") +} + +func (m *mockCounterVec) Write(*io_prometheus_client.Metric) error { + panic("implement me") +} + +func (m *mockCounterVec) Describe(chan<- *prometheus.Desc) { + panic("implement me") +} + +func (m *mockCounterVec) Collect(chan<- prometheus.Metric) { + panic("implement me") +} + +func (m *mockCounterVec) Add(float64) { + panic("implement me") +} + +func (m *mockCounterVec) Inc() { + m.cLck.Lock() + defer m.cLck.Unlock() + m.count++ +} + type mockGauge struct { cLck sync.Mutex val float64 @@ -507,13 +579,20 @@ type mockReader struct { resp []time.Time countOverTime float64 queryRange string + + // return this value if called without cache. + noCacheCountOvertime float64 } func (r *mockReader) Query(_ time.Time, _ time.Time) ([]time.Time, error) { return r.resp, nil } -func (r *mockReader) QueryCountOverTime(queryRange string) (float64, error) { +func (r *mockReader) QueryCountOverTime(queryRange string, _ time.Time, cache bool) (float64, error) { r.queryRange = queryRange - return r.countOverTime, nil + res := r.countOverTime + if !cache { + res = r.noCacheCountOvertime + } + return res, nil } diff --git a/pkg/canary/reader/reader.go b/pkg/canary/reader/reader.go index 4576ca7a70ed8..c323fc999dd01 100644 --- a/pkg/canary/reader/reader.go +++ b/pkg/canary/reader/reader.go @@ -45,7 +45,7 @@ var ( type LokiReader interface { Query(start time.Time, end time.Time) ([]time.Time, error) - QueryCountOverTime(queryRange string) (float64, error) + QueryCountOverTime(queryRange string, now time.Time, cache bool) (float64, error) } type Reader struct { @@ -180,7 +180,7 @@ func (r *Reader) Stop() { // QueryCountOverTime will ask Loki for a count of logs over the provided range e.g. 5m // QueryCountOverTime blocks if a previous query has failed until the appropriate backoff time has been reached. -func (r *Reader) QueryCountOverTime(queryRange string) (float64, error) { +func (r *Reader) QueryCountOverTime(queryRange string, now time.Time, cache bool) (float64, error) { r.backoffMtx.RLock() next := r.nextQuery r.backoffMtx.RUnlock() @@ -201,10 +201,10 @@ func (r *Reader) QueryCountOverTime(queryRange string) (float64, error) { Host: r.addr, Path: "/loki/api/v1/query", RawQuery: "query=" + url.QueryEscape(fmt.Sprintf("count_over_time({%v=\"%v\",%v=\"%v\"}[%s])", r.sName, r.sValue, r.lName, r.lVal, queryRange)) + - fmt.Sprintf("&time=%d", time.Now().UnixNano()) + + fmt.Sprintf("&time=%d", now.UnixNano()) + "&limit=1000", } - fmt.Fprintf(r.w, "Querying loki for metric count with query: %v\n", u.String()) + fmt.Fprintf(r.w, "Querying loki for metric count with query: %v, cache: %v\n", u.String(), cache) ctx, cancel := context.WithTimeout(context.Background(), r.queryTimeout) defer cancel() @@ -221,6 +221,9 @@ func (r *Reader) QueryCountOverTime(queryRange string) (float64, error) { req.Header.Set("X-Scope-OrgID", r.tenantID) } req.Header.Set("User-Agent", userAgent) + if !cache { + req.Header.Set("Cache-Control", "no-cache") + } resp, err := r.httpClient.Do(req) if err != nil {