diff --git a/flow/connectors/mysql/qrep.go b/flow/connectors/mysql/qrep.go index fb4bceb14..83c35b176 100644 --- a/flow/connectors/mysql/qrep.go +++ b/flow/connectors/mysql/qrep.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "log/slog" + "strings" "text/template" "github.com/go-mysql-org/go-mysql/mysql" @@ -150,7 +151,6 @@ func (c *MySqlConnector) PullQRepRecords( if err != nil { return 0, err } - query = fmt.Sprintf("select * from %s", config.WatermarkTable) totalRecords := 0 onResult := func(rs *mysql.Result) error { @@ -179,18 +179,17 @@ func (c *MySqlConnector) PullQRepRecords( return nil } - /* - // testing - rs, err := c.Execute(ctx, "show databases") - if err != nil { - return 0, fmt.Errorf("mymymy err %w", err) - } - for rowIdx, row := range rs.Values { - for idx, val := range row { - c.logger.Info("mymymy show", slog.Int("rowIdx", rowIdx), slog.Int("idx", idx), slog.Any("field", string(val.AsString()))) - } + // testing + schema, _, _ := strings.Cut(config.WatermarkTable, ".") + rs, err := c.Execute(ctx, fmt.Sprintf("show tables from %s", schema)) + if err != nil { + return 0, fmt.Errorf("mymymy err %w", err) + } + for rowIdx, row := range rs.Values { + for idx, val := range row { + c.logger.Info("mymymy show", slog.Int("rowIdx", rowIdx), slog.Int("idx", idx), slog.Any("field", string(val.AsString()))) } - */ + } if last.FullTablePartition { // this is a full table partition, so just run the query