Skip to content

Commit

Permalink
database_observability: additional configuration and cleanup:
Browse files Browse the repository at this point in the history
- update CHANGELOG to mention new component
- add query_samples_enabled argument
- show only redacted samples
- improve logging
  • Loading branch information
cristiangreco committed Nov 27, 2024
1 parent cfad180 commit cf2e3fb
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 45 deletions.
14 changes: 8 additions & 6 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ Main (unreleased)

- Add `otelcol.exporter.syslog` component to export logs in syslog format (@dehaansa)

- (_Experimental_) Add a `database_observability.mysql` component to collect mysql performance data.

### Enhancements

- Add second metrics sample to the support bundle to provide delta information (@dehaansa)
Expand All @@ -39,7 +41,7 @@ Main (unreleased)

- Fixed an issue in the `otelcol.processor.attribute` component where the actions `delete` and `hash` could not be used with the `pattern` argument. (@wildum)

- Fixed a race condition that could lead to a deadlock when using `import` statements, which could lead to a memory leak on `/metrics` endpoint of an Alloy instance. (@thampiotr)
- Fixed a race condition that could lead to a deadlock when using `import` statements, which could lead to a memory leak on `/metrics` endpoint of an Alloy instance. (@thampiotr)

- Fix a race condition where the ui service was dependent on starting after the remotecfg service, which is not guaranteed. (@dehaansa & @erikbaranowski)

Expand Down Expand Up @@ -97,7 +99,7 @@ v1.5.0
- Add support for relative paths to `import.file`. This new functionality allows users to use `import.file` blocks in modules
imported via `import.git` and other `import.file`. (@wildum)

- `prometheus.exporter.cloudwatch`: The `discovery` block now has a `recently_active_only` configuration attribute
- `prometheus.exporter.cloudwatch`: The `discovery` block now has a `recently_active_only` configuration attribute
to return only metrics which have been active in the last 3 hours.

- Add Prometheus bearer authentication to a `prometheus.write.queue` component (@freak12techno)
Expand All @@ -110,9 +112,9 @@ v1.5.0

- Fixed a bug in `import.git` which caused a `"non-fast-forward update"` error message. (@ptodev)

- Do not log error on clean shutdown of `loki.source.journal`. (@thampiotr)
- Do not log error on clean shutdown of `loki.source.journal`. (@thampiotr)

- `prometheus.operator.*` components: Fixed a bug which would sometimes cause a
- `prometheus.operator.*` components: Fixed a bug which would sometimes cause a
"failed to create service discovery refresh metrics" error after a config reload. (@ptodev)

### Other changes
Expand Down Expand Up @@ -151,7 +153,7 @@ v1.4.3

- `pyroscope.scrape` no longer tries to scrape endpoints which are not active targets anymore. (@wildum @mattdurham @dehaansa @ptodev)

- Fixed a bug with `loki.source.podlogs` not starting in large clusters due to short informer sync timeout. (@elburnetto-intapp)
- Fixed a bug with `loki.source.podlogs` not starting in large clusters due to short informer sync timeout. (@elburnetto-intapp)

- `prometheus.exporter.windows`: Fixed bug with `exclude` regular expression config arguments which caused missing metrics. (@ptodev)

Expand All @@ -170,7 +172,7 @@ v1.4.2
- Fix parsing of the Level configuration attribute in debug_metrics config block
- Ensure "optional" debug_metrics config block really is optional

- Fixed an issue with `loki.process` where `stage.luhn` and `stage.timestamp` would not apply
- Fixed an issue with `loki.process` where `stage.luhn` and `stage.timestamp` would not apply
default configuration settings correctly (@thampiotr)

- Fixed an issue with `loki.process` where configuration could be reloaded even if there
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ The following arguments are supported:

