Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
113809: kvstreamer: add limit to how many eager batches are issued r=yuzefovich a=yuzefovich

**kvstreamer: add limit to how many eager batches are issued**

This commit fixes extremely suboptimal behavior of the streamer in the
InOrder mode in some cases. In particular, previously it was possible
for the buffered responses to consume most of the working budget, so the
streamer would degrade to processing all requests effectively one
BatchRequest with one Get / Scan at a time, significantly increasing the
latency. For example, the query added as a regression test that performs
30k Gets across 10 ranges would usually take on the order of 1.5s (which
is not great already since in the non-streamer path it takes 400ms), but
in the degenerate cases it could be on the order of 20-30s.

Similar behavior could occur in the OutOfOrder mode too where we would
issue more BatchRequests in which only one request could be satisfied
(although in OutOfOrder mode the problem is not as severe - we don't
buffer any results since we can always return them right away).

This problem is now fixed by imposing the limit on the budget's usage at
which point the streamer stops issuing "eager" requests. Namely, now,
when there is at least one request in flight, the streamer won't issue
anymore requests once `limit * eagerFraction` is exceeded. This
effectively reserves a portion of the budget for the "head-of-the-line"
batch.

The "eager fraction" is controlled by a session variable, separate for
each mode. The defaults of 0.5 for InOrder and 0.8 for OutOfOrder modes
were chosen after running TPCH queries and the query that inspired this
commit. These values bring the number of gRPC calls for the reproduction
query from 1.5k-2k range to below 200 and the query latency to be
reliably around 400ms.

I don't really see any significant downsides to this change - in the
worst case, we'd be utilizing less of the available memory budget which
is not that big of a deal, so I intend to backport this change. Also,
setting the eager fractions to large values (greater than 1.0 is
allowed) would disable this limiting behavior and revert to the previous
behavior if we need it.

Fixes: cockroachdb#113729.

Release note (bug fix): Previously, when executing queries with
index / lookup joins when the ordering needs to be maintained,
CockroachDB in some cases could get into a pathological behavior
which would lead to increased query latency, possibly by 1 or 2 orders
of magnitude. This bug was introduced in 22.2 and is now fixed.

**kvstreamer: increase default avg response multiple**

This commit increases the default value for
`sql.distsql.streamer.avg_response_size_multiple` cluster setting from
1.5 to 3.0. This setting controls the factor by which the current "avg
response size" estimate is multiplied and allows for TargetBytes
parameter to grow over time. In the reproduction query from the
previous commit it was determined that the growth might not be as quick
as desirable.

The effect of this change is as follows:
- if we have responses of varying sizes, then we're now likely to be more
effective since we'll end up issuing less BatchRequests
- if we have responses of similar sizes, then we might pre-reserve too
much budget upfront, so we'll end up with lower concurrency across
ranges.

Thus, we don't want to increase the multiple by too much; however,
keeping it at 1.5 can be quite suboptimal in some cases - 3.0 seems like
a decent middle ground. This number was chosen based on running TPCH
queries (both via InOrder and OutOfOrder modes of the streamer) and the
reproduction query. (For the latter this change reduces the number of
gRPC calls by a factor of 3 or so.)

