Skip to content

Commit

Permalink
add test for SchemaTable collector
Browse files Browse the repository at this point in the history
  • Loading branch information
cristiangreco committed Nov 21, 2024
1 parent b3ec207 commit 71331b1
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package collector

import (
"context"
"os"
"testing"
"time"

Expand All @@ -22,9 +23,9 @@ func TestQuerySampleRun(t *testing.T) {

collector, err := NewQuerySample(QuerySampleArguments{
DB: db,
ScrapeInterval: time.Second,
ScrapeInterval: time.Minute,
EntryHandler: lokiClient,
Logger: log.NewNopLogger(),
Logger: log.NewLogfmtLogger(os.Stderr),
})
require.NoError(t, err)
require.NotNil(t, collector)
Expand All @@ -37,7 +38,7 @@ func TestQuerySampleRun(t *testing.T) {
"query_sample_timer_wait",
}).AddRow(
"abc123",
"select * from table_name where id = 1",
"select * from some_table where id = 1",
"2024-01-01T00:00:00.000Z",
"1000",
),
Expand All @@ -57,8 +58,8 @@ func TestQuerySampleRun(t *testing.T) {
for _, entry := range lokiEntries {
require.Equal(t, model.LabelSet{"job": "integrations/db-o11y"}, entry.Labels)
}
require.Equal(t, `level=info msg="query samples fetched" op="query_sample" digest="abc123" query_sample_text="select * from table_name where id = 1" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_redacted="select * from table_name where id = :redacted1"`, lokiEntries[0].Line)
require.Equal(t, `level=info msg="table name parsed" op="query_parsed_table_name" digest="abc123" table="table_name"`, lokiEntries[1].Line)
require.Equal(t, `level=info msg="query samples fetched" op="query_sample" digest="abc123" query_sample_text="select * from some_table where id = 1" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_redacted="select * from some_table where id = :redacted1"`, lokiEntries[0].Line)
require.Equal(t, `level=info msg="table name parsed" op="query_parsed_table_name" digest="abc123" table="some_table"`, lokiEntries[1].Line)

err = mock.ExpectationsWereMet()
require.NoError(t, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package collector
import (
"context"
"database/sql"
"errors"
"fmt"
"time"

Expand Down Expand Up @@ -47,7 +46,7 @@ const (
)

type SchemaTableArguments struct {
DSN string
DB *sql.DB
ScrapeInterval time.Duration
EntryHandler loki.EntryHandler
CacheTTL time.Duration
Expand Down Expand Up @@ -81,20 +80,8 @@ type tableInfo struct {
}

func NewSchemaTable(args SchemaTableArguments) (*SchemaTable, error) {
dbConnection, err := sql.Open("mysql", args.DSN+"?parseTime=true")
if err != nil {
return nil, err
}
if dbConnection == nil {
return nil, errors.New("nil DB connection")
}

if err = dbConnection.Ping(); err != nil {
return nil, err
}

return &SchemaTable{
dbConnection: dbConnection,
dbConnection: args.DB,
scrapeInterval: args.ScrapeInterval,
entryHandler: args.EntryHandler,
cache: expirable.NewLRU[string, tableInfo](0, nil, args.CacheTTL),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package collector

import (
"context"
"os"
"testing"
"time"

"github.com/DATA-DOG/go-sqlmock"
"github.com/go-kit/log"
loki_fake "github.com/grafana/alloy/internal/component/common/loki/client/fake"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
)

func TestSchemaTableRun(t *testing.T) {
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
require.NoError(t, err)
defer db.Close()

lokiClient := loki_fake.NewClient(func() {})

collector, err := NewSchemaTable(SchemaTableArguments{
DB: db,
ScrapeInterval: time.Second,
EntryHandler: lokiClient,
CacheTTL: time.Minute,
Logger: log.NewLogfmtLogger(os.Stderr),
})

require.NoError(t, err)
require.NotNil(t, collector)

mock.ExpectQuery(selectSchemaName).WithoutArgs().WillReturnRows(
sqlmock.NewRows([]string{
"schema_name",
}).AddRow(
"some_schema",
),
)
mock.ExpectQuery(selectTableName).WithArgs("some_schema").WillReturnRows(
sqlmock.NewRows([]string{
"table_name",
"create_time",
"update_time",
}).AddRow(
"some_table",
time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC),
time.Date(2024, 2, 2, 0, 0, 0, 0, time.UTC),
),
)
mock.ExpectQuery("SHOW CREATE TABLE some_schema.some_table").WithoutArgs().WillReturnRows(
sqlmock.NewRows([]string{
"table_name",
"create_statement",
}).AddRow(
"some_schema.some_table",
"CREATE TABLE some_table (id INT)",
),
)

err = collector.Run(context.Background())
require.NoError(t, err)

require.Eventually(t, func() bool {
return len(lokiClient.Received()) == 3
}, 5*time.Second, 100*time.Millisecond)

collector.Stop()
lokiClient.Stop()

lokiEntries := lokiClient.Received()
for _, entry := range lokiEntries {
require.Equal(t, model.LabelSet{"job": "integrations/db-o11y"}, entry.Labels)
}
require.Equal(t, `level=info msg="schema detected" op="schema_detection" schema="some_schema"`, lokiEntries[0].Line)
require.Equal(t, `level=info msg="table detected" op="table_detection" schema="some_schema" table="some_table"`, lokiEntries[1].Line)
require.Equal(t, `level=info msg="create table" op="create_statement" schema="some_schema" table="some_table" create_statement="CREATE TABLE some_table (id INT)"`, lokiEntries[2].Line)

err = mock.ExpectationsWereMet()
require.NoError(t, err)
}
6 changes: 4 additions & 2 deletions internal/component/database_observability/mysql/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ func (c *Component) Update(args component.Arguments) error {
newArgs := args.(Arguments)
c.args = newArgs

dbConnection, err := sql.Open("mysql", string(newArgs.DataSourceName))
// TODO(cristian): verify before appending parameter
dbConnection, err := sql.Open("mysql", string(newArgs.DataSourceName)+"?parseTime=true")
if err != nil {
return err
}
Expand All @@ -186,6 +187,7 @@ func (c *Component) Update(args component.Arguments) error {

entryHandler := loki.NewEntryHandler(c.handler.Chan(), func() {})

// TODO(cristian)
// dbConnection.Close()
// entryHandler.Stop()

Expand All @@ -206,7 +208,7 @@ func (c *Component) Update(args component.Arguments) error {
c.collectors = append(c.collectors, qsCollector)

stCollector, err := collector.NewSchemaTable(collector.SchemaTableArguments{
DSN: string(newArgs.DataSourceName),
DB: dbConnection,
ScrapeInterval: newArgs.ScrapeInterval,
EntryHandler: entryHandler,
Logger: c.opts.Logger,
Expand Down

0 comments on commit 71331b1

Please sign in to comment.