| Name | Type | Description | Default | Required |
| -------------------- | -------------- | ------------------------------------------------------------------------------------------------------------------- | ------- | -------- |
| `data_source_name` | `secret` | [Data Source Name](https://github.com/go-sql-driver/mysql#dsn-data-source-name) for the MySQL server to connect to. | | yes |
| `forward_to` | `list(LogsReceiver)` | Where to forward log entries after processing. | | yes |
| `collect_interval` | `duration` | How frequently to collect query samples from database | `"10s"` | no |
| `data_source_name` | `secret` | [Data Source Name](https://github.com/go-sql-driver/mysql#dsn-data-source-name) for the MySQL server to connect to. | | yes |
| `forward_to` | `list(LogsReceiver)` | Where to forward log entries after processing. | | yes |
| `collect_interval` | `duration` | How frequently to collect information from database | `"10s"` | no |
| `query_samples_enabled` | `bool` | Whether to enable collection of query samples | `true` | no |

## Blocks

Expand Down Expand Up @@ -67,7 +68,6 @@ loki.write "logs_service" {
}
}
```

<!-- START GENERATED COMPATIBLE COMPONENTS -->

## Compatible components
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package database_observability

const JobName = "integrations/db-o11y"
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/xwb1989/sqlparser"

"github.com/grafana/alloy/internal/component/common/loki"
"github.com/grafana/alloy/internal/component/database_observability"
"github.com/grafana/alloy/internal/runtime/logging/level"
"github.com/grafana/loki/v3/pkg/logproto"
)
Expand Down Expand Up @@ -108,27 +109,28 @@ func (c *QuerySample) fetchQuerySamples(ctx context.Context) error {
var digest, query_sample_text, query_sample_seen, query_sample_timer_wait string
err := rs.Scan(&digest, &query_sample_text, &query_sample_seen, &query_sample_timer_wait)
if err != nil {
level.Error(c.logger).Log("msg", "failed to scan query samples", "err", err)
level.Error(c.logger).Log("msg", "failed to scan query samples", "digest", digest, "err", err)
break
}

redacted, err := sqlparser.RedactSQLQuery(query_sample_text)
query_sample_redacted, err := sqlparser.RedactSQLQuery(query_sample_text)
if err != nil {
level.Error(c.logger).Log("msg", "failed to redact sql query", "err", err)
level.Error(c.logger).Log("msg", "failed to redact sql query", "digest", digest, "err", err)
break
}

c.entryHandler.Chan() <- loki.Entry{
Labels: model.LabelSet{"job": "integrations/db-o11y"},
Labels: model.LabelSet{"job": database_observability.JobName},
Entry: logproto.Entry{
Timestamp: time.Unix(0, time.Now().UnixNano()),
Line: fmt.Sprintf(`level=info msg="query samples fetched" op="%s" digest="%s" query_sample_text="%s" query_sample_seen="%s" query_sample_timer_wait="%s" query_redacted="%s"`, OP_QUERY_SAMPLE, digest, query_sample_text, query_sample_seen, query_sample_timer_wait, redacted),
Line: fmt.Sprintf(`level=info msg="query samples fetched" op="%s" digest="%s" query_sample_seen="%s" query_sample_timer_wait="%s" query_sample_redacted="%s"`, OP_QUERY_SAMPLE, digest, query_sample_seen, query_sample_timer_wait, query_sample_redacted),
},
}

tables := c.tablesFromQuery(query_sample_text)
tables := c.tablesFromQuery(digest, query_sample_text)
for _, table := range tables {
c.entryHandler.Chan() <- loki.Entry{
Labels: model.LabelSet{"job": "integrations/db-o11y"},
Labels: model.LabelSet{"job": database_observability.JobName},
Entry: logproto.Entry{
Timestamp: time.Unix(0, time.Now().UnixNano()),
Line: fmt.Sprintf(`level=info msg="table name parsed" op="%s" digest="%s" table="%s"`, OP_QUERY_PARSED_TABLE_NAME, digest, table),
Expand All @@ -140,15 +142,15 @@ func (c *QuerySample) fetchQuerySamples(ctx context.Context) error {
return nil
}

func (c QuerySample) tablesFromQuery(query string) []string {
func (c QuerySample) tablesFromQuery(digest, query string) []string {
if strings.HasSuffix(query, "...") {
level.Info(c.logger).Log("msg", "skipping parsing truncated query")
level.Info(c.logger).Log("msg", "skipping parsing truncated query", "digest", digest)
return []string{}
}

stmt, err := sqlparser.Parse(query)
if err != nil {
level.Error(c.logger).Log("msg", "failed to parse sql query", "err", err)
level.Error(c.logger).Log("msg", "failed to parse sql query", "digest", digest, "err", err)
return []string{}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

loki_fake "github.com/grafana/alloy/internal/component/common/loki/client/fake"
"github.com/grafana/alloy/internal/component/database_observability"
"github.com/prometheus/common/model"
"go.uber.org/goleak"

Expand Down Expand Up @@ -59,9 +60,9 @@ func TestQuerySample(t *testing.T) {

lokiEntries := lokiClient.Received()
for _, entry := range lokiEntries {
require.Equal(t, model.LabelSet{"job": "integrations/db-o11y"}, entry.Labels)
require.Equal(t, model.LabelSet{"job": database_observability.JobName}, entry.Labels)
}
require.Equal(t, `level=info msg="query samples fetched" op="query_sample" digest="abc123" query_sample_text="select * from some_table where id = 1" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_redacted="select * from some_table where id = :redacted1"`, lokiEntries[0].Line)
require.Equal(t, `level=info msg="query samples fetched" op="query_sample" digest="abc123" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select * from some_table where id = :redacted1"`, lokiEntries[0].Line)
require.Equal(t, `level=info msg="table name parsed" op="query_parsed_table_name" digest="abc123" table="some_table"`, lokiEntries[1].Line)

err = mock.ExpectationsWereMet()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/prometheus/common/model"

"github.com/grafana/alloy/internal/component/common/loki"
"github.com/grafana/alloy/internal/component/database_observability"
"github.com/grafana/alloy/internal/runtime/logging/level"
)

