diff --git a/bootstrap.sh b/bootstrap.sh index 3c58a603efc..5a75cb4115e 100755 --- a/bootstrap.sh +++ b/bootstrap.sh @@ -290,7 +290,6 @@ gotools=" \ golang.org/x/tools/cmd/cover \ golang.org/x/tools/cmd/goimports \ golang.org/x/tools/cmd/goyacc \ - honnef.co/go/tools/cmd/unused \ " echo "Installing dev tools with 'go get'..." # shellcheck disable=SC2086 diff --git a/go/streamlog/streamlog.go b/go/streamlog/streamlog.go index 6b9559694be..8717898df8b 100644 --- a/go/streamlog/streamlog.go +++ b/go/streamlog/streamlog.go @@ -25,6 +25,7 @@ import ( "net/url" "os" "os/signal" + "strings" "sync" "syscall" @@ -40,6 +41,9 @@ var ( // QueryLogFormat controls the format of the query log (either text or json) QueryLogFormat = flag.String("querylog-format", "text", "format for query logs (\"text\" or \"json\")") + // QueryLogFilterTag contains an optional string that must be present in the query for it to be logged + QueryLogFilterTag = flag.String("querylog-filter-tag", "", "string that must be present in the query for it to be logged") + sendCount = stats.NewCountersWithSingleLabel("StreamlogSend", "stream log send count", "logger_names") deliveredCount = stats.NewCountersWithMultiLabels( "StreamlogDelivered", @@ -201,3 +205,12 @@ func GetFormatter(logger *StreamLogger) LogFormatter { return fmter.Logf(w, params) } } + +// ShouldEmitLog returns whether the log with the given SQL query +// should be emitted or filtered +func ShouldEmitLog(sql string) bool { + if *QueryLogFilterTag == "" { + return true + } + return strings.Contains(sql, *QueryLogFilterTag) +} diff --git a/go/vt/vtgate/engine/delete.go b/go/vt/vtgate/engine/delete.go index 4930d0933fc..53041050d4e 100644 --- a/go/vt/vtgate/engine/delete.go +++ b/go/vt/vtgate/engine/delete.go @@ -19,6 +19,7 @@ package engine import ( "encoding/json" "fmt" + "time" "vitess.io/vitess/go/jsonutil" "vitess.io/vitess/go/sqltypes" @@ -63,6 +64,9 @@ type Delete struct { // Option to override the standard behavior and allow a multi-shard delete // to use single round trip autocommit. MultiShardAutocommit bool + + // QueryTimeout contains the optional timeout (in milliseconds) to apply to this query + QueryTimeout int } // MarshalJSON serializes the Delete into a JSON representation. @@ -84,6 +88,7 @@ func (del *Delete) MarshalJSON() ([]byte, error) { Table string `json:",omitempty"` OwnedVindexQuery string `json:",omitempty"` MultiShardAutocommit bool `json:",omitempty"` + QueryTimeout int `json:",omitempty"` }{ Opcode: del.Opcode, Keyspace: del.Keyspace, @@ -93,6 +98,7 @@ func (del *Delete) MarshalJSON() ([]byte, error) { Table: tname, OwnedVindexQuery: del.OwnedVindexQuery, MultiShardAutocommit: del.MultiShardAutocommit, + QueryTimeout: del.QueryTimeout, } return jsonutil.MarshalNoEscape(marshalDelete) } @@ -141,6 +147,11 @@ func (del *Delete) RouteType() string { // Execute performs a non-streaming exec. func (del *Delete) Execute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) { + if del.QueryTimeout != 0 { + cancel := vcursor.SetContextTimeout(time.Duration(del.QueryTimeout) * time.Millisecond) + defer cancel() + } + switch del.Opcode { case DeleteUnsharded: return del.execDeleteUnsharded(vcursor, bindVars) diff --git a/go/vt/vtgate/engine/insert.go b/go/vt/vtgate/engine/insert.go index a1ead406736..a91a8a5ccdf 100644 --- a/go/vt/vtgate/engine/insert.go +++ b/go/vt/vtgate/engine/insert.go @@ -21,6 +21,7 @@ import ( "fmt" "strconv" "strings" + "time" "vitess.io/vitess/go/jsonutil" "vitess.io/vitess/go/sqltypes" @@ -76,6 +77,9 @@ type Insert struct { // However some application use cases would prefer that the statement partially // succeed in order to get the performance benefits of autocommit. MultiShardAutocommit bool + + // QueryTimeout contains the optional timeout (in milliseconds) to apply to this query + QueryTimeout int } // NewQueryInsert creates an Insert with a query string. @@ -127,6 +131,7 @@ func (ins *Insert) MarshalJSON() ([]byte, error) { Mid []string `json:",omitempty"` Suffix string `json:",omitempty"` MultiShardAutocommit bool `json:",omitempty"` + QueryTimeout int `json:",omitempty"` }{ Opcode: ins.Opcode, Keyspace: ins.Keyspace, @@ -138,6 +143,7 @@ func (ins *Insert) MarshalJSON() ([]byte, error) { Mid: ins.Mid, Suffix: ins.Suffix, MultiShardAutocommit: ins.MultiShardAutocommit, + QueryTimeout: ins.QueryTimeout, } return jsonutil.MarshalNoEscape(marshalInsert) } @@ -191,6 +197,11 @@ func (ins *Insert) RouteType() string { // Execute performs a non-streaming exec. func (ins *Insert) Execute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) { + if ins.QueryTimeout != 0 { + cancel := vcursor.SetContextTimeout(time.Duration(ins.QueryTimeout) * time.Millisecond) + defer cancel() + } + switch ins.Opcode { case InsertUnsharded: return ins.execInsertUnsharded(vcursor, bindVars) diff --git a/go/vt/vtgate/engine/update.go b/go/vt/vtgate/engine/update.go index 83a48c32f31..81857997947 100644 --- a/go/vt/vtgate/engine/update.go +++ b/go/vt/vtgate/engine/update.go @@ -19,6 +19,7 @@ package engine import ( "encoding/json" "fmt" + "time" "vitess.io/vitess/go/jsonutil" "vitess.io/vitess/go/sqltypes" @@ -66,6 +67,9 @@ type Update struct { // Option to override the standard behavior and allow a multi-shard update // to use single round trip autocommit. MultiShardAutocommit bool + + // QueryTimeout contains the optional timeout (in milliseconds) to apply to this query + QueryTimeout int } // MarshalJSON serializes the Update into a JSON representation. @@ -88,6 +92,7 @@ func (upd *Update) MarshalJSON() ([]byte, error) { Table string `json:",omitempty"` OwnedVindexQuery string `json:",omitempty"` MultiShardAutocommit bool `json:",omitempty"` + QueryTimeout int `json:",omitempty"` }{ Opcode: upd.Opcode, Keyspace: upd.Keyspace, @@ -98,6 +103,7 @@ func (upd *Update) MarshalJSON() ([]byte, error) { Table: tname, OwnedVindexQuery: upd.OwnedVindexQuery, MultiShardAutocommit: upd.MultiShardAutocommit, + QueryTimeout: upd.QueryTimeout, } return jsonutil.MarshalNoEscape(marshalUpdate) } @@ -145,6 +151,11 @@ func (upd *Update) RouteType() string { // Execute performs a non-streaming exec. func (upd *Update) Execute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) { + if upd.QueryTimeout != 0 { + cancel := vcursor.SetContextTimeout(time.Duration(upd.QueryTimeout) * time.Millisecond) + defer cancel() + } + switch upd.Opcode { case UpdateUnsharded: return upd.execUpdateUnsharded(vcursor, bindVars) diff --git a/go/vt/vtgate/logstats.go b/go/vt/vtgate/logstats.go index 84042878be8..02b4d46b5b0 100644 --- a/go/vt/vtgate/logstats.go +++ b/go/vt/vtgate/logstats.go @@ -121,6 +121,10 @@ func (stats *LogStats) RemoteAddrUsername() (string, string) { // Logf formats the log record to the given writer, either as // tab-separated list of logged fields or as JSON. func (stats *LogStats) Logf(w io.Writer, params url.Values) error { + if !streamlog.ShouldEmitLog(stats.SQL) { + return nil + } + formattedBindVars := "\"[REDACTED]\"" if !*streamlog.RedactDebugUIQueries { _, fullBindParams := params["full"] diff --git a/go/vt/vtgate/logstats_test.go b/go/vt/vtgate/logstats_test.go index 01f5430e6d8..4720d5c7cfd 100644 --- a/go/vt/vtgate/logstats_test.go +++ b/go/vt/vtgate/logstats_test.go @@ -126,6 +126,35 @@ func TestLogStatsFormat(t *testing.T) { *streamlog.QueryLogFormat = "text" } +func TestLogStatsFilter(t *testing.T) { + defer func() { *streamlog.QueryLogFilterTag = "" }() + + logStats := NewLogStats(context.Background(), "test", "sql1 /* LOG_THIS_QUERY */", map[string]*querypb.BindVariable{"intVal": sqltypes.Int64BindVariable(1)}) + logStats.StartTime = time.Date(2017, time.January, 1, 1, 2, 3, 0, time.UTC) + logStats.EndTime = time.Date(2017, time.January, 1, 1, 2, 4, 1234, time.UTC) + params := map[string][]string{"full": {}} + + got := testFormat(logStats, url.Values(params)) + want := "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t0.000000\t0.000000\t0.000000\t\t\"sql1 /* LOG_THIS_QUERY */\"\tmap[intVal:type:INT64 value:\"1\" ]\t0\t0\t\"\"\t\n" + if got != want { + t.Errorf("logstats format: got:\n%q\nwant:\n%q\n", got, want) + } + + *streamlog.QueryLogFilterTag = "LOG_THIS_QUERY" + got = testFormat(logStats, url.Values(params)) + want = "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t0.000000\t0.000000\t0.000000\t\t\"sql1 /* LOG_THIS_QUERY */\"\tmap[intVal:type:INT64 value:\"1\" ]\t0\t0\t\"\"\t\n" + if got != want { + t.Errorf("logstats format: got:\n%q\nwant:\n%q\n", got, want) + } + + *streamlog.QueryLogFilterTag = "NOT_THIS_QUERY" + got = testFormat(logStats, url.Values(params)) + want = "" + if got != want { + t.Errorf("logstats format: got:\n%q\nwant:\n%q\n", got, want) + } +} + func TestLogStatsContextHTML(t *testing.T) { html := "HtmlContext" callInfo := &fakecallinfo.FakeCallInfo{ diff --git a/go/vt/vtgate/planbuilder/delete.go b/go/vt/vtgate/planbuilder/delete.go index 0d20ae4556c..b0c4b46ecc8 100644 --- a/go/vt/vtgate/planbuilder/delete.go +++ b/go/vt/vtgate/planbuilder/delete.go @@ -71,6 +71,8 @@ func buildDeletePlan(del *sqlparser.Delete, vschema ContextVSchema) (*engine.Del edel.MultiShardAutocommit = true } + edel.QueryTimeout = queryTimeout(directives) + if rb.ERoute.TargetDestination != nil { if rb.ERoute.TargetTabletType != topodatapb.TabletType_MASTER { return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unsupported: DELETE statement with a replica target") diff --git a/go/vt/vtgate/planbuilder/insert.go b/go/vt/vtgate/planbuilder/insert.go index 7bacfac0aee..bc85b18fed0 100644 --- a/go/vt/vtgate/planbuilder/insert.go +++ b/go/vt/vtgate/planbuilder/insert.go @@ -123,6 +123,8 @@ func buildInsertShardedPlan(ins *sqlparser.Insert, table *vindexes.Table) (*engi eins.MultiShardAutocommit = true } + eins.QueryTimeout = queryTimeout(directives) + var rows sqlparser.Values switch insertValues := ins.Rows.(type) { case *sqlparser.Select, *sqlparser.Union: diff --git a/go/vt/vtgate/planbuilder/route.go b/go/vt/vtgate/planbuilder/route.go index 74084b8d028..321abb74591 100644 --- a/go/vt/vtgate/planbuilder/route.go +++ b/go/vt/vtgate/planbuilder/route.go @@ -680,3 +680,21 @@ func (rb *route) SetOpcode(code engine.RouteOpcode) error { rb.ERoute.Opcode = code return nil } + +// queryTimeout returns DirectiveQueryTimeout value if set, otherwise returns 0. +func queryTimeout(d sqlparser.CommentDirectives) int { + if d == nil { + return 0 + } + + val, ok := d[sqlparser.DirectiveQueryTimeout] + if !ok { + return 0 + } + + intVal, ok := val.(int) + if ok { + return intVal + } + return 0 +} diff --git a/go/vt/vtgate/planbuilder/select.go b/go/vt/vtgate/planbuilder/select.go index 8814f328b1b..50da199cd21 100644 --- a/go/vt/vtgate/planbuilder/select.go +++ b/go/vt/vtgate/planbuilder/select.go @@ -321,21 +321,3 @@ func (pb *primitiveBuilder) expandStar(inrcs []*resultColumn, expr *sqlparser.St } return inrcs, true, nil } - -// queryTimeout returns DirectiveQueryTimeout value if set, otherwise returns 0. -func queryTimeout(d sqlparser.CommentDirectives) int { - if d == nil { - return 0 - } - - val, ok := d[sqlparser.DirectiveQueryTimeout] - if !ok { - return 0 - } - - intVal, ok := val.(int) - if ok { - return intVal - } - return 0 -} diff --git a/go/vt/vtgate/planbuilder/testdata/dml_cases.txt b/go/vt/vtgate/planbuilder/testdata/dml_cases.txt index 8de0a71d9c2..a90b6c6d480 100644 --- a/go/vt/vtgate/planbuilder/testdata/dml_cases.txt +++ b/go/vt/vtgate/planbuilder/testdata/dml_cases.txt @@ -988,6 +988,33 @@ } } +# insert with query timeout +"insert /*vt+ QUERY_TIMEOUT_MS=1 */ into user(id) values (1), (2)" +{ + "Original": "insert /*vt+ QUERY_TIMEOUT_MS=1 */ into user(id) values (1), (2)", + "Instructions": { + "Opcode": "InsertSharded", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "Query": "insert /*vt+ QUERY_TIMEOUT_MS=1 */ into user(id, Name, Costly) values (:_Id0, :_Name0, :_Costly0), (:_Id1, :_Name1, :_Costly1)", + "Values": [[[":__seq0",":__seq1"]],[[null,null]],[[null,null]]], + "Table": "user", + "Generate": { + "Keyspace": { + "Name": "main", + "Sharded": false + }, + "Query": "select next :n values from seq", + "Values": [1,2] + }, + "Prefix": "insert /*vt+ QUERY_TIMEOUT_MS=1 */ into user(id, Name, Costly) values ", + "Mid": ["(:_Id0, :_Name0, :_Costly0)","(:_Id1, :_Name1, :_Costly1)"], + "QueryTimeout": 1 + } +} + # insert with multiple rows - multi-shard autocommit "insert /*vt+ MULTI_SHARD_AUTOCOMMIT=1 */ into user(id) values (1), (2)" { @@ -1337,6 +1364,22 @@ } } +# update with no primary vindex on where clause (scatter update) - query timeout +"update /*vt+ QUERY_TIMEOUT_MS=1 */ user_extra set val = 1" +{ + "Original": "update /*vt+ QUERY_TIMEOUT_MS=1 */ user_extra set val = 1", + "Instructions": { + "Opcode": "UpdateScatter", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "Query": "update /*vt+ QUERY_TIMEOUT_MS=1 */ user_extra set val = 1", + "Table": "user_extra", + "QueryTimeout": 1 + } +} + # update with non-comparison expr "update user_extra set val = 1 where id between 1 and 2" { @@ -1489,6 +1532,22 @@ } } +# delete from with no index match - query timeout +"delete /*vt+ QUERY_TIMEOUT_MS=1 */ from user_extra where name = 'jose'" +{ + "Original": "delete /*vt+ QUERY_TIMEOUT_MS=1 */ from user_extra where name = 'jose'", + "Instructions": { + "Opcode": "DeleteScatter", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "Query": "delete /*vt+ QUERY_TIMEOUT_MS=1 */ from user_extra where name = 'jose'", + "Table": "user_extra", + "QueryTimeout": 1 + } +} + # delete from with primary id in through IN clause "delete from user_extra where user_id in (1, 2)" { diff --git a/go/vt/vtgate/planbuilder/update.go b/go/vt/vtgate/planbuilder/update.go index 85a9c174307..f4c2c6d7fe5 100644 --- a/go/vt/vtgate/planbuilder/update.go +++ b/go/vt/vtgate/planbuilder/update.go @@ -65,6 +65,8 @@ func buildUpdatePlan(upd *sqlparser.Update, vschema ContextVSchema) (*engine.Upd eupd.MultiShardAutocommit = true } + eupd.QueryTimeout = queryTimeout(directives) + var vindexTable *vindexes.Table for _, tval := range pb.st.tables { vindexTable = tval.vindexTable diff --git a/go/vt/vtgate/querylogz_test.go b/go/vt/vtgate/querylogz_test.go index 1f0ede98bd6..65abb9b8642 100644 --- a/go/vt/vtgate/querylogz_test.go +++ b/go/vt/vtgate/querylogz_test.go @@ -26,6 +26,7 @@ import ( "time" "golang.org/x/net/context" + "vitess.io/vitess/go/streamlog" "vitess.io/vitess/go/vt/callerid" ) @@ -142,6 +143,17 @@ func TestQuerylogzHandlerFormatting(t *testing.T) { close(ch) body, _ = ioutil.ReadAll(response.Body) checkQuerylogzHasStats(t, slowQueryPattern, logStats, body) + + // ensure querylogz is not affected by the filter tag + *streamlog.QueryLogFilterTag = "XXX_SKIP_ME" + defer func() { *streamlog.QueryLogFilterTag = "" }() + ch = make(chan interface{}, 1) + ch <- logStats + querylogzHandler(ch, response, req) + close(ch) + body, _ = ioutil.ReadAll(response.Body) + checkQuerylogzHasStats(t, slowQueryPattern, logStats, body) + } func checkQuerylogzHasStats(t *testing.T, pattern []string, logStats *LogStats, page []byte) { diff --git a/go/vt/vttablet/tabletserver/query_executor.go b/go/vt/vttablet/tabletserver/query_executor.go index 69d0079d90f..730f4b0963e 100644 --- a/go/vt/vttablet/tabletserver/query_executor.go +++ b/go/vt/vttablet/tabletserver/query_executor.go @@ -296,11 +296,14 @@ func (qre *QueryExecutor) execDmlAutoCommit() (reply *sqltypes.Result, err error } func (qre *QueryExecutor) execAsTransaction(f func(conn *TxConnection) (*sqltypes.Result, error)) (reply *sqltypes.Result, err error) { - conn, err := qre.tsv.te.txPool.LocalBegin(qre.ctx, qre.options) + conn, beginSQL, err := qre.tsv.te.txPool.LocalBegin(qre.ctx, qre.options) if err != nil { return nil, err } defer qre.tsv.te.txPool.LocalConclude(qre.ctx, conn) + if beginSQL != "" { + qre.logStats.AddRewrittenSQL(beginSQL, time.Now()) + } qre.logStats.AddRewrittenSQL("begin", time.Now()) reply, err = f(conn) @@ -311,8 +314,12 @@ func (qre *QueryExecutor) execAsTransaction(f func(conn *TxConnection) (*sqltype qre.logStats.AddRewrittenSQL("rollback", start) return nil, err } - err = qre.tsv.te.txPool.LocalCommit(qre.ctx, conn, qre.tsv.messager) - qre.logStats.AddRewrittenSQL("commit", start) + + commitSQL, err := qre.tsv.te.txPool.LocalCommit(qre.ctx, conn, qre.tsv.messager) + // As above LocalCommit is a no-op for autocommmit so don't log anything. + if commitSQL != "" { + qre.logStats.AddRewrittenSQL("commit", start) + } if err != nil { return nil, err } diff --git a/go/vt/vttablet/tabletserver/querylogz_test.go b/go/vt/vttablet/tabletserver/querylogz_test.go index 199a4c969b7..d94f355eaa3 100644 --- a/go/vt/vttablet/tabletserver/querylogz_test.go +++ b/go/vt/vttablet/tabletserver/querylogz_test.go @@ -26,6 +26,7 @@ import ( "time" "golang.org/x/net/context" + "vitess.io/vitess/go/streamlog" "vitess.io/vitess/go/vt/callerid" "vitess.io/vitess/go/vt/vttablet/tabletserver/planbuilder" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" @@ -148,6 +149,16 @@ func TestQuerylogzHandler(t *testing.T) { close(ch) body, _ = ioutil.ReadAll(response.Body) checkQuerylogzHasStats(t, slowQueryPattern, logStats, body) + + // ensure querylogz is not affected by the filter tag + *streamlog.QueryLogFilterTag = "XXX_SKIP_ME" + defer func() { *streamlog.QueryLogFilterTag = "" }() + ch = make(chan interface{}, 1) + ch <- logStats + querylogzHandler(ch, response, req) + close(ch) + body, _ = ioutil.ReadAll(response.Body) + checkQuerylogzHasStats(t, slowQueryPattern, logStats, body) } func checkQuerylogzHasStats(t *testing.T, pattern []string, logStats *tabletenv.LogStats, page []byte) { diff --git a/go/vt/vttablet/tabletserver/tabletenv/logstats.go b/go/vt/vttablet/tabletserver/tabletenv/logstats.go index 6e8323400c3..fdf6d99d352 100644 --- a/go/vt/vttablet/tabletserver/tabletenv/logstats.go +++ b/go/vt/vttablet/tabletserver/tabletenv/logstats.go @@ -180,6 +180,10 @@ func (stats *LogStats) CallInfo() (string, string) { // Logf formats the log record to the given writer, either as // tab-separated list of logged fields or as JSON. func (stats *LogStats) Logf(w io.Writer, params url.Values) error { + if !streamlog.ShouldEmitLog(stats.OriginalSQL) { + return nil + } + rewrittenSQL := "[REDACTED]" formattedBindVars := "\"[REDACTED]\"" diff --git a/go/vt/vttablet/tabletserver/tabletenv/logstats_test.go b/go/vt/vttablet/tabletserver/tabletenv/logstats_test.go index 2bd62184a0d..9f5dbe05047 100644 --- a/go/vt/vttablet/tabletserver/tabletenv/logstats_test.go +++ b/go/vt/vttablet/tabletserver/tabletenv/logstats_test.go @@ -149,6 +149,41 @@ func TestLogStatsFormat(t *testing.T) { *streamlog.QueryLogFormat = "text" } +func TestLogStatsFilter(t *testing.T) { + defer func() { *streamlog.QueryLogFilterTag = "" }() + + logStats := NewLogStats(context.Background(), "test") + logStats.StartTime = time.Date(2017, time.January, 1, 1, 2, 3, 0, time.UTC) + logStats.EndTime = time.Date(2017, time.January, 1, 1, 2, 4, 1234, time.UTC) + logStats.OriginalSQL = "sql /* LOG_THIS_QUERY */" + logStats.BindVariables = map[string]*querypb.BindVariable{"intVal": sqltypes.Int64BindVariable(1)} + logStats.AddRewrittenSQL("sql with pii", time.Now()) + logStats.MysqlResponseTime = 0 + logStats.Rows = [][]sqltypes.Value{{sqltypes.NewVarBinary("a")}} + params := map[string][]string{"full": {}} + + got := testFormat(logStats, url.Values(params)) + want := "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t\t\"sql /* LOG_THIS_QUERY */\"\tmap[intVal:type:INT64 value:\"1\" ]\t1\t\"sql with pii\"\tmysql\t0.000000\t0.000000\t0\t1\t\"\"\t\n" + if got != want { + t.Errorf("logstats format: got:\n%q\nwant:\n%q\n", got, want) + } + + *streamlog.QueryLogFilterTag = "LOG_THIS_QUERY" + got = testFormat(logStats, url.Values(params)) + want = "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t\t\"sql /* LOG_THIS_QUERY */\"\tmap[intVal:type:INT64 value:\"1\" ]\t1\t\"sql with pii\"\tmysql\t0.000000\t0.000000\t0\t1\t\"\"\t\n" + if got != want { + t.Errorf("logstats format: got:\n%q\nwant:\n%q\n", got, want) + } + + *streamlog.QueryLogFilterTag = "NOT_THIS_QUERY" + got = testFormat(logStats, url.Values(params)) + want = "" + if got != want { + t.Errorf("logstats format: got:\n%q\nwant:\n%q\n", got, want) + } + +} + func TestLogStatsFormatQuerySources(t *testing.T) { logStats := NewLogStats(context.Background(), "test") if logStats.FmtQuerySources() != "none" { diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index 003b27708a0..8866b6e456f 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -237,12 +237,15 @@ type TxPoolController interface { // StopGently will change the state to NotServing but first wait for transactions to wrap up StopGently() - // Begin begins a transaction, and returns the associated transaction id. + // Begin begins a transaction, and returns the associated transaction id and the + // statement(s) used to execute the begin (if any). + // // Subsequent statements can access the connection through the transaction id. - Begin(ctx context.Context, options *querypb.ExecuteOptions) (int64, error) + Begin(ctx context.Context, options *querypb.ExecuteOptions) (int64, string, error) - // Commit commits the specified transaction. - Commit(ctx context.Context, transactionID int64, mc messageCommitter) error + // Commit commits the specified transaction, returning the statement used to execute + // the commit or "" in autocommit settings. + Commit(ctx context.Context, transactionID int64, mc messageCommitter) (string, error) // Rollback rolls back the specified transaction. Rollback(ctx context.Context, transactionID int64) error @@ -773,13 +776,25 @@ func (tsv *TabletServer) Begin(ctx context.Context, target *querypb.Target, opti "Begin", "begin", nil, target, options, true /* isBegin */, false, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { - defer tabletenv.QueryStats.Record("BEGIN", time.Now()) + startTime := time.Now() if tsv.txThrottler.Throttle() { // TODO(erez): I think this should be RESOURCE_EXHAUSTED. return vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "Transaction throttled") } - transactionID, err = tsv.teCtrl.Begin(ctx, options) + var beginSQL string + transactionID, beginSQL, err = tsv.teCtrl.Begin(ctx, options) logStats.TransactionID = transactionID + + // Record the actual statements that were executed in the logStats. + // If nothing was actually executed, don't count the operation in + // the tablet metrics, and clear out the logStats Method so that + // handlePanicAndSendLogStats doesn't log the no-op. + logStats.OriginalSQL = beginSQL + if beginSQL != "" { + tabletenv.QueryStats.Record("BEGIN", startTime) + } else { + logStats.Method = "" + } return err }, ) @@ -793,9 +808,21 @@ func (tsv *TabletServer) Commit(ctx context.Context, target *querypb.Target, tra "Commit", "commit", nil, target, nil, false /* isBegin */, true, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { - defer tabletenv.QueryStats.Record("COMMIT", time.Now()) + startTime := time.Now() logStats.TransactionID = transactionID - return tsv.teCtrl.Commit(ctx, transactionID, tsv.messager) + + var commitSQL string + commitSQL, err = tsv.teCtrl.Commit(ctx, transactionID, tsv.messager) + + // If nothing was actually executed, don't count the operation in + // the tablet metrics, and clear out the logStats Method so that + // handlePanicAndSendLogStats doesn't log the no-op. + if commitSQL != "" { + tabletenv.QueryStats.Record("COMMIT", startTime) + } else { + logStats.Method = "" + } + return err }, ) } @@ -1471,6 +1498,7 @@ func (tsv *TabletServer) handlePanicAndSendLogStats( // Examples where we don't send the log stats: // - ExecuteBatch() (logStats == nil) // - beginWaitForSameRangeTransactions() (Method == "") + // - Begin / Commit in autocommit mode if logStats != nil && logStats.Method != "" { logStats.Send() } @@ -1860,9 +1888,9 @@ func (tsv *TabletServer) BroadcastHealth(terTimestamp int64, stats *querypb.Real target := tsv.target tsv.mu.Unlock() shr := &querypb.StreamHealthResponse{ - Target: &target, - TabletAlias: &tsv.alias, - Serving: tsv.IsServing(), + Target: &target, + TabletAlias: &tsv.alias, + Serving: tsv.IsServing(), TabletExternallyReparentedTimestamp: terTimestamp, RealtimeStats: stats, } diff --git a/go/vt/vttablet/tabletserver/tx_engine.go b/go/vt/vttablet/tabletserver/tx_engine.go index cb928f9a934..9db9a56ef5e 100644 --- a/go/vt/vttablet/tabletserver/tx_engine.go +++ b/go/vt/vttablet/tabletserver/tx_engine.go @@ -264,22 +264,24 @@ func (te *TxEngine) AcceptReadOnly() error { } } -// Begin begins a transaction, and returns the associated transaction id. +// Begin begins a transaction, and returns the associated transaction id and the +// statement(s) used to execute the begin (if any). +// // Subsequent statements can access the connection through the transaction id. -func (te *TxEngine) Begin(ctx context.Context, options *querypb.ExecuteOptions) (int64, error) { +func (te *TxEngine) Begin(ctx context.Context, options *querypb.ExecuteOptions) (int64, string, error) { te.stateLock.Lock() canOpenTransactions := te.state == AcceptingReadOnly || te.state == AcceptingReadAndWrite if !canOpenTransactions { // We are not in a state where we can start new transactions. Abort. te.stateLock.Unlock() - return 0, vterrors.Errorf(vtrpc.Code_UNAVAILABLE, "tx engine can't accept new transactions in state %v", te.state) + return 0, "", vterrors.Errorf(vtrpc.Code_UNAVAILABLE, "tx engine can't accept new transactions in state %v", te.state) } isWriteTransaction := options == nil || options.TransactionIsolation != querypb.ExecuteOptions_CONSISTENT_SNAPSHOT_READ_ONLY if te.state == AcceptingReadOnly && isWriteTransaction { te.stateLock.Unlock() - return 0, vterrors.Errorf(vtrpc.Code_UNAVAILABLE, "tx engine can only accept read-only transactions in current state") + return 0, "", vterrors.Errorf(vtrpc.Code_UNAVAILABLE, "tx engine can only accept read-only transactions in current state") } // By Add() to beginRequests, we block others from initiating state @@ -292,7 +294,7 @@ func (te *TxEngine) Begin(ctx context.Context, options *querypb.ExecuteOptions) } // Commit commits the specified transaction. -func (te *TxEngine) Commit(ctx context.Context, transactionID int64, mc messageCommitter) error { +func (te *TxEngine) Commit(ctx context.Context, transactionID int64, mc messageCommitter) (string, error) { return te.txPool.Commit(ctx, transactionID, mc) } @@ -466,7 +468,7 @@ outer: if txid > maxid { maxid = txid } - conn, err := te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) + conn, _, err := te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) if err != nil { allErr.RecordError(err) continue diff --git a/go/vt/vttablet/tabletserver/tx_engine_test.go b/go/vt/vttablet/tabletserver/tx_engine_test.go index cb7c235e41c..3c209a49bf4 100644 --- a/go/vt/vttablet/tabletserver/tx_engine_test.go +++ b/go/vt/vttablet/tabletserver/tx_engine_test.go @@ -55,10 +55,13 @@ func TestTxEngineClose(t *testing.T) { // Normal close with timeout wait. te.open() - c, err := te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) + c, beginSQL, err := te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) if err != nil { t.Fatal(err) } + if beginSQL != "begin" { + t.Errorf("beginSQL: %q, want 'begin'", beginSQL) + } c.Recycle() start = time.Now() te.close(false) @@ -68,7 +71,7 @@ func TestTxEngineClose(t *testing.T) { // Immediate close. te.open() - c, err = te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) + c, beginSQL, err = te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) if err != nil { t.Fatal(err) } @@ -82,7 +85,7 @@ func TestTxEngineClose(t *testing.T) { // Normal close with short grace period. te.shutdownGracePeriod = 250 * time.Millisecond te.open() - c, err = te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) + c, beginSQL, err = te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) if err != nil { t.Fatal(err) } @@ -99,7 +102,7 @@ func TestTxEngineClose(t *testing.T) { // Normal close with short grace period, but pool gets empty early. te.shutdownGracePeriod = 250 * time.Millisecond te.open() - c, err = te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) + c, beginSQL, err = te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) if err != nil { t.Fatal(err) } @@ -123,7 +126,7 @@ func TestTxEngineClose(t *testing.T) { // Immediate close, but connection is in use. te.open() - c, err = te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) + c, beginSQL, err = te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) if err != nil { t.Fatal(err) } @@ -492,6 +495,6 @@ func startTransaction(te *TxEngine, writeTransaction bool) error { } else { options.TransactionIsolation = querypb.ExecuteOptions_CONSISTENT_SNAPSHOT_READ_ONLY } - _, err := te.Begin(context.Background(), options) + _, _, err := te.Begin(context.Background(), options) return err } diff --git a/go/vt/vttablet/tabletserver/tx_executor.go b/go/vt/vttablet/tabletserver/tx_executor.go index f9d00174c40..e485174745f 100644 --- a/go/vt/vttablet/tabletserver/tx_executor.go +++ b/go/vt/vttablet/tabletserver/tx_executor.go @@ -68,7 +68,7 @@ func (txe *TxExecutor) Prepare(transactionID int64, dtid string) error { return vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "prepare failed for transaction %d: %v", transactionID, err) } - localConn, err := txe.te.txPool.LocalBegin(txe.ctx, &querypb.ExecuteOptions{}) + localConn, _, err := txe.te.txPool.LocalBegin(txe.ctx, &querypb.ExecuteOptions{}) if err != nil { return err } @@ -79,7 +79,7 @@ func (txe *TxExecutor) Prepare(transactionID int64, dtid string) error { return err } - err = txe.te.txPool.LocalCommit(txe.ctx, localConn, txe.messager) + _, err = txe.te.txPool.LocalCommit(txe.ctx, localConn, txe.messager) if err != nil { return err } @@ -111,7 +111,7 @@ func (txe *TxExecutor) CommitPrepared(dtid string) error { txe.markFailed(ctx, dtid) return err } - err = txe.te.txPool.LocalCommit(ctx, conn, txe.messager) + _, err = txe.te.txPool.LocalCommit(ctx, conn, txe.messager) if err != nil { txe.markFailed(ctx, dtid) return err @@ -130,7 +130,7 @@ func (txe *TxExecutor) CommitPrepared(dtid string) error { func (txe *TxExecutor) markFailed(ctx context.Context, dtid string) { tabletenv.InternalErrors.Add("TwopcCommit", 1) txe.te.preparedPool.SetFailed(dtid) - conn, err := txe.te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) + conn, _, err := txe.te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) if err != nil { log.Errorf("markFailed: Begin failed for dtid %s: %v", dtid, err) return @@ -142,7 +142,7 @@ func (txe *TxExecutor) markFailed(ctx context.Context, dtid string) { return } - if err = txe.te.txPool.LocalCommit(ctx, conn, txe.messager); err != nil { + if _, err = txe.te.txPool.LocalCommit(ctx, conn, txe.messager); err != nil { log.Errorf("markFailed: Commit failed for dtid %s: %v", dtid, err) } } @@ -170,7 +170,7 @@ func (txe *TxExecutor) RollbackPrepared(dtid string, originalID int64) error { return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled") } defer tabletenv.QueryStats.Record("ROLLBACK_PREPARED", time.Now()) - conn, err := txe.te.txPool.LocalBegin(txe.ctx, &querypb.ExecuteOptions{}) + conn, _, err := txe.te.txPool.LocalBegin(txe.ctx, &querypb.ExecuteOptions{}) if err != nil { goto returnConn } @@ -181,7 +181,7 @@ func (txe *TxExecutor) RollbackPrepared(dtid string, originalID int64) error { goto returnConn } - err = txe.te.txPool.LocalCommit(txe.ctx, conn, txe.messager) + _, err = txe.te.txPool.LocalCommit(txe.ctx, conn, txe.messager) returnConn: if preparedConn := txe.te.preparedPool.FetchForRollback(dtid); preparedConn != nil { @@ -200,7 +200,7 @@ func (txe *TxExecutor) CreateTransaction(dtid string, participants []*querypb.Ta return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled") } defer tabletenv.QueryStats.Record("CREATE_TRANSACTION", time.Now()) - conn, err := txe.te.txPool.LocalBegin(txe.ctx, &querypb.ExecuteOptions{}) + conn, _, err := txe.te.txPool.LocalBegin(txe.ctx, &querypb.ExecuteOptions{}) if err != nil { return err } @@ -210,7 +210,8 @@ func (txe *TxExecutor) CreateTransaction(dtid string, participants []*querypb.Ta if err != nil { return err } - return txe.te.txPool.LocalCommit(txe.ctx, conn, txe.messager) + _, err = txe.te.txPool.LocalCommit(txe.ctx, conn, txe.messager) + return err } // StartCommit atomically commits the transaction along with the @@ -232,7 +233,8 @@ func (txe *TxExecutor) StartCommit(transactionID int64, dtid string) error { if err != nil { return err } - return txe.te.txPool.LocalCommit(txe.ctx, conn, txe.messager) + _, err = txe.te.txPool.LocalCommit(txe.ctx, conn, txe.messager) + return err } // SetRollback transitions the 2pc transaction to the Rollback state. @@ -248,7 +250,7 @@ func (txe *TxExecutor) SetRollback(dtid string, transactionID int64) error { txe.te.txPool.Rollback(txe.ctx, transactionID) } - conn, err := txe.te.txPool.LocalBegin(txe.ctx, &querypb.ExecuteOptions{}) + conn, _, err := txe.te.txPool.LocalBegin(txe.ctx, &querypb.ExecuteOptions{}) if err != nil { return err } @@ -259,7 +261,7 @@ func (txe *TxExecutor) SetRollback(dtid string, transactionID int64) error { return err } - err = txe.te.txPool.LocalCommit(txe.ctx, conn, txe.messager) + _, err = txe.te.txPool.LocalCommit(txe.ctx, conn, txe.messager) if err != nil { return err } @@ -275,7 +277,7 @@ func (txe *TxExecutor) ConcludeTransaction(dtid string) error { } defer tabletenv.QueryStats.Record("RESOLVE", time.Now()) - conn, err := txe.te.txPool.LocalBegin(txe.ctx, &querypb.ExecuteOptions{}) + conn, _, err := txe.te.txPool.LocalBegin(txe.ctx, &querypb.ExecuteOptions{}) if err != nil { return err } @@ -285,7 +287,8 @@ func (txe *TxExecutor) ConcludeTransaction(dtid string) error { if err != nil { return err } - return txe.te.txPool.LocalCommit(txe.ctx, conn, txe.messager) + _, err = txe.te.txPool.LocalCommit(txe.ctx, conn, txe.messager) + return err } // ReadTransaction returns the metadata for the sepcified dtid. diff --git a/go/vt/vttablet/tabletserver/tx_pool.go b/go/vt/vttablet/tabletserver/tx_pool.go index 4d766643792..5f068fba724 100644 --- a/go/vt/vttablet/tabletserver/tx_pool.go +++ b/go/vt/vttablet/tabletserver/tx_pool.go @@ -192,23 +192,26 @@ func (axp *TxPool) WaitForEmpty() { axp.activePool.WaitForEmpty() } -// Begin begins a transaction, and returns the associated transaction id. +// Begin begins a transaction, and returns the associated transaction id and +// the statements (if any) executed to initiate the transaction. In autocommit +// mode the statement will be "". +// // Subsequent statements can access the connection through the transaction id. -func (axp *TxPool) Begin(ctx context.Context, options *querypb.ExecuteOptions) (int64, error) { +func (axp *TxPool) Begin(ctx context.Context, options *querypb.ExecuteOptions) (int64, string, error) { var conn *connpool.DBConn var err error immediateCaller := callerid.ImmediateCallerIDFromContext(ctx) effectiveCaller := callerid.EffectiveCallerIDFromContext(ctx) if !axp.limiter.Get(immediateCaller, effectiveCaller) { - return 0, vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "per-user transaction pool connection limit exceeded") + return 0, "", vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "per-user transaction pool connection limit exceeded") } waiterCount := axp.waiters.Add(1) defer axp.waiters.Add(-1) if waiterCount > axp.waiterCap.Get() { - return 0, vterrors.New(vtrpcpb.Code_RESOURCE_EXHAUSTED, "transaction pool waiter count exceeded") + return 0, "", vterrors.New(vtrpcpb.Code_RESOURCE_EXHAUSTED, "transaction pool waiter count exceeded") } var beginSucceeded bool @@ -231,30 +234,33 @@ func (axp *TxPool) Begin(ctx context.Context, options *querypb.ExecuteOptions) ( if err != nil { switch err { case connpool.ErrConnPoolClosed: - return 0, err + return 0, "", err case pools.ErrTimeout: axp.LogActive() - return 0, vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "transaction pool connection limit exceeded") + return 0, "", vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "transaction pool connection limit exceeded") } - return 0, err + return 0, "", err } autocommitTransaction := false - + beginQueries := "" if queries, ok := txIsolations[options.GetTransactionIsolation()]; ok { if queries.setIsolationLevel != "" { if _, err := conn.Exec(ctx, "set transaction isolation level "+queries.setIsolationLevel, 1, false); err != nil { - return 0, err + return 0, "", err } + + beginQueries = queries.setIsolationLevel + "; " } if _, err := conn.Exec(ctx, queries.openTransaction, 1, false); err != nil { - return 0, err + return 0, "", err } + beginQueries = beginQueries + queries.openTransaction } else if options.GetTransactionIsolation() == querypb.ExecuteOptions_AUTOCOMMIT { autocommitTransaction = true } else { - return 0, fmt.Errorf("don't know how to open a transaction of this type: %v", options.GetTransactionIsolation()) + return 0, "", fmt.Errorf("don't know how to open a transaction of this type: %v", options.GetTransactionIsolation()) } beginSucceeded = true @@ -271,14 +277,14 @@ func (axp *TxPool) Begin(ctx context.Context, options *querypb.ExecuteOptions) ( ), options.GetWorkload() != querypb.ExecuteOptions_DBA, ) - return transactionID, nil + return transactionID, beginQueries, nil } // Commit commits the specified transaction. -func (axp *TxPool) Commit(ctx context.Context, transactionID int64, mc messageCommitter) error { +func (axp *TxPool) Commit(ctx context.Context, transactionID int64, mc messageCommitter) (string, error) { conn, err := axp.Get(transactionID, "for commit") if err != nil { - return err + return "", err } return axp.LocalCommit(ctx, conn, mc) } @@ -305,30 +311,31 @@ func (axp *TxPool) Get(transactionID int64, reason string) (*TxConnection, error // LocalBegin is equivalent to Begin->Get. // It's used for executing transactions within a request. It's safe // to always call LocalConclude at the end. -func (axp *TxPool) LocalBegin(ctx context.Context, options *querypb.ExecuteOptions) (*TxConnection, error) { - transactionID, err := axp.Begin(ctx, options) +func (axp *TxPool) LocalBegin(ctx context.Context, options *querypb.ExecuteOptions) (*TxConnection, string, error) { + transactionID, beginSQL, err := axp.Begin(ctx, options) if err != nil { - return nil, err + return nil, "", err } - return axp.Get(transactionID, "for local query") + conn, err := axp.Get(transactionID, "for local query") + return conn, beginSQL, err } // LocalCommit is the commit function for LocalBegin. -func (axp *TxPool) LocalCommit(ctx context.Context, conn *TxConnection, mc messageCommitter) error { +func (axp *TxPool) LocalCommit(ctx context.Context, conn *TxConnection, mc messageCommitter) (string, error) { defer conn.conclude(TxCommit, "transaction committed") defer mc.LockDB(conn.NewMessages, conn.ChangedMessages)() if conn.Autocommit { mc.UpdateCaches(conn.NewMessages, conn.ChangedMessages) - return nil + return "", nil } if _, err := conn.Exec(ctx, "commit", 1, false); err != nil { conn.Close() - return err + return "", err } mc.UpdateCaches(conn.NewMessages, conn.ChangedMessages) - return nil + return "commit", nil } // LocalConclude concludes a transaction started by LocalBegin. diff --git a/go/vt/vttablet/tabletserver/tx_pool_test.go b/go/vt/vttablet/tabletserver/tx_pool_test.go index d58b0357319..b1d54686b1d 100644 --- a/go/vt/vttablet/tabletserver/tx_pool_test.go +++ b/go/vt/vttablet/tabletserver/tx_pool_test.go @@ -40,6 +40,42 @@ import ( "vitess.io/vitess/go/vt/vttablet/tabletserver/messager" ) +func TestTxPoolExecuteCommit(t *testing.T) { + sql := "update test_column set x=1 where 1!=1" + db := fakesqldb.New(t) + defer db.Close() + db.AddQuery(sql, &sqltypes.Result{}) + db.AddQuery("begin", &sqltypes.Result{}) + db.AddQuery("commit", &sqltypes.Result{}) + + txPool := newTxPool() + txPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams()) + defer txPool.Close() + ctx := context.Background() + transactionID, beginSQL, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) + if err != nil { + t.Fatal(err) + } + if beginSQL != "begin" { + t.Errorf("beginSQL got %q want 'begin'", beginSQL) + } + txConn, err := txPool.Get(transactionID, "for query") + if err != nil { + t.Fatal(err) + } + txConn.RecordQuery(sql) + _, err = txConn.Exec(ctx, sql, 1, true) + txConn.Recycle() + + commitSQL, err := txPool.Commit(ctx, transactionID, &fakeMessageCommitter{}) + if err != nil { + t.Fatal(err) + } + if commitSQL != "commit" { + t.Errorf("commitSQL got %q want 'commit'", commitSQL) + } +} + func TestTxPoolExecuteRollback(t *testing.T) { sql := "alter table test_table add test_column int" db := fakesqldb.New(t) @@ -52,10 +88,13 @@ func TestTxPoolExecuteRollback(t *testing.T) { txPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams()) defer txPool.Close() ctx := context.Background() - transactionID, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) + transactionID, beginSQL, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) if err != nil { t.Fatal(err) } + if beginSQL != "begin" { + t.Errorf("beginSQL got %q want 'begin'", beginSQL) + } txConn, err := txPool.Get(transactionID, "for query") if err != nil { t.Fatal(err) @@ -79,11 +118,11 @@ func TestTxPoolRollbackNonBusy(t *testing.T) { txPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams()) defer txPool.Close() ctx := context.Background() - txid1, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) + txid1, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) if err != nil { t.Fatal(err) } - _, err = txPool.Begin(ctx, &querypb.ExecuteOptions{}) + _, _, err = txPool.Begin(ctx, &querypb.ExecuteOptions{}) if err != nil { t.Fatal(err) } @@ -173,7 +212,7 @@ func TestTxPoolTransactionKillerEnforceTimeoutEnabled(t *testing.T) { } func addQuery(ctx context.Context, sql string, txPool *TxPool, workload querypb.ExecuteOptions_Workload) (int64, error) { - transactionID, err := txPool.Begin(ctx, &querypb.ExecuteOptions{Workload: workload}) + transactionID, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{Workload: workload}) if err != nil { return 0, err } @@ -199,10 +238,13 @@ func TestTxPoolClientRowsFound(t *testing.T) { // Start a 'normal' transaction. It should take a connection // for the normal 'conns' pool. - id1, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) + id1, beginSQL, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) if err != nil { t.Fatal(err) } + if beginSQL != "begin" { + t.Errorf("beginSQL got %q want 'begin'", beginSQL) + } if got, want := txPool.conns.Available(), startNormalSize-1; got != want { t.Errorf("Normal pool size: %d, want %d", got, want) } @@ -212,10 +254,13 @@ func TestTxPoolClientRowsFound(t *testing.T) { // Start a 'foundRows' transaction. It should take a connection // from the foundRows pool. - id2, err := txPool.Begin(ctx, &querypb.ExecuteOptions{ClientFoundRows: true}) + id2, beginSQL, err := txPool.Begin(ctx, &querypb.ExecuteOptions{ClientFoundRows: true}) if err != nil { t.Fatal(err) } + if beginSQL != "begin" { + t.Errorf("beginSQL got %q want 'begin'", beginSQL) + } if got, want := txPool.conns.Available(), startNormalSize-1; got != want { t.Errorf("Normal pool size: %d, want %d", got, want) } @@ -253,16 +298,23 @@ func TestTxPoolTransactionIsolation(t *testing.T) { ctx := context.Background() // Start a transaction with default. It should not change isolation. - _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) + _, beginSQL, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) if err != nil { t.Fatal(err) } + if beginSQL != "begin" { + t.Errorf("beginSQL got %q want 'begin'", beginSQL) + } db.AddQuery("set transaction isolation level READ COMMITTED", &sqltypes.Result{}) - _, err = txPool.Begin(ctx, &querypb.ExecuteOptions{TransactionIsolation: querypb.ExecuteOptions_READ_COMMITTED}) + _, beginSQL, err = txPool.Begin(ctx, &querypb.ExecuteOptions{TransactionIsolation: querypb.ExecuteOptions_READ_COMMITTED}) if err != nil { t.Fatal(err) } + wantBeginSQL := "READ COMMITTED; begin" + if beginSQL != wantBeginSQL { + t.Errorf("beginSQL got %q want %q", beginSQL, wantBeginSQL) + } } func TestTxPoolAutocommit(t *testing.T) { @@ -276,14 +328,20 @@ func TestTxPoolAutocommit(t *testing.T) { // to mysql. // This test is meaningful because if txPool.Begin were to send a BEGIN statement to the connection, it will fatal // because is not in the list of expected queries (i.e db.AddQuery hasn't been called). - txid, err := txPool.Begin(ctx, &querypb.ExecuteOptions{TransactionIsolation: querypb.ExecuteOptions_AUTOCOMMIT}) + txid, beginSQL, err := txPool.Begin(ctx, &querypb.ExecuteOptions{TransactionIsolation: querypb.ExecuteOptions_AUTOCOMMIT}) if err != nil { t.Fatal(err) } - err = txPool.Commit(ctx, txid, &fakeMessageCommitter{}) + if beginSQL != "" { + t.Errorf("beginSQL got %q want ''", beginSQL) + } + commitSQL, err := txPool.Commit(ctx, txid, &fakeMessageCommitter{}) if err != nil { t.Fatal(err) } + if commitSQL != "" { + t.Errorf("commitSQL got %q want ''", commitSQL) + } } // TestTxPoolBeginWithPoolConnectionError_TransientErrno2006 tests the case @@ -305,7 +363,7 @@ func TestTxPoolBeginWithPoolConnectionError_Errno2006_Transient(t *testing.T) { } ctx := context.Background() - txConn, err := txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) + txConn, _, err := txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) if err != nil { t.Fatalf("Begin should have succeeded after the retry in DBConn.Exec(): %v", err) } @@ -336,7 +394,7 @@ func TestTxPoolBeginWithPoolConnectionError_Errno2006_Permanent(t *testing.T) { // After that, vttablet will automatically try to reconnect and this fail. // DBConn.Exec() will return the reconnect error as final error and not the // initial connection error. - _, err = txPool.LocalBegin(context.Background(), &querypb.ExecuteOptions{}) + _, _, err = txPool.LocalBegin(context.Background(), &querypb.ExecuteOptions{}) if err == nil || !strings.Contains(err.Error(), "(errno 2013)") { t.Fatalf("Begin did not return the reconnect error: %v", err) } @@ -362,7 +420,7 @@ func TestTxPoolBeginWithPoolConnectionError_Errno2013(t *testing.T) { db.EnableShouldClose() // 2013 is not retryable. DBConn.Exec() fails after the first attempt. - _, err = txPool.Begin(context.Background(), &querypb.ExecuteOptions{}) + _, _, err = txPool.Begin(context.Background(), &querypb.ExecuteOptions{}) if err == nil || !strings.Contains(err.Error(), "(errno 2013)") { t.Fatalf("Begin must return connection error with MySQL errno 2013: %v", err) } @@ -385,7 +443,7 @@ func primeTxPoolWithConnection(t *testing.T) (*fakesqldb.DB, *TxPool, error) { db.AddQuery("begin", &sqltypes.Result{}) db.AddQuery("rollback", &sqltypes.Result{}) ctx := context.Background() - txConn, err := txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) + txConn, _, err := txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) if err != nil { return nil, nil, err } @@ -402,7 +460,7 @@ func TestTxPoolBeginWithError(t *testing.T) { txPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams()) defer txPool.Close() ctx := context.Background() - _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) + _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) want := "error: rejected" if err == nil || !strings.Contains(err.Error(), want) { t.Errorf("Begin: %v, want %s", err, want) @@ -424,7 +482,7 @@ func TestTxPoolRollbackFail(t *testing.T) { txPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams()) defer txPool.Close() ctx := context.Background() - transactionID, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) + transactionID, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) if err != nil { t.Fatal(err) } @@ -467,7 +525,7 @@ func TestTxPoolGetConnRecentlyRemovedTransaction(t *testing.T) { db.AddQuery("rollback", &sqltypes.Result{}) txPool := newTxPool() txPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams()) - id, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) + id, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) txPool.Close() assertErrorMatch := func(id int64, reason string) { @@ -486,14 +544,14 @@ func TestTxPoolGetConnRecentlyRemovedTransaction(t *testing.T) { txPool = newTxPool() txPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams()) - id, err = txPool.Begin(ctx, &querypb.ExecuteOptions{}) - if err := txPool.Commit(ctx, id, &fakeMessageCommitter{}); err != nil { + id, _, err = txPool.Begin(ctx, &querypb.ExecuteOptions{}) + if _, err := txPool.Commit(ctx, id, &fakeMessageCommitter{}); err != nil { t.Fatalf("got error: %v", err) } assertErrorMatch(id, "transaction committed") - id, err = txPool.Begin(ctx, &querypb.ExecuteOptions{}) + id, _, err = txPool.Begin(ctx, &querypb.ExecuteOptions{}) if err := txPool.Rollback(ctx, id); err != nil { t.Fatalf("got error: %v", err) } @@ -506,13 +564,13 @@ func TestTxPoolGetConnRecentlyRemovedTransaction(t *testing.T) { txPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams()) defer txPool.Close() - id, err = txPool.Begin(ctx, &querypb.ExecuteOptions{}) + id, _, err = txPool.Begin(ctx, &querypb.ExecuteOptions{}) time.Sleep(5 * time.Millisecond) assertErrorMatch(id, "exceeded timeout: 1ms") txPool.SetTimeout(1 * time.Hour) - id, err = txPool.Begin(ctx, &querypb.ExecuteOptions{}) + id, _, err = txPool.Begin(ctx, &querypb.ExecuteOptions{}) txc, err := txPool.Get(id, "for close") if err != nil { t.Fatalf("got error: %v", err) @@ -545,7 +603,7 @@ func TestTxPoolExecFailDueToConnFail_Errno2006(t *testing.T) { ctx := context.Background() // Start the transaction. - txConn, err := txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) + txConn, _, err := txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) if err != nil { t.Fatal(err) } @@ -583,7 +641,7 @@ func TestTxPoolExecFailDueToConnFail_Errno2013(t *testing.T) { ctx := context.Background() // Start the transaction. - txConn, err := txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) + txConn, _, err := txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) if err != nil { t.Fatal(err) } @@ -615,7 +673,7 @@ func TestTxPoolCloseKillsStrayTransactions(t *testing.T) { txPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams()) // Start stray transaction. - _, err := txPool.Begin(context.Background(), &querypb.ExecuteOptions{}) + _, _, err := txPool.Begin(context.Background(), &querypb.ExecuteOptions{}) if err != nil { t.Fatal(err) }