From e67801015ae417e69c96d5c7c8ed68395aa0e922 Mon Sep 17 00:00:00 2001 From: Richard Date: Wed, 5 Jun 2019 10:24:32 -0700 Subject: [PATCH] Cherry pick changes into our build (#133) * Adds timeouts for all kinds of statements. Power to the clients! Signed-off-by: Rafael Chacon * Cherry-pick: suppress begin...commit in autocommit logs Original Message: The previous (untested) implementation turned out to be in the wrong place in the TabletServer execution tier and did not properly log the actual statements being executed. Implement this the right way by returning the statements that were really executed out from the TxPool, then using those to determine whether or not to log the statement. Original PR: vitessio#4827 * Cherry pick: add optional tag-based filtering for query logging Original PR: vitessio#4895 * bootstrap.sh: Remove unused unused command. The deprecated `unused` command has been removed from the source repository, so this now fails. We no longer use `unused` so we can just drop the line that tries to install it. Signed-off-by: Anthony Yeh --- bootstrap.sh | 1 - go/streamlog/streamlog.go | 13 +++ go/vt/vtgate/engine/delete.go | 11 ++ go/vt/vtgate/engine/insert.go | 11 ++ go/vt/vtgate/engine/update.go | 11 ++ go/vt/vtgate/logstats.go | 4 + go/vt/vtgate/logstats_test.go | 29 +++++ go/vt/vtgate/planbuilder/delete.go | 2 + go/vt/vtgate/planbuilder/insert.go | 2 + go/vt/vtgate/planbuilder/route.go | 18 +++ go/vt/vtgate/planbuilder/select.go | 18 --- .../vtgate/planbuilder/testdata/dml_cases.txt | 59 ++++++++++ go/vt/vtgate/planbuilder/update.go | 2 + go/vt/vtgate/querylogz_test.go | 12 ++ go/vt/vttablet/tabletserver/query_executor.go | 13 ++- go/vt/vttablet/tabletserver/querylogz_test.go | 11 ++ .../tabletserver/tabletenv/logstats.go | 4 + .../tabletserver/tabletenv/logstats_test.go | 35 ++++++ go/vt/vttablet/tabletserver/tabletserver.go | 50 ++++++-- go/vt/vttablet/tabletserver/tx_engine.go | 14 ++- go/vt/vttablet/tabletserver/tx_engine_test.go | 15 ++- go/vt/vttablet/tabletserver/tx_executor.go | 31 ++--- go/vt/vttablet/tabletserver/tx_pool.go | 51 +++++---- go/vt/vttablet/tabletserver/tx_pool_test.go | 108 ++++++++++++++---- 24 files changed, 419 insertions(+), 106 deletions(-) diff --git a/bootstrap.sh b/bootstrap.sh index 3c58a603efc..5a75cb4115e 100755 --- a/bootstrap.sh +++ b/bootstrap.sh @@ -290,7 +290,6 @@ gotools=" \ golang.org/x/tools/cmd/cover \ golang.org/x/tools/cmd/goimports \ golang.org/x/tools/cmd/goyacc \ - honnef.co/go/tools/cmd/unused \ " echo "Installing dev tools with 'go get'..." # shellcheck disable=SC2086 diff --git a/go/streamlog/streamlog.go b/go/streamlog/streamlog.go index 6b9559694be..8717898df8b 100644 --- a/go/streamlog/streamlog.go +++ b/go/streamlog/streamlog.go @@ -25,6 +25,7 @@ import ( "net/url" "os" "os/signal" + "strings" "sync" "syscall" @@ -40,6 +41,9 @@ var ( // QueryLogFormat controls the format of the query log (either text or json) QueryLogFormat = flag.String("querylog-format", "text", "format for query logs (\"text\" or \"json\")") + // QueryLogFilterTag contains an optional string that must be present in the query for it to be logged + QueryLogFilterTag = flag.String("querylog-filter-tag", "", "string that must be present in the query for it to be logged") + sendCount = stats.NewCountersWithSingleLabel("StreamlogSend", "stream log send count", "logger_names") deliveredCount = stats.NewCountersWithMultiLabels( "StreamlogDelivered", @@ -201,3 +205,12 @@ func GetFormatter(logger *StreamLogger) LogFormatter { return fmter.Logf(w, params) } } + +// ShouldEmitLog returns whether the log with the given SQL query +// should be emitted or filtered +func ShouldEmitLog(sql string) bool { + if *QueryLogFilterTag == "" { + return true + } + return strings.Contains(sql, *QueryLogFilterTag) +} diff --git a/go/vt/vtgate/engine/delete.go b/go/vt/vtgate/engine/delete.go index 4930d0933fc..53041050d4e 100644 --- a/go/vt/vtgate/engine/delete.go +++ b/go/vt/vtgate/engine/delete.go @@ -19,6 +19,7 @@ package engine import ( "encoding/json" "fmt" + "time" "vitess.io/vitess/go/jsonutil" "vitess.io/vitess/go/sqltypes" @@ -63,6 +64,9 @@ type Delete struct { // Option to override the standard behavior and allow a multi-shard delete // to use single round trip autocommit. MultiShardAutocommit bool + + // QueryTimeout contains the optional timeout (in milliseconds) to apply to this query + QueryTimeout int } // MarshalJSON serializes the Delete into a JSON representation. @@ -84,6 +88,7 @@ func (del *Delete) MarshalJSON() ([]byte, error) { Table string `json:",omitempty"` OwnedVindexQuery string `json:",omitempty"` MultiShardAutocommit bool `json:",omitempty"` + QueryTimeout int `json:",omitempty"` }{ Opcode: del.Opcode, Keyspace: del.Keyspace, @@ -93,6 +98,7 @@ func (del *Delete) MarshalJSON() ([]byte, error) { Table: tname, OwnedVindexQuery: del.OwnedVindexQuery, MultiShardAutocommit: del.MultiShardAutocommit, + QueryTimeout: del.QueryTimeout, } return jsonutil.MarshalNoEscape(marshalDelete) } @@ -141,6 +147,11 @@ func (del *Delete) RouteType() string { // Execute performs a non-streaming exec. func (del *Delete) Execute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) { + if del.QueryTimeout != 0 { + cancel := vcursor.SetContextTimeout(time.Duration(del.QueryTimeout) * time.Millisecond) + defer cancel() + } + switch del.Opcode { case DeleteUnsharded: return del.execDeleteUnsharded(vcursor, bindVars) diff --git a/go/vt/vtgate/engine/insert.go b/go/vt/vtgate/engine/insert.go index a1ead406736..a91a8a5ccdf 100644 --- a/go/vt/vtgate/engine/insert.go +++ b/go/vt/vtgate/engine/insert.go @@ -21,6 +21,7 @@ import ( "fmt" "strconv" "strings" + "time" "vitess.io/vitess/go/jsonutil" "vitess.io/vitess/go/sqltypes" @@ -76,6 +77,9 @@ type Insert struct { // However some application use cases would prefer that the statement partially // succeed in order to get the performance benefits of autocommit. MultiShardAutocommit bool + + // QueryTimeout contains the optional timeout (in milliseconds) to apply to this query + QueryTimeout int } // NewQueryInsert creates an Insert with a query string. @@ -127,6 +131,7 @@ func (ins *Insert) MarshalJSON() ([]byte, error) { Mid []string `json:",omitempty"` Suffix string `json:",omitempty"` MultiShardAutocommit bool `json:",omitempty"` + QueryTimeout int `json:",omitempty"` }{ Opcode: ins.Opcode, Keyspace: ins.Keyspace, @@ -138,6 +143,7 @@ func (ins *Insert) MarshalJSON() ([]byte, error) { Mid: ins.Mid, Suffix: ins.Suffix, MultiShardAutocommit: ins.MultiShardAutocommit, + QueryTimeout: ins.QueryTimeout, } return jsonutil.MarshalNoEscape(marshalInsert) } @@ -191,6 +197,11 @@ func (ins *Insert) RouteType() string { // Execute performs a non-streaming exec. func (ins *Insert) Execute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) { + if ins.QueryTimeout != 0 { + cancel := vcursor.SetContextTimeout(time.Duration(ins.QueryTimeout) * time.Millisecond) + defer cancel() + } + switch ins.Opcode { case InsertUnsharded: return ins.execInsertUnsharded(vcursor, bindVars) diff --git a/go/vt/vtgate/engine/update.go b/go/vt/vtgate/engine/update.go index 83a48c32f31..81857997947 100644 --- a/go/vt/vtgate/engine/update.go +++ b/go/vt/vtgate/engine/update.go @@ -19,6 +19,7 @@ package engine import ( "encoding/json" "fmt" + "time" "vitess.io/vitess/go/jsonutil" "vitess.io/vitess/go/sqltypes" @@ -66,6 +67,9 @@ type Update struct { // Option to override the standard behavior and allow a multi-shard update // to use single round trip autocommit. MultiShardAutocommit bool + + // QueryTimeout contains the optional timeout (in milliseconds) to apply to this query + QueryTimeout int } // MarshalJSON serializes the Update into a JSON representation. @@ -88,6 +92,7 @@ func (upd *Update) MarshalJSON() ([]byte, error) { Table string `json:",omitempty"` OwnedVindexQuery string `json:",omitempty"` MultiShardAutocommit bool `json:",omitempty"` + QueryTimeout int `json:",omitempty"` }{ Opcode: upd.Opcode, Keyspace: upd.Keyspace, @@ -98,6 +103,7 @@ func (upd *Update) MarshalJSON() ([]byte, error) { Table: tname, OwnedVindexQuery: upd.OwnedVindexQuery, MultiShardAutocommit: upd.MultiShardAutocommit, + QueryTimeout: upd.QueryTimeout, } return jsonutil.MarshalNoEscape(marshalUpdate) } @@ -145,6 +151,11 @@ func (upd *Update) RouteType() string { // Execute performs a non-streaming exec. func (upd *Update) Execute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) { + if upd.QueryTimeout != 0 { + cancel := vcursor.SetContextTimeout(time.Duration(upd.QueryTimeout) * time.Millisecond) + defer cancel() + } + switch upd.Opcode { case UpdateUnsharded: return upd.execUpdateUnsharded(vcursor, bindVars) diff --git a/go/vt/vtgate/logstats.go b/go/vt/vtgate/logstats.go index 84042878be8..02b4d46b5b0 100644 --- a/go/vt/vtgate/logstats.go +++ b/go/vt/vtgate/logstats.go @@ -121,6 +121,10 @@ func (stats *LogStats) RemoteAddrUsername() (string, string) { // Logf formats the log record to the given writer, either as // tab-separated list of logged fields or as JSON. func (stats *LogStats) Logf(w io.Writer, params url.Values) error { + if !streamlog.ShouldEmitLog(stats.SQL) { + return nil + } + formattedBindVars := "\"[REDACTED]\"" if !*streamlog.RedactDebugUIQueries { _, fullBindParams := params["full"] diff --git a/go/vt/vtgate/logstats_test.go b/go/vt/vtgate/logstats_test.go index 01f5430e6d8..4720d5c7cfd 100644 --- a/go/vt/vtgate/logstats_test.go +++ b/go/vt/vtgate/logstats_test.go @@ -126,6 +126,35 @@ func TestLogStatsFormat(t *testing.T) { *streamlog.QueryLogFormat = "text" } +func TestLogStatsFilter(t *testing.T) { + defer func() { *streamlog.QueryLogFilterTag = "" }() + + logStats := NewLogStats(context.Background(), "test", "sql1 /* LOG_THIS_QUERY */", map[string]*querypb.BindVariable{"intVal": sqltypes.Int64BindVariable(1)}) + logStats.StartTime = time.Date(2017, time.January, 1, 1, 2, 3, 0, time.UTC) + logStats.EndTime = time.Date(2017, time.January, 1, 1, 2, 4, 1234, time.UTC) + params := map[string][]string{"full": {}} + + got := testFormat(logStats, url.Values(params)) + want := "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t0.000000\t0.000000\t0.000000\t\t\"sql1 /* LOG_THIS_QUERY */\"\tmap[intVal:type:INT64 value:\"1\" ]\t0\t0\t\"\"\t\n" + if got != want { + t.Errorf("logstats format: got:\n%q\nwant:\n%q\n", got, want) + } + + *streamlog.QueryLogFilterTag = "LOG_THIS_QUERY" + got = testFormat(logStats, url.Values(params)) + want = "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t0.000000\t0.000000\t0.000000\t\t\"sql1 /* LOG_THIS_QUERY */\"\tmap[intVal:type:INT64 value:\"1\" ]\t0\t0\t\"\"\t\n" + if got != want { + t.Errorf("logstats format: got:\n%q\nwant:\n%q\n", got, want) + } + + *streamlog.QueryLogFilterTag = "NOT_THIS_QUERY" + got = testFormat(logStats, url.Values(params)) + want = "" + if got != want { + t.Errorf("logstats format: got:\n%q\nwant:\n%q\n", got, want) + } +} + func TestLogStatsContextHTML(t *testing.T) { html := "HtmlContext" callInfo := &fakecallinfo.FakeCallInfo{ diff --git a/go/vt/vtgate/planbuilder/delete.go b/go/vt/vtgate/planbuilder/delete.go index 0d20ae4556c..b0c4b46ecc8 100644 --- a/go/vt/vtgate/planbuilder/delete.go +++ b/go/vt/vtgate/planbuilder/delete.go @@ -71,6 +71,8 @@ func buildDeletePlan(del *sqlparser.Delete, vschema ContextVSchema) (*engine.Del edel.MultiShardAutocommit = true } + edel.QueryTimeout = queryTimeout(directives) + if rb.ERoute.TargetDestination != nil { if rb.ERoute.TargetTabletType != topodatapb.TabletType_MASTER { return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unsupported: DELETE statement with a replica target") diff --git a/go/vt/vtgate/planbuilder/insert.go b/go/vt/vtgate/planbuilder/insert.go index 7bacfac0aee..bc85b18fed0 100644 --- a/go/vt/vtgate/planbuilder/insert.go +++ b/go/vt/vtgate/planbuilder/insert.go @@ -123,6 +123,8 @@ func buildInsertShardedPlan(ins *sqlparser.Insert, table *vindexes.Table) (*engi eins.MultiShardAutocommit = true } + eins.QueryTimeout = queryTimeout(directives) + var rows sqlparser.Values switch insertValues := ins.Rows.(type) { case *sqlparser.Select, *sqlparser.Union: diff --git a/go/vt/vtgate/planbuilder/route.go b/go/vt/vtgate/planbuilder/route.go index 74084b8d028..321abb74591 100644 --- a/go/vt/vtgate/planbuilder/route.go +++ b/go/vt/vtgate/planbuilder/route.go @@ -680,3 +680,21 @@ func (rb *route) SetOpcode(code engine.RouteOpcode) error { rb.ERoute.Opcode = code return nil } + +// queryTimeout returns DirectiveQueryTimeout value if set, otherwise returns 0. +func queryTimeout(d sqlparser.CommentDirectives) int { + if d == nil { + return 0 + } + + val, ok := d[sqlparser.DirectiveQueryTimeout] + if !ok { + return 0 + } + + intVal, ok := val.(int) + if ok { + return intVal + } + return 0 +} diff --git a/go/vt/vtgate/planbuilder/select.go b/go/vt/vtgate/planbuilder/select.go index 8814f328b1b..50da199cd21 100644 --- a/go/vt/vtgate/planbuilder/select.go +++ b/go/vt/vtgate/planbuilder/select.go @@ -321,21 +321,3 @@ func (pb *primitiveBuilder) expandStar(inrcs []*resultColumn, expr *sqlparser.St } return inrcs, true, nil } - -// queryTimeout returns DirectiveQueryTimeout value if set, otherwise returns 0. -func queryTimeout(d sqlparser.CommentDirectives) int { - if d == nil { - return 0 - } - - val, ok := d[sqlparser.DirectiveQueryTimeout] - if !ok { - return 0 - } - - intVal, ok := val.(int) - if ok { - return intVal - } - return 0 -} diff --git a/go/vt/vtgate/planbuilder/testdata/dml_cases.txt b/go/vt/vtgate/planbuilder/testdata/dml_cases.txt index 8de0a71d9c2..a90b6c6d480 100644 --- a/go/vt/vtgate/planbuilder/testdata/dml_cases.txt +++ b/go/vt/vtgate/planbuilder/testdata/dml_cases.txt @@ -988,6 +988,33 @@ } } +# insert with query timeout +"insert /*vt+ QUERY_TIMEOUT_MS=1 */ into user(id) values (1), (2)" +{ + "Original": "insert /*vt+ QUERY_TIMEOUT_MS=1 */ into user(id) values (1), (2)", + "Instructions": { + "Opcode": "InsertSharded", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "Query": "insert /*vt+ QUERY_TIMEOUT_MS=1 */ into user(id, Name, Costly) values (:_Id0, :_Name0, :_Costly0), (:_Id1, :_Name1, :_Costly1)", + "Values": [[[":__seq0",":__seq1"]],[[null,null]],[[null,null]]], + "Table": "user", + "Generate": { + "Keyspace": { + "Name": "main", + "Sharded": false + }, + "Query": "select next :n values from seq", + "Values": [1,2] + }, + "Prefix": "insert /*vt+ QUERY_TIMEOUT_MS=1 */ into user(id, Name, Costly) values ", + "Mid": ["(:_Id0, :_Name0, :_Costly0)","(:_Id1, :_Name1, :_Costly1)"], + "QueryTimeout": 1 + } +} + # insert with multiple rows - multi-shard autocommit "insert /*vt+ MULTI_SHARD_AUTOCOMMIT=1 */ into user(id) values (1), (2)" { @@ -1337,6 +1364,22 @@ } } +# update with no primary vindex on where clause (scatter update) - query timeout +"update /*vt+ QUERY_TIMEOUT_MS=1 */ user_extra set val = 1" +{ + "Original": "update /*vt+ QUERY_TIMEOUT_MS=1 */ user_extra set val = 1", + "Instructions": { + "Opcode": "UpdateScatter", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "Query": "update /*vt+ QUERY_TIMEOUT_MS=1 */ user_extra set val = 1", + "Table": "user_extra", + "QueryTimeout": 1 + } +} + # update with non-comparison expr "update user_extra set val = 1 where id between 1 and 2" { @@ -1489,6 +1532,22 @@ } } +# delete from with no index match - query timeout +"delete /*vt+ QUERY_TIMEOUT_MS=1 */ from user_extra where name = 'jose'" +{ + "Original": "delete /*vt+ QUERY_TIMEOUT_MS=1 */ from user_extra where name = 'jose'", + "Instructions": { + "Opcode": "DeleteScatter", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "Query": "delete /*vt+ QUERY_TIMEOUT_MS=1 */ from user_extra where name = 'jose'", + "Table": "user_extra", + "QueryTimeout": 1 + } +} + # delete from with primary id in through IN clause "delete from user_extra where user_id in (1, 2)" { diff --git a/go/vt/vtgate/planbuilder/update.go b/go/vt/vtgate/planbuilder/update.go index 85a9c174307..f4c2c6d7fe5 100644 --- a/go/vt/vtgate/planbuilder/update.go +++ b/go/vt/vtgate/planbuilder/update.go @@ -65,6 +65,8 @@ func buildUpdatePlan(upd *sqlparser.Update, vschema ContextVSchema) (*engine.Upd eupd.MultiShardAutocommit = true } + eupd.QueryTimeout = queryTimeout(directives) + var vindexTable *vindexes.Table for _, tval := range pb.st.tables { vindexTable = tval.vindexTable diff --git a/go/vt/vtgate/querylogz_test.go b/go/vt/vtgate/querylogz_test.go index 1f0ede98bd6..65abb9b8642 100644 --- a/go/vt/vtgate/querylogz_test.go +++ b/go/vt/vtgate/querylogz_test.go @@ -26,6 +26,7 @@ import ( "time" "golang.org/x/net/context" + "vitess.io/vitess/go/streamlog" "vitess.io/vitess/go/vt/callerid" ) @@ -142,6 +143,17 @@ func TestQuerylogzHandlerFormatting(t *testing.T) { close(ch) body, _ = ioutil.ReadAll(response.Body) checkQuerylogzHasStats(t, slowQueryPattern, logStats, body) + + // ensure querylogz is not affected by the filter tag + *streamlog.QueryLogFilterTag = "XXX_SKIP_ME" + defer func() { *streamlog.QueryLogFilterTag = "" }() + ch = make(chan interface{}, 1) + ch <- logStats + querylogzHandler(ch, response, req) + close(ch) + body, _ = ioutil.ReadAll(response.Body) + checkQuerylogzHasStats(t, slowQueryPattern, logStats, body) + } func checkQuerylogzHasStats(t *testing.T, pattern []string, logStats *LogStats, page []byte) { diff --git a/go/vt/vttablet/tabletserver/query_executor.go b/go/vt/vttablet/tabletserver/query_executor.go index 69d0079d90f..730f4b0963e 100644 --- a/go/vt/vttablet/tabletserver/query_executor.go +++ b/go/vt/vttablet/tabletserver/query_executor.go @@ -296,11 +296,14 @@ func (qre *QueryExecutor) execDmlAutoCommit() (reply *sqltypes.Result, err error } func (qre *QueryExecutor) execAsTransaction(f func(conn *TxConnection) (*sqltypes.Result, error)) (reply *sqltypes.Result, err error) { - conn, err := qre.tsv.te.txPool.LocalBegin(qre.ctx, qre.options) + conn, beginSQL, err := qre.tsv.te.txPool.LocalBegin(qre.ctx, qre.options) if err != nil { return nil, err } defer qre.tsv.te.txPool.LocalConclude(qre.ctx, conn) + if beginSQL != "" { + qre.logStats.AddRewrittenSQL(beginSQL, time.Now()) + } qre.logStats.AddRewrittenSQL("begin", time.Now()) reply, err = f(conn) @@ -311,8 +314,12 @@ func (qre *QueryExecutor) execAsTransaction(f func(conn *TxConnection) (*sqltype qre.logStats.AddRewrittenSQL("rollback", start) return nil, err } - err = qre.tsv.te.txPool.LocalCommit(qre.ctx, conn, qre.tsv.messager) - qre.logStats.AddRewrittenSQL("commit", start) + + commitSQL, err := qre.tsv.te.txPool.LocalCommit(qre.ctx, conn, qre.tsv.messager) + // As above LocalCommit is a no-op for autocommmit so don't log anything. + if commitSQL != "" { + qre.logStats.AddRewrittenSQL("commit", start) + } if err != nil { return nil, err } diff --git a/go/vt/vttablet/tabletserver/querylogz_test.go b/go/vt/vttablet/tabletserver/querylogz_test.go index 199a4c969b7..d94f355eaa3 100644 --- a/go/vt/vttablet/tabletserver/querylogz_test.go +++ b/go/vt/vttablet/tabletserver/querylogz_test.go @@ -26,6 +26,7 @@ import ( "time" "golang.org/x/net/context" + "vitess.io/vitess/go/streamlog" "vitess.io/vitess/go/vt/callerid" "vitess.io/vitess/go/vt/vttablet/tabletserver/planbuilder" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" @@ -148,6 +149,16 @@ func TestQuerylogzHandler(t *testing.T) { close(ch) body, _ = ioutil.ReadAll(response.Body) checkQuerylogzHasStats(t, slowQueryPattern, logStats, body) + + // ensure querylogz is not affected by the filter tag + *streamlog.QueryLogFilterTag = "XXX_SKIP_ME" + defer func() { *streamlog.QueryLogFilterTag = "" }() + ch = make(chan interface{}, 1) + ch <- logStats + querylogzHandler(ch, response, req) + close(ch) + body, _ = ioutil.ReadAll(response.Body) + checkQuerylogzHasStats(t, slowQueryPattern, logStats, body) } func checkQuerylogzHasStats(t *testing.T, pattern []string, logStats *tabletenv.LogStats, page []byte) { diff --git a/go/vt/vttablet/tabletserver/tabletenv/logstats.go b/go/vt/vttablet/tabletserver/tabletenv/logstats.go index 6e8323400c3..fdf6d99d352 100644 --- a/go/vt/vttablet/tabletserver/tabletenv/logstats.go +++ b/go/vt/vttablet/tabletserver/tabletenv/logstats.go @@ -180,6 +180,10 @@ func (stats *LogStats) CallInfo() (string, string) { // Logf formats the log record to the given writer, either as // tab-separated list of logged fields or as JSON. func (stats *LogStats) Logf(w io.Writer, params url.Values) error { + if !streamlog.ShouldEmitLog(stats.OriginalSQL) { + return nil + } + rewrittenSQL := "[REDACTED]" formattedBindVars := "\"[REDACTED]\"" diff --git a/go/vt/vttablet/tabletserver/tabletenv/logstats_test.go b/go/vt/vttablet/tabletserver/tabletenv/logstats_test.go index 2bd62184a0d..9f5dbe05047 100644 --- a/go/vt/vttablet/tabletserver/tabletenv/logstats_test.go +++ b/go/vt/vttablet/tabletserver/tabletenv/logstats_test.go @@ -149,6 +149,41 @@ func TestLogStatsFormat(t *testing.T) { *streamlog.QueryLogFormat = "text" } +func TestLogStatsFilter(t *testing.T) { + defer func() { *streamlog.QueryLogFilterTag = "" }() + + logStats := NewLogStats(context.Background(), "test") + logStats.StartTime = time.Date(2017, time.January, 1, 1, 2, 3, 0, time.UTC) + logStats.EndTime = time.Date(2017, time.January, 1, 1, 2, 4, 1234, time.UTC) + logStats.OriginalSQL = "sql /* LOG_THIS_QUERY */" + logStats.BindVariables = map[string]*querypb.BindVariable{"intVal": sqltypes.Int64BindVariable(1)} + logStats.AddRewrittenSQL("sql with pii", time.Now()) + logStats.MysqlResponseTime = 0 + logStats.Rows = [][]sqltypes.Value{{sqltypes.NewVarBinary("a")}} + params := map[string][]string{"full": {}} + + got := testFormat(logStats, url.Values(params)) + want := "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t\t\"sql /* LOG_THIS_QUERY */\"\tmap[intVal:type:INT64 value:\"1\" ]\t1\t\"sql with pii\"\tmysql\t0.000000\t0.000000\t0\t1\t\"\"\t\n" + if got != want { + t.Errorf("logstats format: got:\n%q\nwant:\n%q\n", got, want) + } + + *streamlog.QueryLogFilterTag = "LOG_THIS_QUERY" + got = testFormat(logStats, url.Values(params)) + want = "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t\t\"sql /* LOG_THIS_QUERY */\"\tmap[intVal:type:INT64 value:\"1\" ]\t1\t\"sql with pii\"\tmysql\t0.000000\t0.000000\t0\t1\t\"\"\t\n" + if got != want { + t.Errorf("logstats format: got:\n%q\nwant:\n%q\n", got, want) + } + + *streamlog.QueryLogFilterTag = "NOT_THIS_QUERY" + got = testFormat(logStats, url.Values(params)) + want = "" + if got != want { + t.Errorf("logstats format: got:\n%q\nwant:\n%q\n", got, want) + } + +} + func TestLogStatsFormatQuerySources(t *testing.T) { logStats := NewLogStats(context.Background(), "test") if logStats.FmtQuerySources() != "none" { diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index 003b27708a0..8866b6e456f 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -237,12 +237,15 @@ type TxPoolController interface { // StopGently will change the state to NotServing but first wait for transactions to wrap up StopGently() - // Begin begins a transaction, and returns the associated transaction id. + // Begin begins a transaction, and returns the associated transaction id and the + // statement(s) used to execute the begin (if any). + // // Subsequent statements can access the connection through the transaction id. - Begin(ctx context.Context, options *querypb.ExecuteOptions) (int64, error) + Begin(ctx context.Context, options *querypb.ExecuteOptions) (int64, string, error) - // Commit commits the specified transaction. - Commit(ctx context.Context, transactionID int64, mc messageCommitter) error + // Commit commits the specified transaction, returning the statement used to execute + // the commit or "" in autocommit settings. + Commit(ctx context.Context, transactionID int64, mc messageCommitter) (string, error) // Rollback rolls back the specified transaction. Rollback(ctx context.Context, transactionID int64) error @@ -773,13 +776,25 @@ func (tsv *TabletServer) Begin(ctx context.Context, target *querypb.Target, opti "Begin", "begin", nil, target, options, true /* isBegin */, false, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { - defer tabletenv.QueryStats.Record("BEGIN", time.Now()) + startTime := time.Now() if tsv.txThrottler.Throttle() { // TODO(erez): I think this should be RESOURCE_EXHAUSTED. return vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "Transaction throttled") } - transactionID, err = tsv.teCtrl.Begin(ctx, options) + var beginSQL string + transactionID, beginSQL, err = tsv.teCtrl.Begin(ctx, options) logStats.TransactionID = transactionID + + // Record the actual statements that were executed in the logStats. + // If nothing was actually executed, don't count the operation in + // the tablet metrics, and clear out the logStats Method so that + // handlePanicAndSendLogStats doesn't log the no-op. + logStats.OriginalSQL = beginSQL + if beginSQL != "" { + tabletenv.QueryStats.Record("BEGIN", startTime) + } else { + logStats.Method = "" + } return err }, ) @@ -793,9 +808,21 @@ func (tsv *TabletServer) Commit(ctx context.Context, target *querypb.Target, tra "Commit", "commit", nil, target, nil, false /* isBegin */, true, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { - defer tabletenv.QueryStats.Record("COMMIT", time.Now()) + startTime := time.Now() logStats.TransactionID = transactionID - return tsv.teCtrl.Commit(ctx, transactionID, tsv.messager) + + var commitSQL string + commitSQL, err = tsv.teCtrl.Commit(ctx, transactionID, tsv.messager) + + // If nothing was actually executed, don't count the operation in + // the tablet metrics, and clear out the logStats Method so that + // handlePanicAndSendLogStats doesn't log the no-op. + if commitSQL != "" { + tabletenv.QueryStats.Record("COMMIT", startTime) + } else { + logStats.Method = "" + } + return err }, ) } @@ -1471,6 +1498,7 @@ func (tsv *TabletServer) handlePanicAndSendLogStats( // Examples where we don't send the log stats: // - ExecuteBatch() (logStats == nil) // - beginWaitForSameRangeTransactions() (Method == "") + // - Begin / Commit in autocommit mode if logStats != nil && logStats.Method != "" { logStats.Send() } @@ -1860,9 +1888,9 @@ func (tsv *TabletServer) BroadcastHealth(terTimestamp int64, stats *querypb.Real target := tsv.target tsv.mu.Unlock() shr := &querypb.StreamHealthResponse{ - Target: &target, - TabletAlias: &tsv.alias, - Serving: tsv.IsServing(), + Target: &target, + TabletAlias: &tsv.alias, + Serving: tsv.IsServing(), TabletExternallyReparentedTimestamp: terTimestamp, RealtimeStats: stats, } diff --git a/go/vt/vttablet/tabletserver/tx_engine.go b/go/vt/vttablet/tabletserver/tx_engine.go index cb928f9a934..9db9a56ef5e 100644 --- a/go/vt/vttablet/tabletserver/tx_engine.go +++ b/go/vt/vttablet/tabletserver/tx_engine.go @@ -264,22 +264,24 @@ func (te *TxEngine) AcceptReadOnly() error { } } -// Begin begins a transaction, and returns the associated transaction id. +// Begin begins a transaction, and returns the associated transaction id and the +// statement(s) used to execute the begin (if any). +// // Subsequent statements can access the connection through the transaction id. -func (te *TxEngine) Begin(ctx context.Context, options *querypb.ExecuteOptions) (int64, error) { +func (te *TxEngine) Begin(ctx context.Context, options *querypb.ExecuteOptions) (int64, string, error) { te.stateLock.Lock() canOpenTransactions := te.state == AcceptingReadOnly || te.state == AcceptingReadAndWrite if !canOpenTransactions { // We are not in a state where we can start new transactions. Abort. te.stateLock.Unlock() - return 0, vterrors.Errorf(vtrpc.Code_UNAVAILABLE, "tx engine can't accept new transactions in state %v", te.state) + return 0, "", vterrors.Errorf(vtrpc.Code_UNAVAILABLE, "tx engine can't accept new transactions in state %v", te.state) } isWriteTransaction := options == nil || options.TransactionIsolation != querypb.ExecuteOptions_CONSISTENT_SNAPSHOT_READ_ONLY if te.state == AcceptingReadOnly && isWriteTransaction { te.stateLock.Unlock() - return 0, vterrors.Errorf(vtrpc.Code_UNAVAILABLE, "tx engine can only accept read-only transactions in current state") + return 0, "", vterrors.Errorf(vtrpc.Code_UNAVAILABLE, "tx engine can only accept read-only transactions in current state") } // By Add() to beginRequests, we block others from initiating state @@ -292,7 +294,7 @@ func (te *TxEngine) Begin(ctx context.Context, options *querypb.ExecuteOptions) } // Commit commits the specified transaction. -func (te *TxEngine) Commit(ctx context.Context, transactionID int64, mc messageCommitter) error { +func (te *TxEngine) Commit(ctx context.Context, transactionID int64, mc messageCommitter) (string, error) { return te.txPool.Commit(ctx, transactionID, mc) } @@ -466,7 +468,7 @@ outer: if txid > maxid { maxid = txid } - conn, err := te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) + conn, _, err := te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) if err != nil { allErr.RecordError(err) continue diff --git a/go/vt/vttablet/tabletserver/tx_engine_test.go b/go/vt/vttablet/tabletserver/tx_engine_test.go index cb7c235e41c..3c209a49bf4 100644 --- a/go/vt/vttablet/tabletserver/tx_engine_test.go +++ b/go/vt/vttablet/tabletserver/tx_engine_test.go @@ -55,10 +55,13 @@ func TestTxEngineClose(t *testing.T) { // Normal close with timeout wait. te.open() - c, err := te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) + c, beginSQL, err := te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) if err != nil { t.Fatal(err) } + if beginSQL != "begin" { + t.Errorf("beginSQL: %q, want 'begin'", beginSQL) + } c.Recycle() start = time.Now() te.close(false) @@ -68,7 +71,7 @@ func TestTxEngineClose(t *testing.T) { // Immediate close. te.open() - c, err = te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) + c, beginSQL, err = te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) if err != nil { t.Fatal(err) } @@ -82,7 +85,7 @@ func TestTxEngineClose(t *testing.T) { // Normal close with short grace period. te.shutdownGracePeriod = 250 * time.Millisecond te.open() - c, err = te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) + c, beginSQL, err = te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) if err != nil { t.Fatal(err) } @@ -99,7 +102,7 @@ func TestTxEngineClose(t *testing.T) { // Normal close with short grace period, but pool gets empty early. te.shutdownGracePeriod = 250 * time.Millisecond te.open() - c, err = te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) + c, beginSQL, err = te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) if err != nil { t.Fatal(err) } @@ -123,7 +126,7 @@ func TestTxEngineClose(t *testing.T) { // Immediate close, but connection is in use. te.open() - c, err = te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) + c, beginSQL, err = te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) if err != nil { t.Fatal(err) } @@ -492,6 +495,6 @@ func startTransaction(te *TxEngine, writeTransaction bool) error { } else { options.TransactionIsolation = querypb.ExecuteOptions_CONSISTENT_SNAPSHOT_READ_ONLY } - _, err := te.Begin(context.Background(), options) + _, _, err := te.Begin(context.Background(), options) return err } diff --git a/go/vt/vttablet/tabletserver/tx_executor.go b/go/vt/vttablet/tabletserver/tx_executor.go index f9d00174c40..e485174745f 100644 --- a/go/vt/vttablet/tabletserver/tx_executor.go +++ b/go/vt/vttablet/tabletserver/tx_executor.go @@ -68,7 +68,7 @@ func (txe *TxExecutor) Prepare(transactionID int64, dtid string) error { return vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "prepare failed for transaction %d: %v", transactionID, err) } - localConn, err := txe.te.txPool.LocalBegin(txe.ctx, &querypb.ExecuteOptions{}) + localConn, _, err := txe.te.txPool.LocalBegin(txe.ctx, &querypb.ExecuteOptions{}) if err != nil { return err } @@ -79,7 +79,7 @@ func (txe *TxExecutor) Prepare(transactionID int64, dtid string) error { return err } - err = txe.te.txPool.LocalCommit(txe.ctx, localConn, txe.messager) + _, err = txe.te.txPool.LocalCommit(txe.ctx, localConn, txe.messager) if err != nil { return err } @@ -111,7 +111,7 @@ func (txe *TxExecutor) CommitPrepared(dtid string) error { txe.markFailed(ctx, dtid) return err } - err = txe.te.txPool.LocalCommit(ctx, conn, txe.messager) + _, err = txe.te.txPool.LocalCommit(ctx, conn, txe.messager) if err != nil { txe.markFailed(ctx, dtid) return err @@ -130,7 +130,7 @@ func (txe *TxExecutor) CommitPrepared(dtid string) error { func (txe *TxExecutor) markFailed(ctx context.Context, dtid string) { tabletenv.InternalErrors.Add("TwopcCommit", 1) txe.te.preparedPool.SetFailed(dtid) - conn, err := txe.te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) + conn, _, err := txe.te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) if err != nil { log.Errorf("markFailed: Begin failed for dtid %s: %v", dtid, err) return @@ -142,7 +142,7 @@ func (txe *TxExecutor) markFailed(ctx context.Context, dtid string) { return } - if err = txe.te.txPool.LocalCommit(ctx, conn, txe.messager); err != nil { + if _, err = txe.te.txPool.LocalCommit(ctx, conn, txe.messager); err != nil { log.Errorf("markFailed: Commit failed for dtid %s: %v", dtid, err) } } @@ -170,7 +170,7 @@ func (txe *TxExecutor) RollbackPrepared(dtid string, originalID int64) error { return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled") } defer tabletenv.QueryStats.Record("ROLLBACK_PREPARED", time.Now()) - conn, err := txe.te.txPool.LocalBegin(txe.ctx, &querypb.ExecuteOptions{}) + conn, _, err := txe.te.txPool.LocalBegin(txe.ctx, &querypb.ExecuteOptions{}) if err != nil { goto returnConn } @@ -181,7 +181,7 @@ func (txe *TxExecutor) RollbackPrepared(dtid string, originalID int64) error { goto returnConn } - err = txe.te.txPool.LocalCommit(txe.ctx, conn, txe.messager) + _, err = txe.te.txPool.LocalCommit(txe.ctx, conn, txe.messager) returnConn: if preparedConn := txe.te.preparedPool.FetchForRollback(dtid); preparedConn != nil { @@ -200,7 +200,7 @@ func (txe *TxExecutor) CreateTransaction(dtid string, participants []*querypb.Ta return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled") } defer tabletenv.QueryStats.Record("CREATE_TRANSACTION", time.Now()) - conn, err := txe.te.txPool.LocalBegin(txe.ctx, &querypb.ExecuteOptions{}) + conn, _, err := txe.te.txPool.LocalBegin(txe.ctx, &querypb.ExecuteOptions{}) if err != nil { return err } @@ -210,7 +210,8 @@ func (txe *TxExecutor) CreateTransaction(dtid string, participants []*querypb.Ta if err != nil { return err } - return txe.te.txPool.LocalCommit(txe.ctx, conn, txe.messager) + _, err = txe.te.txPool.LocalCommit(txe.ctx, conn, txe.messager) + return err } // StartCommit atomically commits the transaction along with the @@ -232,7 +233,8 @@ func (txe *TxExecutor) StartCommit(transactionID int64, dtid string) error { if err != nil { return err } - return txe.te.txPool.LocalCommit(txe.ctx, conn, txe.messager) + _, err = txe.te.txPool.LocalCommit(txe.ctx, conn, txe.messager) + return err } // SetRollback transitions the 2pc transaction to the Rollback state. @@ -248,7 +250,7 @@ func (txe *TxExecutor) SetRollback(dtid string, transactionID int64) error { txe.te.txPool.Rollback(txe.ctx, transactionID) } - conn, err := txe.te.txPool.LocalBegin(txe.ctx, &querypb.ExecuteOptions{}) + conn, _, err := txe.te.txPool.LocalBegin(txe.ctx, &querypb.ExecuteOptions{}) if err != nil { return err } @@ -259,7 +261,7 @@ func (txe *TxExecutor) SetRollback(dtid string, transactionID int64) error { return err } - err = txe.te.txPool.LocalCommit(txe.ctx, conn, txe.messager) + _, err = txe.te.txPool.LocalCommit(txe.ctx, conn, txe.messager) if err != nil { return err } @@ -275,7 +277,7 @@ func (txe *TxExecutor) ConcludeTransaction(dtid string) error { } defer tabletenv.QueryStats.Record("RESOLVE", time.Now()) - conn, err := txe.te.txPool.LocalBegin(txe.ctx, &querypb.ExecuteOptions{}) + conn, _, err := txe.te.txPool.LocalBegin(txe.ctx, &querypb.ExecuteOptions{}) if err != nil { return err } @@ -285,7 +287,8 @@ func (txe *TxExecutor) ConcludeTransaction(dtid string) error { if err != nil { return err } - return txe.te.txPool.LocalCommit(txe.ctx, conn, txe.messager) + _, err = txe.te.txPool.LocalCommit(txe.ctx, conn, txe.messager) + return err } // ReadTransaction returns the metadata for the sepcified dtid. diff --git a/go/vt/vttablet/tabletserver/tx_pool.go b/go/vt/vttablet/tabletserver/tx_pool.go index 4d766643792..5f068fba724 100644 --- a/go/vt/vttablet/tabletserver/tx_pool.go +++ b/go/vt/vttablet/tabletserver/tx_pool.go @@ -192,23 +192,26 @@ func (axp *TxPool) WaitForEmpty() { axp.activePool.WaitForEmpty() } -// Begin begins a transaction, and returns the associated transaction id. +// Begin begins a transaction, and returns the associated transaction id and +// the statements (if any) executed to initiate the transaction. In autocommit +// mode the statement will be "". +// // Subsequent statements can access the connection through the transaction id. -func (axp *TxPool) Begin(ctx context.Context, options *querypb.ExecuteOptions) (int64, error) { +func (axp *TxPool) Begin(ctx context.Context, options *querypb.ExecuteOptions) (int64, string, error) { var conn *connpool.DBConn var err error immediateCaller := callerid.ImmediateCallerIDFromContext(ctx) effectiveCaller := callerid.EffectiveCallerIDFromContext(ctx) if !axp.limiter.Get(immediateCaller, effectiveCaller) { - return 0, vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "per-user transaction pool connection limit exceeded") + return 0, "", vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "per-user transaction pool connection limit exceeded") } waiterCount := axp.waiters.Add(1) defer axp.waiters.Add(-1) if waiterCount > axp.waiterCap.Get() { - return 0, vterrors.New(vtrpcpb.Code_RESOURCE_EXHAUSTED, "transaction pool waiter count exceeded") + return 0, "", vterrors.New(vtrpcpb.Code_RESOURCE_EXHAUSTED, "transaction pool waiter count exceeded") } var beginSucceeded bool @@ -231,30 +234,33 @@ func (axp *TxPool) Begin(ctx context.Context, options *querypb.ExecuteOptions) ( if err != nil { switch err { case connpool.ErrConnPoolClosed: - return 0, err + return 0, "", err case pools.ErrTimeout: axp.LogActive() - return 0, vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "transaction pool connection limit exceeded") + return 0, "", vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "transaction pool connection limit exceeded") } - return 0, err + return 0, "", err } autocommitTransaction := false - + beginQueries := "" if queries, ok := txIsolations[options.GetTransactionIsolation()]; ok { if queries.setIsolationLevel != "" { if _, err := conn.Exec(ctx, "set transaction isolation level "+queries.setIsolationLevel, 1, false); err != nil { - return 0, err + return 0, "", err } + + beginQueries = queries.setIsolationLevel + "; " } if _, err := conn.Exec(ctx, queries.openTransaction, 1, false); err != nil { - return 0, err + return 0, "", err } + beginQueries = beginQueries + queries.openTransaction } else if options.GetTransactionIsolation() == querypb.ExecuteOptions_AUTOCOMMIT { autocommitTransaction = true } else { - return 0, fmt.Errorf("don't know how to open a transaction of this type: %v", options.GetTransactionIsolation()) + return 0, "", fmt.Errorf("don't know how to open a transaction of this type: %v", options.GetTransactionIsolation()) } beginSucceeded = true @@ -271,14 +277,14 @@ func (axp *TxPool) Begin(ctx context.Context, options *querypb.ExecuteOptions) ( ), options.GetWorkload() != querypb.ExecuteOptions_DBA, ) - return transactionID, nil + return transactionID, beginQueries, nil } // Commit commits the specified transaction. -func (axp *TxPool) Commit(ctx context.Context, transactionID int64, mc messageCommitter) error { +func (axp *TxPool) Commit(ctx context.Context, transactionID int64, mc messageCommitter) (string, error) { conn, err := axp.Get(transactionID, "for commit") if err != nil { - return err + return "", err } return axp.LocalCommit(ctx, conn, mc) } @@ -305,30 +311,31 @@ func (axp *TxPool) Get(transactionID int64, reason string) (*TxConnection, error // LocalBegin is equivalent to Begin->Get. // It's used for executing transactions within a request. It's safe // to always call LocalConclude at the end. -func (axp *TxPool) LocalBegin(ctx context.Context, options *querypb.ExecuteOptions) (*TxConnection, error) { - transactionID, err := axp.Begin(ctx, options) +func (axp *TxPool) LocalBegin(ctx context.Context, options *querypb.ExecuteOptions) (*TxConnection, string, error) { + transactionID, beginSQL, err := axp.Begin(ctx, options) if err != nil { - return nil, err + return nil, "", err } - return axp.Get(transactionID, "for local query") + conn, err := axp.Get(transactionID, "for local query") + return conn, beginSQL, err } // LocalCommit is the commit function for LocalBegin. -func (axp *TxPool) LocalCommit(ctx context.Context, conn *TxConnection, mc messageCommitter) error { +func (axp *TxPool) LocalCommit(ctx context.Context, conn *TxConnection, mc messageCommitter) (string, error) { defer conn.conclude(TxCommit, "transaction committed") defer mc.LockDB(conn.NewMessages, conn.ChangedMessages)() if conn.Autocommit { mc.UpdateCaches(conn.NewMessages, conn.ChangedMessages) - return nil + return "", nil } if _, err := conn.Exec(ctx, "commit", 1, false); err != nil { conn.Close() - return err + return "", err } mc.UpdateCaches(conn.NewMessages, conn.ChangedMessages) - return nil + return "commit", nil } // LocalConclude concludes a transaction started by LocalBegin. diff --git a/go/vt/vttablet/tabletserver/tx_pool_test.go b/go/vt/vttablet/tabletserver/tx_pool_test.go index d58b0357319..b1d54686b1d 100644 --- a/go/vt/vttablet/tabletserver/tx_pool_test.go +++ b/go/vt/vttablet/tabletserver/tx_pool_test.go @@ -40,6 +40,42 @@ import ( "vitess.io/vitess/go/vt/vttablet/tabletserver/messager" ) +func TestTxPoolExecuteCommit(t *testing.T) { + sql := "update test_column set x=1 where 1!=1" + db := fakesqldb.New(t) + defer db.Close() + db.AddQuery(sql, &sqltypes.Result{}) + db.AddQuery("begin", &sqltypes.Result{}) + db.AddQuery("commit", &sqltypes.Result{}) + + txPool := newTxPool() + txPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams()) + defer txPool.Close() + ctx := context.Background() + transactionID, beginSQL, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) + if err != nil { + t.Fatal(err) + } + if beginSQL != "begin" { + t.Errorf("beginSQL got %q want 'begin'", beginSQL) + } + txConn, err := txPool.Get(transactionID, "for query") + if err != nil { + t.Fatal(err) + } + txConn.RecordQuery(sql) + _, err = txConn.Exec(ctx, sql, 1, true) + txConn.Recycle() + + commitSQL, err := txPool.Commit(ctx, transactionID, &fakeMessageCommitter{}) + if err != nil { + t.Fatal(err) + } + if commitSQL != "commit" { + t.Errorf("commitSQL got %q want 'commit'", commitSQL) + } +} + func TestTxPoolExecuteRollback(t *testing.T) { sql := "alter table test_table add test_column int" db := fakesqldb.New(t) @@ -52,10 +88,13 @@ func TestTxPoolExecuteRollback(t *testing.T) { txPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams()) defer txPool.Close() ctx := context.Background() - transactionID, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) + transactionID, beginSQL, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) if err != nil { t.Fatal(err) } + if beginSQL != "begin" { + t.Errorf("beginSQL got %q want 'begin'", beginSQL) + } txConn, err := txPool.Get(transactionID, "for query") if err != nil { t.Fatal(err) @@ -79,11 +118,11 @@ func TestTxPoolRollbackNonBusy(t *testing.T) { txPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams()) defer txPool.Close() ctx := context.Background() - txid1, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) + txid1, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) if err != nil { t.Fatal(err) } - _, err = txPool.Begin(ctx, &querypb.ExecuteOptions{}) + _, _, err = txPool.Begin(ctx, &querypb.ExecuteOptions{}) if err != nil { t.Fatal(err) } @@ -173,7 +212,7 @@ func TestTxPoolTransactionKillerEnforceTimeoutEnabled(t *testing.T) { } func addQuery(ctx context.Context, sql string, txPool *TxPool, workload querypb.ExecuteOptions_Workload) (int64, error) { - transactionID, err := txPool.Begin(ctx, &querypb.ExecuteOptions{Workload: workload}) + transactionID, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{Workload: workload}) if err != nil { return 0, err } @@ -199,10 +238,13 @@ func TestTxPoolClientRowsFound(t *testing.T) { // Start a 'normal' transaction. It should take a connection // for the normal 'conns' pool. - id1, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) + id1, beginSQL, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) if err != nil { t.Fatal(err) } + if beginSQL != "begin" { + t.Errorf("beginSQL got %q want 'begin'", beginSQL) + } if got, want := txPool.conns.Available(), startNormalSize-1; got != want { t.Errorf("Normal pool size: %d, want %d", got, want) } @@ -212,10 +254,13 @@ func TestTxPoolClientRowsFound(t *testing.T) { // Start a 'foundRows' transaction. It should take a connection // from the foundRows pool. - id2, err := txPool.Begin(ctx, &querypb.ExecuteOptions{ClientFoundRows: true}) + id2, beginSQL, err := txPool.Begin(ctx, &querypb.ExecuteOptions{ClientFoundRows: true}) if err != nil { t.Fatal(err) } + if beginSQL != "begin" { + t.Errorf("beginSQL got %q want 'begin'", beginSQL) + } if got, want := txPool.conns.Available(), startNormalSize-1; got != want { t.Errorf("Normal pool size: %d, want %d", got, want) } @@ -253,16 +298,23 @@ func TestTxPoolTransactionIsolation(t *testing.T) { ctx := context.Background() // Start a transaction with default. It should not change isolation. - _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) + _, beginSQL, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) if err != nil { t.Fatal(err) } + if beginSQL != "begin" { + t.Errorf("beginSQL got %q want 'begin'", beginSQL) + } db.AddQuery("set transaction isolation level READ COMMITTED", &sqltypes.Result{}) - _, err = txPool.Begin(ctx, &querypb.ExecuteOptions{TransactionIsolation: querypb.ExecuteOptions_READ_COMMITTED}) + _, beginSQL, err = txPool.Begin(ctx, &querypb.ExecuteOptions{TransactionIsolation: querypb.ExecuteOptions_READ_COMMITTED}) if err != nil { t.Fatal(err) } + wantBeginSQL := "READ COMMITTED; begin" + if beginSQL != wantBeginSQL { + t.Errorf("beginSQL got %q want %q", beginSQL, wantBeginSQL) + } } func TestTxPoolAutocommit(t *testing.T) { @@ -276,14 +328,20 @@ func TestTxPoolAutocommit(t *testing.T) { // to mysql. // This test is meaningful because if txPool.Begin were to send a BEGIN statement to the connection, it will fatal // because is not in the list of expected queries (i.e db.AddQuery hasn't been called). - txid, err := txPool.Begin(ctx, &querypb.ExecuteOptions{TransactionIsolation: querypb.ExecuteOptions_AUTOCOMMIT}) + txid, beginSQL, err := txPool.Begin(ctx, &querypb.ExecuteOptions{TransactionIsolation: querypb.ExecuteOptions_AUTOCOMMIT}) if err != nil { t.Fatal(err) } - err = txPool.Commit(ctx, txid, &fakeMessageCommitter{}) + if beginSQL != "" { + t.Errorf("beginSQL got %q want ''", beginSQL) + } + commitSQL, err := txPool.Commit(ctx, txid, &fakeMessageCommitter{}) if err != nil { t.Fatal(err) } + if commitSQL != "" { + t.Errorf("commitSQL got %q want ''", commitSQL) + } } // TestTxPoolBeginWithPoolConnectionError_TransientErrno2006 tests the case @@ -305,7 +363,7 @@ func TestTxPoolBeginWithPoolConnectionError_Errno2006_Transient(t *testing.T) { } ctx := context.Background() - txConn, err := txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) + txConn, _, err := txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) if err != nil { t.Fatalf("Begin should have succeeded after the retry in DBConn.Exec(): %v", err) } @@ -336,7 +394,7 @@ func TestTxPoolBeginWithPoolConnectionError_Errno2006_Permanent(t *testing.T) { // After that, vttablet will automatically try to reconnect and this fail. // DBConn.Exec() will return the reconnect error as final error and not the // initial connection error. - _, err = txPool.LocalBegin(context.Background(), &querypb.ExecuteOptions{}) + _, _, err = txPool.LocalBegin(context.Background(), &querypb.ExecuteOptions{}) if err == nil || !strings.Contains(err.Error(), "(errno 2013)") { t.Fatalf("Begin did not return the reconnect error: %v", err) } @@ -362,7 +420,7 @@ func TestTxPoolBeginWithPoolConnectionError_Errno2013(t *testing.T) { db.EnableShouldClose() // 2013 is not retryable. DBConn.Exec() fails after the first attempt. - _, err = txPool.Begin(context.Background(), &querypb.ExecuteOptions{}) + _, _, err = txPool.Begin(context.Background(), &querypb.ExecuteOptions{}) if err == nil || !strings.Contains(err.Error(), "(errno 2013)") { t.Fatalf("Begin must return connection error with MySQL errno 2013: %v", err) } @@ -385,7 +443,7 @@ func primeTxPoolWithConnection(t *testing.T) (*fakesqldb.DB, *TxPool, error) { db.AddQuery("begin", &sqltypes.Result{}) db.AddQuery("rollback", &sqltypes.Result{}) ctx := context.Background() - txConn, err := txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) + txConn, _, err := txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) if err != nil { return nil, nil, err } @@ -402,7 +460,7 @@ func TestTxPoolBeginWithError(t *testing.T) { txPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams()) defer txPool.Close() ctx := context.Background() - _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) + _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) want := "error: rejected" if err == nil || !strings.Contains(err.Error(), want) { t.Errorf("Begin: %v, want %s", err, want) @@ -424,7 +482,7 @@ func TestTxPoolRollbackFail(t *testing.T) { txPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams()) defer txPool.Close() ctx := context.Background() - transactionID, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) + transactionID, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) if err != nil { t.Fatal(err) } @@ -467,7 +525,7 @@ func TestTxPoolGetConnRecentlyRemovedTransaction(t *testing.T) { db.AddQuery("rollback", &sqltypes.Result{}) txPool := newTxPool() txPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams()) - id, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) + id, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) txPool.Close() assertErrorMatch := func(id int64, reason string) { @@ -486,14 +544,14 @@ func TestTxPoolGetConnRecentlyRemovedTransaction(t *testing.T) { txPool = newTxPool() txPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams()) - id, err = txPool.Begin(ctx, &querypb.ExecuteOptions{}) - if err := txPool.Commit(ctx, id, &fakeMessageCommitter{}); err != nil { + id, _, err = txPool.Begin(ctx, &querypb.ExecuteOptions{}) + if _, err := txPool.Commit(ctx, id, &fakeMessageCommitter{}); err != nil { t.Fatalf("got error: %v", err) } assertErrorMatch(id, "transaction committed") - id, err = txPool.Begin(ctx, &querypb.ExecuteOptions{}) + id, _, err = txPool.Begin(ctx, &querypb.ExecuteOptions{}) if err := txPool.Rollback(ctx, id); err != nil { t.Fatalf("got error: %v", err) } @@ -506,13 +564,13 @@ func TestTxPoolGetConnRecentlyRemovedTransaction(t *testing.T) { txPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams()) defer txPool.Close() - id, err = txPool.Begin(ctx, &querypb.ExecuteOptions{}) + id, _, err = txPool.Begin(ctx, &querypb.ExecuteOptions{}) time.Sleep(5 * time.Millisecond) assertErrorMatch(id, "exceeded timeout: 1ms") txPool.SetTimeout(1 * time.Hour) - id, err = txPool.Begin(ctx, &querypb.ExecuteOptions{}) + id, _, err = txPool.Begin(ctx, &querypb.ExecuteOptions{}) txc, err := txPool.Get(id, "for close") if err != nil { t.Fatalf("got error: %v", err) @@ -545,7 +603,7 @@ func TestTxPoolExecFailDueToConnFail_Errno2006(t *testing.T) { ctx := context.Background() // Start the transaction. - txConn, err := txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) + txConn, _, err := txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) if err != nil { t.Fatal(err) } @@ -583,7 +641,7 @@ func TestTxPoolExecFailDueToConnFail_Errno2013(t *testing.T) { ctx := context.Background() // Start the transaction. - txConn, err := txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) + txConn, _, err := txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) if err != nil { t.Fatal(err) } @@ -615,7 +673,7 @@ func TestTxPoolCloseKillsStrayTransactions(t *testing.T) { txPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams()) // Start stray transaction. - _, err := txPool.Begin(context.Background(), &querypb.ExecuteOptions{}) + _, _, err := txPool.Begin(context.Background(), &querypb.ExecuteOptions{}) if err != nil { t.Fatal(err) }