Expand Down Expand Up @@ -146,7 +147,7 @@ func (c *SchemaTable) extractSchema(ctx context.Context) error {
schemas = append(schemas, schema)

c.entryHandler.Chan() <- loki.Entry{
Labels: model.LabelSet{"job": "integrations/db-o11y"},
Labels: model.LabelSet{"job": database_observability.JobName},
Entry: logproto.Entry{
Timestamp: time.Unix(0, time.Now().UnixNano()),
Line: fmt.Sprintf(`level=info msg="schema detected" op="%s" schema="%s"`, OP_SCHEMA_DETECTION, schema),
Expand Down Expand Up @@ -179,7 +180,7 @@ func (c *SchemaTable) extractSchema(ctx context.Context) error {
tables = append(tables, tableInfo{schema: schema, tableName: table, createTime: createTime, updateTime: updateTime})

c.entryHandler.Chan() <- loki.Entry{
Labels: model.LabelSet{"job": "integrations/db-o11y"},
Labels: model.LabelSet{"job": database_observability.JobName},
Entry: logproto.Entry{
Timestamp: time.Unix(0, time.Now().UnixNano()),
Line: fmt.Sprintf(`level=info msg="table detected" op="%s" schema="%s" table="%s"`, OP_TABLE_DETECTION, schema, table),
Expand Down Expand Up @@ -215,7 +216,7 @@ func (c *SchemaTable) extractSchema(ctx context.Context) error {
c.cache.Add(cacheKey, table)

c.entryHandler.Chan() <- loki.Entry{
Labels: model.LabelSet{"job": "integrations/db-o11y"},
Labels: model.LabelSet{"job": database_observability.JobName},
Entry: logproto.Entry{
Timestamp: time.Unix(0, time.Now().UnixNano()),
Line: fmt.Sprintf(`level=info msg="create table" op="%s" schema="%s" table="%s" create_statement="%s"`, OP_CREATE_STATEMENT, table.schema, table.tableName, createStmt),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/DATA-DOG/go-sqlmock"
"github.com/go-kit/log"
loki_fake "github.com/grafana/alloy/internal/component/common/loki/client/fake"
"github.com/grafana/alloy/internal/component/database_observability"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
Expand Down Expand Up @@ -76,7 +77,7 @@ func TestSchemaTable(t *testing.T) {

lokiEntries := lokiClient.Received()
for _, entry := range lokiEntries {
require.Equal(t, model.LabelSet{"job": "integrations/db-o11y"}, entry.Labels)
require.Equal(t, model.LabelSet{"job": database_observability.JobName}, entry.Labels)
}
require.Equal(t, `level=info msg="schema detected" op="schema_detection" schema="some_schema"`, lokiEntries[0].Line)
require.Equal(t, `level=info msg="table detected" op="table_detection" schema="some_schema" table="some_table"`, lokiEntries[1].Line)
Expand Down
45 changes: 26 additions & 19 deletions internal/component/database_observability/mysql/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/grafana/alloy/internal/component"
"github.com/grafana/alloy/internal/component/common/loki"
"github.com/grafana/alloy/internal/component/database_observability"
"github.com/grafana/alloy/internal/component/database_observability/mysql/collector"
"github.com/grafana/alloy/internal/component/discovery"
"github.com/grafana/alloy/internal/featuregate"
Expand Down Expand Up @@ -46,14 +47,18 @@ var (
_ syntax.Validator = (*Arguments)(nil)
)

// TODO(cristian) consider using something like "enabled_collectors"
// to allow users to enable/disable collectors.
type Arguments struct {
DataSourceName alloytypes.Secret `alloy:"data_source_name,attr"`
CollectInterval time.Duration `alloy:"collect_interval,attr,optional"`
ForwardTo []loki.LogsReceiver `alloy:"forward_to,attr"`
DataSourceName alloytypes.Secret `alloy:"data_source_name,attr"`
CollectInterval time.Duration `alloy:"collect_interval,attr,optional"`
QuerySamplesEnabled bool `alloy:"query_samples_enabled,attr,optional"`
ForwardTo []loki.LogsReceiver `alloy:"forward_to,attr"`
}

var DefaultArguments = Arguments{
CollectInterval: 10 * time.Second,
CollectInterval: 10 * time.Second,
QuerySamplesEnabled: true,
}

func (a *Arguments) SetToDefault() {
Expand Down Expand Up @@ -155,7 +160,7 @@ func (c *Component) getBaseTarget() (discovery.Target, error) {
model.SchemeLabel: "http",
model.MetricsPathLabel: path.Join(httpData.HTTPPathForComponent(c.opts.ID), "metrics"),
"instance": c.instanceKey(),
"job": "integrations/db-o11y",
"job": database_observability.JobName,
}, nil
}

Expand Down Expand Up @@ -194,21 +199,23 @@ func (c *Component) Update(args component.Arguments) error {

entryHandler := loki.NewEntryHandler(c.handler.Chan(), func() {})

qsCollector, err := collector.NewQuerySample(collector.QuerySampleArguments{
DB: dbConnection,
CollectInterval: c.args.CollectInterval,
EntryHandler: entryHandler,
Logger: c.opts.Logger,
})
if err != nil {
level.Error(c.opts.Logger).Log("msg", "failed to create QuerySample collector", "err", err)
return err
}
if err := qsCollector.Start(context.Background()); err != nil {
level.Error(c.opts.Logger).Log("msg", "failed to start QuerySample collector", "err", err)
return err
if c.args.QuerySamplesEnabled {
qsCollector, err := collector.NewQuerySample(collector.QuerySampleArguments{
DB: dbConnection,
CollectInterval: c.args.CollectInterval,
EntryHandler: entryHandler,
Logger: c.opts.Logger,
})
if err != nil {
level.Error(c.opts.Logger).Log("msg", "failed to create QuerySample collector", "err", err)
return err
}
if err := qsCollector.Start(context.Background()); err != nil {
level.Error(c.opts.Logger).Log("msg", "failed to start QuerySample collector", "err", err)
return err
}
c.collectors = append(c.collectors, qsCollector)
}
c.collectors = append(c.collectors, qsCollector)

stCollector, err := collector.NewSchemaTable(collector.SchemaTableArguments{
DB: dbConnection,
Expand Down

0 comments on commit cf2e3fb

Please sign in to comment.