diff --git a/flow/connectors/mysql/qrep.go b/flow/connectors/mysql/qrep.go index 94e17c42f..fb4bceb14 100644 --- a/flow/connectors/mysql/qrep.go +++ b/flow/connectors/mysql/qrep.go @@ -150,6 +150,7 @@ func (c *MySqlConnector) PullQRepRecords( if err != nil { return 0, err } + query = fmt.Sprintf("select * from %s", config.WatermarkTable) totalRecords := 0 onResult := func(rs *mysql.Result) error { @@ -178,16 +179,18 @@ func (c *MySqlConnector) PullQRepRecords( return nil } - // testing - rs, err := c.Execute(ctx, "show databases") - if err != nil { - return 0, fmt.Errorf("mymymy err %w", err) - } - for rowIdx, row := range rs.Values { - for idx, val := range row { - c.logger.Info("mymymy show", slog.Int("rowIdx", rowIdx), slog.Int("idx", idx), slog.Any("field", string(val.AsString()))) + /* + // testing + rs, err := c.Execute(ctx, "show databases") + if err != nil { + return 0, fmt.Errorf("mymymy err %w", err) } - } + for rowIdx, row := range rs.Values { + for idx, val := range row { + c.logger.Info("mymymy show", slog.Int("rowIdx", rowIdx), slog.Int("idx", idx), slog.Any("field", string(val.AsString()))) + } + } + */ if last.FullTablePartition { // this is a full table partition, so just run the query