Skip to content

Commit

Permalink
Fixes to database_observability samples collector (#2198)
Browse files Browse the repository at this point in the history
* Fixes to `database_observability` samples colllector

- Skip queries with schema `mysql`, `performance_schema` and
`information_schema`, so we're on par with data from mysqld_exporter
- Skip truncated queries early before parsing
- Do not fail the entire processing loop if a query fails to parse
- Log query type
- Correctly extract tables from subqueries

* amend error msg

* update tests

* Update internal/component/database_observability/mysql/collector/query_sample_test.go

Co-authored-by: MattNolf <[email protected]>

* better naming

---------

Co-authored-by: MattNolf <[email protected]>
  • Loading branch information
cristiangreco and matthewnolf authored Dec 3, 2024
1 parent 608574c commit 1cc7861
Show file tree
Hide file tree
Showing 3 changed files with 380 additions and 71 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Main (unreleased)

- Add `otelcol.exporter.syslog` component to export logs in syslog format (@dehaansa)

- (_Experimental_) Add a `database_observability.mysql` component to collect mysql performance data.
- (_Experimental_) Add a `database_observability.mysql` component to collect mysql performance data. (@cristiangreco & @matthewnolf)

- Add `otelcol.receiver.influxdb` to convert influx metric into OTEL. (@EHSchmitt4395)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"database/sql"
"fmt"
"strings"
"sync/atomic"
"time"

"github.com/go-kit/log"
Expand All @@ -29,7 +30,8 @@ const selectQuerySamples = `
query_sample_seen,
query_sample_timer_wait
FROM performance_schema.events_statements_summary_by_digest
WHERE last_seen > DATE_SUB(NOW(), INTERVAL 1 DAY)`
WHERE schema_name NOT IN ('mysql', 'performance_schema', 'information_schema')
AND last_seen > DATE_SUB(NOW(), INTERVAL 1 DAY)`

type QuerySampleArguments struct {
DB *sql.DB
Expand All @@ -44,10 +46,10 @@ type QuerySample struct {
collectInterval time.Duration
entryHandler loki.EntryHandler

logger log.Logger

ctx context.Context
cancel context.CancelFunc
logger log.Logger
running *atomic.Bool
ctx context.Context
cancel context.CancelFunc
}

func NewQuerySample(args QuerySampleArguments) (*QuerySample, error) {
Expand All @@ -56,23 +58,29 @@ func NewQuerySample(args QuerySampleArguments) (*QuerySample, error) {
collectInterval: args.CollectInterval,
entryHandler: args.EntryHandler,
logger: args.Logger,
running: &atomic.Bool{},
}, nil
}

func (c *QuerySample) Start(ctx context.Context) error {
level.Debug(c.logger).Log("msg", "QuerySample collector started")

c.running.Store(true)
ctx, cancel := context.WithCancel(ctx)
c.ctx = ctx
c.cancel = cancel

go func() {
defer func() {
c.Stop()
c.running.Store(false)
}()

ticker := time.NewTicker(c.collectInterval)

for {
if err := c.fetchQuerySamples(c.ctx); err != nil {
level.Error(c.logger).Log("msg", "collector stopping due to error", "err", err)
c.Stop()
break
}

Expand All @@ -88,6 +96,10 @@ func (c *QuerySample) Start(ctx context.Context) error {
return nil
}

func (c *QuerySample) Stopped() bool {
return !c.running.Load()
}

// Stop should be kept idempotent
func (c *QuerySample) Stop() {
c.cancel()
Expand All @@ -106,34 +118,52 @@ func (c *QuerySample) fetchQuerySamples(ctx context.Context) error {
level.Error(c.logger).Log("msg", "failed to iterate rs", "err", err)
break
}
var digest, query_sample_text, query_sample_seen, query_sample_timer_wait string
err := rs.Scan(&digest, &query_sample_text, &query_sample_seen, &query_sample_timer_wait)

var digest, sampleText, sampleSeen, sampleTimerWait string
err := rs.Scan(&digest, &sampleText, &sampleSeen, &sampleTimerWait)
if err != nil {
level.Error(c.logger).Log("msg", "failed to scan query samples", "digest", digest, "err", err)
break
level.Error(c.logger).Log("msg", "failed to scan result set for query samples", "err", err)
continue
}

if strings.HasSuffix(sampleText, "...") {
level.Info(c.logger).Log("msg", "skipping parsing truncated query", "digest", digest)
continue
}

stmt, err := sqlparser.Parse(sampleText)
if err != nil {
level.Error(c.logger).Log("msg", "failed to parse sql query", "digest", digest, "err", err)
continue
}

query_sample_redacted, err := sqlparser.RedactSQLQuery(query_sample_text)
sampleRedactedText, err := sqlparser.RedactSQLQuery(sampleText)
if err != nil {
level.Error(c.logger).Log("msg", "failed to redact sql query", "digest", digest, "err", err)
break
continue
}

c.entryHandler.Chan() <- loki.Entry{
Labels: model.LabelSet{"job": database_observability.JobName},
Entry: logproto.Entry{
Timestamp: time.Unix(0, time.Now().UnixNano()),
Line: fmt.Sprintf(`level=info msg="query samples fetched" op="%s" digest="%s" query_sample_seen="%s" query_sample_timer_wait="%s" query_sample_redacted="%s"`, OP_QUERY_SAMPLE, digest, query_sample_seen, query_sample_timer_wait, query_sample_redacted),
Line: fmt.Sprintf(
`level=info msg="query samples fetched" op="%s" digest="%s" query_type="%s" query_sample_seen="%s" query_sample_timer_wait="%s" query_sample_redacted="%s"`,
OP_QUERY_SAMPLE, digest, c.stmtType(stmt), sampleSeen, sampleTimerWait, sampleRedactedText,
),
},
}

tables := c.tablesFromQuery(digest, query_sample_text)
tables := c.tablesFromQuery(stmt)
for _, table := range tables {
c.entryHandler.Chan() <- loki.Entry{
Labels: model.LabelSet{"job": database_observability.JobName},
Entry: logproto.Entry{
Timestamp: time.Unix(0, time.Now().UnixNano()),
Line: fmt.Sprintf(`level=info msg="table name parsed" op="%s" digest="%s" table="%s"`, OP_QUERY_PARSED_TABLE_NAME, digest, table),
Line: fmt.Sprintf(
`level=info msg="table name parsed" op="%s" digest="%s" table="%s"`,
OP_QUERY_PARSED_TABLE_NAME, digest, table,
),
},
}
}
Expand All @@ -142,18 +172,22 @@ func (c *QuerySample) fetchQuerySamples(ctx context.Context) error {
return nil
}

func (c QuerySample) tablesFromQuery(digest, query string) []string {
if strings.HasSuffix(query, "...") {
level.Info(c.logger).Log("msg", "skipping parsing truncated query", "digest", digest)
return []string{}
}

stmt, err := sqlparser.Parse(query)
if err != nil {
level.Error(c.logger).Log("msg", "failed to parse sql query", "digest", digest, "err", err)
return []string{}
func (c QuerySample) stmtType(stmt sqlparser.Statement) string {
switch stmt.(type) {
case *sqlparser.Select:
return "select"
case *sqlparser.Insert:
return "insert"
case *sqlparser.Update:
return "update"
case *sqlparser.Delete:
return "delete"
default:
return ""
}
}

func (c QuerySample) tablesFromQuery(stmt sqlparser.Statement) []string {
var parsedTables []string

switch stmt := stmt.(type) {
Expand All @@ -176,7 +210,15 @@ func (c QuerySample) parseTableExprs(tables sqlparser.TableExprs) []string {
t := tables[i]
switch tableExpr := t.(type) {
case *sqlparser.AliasedTableExpr:
parsedTables = append(parsedTables, c.parseTableName(tableExpr.Expr.(sqlparser.TableName)))
switch expr := tableExpr.Expr.(type) {
case sqlparser.TableName:
parsedTables = append(parsedTables, c.parseTableName(expr))
case *sqlparser.Subquery:
subquery := expr.Select.(*sqlparser.Select)
parsedTables = append(parsedTables, c.parseTableExprs(subquery.From)...)
default:
level.Error(c.logger).Log("msg", "unknown nested table expression", "table", tableExpr)
}
case *sqlparser.JoinTableExpr:
// continue parsing both sides of join
tables = append(tables, tableExpr.LeftExpr, tableExpr.RightExpr)
Expand Down
Loading

0 comments on commit 1cc7861

Please sign in to comment.