Release note: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
craig[bot] and yuzefovich committed Nov 8, 2023
2 parents d969454 + 116bf04 commit e174990
Show file tree
Hide file tree
Showing 15 changed files with 210 additions and 13 deletions.
2 changes: 2 additions & 0 deletions pkg/kv/kvclient/kvstreamer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ go_library(
"//pkg/roachpb",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql/sessiondata",
"//pkg/util",
"//pkg/util/admission",
"//pkg/util/admission/admissionpb",
Expand Down Expand Up @@ -67,6 +68,7 @@ go_test(
"//pkg/sql/rowcontainer",
"//pkg/storage",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/util/hlc",
"//pkg/util/leaktest",
Expand Down
5 changes: 3 additions & 2 deletions pkg/kv/kvclient/kvstreamer/avg_response_estimator.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ const (
// response.
// TODO(yuzefovich): use the optimizer-driven estimates.
InitialAvgResponseSize = 1 << 10 // 1KiB
// This value was determined using tpchvec/bench test on all TPC-H queries.
defaultAvgResponseSizeMultiple = 1.5
// This value was determined using tpchvec/bench test on all TPC-H queries
// as well as the query in TestStreamerVaryingResponseSizes.
defaultAvgResponseSizeMultiple = 3.0
)

// streamerAvgResponseSizeMultiple determines the multiple used when calculating
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvclient/kvstreamer/avg_response_estimator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,10 @@ func TestAvgResponseSizeForPartialResponses(t *testing.T) {
}
// We started with the TargetBytes equal to the initial estimate of 1KiB,
// and then with each update the estimate should have grown. In particular,
// we expect 7 BatchRequests total that fetch 1, 1, 3, 7, 18, 45, 25 rows
// respectively (note that the growth is faster than 2x because we use 1.5
// multiple on top of the average).
require.Equal(t, 7, batchRequestsCount)
// we expect 5 BatchRequests total that fetch 1, 3, 12, 48, 36 rows
// respectively (note that the growth is 3-4x because we use 3.0 multiple on
// top of the average).
require.Equal(t, 5, batchRequestsCount)
// From the perspective of the response estimator, we received only one
// response (that happened to be paginated across BatchRequests), so our
// estimate should match exactly the total size.
Expand Down
66 changes: 59 additions & 7 deletions pkg/kv/kvclient/kvstreamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
Expand Down Expand Up @@ -220,13 +221,19 @@ func (r Result) Release(ctx context.Context) {
type Streamer struct {
distSender *kvcoord.DistSender
stopper *stop.Stopper

mode OperationMode
hints Hints
maxKeysPerRow int32
budget *budget
lockStrength lock.Strength
lockDurability lock.Durability
// sd can be nil in tests.
sd *sessiondata.SessionData

mode OperationMode
hints Hints
maxKeysPerRow int32
// eagerMemUsageLimitBytes determines the maximum memory used from the
// budget at which point the streamer stops sending non-head-of-the-line
// requests eagerly.
eagerMemUsageLimitBytes int64
budget *budget
lockStrength lock.Strength
lockDurability lock.Durability

streamerStatistics

Expand Down Expand Up @@ -356,6 +363,8 @@ func max(a, b int64) int64 {
// to interact with the account only after canceling the Streamer (because
// memory accounts are not thread-safe).
//
// sd can be nil in tests in which case some reasonable defaults will be used.
//
// kvPairsRead should be incremented atomically with the sum of NumKeys
// parameters of all received responses.
//
Expand All @@ -366,6 +375,7 @@ func NewStreamer(
stopper *stop.Stopper,
txn *kv.Txn,
st *cluster.Settings,
sd *sessiondata.SessionData,
lockWaitPolicy lock.WaitPolicy,
limitBytes int64,
acc *mon.BoundAccount,
Expand All @@ -380,6 +390,7 @@ func NewStreamer(
s := &Streamer{
distSender: distSender,
stopper: stopper,
sd: sd,
budget: newBudget(acc, limitBytes),
lockStrength: lockStrength,
lockDurability: lockDurability,
Expand Down Expand Up @@ -425,12 +436,29 @@ func (s *Streamer) Init(
mode OperationMode, hints Hints, maxKeysPerRow int, diskBuffer ResultDiskBuffer,
) {
s.mode = mode
// s.sd can be nil in tests, so use almost all the budget eagerly then.
eagerFraction := 0.9
if mode == OutOfOrder {
s.requestsToServe = newOutOfOrderRequestsProvider()
s.results = newOutOfOrderResultsBuffer(s.budget)
if s.sd != nil {
eagerFraction = s.sd.StreamerOutOfOrderEagerMemoryUsageFraction
}
} else {
s.requestsToServe = newInOrderRequestsProvider()
s.results = newInOrderResultsBuffer(s.budget, diskBuffer)
if s.sd != nil {
eagerFraction = s.sd.StreamerInOrderEagerMemoryUsageFraction
}
}
s.eagerMemUsageLimitBytes = int64(math.Ceil(float64(s.budget.limitBytes) * eagerFraction))
// Ensure some reasonable lower bound.
const minEagerMemUsage = 10 << 10 // 10KiB
if s.eagerMemUsageLimitBytes <= 0 {
// Protect from overflow.
s.eagerMemUsageLimitBytes = math.MaxInt64
} else if s.eagerMemUsageLimitBytes < minEagerMemUsage {
s.eagerMemUsageLimitBytes = minEagerMemUsage
}
if !hints.UniqueRequests {
panic(errors.AssertionFailedf("only unique requests are currently supported"))
Expand Down Expand Up @@ -1052,6 +1080,30 @@ func (w *workerCoordinator) issueRequestsForAsyncProcessing(
)
var budgetIsExhausted bool
for !w.s.requestsToServe.emptyLocked() && maxNumRequestsToIssue > 0 && !budgetIsExhausted {
if !headOfLine && w.s.budget.mu.acc.Used() > w.s.eagerMemUsageLimitBytes {
// The next batch is not head-of-the-line, and the budget has used
// up more than eagerMemUsageLimitBytes bytes. At this point, we
// stop issuing "eager" requests, so we just exit.
//
// This exit will not lead to the streamer deadlocking because we
// already have at least one batch to serve, and at some point we'll
// get into this loop with headOfLine=true, so we'll always be
// issuing at least one batch, eventually.
//
// This behavior is helpful to prevent pathological behavior
// observed in #113729. Namely, if we issue too many batches eagerly
// in the InOrder mode, the buffered responses might consume most of
// our memory budget, and at some point we might regress to
// processing requests one batch with a single Get / Scan request at
// a time. We don't want to just drop already received responses
// (we've already discarded the original singleRangeBatches anyway),
// so this mechanism allows us to preserve a fraction of the budget
// to processing head-of-the-line batches.
//
// Similar pattern can occur in the OutOfOrder mode too although the
// degradation is not as severe as in the InOrder mode.
return nil
}
singleRangeReqs := w.s.requestsToServe.nextLocked()
availableBudget := w.s.budget.limitBytes - w.s.budget.mu.acc.Used()
// minTargetBytes is the minimum TargetBytes limit with which it makes
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvclient/kvstreamer/streamer_accounting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func TestStreamerMemoryAccounting(t *testing.T) {
s.AppStopper(),
kv.NewLeafTxn(ctx, s.DB(), s.DistSQLPlanningNodeID(), leafInputState),
cluster.MakeTestingClusterSettings(),
nil, /* sd */
lock.WaitPolicy(0),
math.MaxInt64,
&acc,
Expand Down
62 changes: 62 additions & 0 deletions pkg/kv/kvclient/kvstreamer/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ import (
gosql "database/sql"
"fmt"
"math"
"regexp"
"strconv"
"strings"
"sync"
"testing"

Expand All @@ -28,6 +31,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
Expand All @@ -53,6 +58,7 @@ func getStreamer(
s.AppStopper(),
kv.NewLeafTxn(ctx, s.DB(), s.DistSQLPlanningNodeID(), leafInputState),
cluster.MakeTestingClusterSettings(),
nil, /* sd */
lock.WaitPolicy(0),
limitBytes,
acc,
Expand Down Expand Up @@ -110,6 +116,7 @@ func TestStreamerLimitations(t *testing.T) {
s.AppStopper(),
kv.NewTxn(ctx, s.DB(), s.DistSQLPlanningNodeID()),
cluster.MakeTestingClusterSettings(),
nil, /* sd */
lock.WaitPolicy(0),
math.MaxInt64, /* limitBytes */
nil, /* acc */
Expand Down Expand Up @@ -542,3 +549,58 @@ func TestStreamerMultiRangeScan(t *testing.T) {
expected += "}"
require.Equal(t, expected, result)
}

// TestStreamerVaryingResponseSizes verifies that the Streamer handles the
// responses of vastly variable sizes reasonably well. It is a regression test
// for #113729.
func TestStreamerVaryingResponseSizes(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderStress(t, "the test is too memory intensive")
skip.UnderRace(t, "the test is too memory intensive")

s, db, _ := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(context.Background())

runner := sqlutils.MakeSQLRunner(db)
// Create a table with 10 ranges, with 3k rows in each. Within each range,
// first 1k rows are relatively small, then next 1k rows are medium, and
// the last 1k rows are large.
runner.Exec(t, `
CREATE TABLE t (
k INT PRIMARY KEY,
v STRING,
blob STRING,
INDEX t_v_idx (v ASC)
);
INSERT INTO t SELECT 3000 * (i // 1000) + i % 1000, '1', repeat('a', 600 + i % 200) FROM generate_series(1, 10000) AS g(i);
INSERT INTO t SELECT 3000 * (i // 1000) + i % 1000 + 1000, '1', repeat('a', 3000 + i % 1000) FROM generate_series(1, 10000) AS g(i);
INSERT INTO t SELECT 3000 * (i // 1000) + i % 1000 + 2000, '1', repeat('a', 20000 + i) FROM generate_series(1, 10000) AS g(i);
ALTER TABLE t SPLIT AT SELECT generate_series(1, 30000, 3000);
`)

// The meat of the test - run the query that performs an index join to fetch
// all rows via the streamer, both in the OutOfOrder and InOrder modes. Each
// time assert that the number of BatchRequests issued is in double digits
// (if not, then the streamer was extremely suboptimal).
kvGRPCCallsRegex := regexp.MustCompile(`KV gRPC calls: (\d+,)`)
for inOrder := range []bool{false, true} {
runner.Exec(t, `SET streamer_always_maintain_ordering = $1;`, inOrder)
for i := 0; i < 5; i++ {
var gRPCCalls int
var err error
rows := runner.QueryStr(t, `EXPLAIN ANALYZE SELECT length(blob) FROM t@t_v_idx WHERE v = '1';`)
for _, row := range rows {
if matches := kvGRPCCallsRegex.FindStringSubmatch(row[0]); len(matches) > 0 {
gRPCCalls, err = strconv.Atoi(strings.ReplaceAll(matches[1], ",", ""))
require.NoError(t, err)
break
}
}
require.Greater(t, 100, gRPCCalls, rows)
}
}
}
1 change: 1 addition & 0 deletions pkg/sql/colfetcher/index_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,7 @@ func NewColIndexJoin(
flowCtx.Stopper(),
txn,
flowCtx.EvalCtx.Settings,
flowCtx.EvalCtx.SessionData(),
spec.LockingWaitPolicy,
spec.LockingStrength,
spec.LockingDurability,
Expand Down
8 changes: 8 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -3645,6 +3645,14 @@ func (m *sessionDataMutator) SetStreamerAlwaysMaintainOrdering(val bool) {
m.data.StreamerAlwaysMaintainOrdering = val
}

func (m *sessionDataMutator) SetStreamerInOrderEagerMemoryUsageFraction(val float64) {
m.data.StreamerInOrderEagerMemoryUsageFraction = val
}

func (m *sessionDataMutator) SetStreamerOutOfOrderEagerMemoryUsageFraction(val float64) {
m.data.StreamerOutOfOrderEagerMemoryUsageFraction = val
}

func (m *sessionDataMutator) SetMultipleActivePortalsEnabled(val bool) {
m.data.MultipleActivePortalsEnabled = val
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/information_schema
Original file line number Diff line number Diff line change
Expand Up @@ -5596,6 +5596,8 @@ standard_conforming_strings on
statement_timeout 0
streamer_always_maintain_ordering off
streamer_enabled on
streamer_in_order_eager_memory_usage_fraction 0.5
streamer_out_of_order_eager_memory_usage_fraction 0.8
strict_ddl_atomicity off
stub_catalog_tables on
synchronize_seqscans on
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/pg_catalog
Original file line number Diff line number Diff line change
Expand Up @@ -2914,6 +2914,8 @@ standard_conforming_strings on N
statement_timeout 0 NULL NULL NULL string
streamer_always_maintain_ordering off NULL NULL NULL string
streamer_enabled on NULL NULL NULL string
streamer_in_order_eager_memory_usage_fraction 0.5 NULL NULL NULL string
streamer_out_of_order_eager_memory_usage_fraction 0.8 NULL NULL NULL string
strict_ddl_atomicity off NULL NULL NULL string
stub_catalog_tables on NULL NULL NULL string
synchronize_seqscans on NULL NULL NULL string
Expand Down Expand Up @@ -3079,6 +3081,8 @@ standard_conforming_strings on N
statement_timeout 0 NULL user NULL 0s 0s
streamer_always_maintain_ordering off NULL user NULL off off
streamer_enabled on NULL user NULL on on
streamer_in_order_eager_memory_usage_fraction 0.5 NULL user NULL 0.5 0.5
streamer_out_of_order_eager_memory_usage_fraction 0.8 NULL user NULL 0.8 0.8
strict_ddl_atomicity off NULL user NULL off off
stub_catalog_tables on NULL user NULL on on
synchronize_seqscans on NULL user NULL on on
Expand Down Expand Up @@ -3244,6 +3248,8 @@ standard_conforming_strings NULL NULL NULL
statement_timeout NULL NULL NULL NULL NULL
streamer_always_maintain_ordering NULL NULL NULL NULL NULL
streamer_enabled NULL NULL NULL NULL NULL
streamer_in_order_eager_memory_usage_fraction NULL NULL NULL NULL NULL
streamer_out_of_order_eager_memory_usage_fraction NULL NULL NULL NULL NULL
strict_ddl_atomicity NULL NULL NULL NULL NULL
stub_catalog_tables NULL NULL NULL NULL NULL
synchronize_seqscans NULL NULL NULL NULL NULL
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/show_source
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ standard_conforming_strings on
statement_timeout 0
streamer_always_maintain_ordering off
streamer_enabled on
streamer_in_order_eager_memory_usage_fraction 0.5
streamer_out_of_order_eager_memory_usage_fraction 0.8
strict_ddl_atomicity off
stub_catalog_tables on
synchronize_seqscans on
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/row/kv_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/fetchpb"
"github.com/cockroachdb/cockroach/pkg/sql/rowinfra"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -170,6 +171,7 @@ func NewStreamingKVFetcher(
stopper *stop.Stopper,
txn *kv.Txn,
st *cluster.Settings,
sd *sessiondata.SessionData,
lockWaitPolicy descpb.ScanLockingWaitPolicy,
lockStrength descpb.ScanLockingStrength,
lockDurability descpb.ScanLockingDurability,
Expand All @@ -188,6 +190,7 @@ func NewStreamingKVFetcher(
stopper,
txn,
st,
sd,
GetWaitPolicy(lockWaitPolicy),
streamerBudgetLimit,
streamerBudgetAcc,
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/rowexec/joinreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,7 @@ func newJoinReader(
flowCtx.Stopper(),
jr.txn,
flowCtx.EvalCtx.Settings,
flowCtx.EvalCtx.SessionData(),
spec.LockingWaitPolicy,
spec.LockingStrength,
spec.LockingDurability,
Expand Down
8 changes: 8 additions & 0 deletions pkg/sql/sessiondatapb/session_data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,14 @@ message SessionData {
// This session variable is introduced as a possible workaround in case we
// have more bugs like #113013.
bool streamer_always_maintain_ordering = 27;
// StreamerInOrderEagerMemoryUsageFraction controls the fraction of the
// streamer's memory budget that might be used for issuing requests eagerly,
// in the InOrder mode.
double streamer_in_order_eager_memory_usage_fraction = 28;
// StreamerOutOfOrderEagerMemoryUsageFraction controls the fraction of the
// streamer's memory budget that might be used for issuing requests eagerly,
// in the OutOfOrder mode.
double streamer_out_of_order_eager_memory_usage_fraction = 29;
}

// DataConversionConfig contains the parameters that influence the output
Expand Down
Loading

0 comments on commit e174990

Please sign in to comment.