From 094b69d490dd13fead0fe301dd6e88093dec7291 Mon Sep 17 00:00:00 2001 From: Kevin Biju <52661649+heavycrystal@users.noreply.github.com> Date: Fri, 9 Feb 2024 21:25:05 +0530 Subject: [PATCH] spill to disk based on flow-worker memory usage (#1231) the existing number of records based threshold is still present, both can be active at the same time. Best used with `GOMEMLIMIT` to keep heap more aligned with set memory limit. --- .../utils/cdc_records/cdc_records_storage.go | 38 +++++++++++++++++-- flow/peerdbenv/config.go | 18 +++++++-- flow/peerdbenv/env.go | 9 +++-- 3 files changed, 54 insertions(+), 11 deletions(-) diff --git a/flow/connectors/utils/cdc_records/cdc_records_storage.go b/flow/connectors/utils/cdc_records/cdc_records_storage.go index c44892c2c0..b46a4ad9b8 100644 --- a/flow/connectors/utils/cdc_records/cdc_records_storage.go +++ b/flow/connectors/utils/cdc_records/cdc_records_storage.go @@ -8,6 +8,7 @@ import ( "log/slog" "math/big" "os" + "runtime" "time" "github.com/cockroachdb/pebble" @@ -34,6 +35,9 @@ type cdcRecordsStore struct { flowJobName string dbFolderName string numRecordsSwitchThreshold int + memThresholdBytes uint64 + thresholdReason string + memStats runtime.MemStats } func NewCDCRecordsStore(flowJobName string) *cdcRecordsStore { @@ -43,7 +47,16 @@ func NewCDCRecordsStore(flowJobName string) *cdcRecordsStore { numRecords: 0, flowJobName: flowJobName, dbFolderName: fmt.Sprintf("%s/%s_%s", os.TempDir(), flowJobName, shared.RandomString(8)), - numRecordsSwitchThreshold: peerdbenv.PeerDBCDCDiskSpillThreshold(), + numRecordsSwitchThreshold: peerdbenv.PeerDBCDCDiskSpillRecordsThreshold(), + memThresholdBytes: func() uint64 { + memPercent := peerdbenv.PeerDBCDCDiskSpillMemPercentThreshold() + maxMemBytes := peerdbenv.PeerDBFlowWorkerMaxMemBytes() + if memPercent > 0 && maxMemBytes > 0 { + return maxMemBytes * uint64(memPercent) / 100 + } + return 0 + }(), + thresholdReason: "", } } @@ -72,15 +85,32 @@ func (c *cdcRecordsStore) initPebbleDB() error { return nil } +func (c *cdcRecordsStore) diskSpillThresholdsExceeded() bool { + if len(c.inMemoryRecords) >= c.numRecordsSwitchThreshold { + c.thresholdReason = fmt.Sprintf("more than %d primary keys read, spilling to disk", + c.numRecordsSwitchThreshold) + return true + } + if c.memThresholdBytes > 0 { + runtime.ReadMemStats(&c.memStats) + + if c.memStats.Alloc >= c.memThresholdBytes { + c.thresholdReason = fmt.Sprintf("memalloc greater than %d bytes, spilling to disk", + c.memThresholdBytes) + return true + } + } + return false +} + func (c *cdcRecordsStore) Set(key *model.TableWithPkey, rec model.Record) error { if key != nil { _, ok := c.inMemoryRecords[*key] - if ok || len(c.inMemoryRecords) < c.numRecordsSwitchThreshold { + if ok || !c.diskSpillThresholdsExceeded() { c.inMemoryRecords[*key] = rec } else { if c.pebbleDB == nil { - slog.Info(fmt.Sprintf("more than %d primary keys read, spilling to disk", - c.numRecordsSwitchThreshold), + slog.Info(c.thresholdReason, slog.String(string(shared.FlowNameKey), c.flowJobName)) err := c.initPebbleDB() if err != nil { diff --git a/flow/peerdbenv/config.go b/flow/peerdbenv/config.go index e9c0a2b8fc..63633fe413 100644 --- a/flow/peerdbenv/config.go +++ b/flow/peerdbenv/config.go @@ -41,9 +41,19 @@ func PeerDBCDCIdleTimeoutSeconds(providedValue int) time.Duration { return time.Duration(x) * time.Second } -// PEERDB_CDC_DISK_SPILL_THRESHOLD -func PeerDBCDCDiskSpillThreshold() int { - return getEnvInt("PEERDB_CDC_DISK_SPILL_THRESHOLD", 1_000_000) +// PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD +func PeerDBCDCDiskSpillRecordsThreshold() int { + return getEnvInt("PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD", 1_000_000) +} + +// PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD, negative numbers means memory threshold disabled +func PeerDBCDCDiskSpillMemPercentThreshold() int { + return getEnvInt("PEERDB_CDC_DISK_SPILL_MEM_PERCENT_THRESHOLD", -1) +} + +// GOMEMLIMIT is a variable internal to Golang itself, we use this for internal targets, 0 means no maximum +func PeerDBFlowWorkerMaxMemBytes() uint64 { + return getEnvUint[uint64]("GOMEMLIMIT", 0) } // PEERDB_CATALOG_HOST @@ -53,7 +63,7 @@ func PeerDBCatalogHost() string { // PEERDB_CATALOG_PORT func PeerDBCatalogPort() uint32 { - return getEnvUint32("PEERDB_CATALOG_PORT", 5432) + return getEnvUint[uint32]("PEERDB_CATALOG_PORT", 5432) } // PEERDB_CATALOG_USER diff --git a/flow/peerdbenv/env.go b/flow/peerdbenv/env.go index 3bba77c46d..0e1c18b3ca 100644 --- a/flow/peerdbenv/env.go +++ b/flow/peerdbenv/env.go @@ -3,6 +3,8 @@ package peerdbenv import ( "os" "strconv" + + "golang.org/x/exp/constraints" ) // GetEnv returns the value of the environment variable with the given name @@ -32,18 +34,19 @@ func getEnvInt(name string, defaultValue int) int { // getEnvUint32 returns the value of the environment variable with the given name // or defaultValue if the environment variable is not set or is not a valid // uint32 value. -func getEnvUint32(name string, defaultValue uint32) uint32 { +func getEnvUint[T constraints.Unsigned](name string, defaultValue T) T { val, ok := getEnv(name) if !ok { return defaultValue } - i, err := strconv.ParseUint(val, 10, 32) + // widest bit size, truncate later + i, err := strconv.ParseUint(val, 10, 64) if err != nil { return defaultValue } - return uint32(i) + return T(i) } // getEnvBool returns the value of the environment variable with the given name