Skip to content
This repository has been archived by the owner on Dec 16, 2022. It is now read-only.

Commit

Permalink
Cherry pick changes into our build (#133)
Browse files Browse the repository at this point in the history
* Adds timeouts for all kinds of statements. Power to the clients!

Signed-off-by: Rafael Chacon <[email protected]>

* Cherry-pick: suppress begin...commit in autocommit logs

Original Message:
The previous (untested) implementation turned out to be in the wrong
place in the TabletServer execution tier and did not properly log
the actual statements being executed.

Implement this the right way by returning the statements that were
really executed out from the TxPool, then using those to determine
whether or not to log the statement.

Original PR: vitessio#4827

* Cherry pick: add optional tag-based filtering for query logging

Original PR: vitessio#4895

* bootstrap.sh: Remove unused unused command.

The deprecated `unused` command has been removed from the source
repository, so this now fails. We no longer use `unused` so we can just
drop the line that tries to install it.

Signed-off-by: Anthony Yeh <[email protected]>
  • Loading branch information
setassociative authored Jun 5, 2019
1 parent bf603d5 commit e678010
Show file tree
Hide file tree
Showing 24 changed files with 419 additions and 106 deletions.
1 change: 0 additions & 1 deletion bootstrap.sh
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,6 @@ gotools=" \
golang.org/x/tools/cmd/cover \
golang.org/x/tools/cmd/goimports \
golang.org/x/tools/cmd/goyacc \
honnef.co/go/tools/cmd/unused \
"
echo "Installing dev tools with 'go get'..."
# shellcheck disable=SC2086
Expand Down
13 changes: 13 additions & 0 deletions go/streamlog/streamlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"net/url"
"os"
"os/signal"
"strings"
"sync"
"syscall"

Expand All @@ -40,6 +41,9 @@ var (
// QueryLogFormat controls the format of the query log (either text or json)
QueryLogFormat = flag.String("querylog-format", "text", "format for query logs (\"text\" or \"json\")")

// QueryLogFilterTag contains an optional string that must be present in the query for it to be logged
QueryLogFilterTag = flag.String("querylog-filter-tag", "", "string that must be present in the query for it to be logged")

sendCount = stats.NewCountersWithSingleLabel("StreamlogSend", "stream log send count", "logger_names")
deliveredCount = stats.NewCountersWithMultiLabels(
"StreamlogDelivered",
Expand Down Expand Up @@ -201,3 +205,12 @@ func GetFormatter(logger *StreamLogger) LogFormatter {
return fmter.Logf(w, params)
}
}

// ShouldEmitLog returns whether the log with the given SQL query
// should be emitted or filtered
func ShouldEmitLog(sql string) bool {
if *QueryLogFilterTag == "" {
return true
}
return strings.Contains(sql, *QueryLogFilterTag)
}
11 changes: 11 additions & 0 deletions go/vt/vtgate/engine/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package engine
import (
"encoding/json"
"fmt"
"time"

"vitess.io/vitess/go/jsonutil"
"vitess.io/vitess/go/sqltypes"
Expand Down Expand Up @@ -63,6 +64,9 @@ type Delete struct {
// Option to override the standard behavior and allow a multi-shard delete
// to use single round trip autocommit.
MultiShardAutocommit bool

// QueryTimeout contains the optional timeout (in milliseconds) to apply to this query
QueryTimeout int
}

// MarshalJSON serializes the Delete into a JSON representation.
Expand All @@ -84,6 +88,7 @@ func (del *Delete) MarshalJSON() ([]byte, error) {
Table string `json:",omitempty"`
OwnedVindexQuery string `json:",omitempty"`
MultiShardAutocommit bool `json:",omitempty"`
QueryTimeout int `json:",omitempty"`
}{
Opcode: del.Opcode,
Keyspace: del.Keyspace,
Expand All @@ -93,6 +98,7 @@ func (del *Delete) MarshalJSON() ([]byte, error) {
Table: tname,
OwnedVindexQuery: del.OwnedVindexQuery,
MultiShardAutocommit: del.MultiShardAutocommit,
QueryTimeout: del.QueryTimeout,
}
return jsonutil.MarshalNoEscape(marshalDelete)
}
Expand Down Expand Up @@ -141,6 +147,11 @@ func (del *Delete) RouteType() string {

// Execute performs a non-streaming exec.
func (del *Delete) Execute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
if del.QueryTimeout != 0 {
cancel := vcursor.SetContextTimeout(time.Duration(del.QueryTimeout) * time.Millisecond)
defer cancel()
}

switch del.Opcode {
case DeleteUnsharded:
return del.execDeleteUnsharded(vcursor, bindVars)
Expand Down
11 changes: 11 additions & 0 deletions go/vt/vtgate/engine/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"strconv"
"strings"
"time"

"vitess.io/vitess/go/jsonutil"
"vitess.io/vitess/go/sqltypes"
Expand Down Expand Up @@ -76,6 +77,9 @@ type Insert struct {
// However some application use cases would prefer that the statement partially
// succeed in order to get the performance benefits of autocommit.
MultiShardAutocommit bool

// QueryTimeout contains the optional timeout (in milliseconds) to apply to this query
QueryTimeout int
}

// NewQueryInsert creates an Insert with a query string.
Expand Down Expand Up @@ -127,6 +131,7 @@ func (ins *Insert) MarshalJSON() ([]byte, error) {
Mid []string `json:",omitempty"`
Suffix string `json:",omitempty"`
MultiShardAutocommit bool `json:",omitempty"`
QueryTimeout int `json:",omitempty"`
}{
Opcode: ins.Opcode,
Keyspace: ins.Keyspace,
Expand All @@ -138,6 +143,7 @@ func (ins *Insert) MarshalJSON() ([]byte, error) {
Mid: ins.Mid,
Suffix: ins.Suffix,
MultiShardAutocommit: ins.MultiShardAutocommit,
QueryTimeout: ins.QueryTimeout,
}
return jsonutil.MarshalNoEscape(marshalInsert)
}
Expand Down Expand Up @@ -191,6 +197,11 @@ func (ins *Insert) RouteType() string {

// Execute performs a non-streaming exec.
func (ins *Insert) Execute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
if ins.QueryTimeout != 0 {
cancel := vcursor.SetContextTimeout(time.Duration(ins.QueryTimeout) * time.Millisecond)
defer cancel()
}

switch ins.Opcode {
case InsertUnsharded:
return ins.execInsertUnsharded(vcursor, bindVars)
Expand Down
11 changes: 11 additions & 0 deletions go/vt/vtgate/engine/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package engine
import (
"encoding/json"
"fmt"
"time"

"vitess.io/vitess/go/jsonutil"
"vitess.io/vitess/go/sqltypes"
Expand Down Expand Up @@ -66,6 +67,9 @@ type Update struct {
// Option to override the standard behavior and allow a multi-shard update
// to use single round trip autocommit.
MultiShardAutocommit bool

// QueryTimeout contains the optional timeout (in milliseconds) to apply to this query
QueryTimeout int
}

// MarshalJSON serializes the Update into a JSON representation.
Expand All @@ -88,6 +92,7 @@ func (upd *Update) MarshalJSON() ([]byte, error) {
Table string `json:",omitempty"`
OwnedVindexQuery string `json:",omitempty"`
MultiShardAutocommit bool `json:",omitempty"`
QueryTimeout int `json:",omitempty"`
}{
Opcode: upd.Opcode,
Keyspace: upd.Keyspace,
Expand All @@ -98,6 +103,7 @@ func (upd *Update) MarshalJSON() ([]byte, error) {
Table: tname,
OwnedVindexQuery: upd.OwnedVindexQuery,
MultiShardAutocommit: upd.MultiShardAutocommit,
QueryTimeout: upd.QueryTimeout,
}
return jsonutil.MarshalNoEscape(marshalUpdate)
}
Expand Down Expand Up @@ -145,6 +151,11 @@ func (upd *Update) RouteType() string {

// Execute performs a non-streaming exec.
func (upd *Update) Execute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
if upd.QueryTimeout != 0 {
cancel := vcursor.SetContextTimeout(time.Duration(upd.QueryTimeout) * time.Millisecond)
defer cancel()
}

switch upd.Opcode {
case UpdateUnsharded:
return upd.execUpdateUnsharded(vcursor, bindVars)
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vtgate/logstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ func (stats *LogStats) RemoteAddrUsername() (string, string) {
// Logf formats the log record to the given writer, either as
// tab-separated list of logged fields or as JSON.
func (stats *LogStats) Logf(w io.Writer, params url.Values) error {
if !streamlog.ShouldEmitLog(stats.SQL) {
return nil
}

formattedBindVars := "\"[REDACTED]\""
if !*streamlog.RedactDebugUIQueries {
_, fullBindParams := params["full"]
Expand Down
29 changes: 29 additions & 0 deletions go/vt/vtgate/logstats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,35 @@ func TestLogStatsFormat(t *testing.T) {
*streamlog.QueryLogFormat = "text"
}

func TestLogStatsFilter(t *testing.T) {
defer func() { *streamlog.QueryLogFilterTag = "" }()

logStats := NewLogStats(context.Background(), "test", "sql1 /* LOG_THIS_QUERY */", map[string]*querypb.BindVariable{"intVal": sqltypes.Int64BindVariable(1)})
logStats.StartTime = time.Date(2017, time.January, 1, 1, 2, 3, 0, time.UTC)
logStats.EndTime = time.Date(2017, time.January, 1, 1, 2, 4, 1234, time.UTC)
params := map[string][]string{"full": {}}

got := testFormat(logStats, url.Values(params))
want := "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t0.000000\t0.000000\t0.000000\t\t\"sql1 /* LOG_THIS_QUERY */\"\tmap[intVal:type:INT64 value:\"1\" ]\t0\t0\t\"\"\t\n"
if got != want {
t.Errorf("logstats format: got:\n%q\nwant:\n%q\n", got, want)
}

*streamlog.QueryLogFilterTag = "LOG_THIS_QUERY"
got = testFormat(logStats, url.Values(params))
want = "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t0.000000\t0.000000\t0.000000\t\t\"sql1 /* LOG_THIS_QUERY */\"\tmap[intVal:type:INT64 value:\"1\" ]\t0\t0\t\"\"\t\n"
if got != want {
t.Errorf("logstats format: got:\n%q\nwant:\n%q\n", got, want)
}

*streamlog.QueryLogFilterTag = "NOT_THIS_QUERY"
got = testFormat(logStats, url.Values(params))
want = ""
if got != want {
t.Errorf("logstats format: got:\n%q\nwant:\n%q\n", got, want)
}
}

func TestLogStatsContextHTML(t *testing.T) {
html := "HtmlContext"
callInfo := &fakecallinfo.FakeCallInfo{
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vtgate/planbuilder/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ func buildDeletePlan(del *sqlparser.Delete, vschema ContextVSchema) (*engine.Del
edel.MultiShardAutocommit = true
}

edel.QueryTimeout = queryTimeout(directives)

if rb.ERoute.TargetDestination != nil {
if rb.ERoute.TargetTabletType != topodatapb.TabletType_MASTER {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unsupported: DELETE statement with a replica target")
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vtgate/planbuilder/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ func buildInsertShardedPlan(ins *sqlparser.Insert, table *vindexes.Table) (*engi
eins.MultiShardAutocommit = true
}

eins.QueryTimeout = queryTimeout(directives)

var rows sqlparser.Values
switch insertValues := ins.Rows.(type) {
case *sqlparser.Select, *sqlparser.Union:
Expand Down
18 changes: 18 additions & 0 deletions go/vt/vtgate/planbuilder/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,3 +680,21 @@ func (rb *route) SetOpcode(code engine.RouteOpcode) error {
rb.ERoute.Opcode = code
return nil
}

// queryTimeout returns DirectiveQueryTimeout value if set, otherwise returns 0.
func queryTimeout(d sqlparser.CommentDirectives) int {
if d == nil {
return 0
}

val, ok := d[sqlparser.DirectiveQueryTimeout]
if !ok {
return 0
}

intVal, ok := val.(int)
if ok {
return intVal
}
return 0
}
18 changes: 0 additions & 18 deletions go/vt/vtgate/planbuilder/select.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,21 +321,3 @@ func (pb *primitiveBuilder) expandStar(inrcs []*resultColumn, expr *sqlparser.St
}
return inrcs, true, nil
}

// queryTimeout returns DirectiveQueryTimeout value if set, otherwise returns 0.
func queryTimeout(d sqlparser.CommentDirectives) int {
if d == nil {
return 0
}

val, ok := d[sqlparser.DirectiveQueryTimeout]
if !ok {
return 0
}

intVal, ok := val.(int)
if ok {
return intVal
}
return 0
}
59 changes: 59 additions & 0 deletions go/vt/vtgate/planbuilder/testdata/dml_cases.txt
Original file line number Diff line number Diff line change
Expand Up @@ -988,6 +988,33 @@
}
}

# insert with query timeout
"insert /*vt+ QUERY_TIMEOUT_MS=1 */ into user(id) values (1), (2)"
{
"Original": "insert /*vt+ QUERY_TIMEOUT_MS=1 */ into user(id) values (1), (2)",
"Instructions": {
"Opcode": "InsertSharded",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"Query": "insert /*vt+ QUERY_TIMEOUT_MS=1 */ into user(id, Name, Costly) values (:_Id0, :_Name0, :_Costly0), (:_Id1, :_Name1, :_Costly1)",
"Values": [[[":__seq0",":__seq1"]],[[null,null]],[[null,null]]],
"Table": "user",
"Generate": {
"Keyspace": {
"Name": "main",
"Sharded": false
},
"Query": "select next :n values from seq",
"Values": [1,2]
},
"Prefix": "insert /*vt+ QUERY_TIMEOUT_MS=1 */ into user(id, Name, Costly) values ",
"Mid": ["(:_Id0, :_Name0, :_Costly0)","(:_Id1, :_Name1, :_Costly1)"],
"QueryTimeout": 1
}
}

# insert with multiple rows - multi-shard autocommit
"insert /*vt+ MULTI_SHARD_AUTOCOMMIT=1 */ into user(id) values (1), (2)"
{
Expand Down Expand Up @@ -1337,6 +1364,22 @@
}
}

# update with no primary vindex on where clause (scatter update) - query timeout
"update /*vt+ QUERY_TIMEOUT_MS=1 */ user_extra set val = 1"
{
"Original": "update /*vt+ QUERY_TIMEOUT_MS=1 */ user_extra set val = 1",
"Instructions": {
"Opcode": "UpdateScatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"Query": "update /*vt+ QUERY_TIMEOUT_MS=1 */ user_extra set val = 1",
"Table": "user_extra",
"QueryTimeout": 1
}
}

# update with non-comparison expr
"update user_extra set val = 1 where id between 1 and 2"
{
Expand Down Expand Up @@ -1489,6 +1532,22 @@
}
}

# delete from with no index match - query timeout
"delete /*vt+ QUERY_TIMEOUT_MS=1 */ from user_extra where name = 'jose'"
{
"Original": "delete /*vt+ QUERY_TIMEOUT_MS=1 */ from user_extra where name = 'jose'",
"Instructions": {
"Opcode": "DeleteScatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"Query": "delete /*vt+ QUERY_TIMEOUT_MS=1 */ from user_extra where name = 'jose'",
"Table": "user_extra",
"QueryTimeout": 1
}
}

# delete from with primary id in through IN clause
"delete from user_extra where user_id in (1, 2)"
{
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vtgate/planbuilder/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ func buildUpdatePlan(upd *sqlparser.Update, vschema ContextVSchema) (*engine.Upd
eupd.MultiShardAutocommit = true
}

eupd.QueryTimeout = queryTimeout(directives)

var vindexTable *vindexes.Table
for _, tval := range pb.st.tables {
vindexTable = tval.vindexTable
Expand Down
Loading

0 comments on commit e678010

Please sign in to comment.