diff --git a/dashboard/package-lock.json b/dashboard/package-lock.json index ce861243e27cf..13322f53cd404 100644 --- a/dashboard/package-lock.json +++ b/dashboard/package-lock.json @@ -56,7 +56,7 @@ "eslint-plugin-n": "^15.2.5", "eslint-plugin-promise": "^6.0.1", "eslint-plugin-react": "^7.31.6", - "express": "^4.20.0", + "express": "^4.21.1", "prettier": "^2.7.1", "prettier-plugin-organize-imports": "^3.1.1", "typescript": "5.4.2" @@ -3850,21 +3850,6 @@ "integrity": "sha512-Tpp60P6IUJDTuOq/5Z8cdskzJujfwqfOTkrwIwj7IRISpnkJnT6SyJ4PCPnGMoFjC9ddhal5KVIYtAt97ix05A==", "dev": true }, - "node_modules/body-parser/node_modules/qs": { - "version": "6.13.0", - "resolved": "https://registry.npmjs.org/qs/-/qs-6.13.0.tgz", - "integrity": "sha512-+38qI9SOr8tfZ4QmJNplMUxqjbe7LKvvZgWdExBOmd+egZTtjLB67Gu0HRX3u/XOq7UU2Nx6nsjvS16Z9uwfpg==", - "dev": true, - "dependencies": { - "side-channel": "^1.0.6" - }, - "engines": { - "node": ">=0.6" - }, - "funding": { - "url": "https://github.com/sponsors/ljharb" - } - }, "node_modules/bootstrap-icons": { "version": "1.9.1", "resolved": "https://registry.npmjs.org/bootstrap-icons/-/bootstrap-icons-1.9.1.tgz", @@ -4275,9 +4260,9 @@ } }, "node_modules/cookie": { - "version": "0.6.0", - "resolved": "https://registry.npmjs.org/cookie/-/cookie-0.6.0.tgz", - "integrity": "sha512-U71cyTamuh1CRNCfpGY6to28lxvNwPG4Guz/EVjgf3Jmzv0vlDp1atT9eS5dDjMYHucpHbWns6Lwf3BKz6svdw==", + "version": "0.7.1", + "resolved": "https://registry.npmjs.org/cookie/-/cookie-0.7.1.tgz", + "integrity": "sha512-6DnInpx7SJ2AK3+CTUE/ZM0vWTUboZCegxhC2xiIydHR9jNuTAASBrfEpHhiGOZw/nX51bHt6YQl8jsGo4y/0w==", "dev": true, "engines": { "node": ">= 0.6" @@ -6177,9 +6162,9 @@ } }, "node_modules/express": { - "version": "4.20.0", - "resolved": "https://registry.npmjs.org/express/-/express-4.20.0.tgz", - "integrity": "sha512-pLdae7I6QqShF5PnNTCVn4hI91Dx0Grkn2+IAsMTgMIKuQVte2dN9PeGSSAME2FR8anOhVA62QDIUaWVfEXVLw==", + "version": "4.21.1", + "resolved": "https://registry.npmjs.org/express/-/express-4.21.1.tgz", + "integrity": "sha512-YSFlK1Ee0/GC8QaO91tHcDxJiE/X4FbpAyQWkxAvG6AXCuR65YzK8ua6D9hvi/TzUfZMpc+BwuM1IPw8fmQBiQ==", "dev": true, "dependencies": { "accepts": "~1.3.8", @@ -6187,14 +6172,14 @@ "body-parser": "1.20.3", "content-disposition": "0.5.4", "content-type": "~1.0.4", - "cookie": "0.6.0", + "cookie": "0.7.1", "cookie-signature": "1.0.6", "debug": "2.6.9", "depd": "2.0.0", "encodeurl": "~2.0.0", "escape-html": "~1.0.3", "etag": "~1.8.1", - "finalhandler": "1.2.0", + "finalhandler": "1.3.1", "fresh": "0.5.2", "http-errors": "2.0.0", "merge-descriptors": "1.0.3", @@ -6203,11 +6188,11 @@ "parseurl": "~1.3.3", "path-to-regexp": "0.1.10", "proxy-addr": "~2.0.7", - "qs": "6.11.0", + "qs": "6.13.0", "range-parser": "~1.2.1", "safe-buffer": "5.2.1", "send": "0.19.0", - "serve-static": "1.16.0", + "serve-static": "1.16.2", "setprototypeof": "1.2.0", "statuses": "2.0.1", "type-is": "~1.6.18", @@ -6403,13 +6388,13 @@ } }, "node_modules/finalhandler": { - "version": "1.2.0", - "resolved": "https://registry.npmjs.org/finalhandler/-/finalhandler-1.2.0.tgz", - "integrity": "sha512-5uXcUVftlQMFnWC9qu/svkWv3GTd2PfUhK/3PLkYNAe7FbqJMt3515HaxE6eRL74GdsriiwujiawdaB1BpEISg==", + "version": "1.3.1", + "resolved": "https://registry.npmjs.org/finalhandler/-/finalhandler-1.3.1.tgz", + "integrity": "sha512-6BN9trH7bp3qvnrRyzsBz+g3lZxTNZTbVO2EV1CS0WIcDbawYVdYvGflME/9QP0h0pYlCDBCTjYa9nZzMDpyxQ==", "dev": true, "dependencies": { "debug": "2.6.9", - "encodeurl": "~1.0.2", + "encodeurl": "~2.0.0", "escape-html": "~1.0.3", "on-finished": "2.4.1", "parseurl": "~1.3.3", @@ -6429,6 +6414,15 @@ "ms": "2.0.0" } }, + "node_modules/finalhandler/node_modules/encodeurl": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/encodeurl/-/encodeurl-2.0.0.tgz", + "integrity": "sha512-Q0n9HRi4m6JuGIV1eFlmvJB7ZEVxu93IrMyiMsGC0lrMJMWzRgx6WGquyfQgZVb31vhGgXnfmPNNXmxnOkRBrg==", + "dev": true, + "engines": { + "node": ">= 0.8" + } + }, "node_modules/finalhandler/node_modules/ms": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/ms/-/ms-2.0.0.tgz", @@ -9388,12 +9382,12 @@ "integrity": "sha512-QFADYnsVoBMw1srW7OVKEYjG+MbIa49s54w1MA1EDY6r2r/sTcKKYqRX1f4GYvnXP7eN/Pe9HFcX+hwzmrXRHA==" }, "node_modules/qs": { - "version": "6.11.0", - "resolved": "https://registry.npmjs.org/qs/-/qs-6.11.0.tgz", - "integrity": "sha512-MvjoMCJwEarSbUYk5O+nmoSzSutSsTwF85zcHPQ9OrlFoZOYIjaqBAJIqIXjptyD5vThxGq52Xu/MaJzRkIk4Q==", + "version": "6.13.0", + "resolved": "https://registry.npmjs.org/qs/-/qs-6.13.0.tgz", + "integrity": "sha512-+38qI9SOr8tfZ4QmJNplMUxqjbe7LKvvZgWdExBOmd+egZTtjLB67Gu0HRX3u/XOq7UU2Nx6nsjvS16Z9uwfpg==", "dev": true, "dependencies": { - "side-channel": "^1.0.4" + "side-channel": "^1.0.6" }, "engines": { "node": ">=0.6" @@ -10206,63 +10200,27 @@ "dev": true }, "node_modules/serve-static": { - "version": "1.16.0", - "resolved": "https://registry.npmjs.org/serve-static/-/serve-static-1.16.0.tgz", - "integrity": "sha512-pDLK8zwl2eKaYrs8mrPZBJua4hMplRWJ1tIFksVC3FtBEBnl8dxgeHtsaMS8DhS9i4fLObaon6ABoc4/hQGdPA==", + "version": "1.16.2", + "resolved": "https://registry.npmjs.org/serve-static/-/serve-static-1.16.2.tgz", + "integrity": "sha512-VqpjJZKadQB/PEbEwvFdO43Ax5dFBZ2UECszz8bQ7pi7wt//PWe1P6MN7eCnjsatYtBT6EuiClbjSWP2WrIoTw==", "dev": true, "dependencies": { - "encodeurl": "~1.0.2", + "encodeurl": "~2.0.0", "escape-html": "~1.0.3", "parseurl": "~1.3.3", - "send": "0.18.0" + "send": "0.19.0" }, "engines": { "node": ">= 0.8.0" } }, - "node_modules/serve-static/node_modules/debug": { - "version": "2.6.9", - "resolved": "https://registry.npmjs.org/debug/-/debug-2.6.9.tgz", - "integrity": "sha512-bC7ElrdJaJnPbAP+1EotYvqZsb3ecl5wi6Bfi6BJTUcNowp6cvspg0jXznRTKDjm/E7AdgFBVeAPVMNcKGsHMA==", - "dev": true, - "dependencies": { - "ms": "2.0.0" - } - }, - "node_modules/serve-static/node_modules/debug/node_modules/ms": { + "node_modules/serve-static/node_modules/encodeurl": { "version": "2.0.0", - "resolved": "https://registry.npmjs.org/ms/-/ms-2.0.0.tgz", - "integrity": "sha512-Tpp60P6IUJDTuOq/5Z8cdskzJujfwqfOTkrwIwj7IRISpnkJnT6SyJ4PCPnGMoFjC9ddhal5KVIYtAt97ix05A==", - "dev": true - }, - "node_modules/serve-static/node_modules/ms": { - "version": "2.1.3", - "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.3.tgz", - "integrity": "sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==", - "dev": true - }, - "node_modules/serve-static/node_modules/send": { - "version": "0.18.0", - "resolved": "https://registry.npmjs.org/send/-/send-0.18.0.tgz", - "integrity": "sha512-qqWzuOjSFOuqPjFe4NOsMLafToQQwBSOEpS+FwEt3A2V3vKubTquT3vmLTQpFgMXp8AlFWFuP1qKaJZOtPpVXg==", + "resolved": "https://registry.npmjs.org/encodeurl/-/encodeurl-2.0.0.tgz", + "integrity": "sha512-Q0n9HRi4m6JuGIV1eFlmvJB7ZEVxu93IrMyiMsGC0lrMJMWzRgx6WGquyfQgZVb31vhGgXnfmPNNXmxnOkRBrg==", "dev": true, - "dependencies": { - "debug": "2.6.9", - "depd": "2.0.0", - "destroy": "1.2.0", - "encodeurl": "~1.0.2", - "escape-html": "~1.0.3", - "etag": "~1.8.1", - "fresh": "0.5.2", - "http-errors": "2.0.0", - "mime": "1.6.0", - "ms": "2.1.3", - "on-finished": "2.4.1", - "range-parser": "~1.2.1", - "statuses": "2.0.1" - }, "engines": { - "node": ">= 0.8.0" + "node": ">= 0.8" } }, "node_modules/set-blocking": { @@ -14629,15 +14587,6 @@ "resolved": "https://registry.npmjs.org/ms/-/ms-2.0.0.tgz", "integrity": "sha512-Tpp60P6IUJDTuOq/5Z8cdskzJujfwqfOTkrwIwj7IRISpnkJnT6SyJ4PCPnGMoFjC9ddhal5KVIYtAt97ix05A==", "dev": true - }, - "qs": { - "version": "6.13.0", - "resolved": "https://registry.npmjs.org/qs/-/qs-6.13.0.tgz", - "integrity": "sha512-+38qI9SOr8tfZ4QmJNplMUxqjbe7LKvvZgWdExBOmd+egZTtjLB67Gu0HRX3u/XOq7UU2Nx6nsjvS16Z9uwfpg==", - "dev": true, - "requires": { - "side-channel": "^1.0.6" - } } } }, @@ -14930,9 +14879,9 @@ } }, "cookie": { - "version": "0.6.0", - "resolved": "https://registry.npmjs.org/cookie/-/cookie-0.6.0.tgz", - "integrity": "sha512-U71cyTamuh1CRNCfpGY6to28lxvNwPG4Guz/EVjgf3Jmzv0vlDp1atT9eS5dDjMYHucpHbWns6Lwf3BKz6svdw==", + "version": "0.7.1", + "resolved": "https://registry.npmjs.org/cookie/-/cookie-0.7.1.tgz", + "integrity": "sha512-6DnInpx7SJ2AK3+CTUE/ZM0vWTUboZCegxhC2xiIydHR9jNuTAASBrfEpHhiGOZw/nX51bHt6YQl8jsGo4y/0w==", "dev": true }, "cookie-signature": { @@ -16346,9 +16295,9 @@ } }, "express": { - "version": "4.20.0", - "resolved": "https://registry.npmjs.org/express/-/express-4.20.0.tgz", - "integrity": "sha512-pLdae7I6QqShF5PnNTCVn4hI91Dx0Grkn2+IAsMTgMIKuQVte2dN9PeGSSAME2FR8anOhVA62QDIUaWVfEXVLw==", + "version": "4.21.1", + "resolved": "https://registry.npmjs.org/express/-/express-4.21.1.tgz", + "integrity": "sha512-YSFlK1Ee0/GC8QaO91tHcDxJiE/X4FbpAyQWkxAvG6AXCuR65YzK8ua6D9hvi/TzUfZMpc+BwuM1IPw8fmQBiQ==", "dev": true, "requires": { "accepts": "~1.3.8", @@ -16356,14 +16305,14 @@ "body-parser": "1.20.3", "content-disposition": "0.5.4", "content-type": "~1.0.4", - "cookie": "0.6.0", + "cookie": "0.7.1", "cookie-signature": "1.0.6", "debug": "2.6.9", "depd": "2.0.0", "encodeurl": "~2.0.0", "escape-html": "~1.0.3", "etag": "~1.8.1", - "finalhandler": "1.2.0", + "finalhandler": "1.3.1", "fresh": "0.5.2", "http-errors": "2.0.0", "merge-descriptors": "1.0.3", @@ -16372,11 +16321,11 @@ "parseurl": "~1.3.3", "path-to-regexp": "0.1.10", "proxy-addr": "~2.0.7", - "qs": "6.11.0", + "qs": "6.13.0", "range-parser": "~1.2.1", "safe-buffer": "5.2.1", "send": "0.19.0", - "serve-static": "1.16.0", + "serve-static": "1.16.2", "setprototypeof": "1.2.0", "statuses": "2.0.1", "type-is": "~1.6.18", @@ -16537,13 +16486,13 @@ } }, "finalhandler": { - "version": "1.2.0", - "resolved": "https://registry.npmjs.org/finalhandler/-/finalhandler-1.2.0.tgz", - "integrity": "sha512-5uXcUVftlQMFnWC9qu/svkWv3GTd2PfUhK/3PLkYNAe7FbqJMt3515HaxE6eRL74GdsriiwujiawdaB1BpEISg==", + "version": "1.3.1", + "resolved": "https://registry.npmjs.org/finalhandler/-/finalhandler-1.3.1.tgz", + "integrity": "sha512-6BN9trH7bp3qvnrRyzsBz+g3lZxTNZTbVO2EV1CS0WIcDbawYVdYvGflME/9QP0h0pYlCDBCTjYa9nZzMDpyxQ==", "dev": true, "requires": { "debug": "2.6.9", - "encodeurl": "~1.0.2", + "encodeurl": "~2.0.0", "escape-html": "~1.0.3", "on-finished": "2.4.1", "parseurl": "~1.3.3", @@ -16560,6 +16509,12 @@ "ms": "2.0.0" } }, + "encodeurl": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/encodeurl/-/encodeurl-2.0.0.tgz", + "integrity": "sha512-Q0n9HRi4m6JuGIV1eFlmvJB7ZEVxu93IrMyiMsGC0lrMJMWzRgx6WGquyfQgZVb31vhGgXnfmPNNXmxnOkRBrg==", + "dev": true + }, "ms": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/ms/-/ms-2.0.0.tgz", @@ -18686,12 +18641,12 @@ "integrity": "sha512-QFADYnsVoBMw1srW7OVKEYjG+MbIa49s54w1MA1EDY6r2r/sTcKKYqRX1f4GYvnXP7eN/Pe9HFcX+hwzmrXRHA==" }, "qs": { - "version": "6.11.0", - "resolved": "https://registry.npmjs.org/qs/-/qs-6.11.0.tgz", - "integrity": "sha512-MvjoMCJwEarSbUYk5O+nmoSzSutSsTwF85zcHPQ9OrlFoZOYIjaqBAJIqIXjptyD5vThxGq52Xu/MaJzRkIk4Q==", + "version": "6.13.0", + "resolved": "https://registry.npmjs.org/qs/-/qs-6.13.0.tgz", + "integrity": "sha512-+38qI9SOr8tfZ4QmJNplMUxqjbe7LKvvZgWdExBOmd+egZTtjLB67Gu0HRX3u/XOq7UU2Nx6nsjvS16Z9uwfpg==", "dev": true, "requires": { - "side-channel": "^1.0.4" + "side-channel": "^1.0.6" } }, "quadprog": { @@ -19270,60 +19225,22 @@ } }, "serve-static": { - "version": "1.16.0", - "resolved": "https://registry.npmjs.org/serve-static/-/serve-static-1.16.0.tgz", - "integrity": "sha512-pDLK8zwl2eKaYrs8mrPZBJua4hMplRWJ1tIFksVC3FtBEBnl8dxgeHtsaMS8DhS9i4fLObaon6ABoc4/hQGdPA==", + "version": "1.16.2", + "resolved": "https://registry.npmjs.org/serve-static/-/serve-static-1.16.2.tgz", + "integrity": "sha512-VqpjJZKadQB/PEbEwvFdO43Ax5dFBZ2UECszz8bQ7pi7wt//PWe1P6MN7eCnjsatYtBT6EuiClbjSWP2WrIoTw==", "dev": true, "requires": { - "encodeurl": "~1.0.2", + "encodeurl": "~2.0.0", "escape-html": "~1.0.3", "parseurl": "~1.3.3", - "send": "0.18.0" + "send": "0.19.0" }, "dependencies": { - "debug": { - "version": "2.6.9", - "resolved": "https://registry.npmjs.org/debug/-/debug-2.6.9.tgz", - "integrity": "sha512-bC7ElrdJaJnPbAP+1EotYvqZsb3ecl5wi6Bfi6BJTUcNowp6cvspg0jXznRTKDjm/E7AdgFBVeAPVMNcKGsHMA==", - "dev": true, - "requires": { - "ms": "2.0.0" - }, - "dependencies": { - "ms": { - "version": "2.0.0", - "resolved": "https://registry.npmjs.org/ms/-/ms-2.0.0.tgz", - "integrity": "sha512-Tpp60P6IUJDTuOq/5Z8cdskzJujfwqfOTkrwIwj7IRISpnkJnT6SyJ4PCPnGMoFjC9ddhal5KVIYtAt97ix05A==", - "dev": true - } - } - }, - "ms": { - "version": "2.1.3", - "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.3.tgz", - "integrity": "sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==", + "encodeurl": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/encodeurl/-/encodeurl-2.0.0.tgz", + "integrity": "sha512-Q0n9HRi4m6JuGIV1eFlmvJB7ZEVxu93IrMyiMsGC0lrMJMWzRgx6WGquyfQgZVb31vhGgXnfmPNNXmxnOkRBrg==", "dev": true - }, - "send": { - "version": "0.18.0", - "resolved": "https://registry.npmjs.org/send/-/send-0.18.0.tgz", - "integrity": "sha512-qqWzuOjSFOuqPjFe4NOsMLafToQQwBSOEpS+FwEt3A2V3vKubTquT3vmLTQpFgMXp8AlFWFuP1qKaJZOtPpVXg==", - "dev": true, - "requires": { - "debug": "2.6.9", - "depd": "2.0.0", - "destroy": "1.2.0", - "encodeurl": "~1.0.2", - "escape-html": "~1.0.3", - "etag": "~1.8.1", - "fresh": "0.5.2", - "http-errors": "2.0.0", - "mime": "1.6.0", - "ms": "2.1.3", - "on-finished": "2.4.1", - "range-parser": "~1.2.1", - "statuses": "2.0.1" - } } } }, diff --git a/dashboard/package.json b/dashboard/package.json index 79897d63b19c8..66cdc9477fad6 100644 --- a/dashboard/package.json +++ b/dashboard/package.json @@ -63,7 +63,7 @@ "eslint-plugin-n": "^15.2.5", "eslint-plugin-promise": "^6.0.1", "eslint-plugin-react": "^7.31.6", - "express": "^4.20.0", + "express": "^4.21.1", "prettier": "^2.7.1", "prettier-plugin-organize-imports": "^3.1.1", "typescript": "5.4.2" diff --git a/proto/meta.proto b/proto/meta.proto index b6c9521ad673c..15a16f36bdddc 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -753,8 +753,8 @@ message EventLog { string error = 3; } message EventCollectBarrierFail { - uint64 prev_epoch = 1; - uint64 cur_epoch = 2; + reserved 1, 2; + reserved "prev_epoch", "cur_epoch"; string error = 3; } message EventWorkerNodePanic { diff --git a/src/common/src/config.rs b/src/common/src/config.rs index c3fc78919a3ef..64d7675903ec4 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -871,6 +871,12 @@ pub struct StorageConfig { #[serde(default = "default::storage::compactor_max_overlap_sst_count")] pub compactor_max_overlap_sst_count: usize, + /// The maximum number of meta files that can be preloaded. + /// If the number of meta files exceeds this value, the compactor will try to compute parallelism only through `SstableInfo`, no longer preloading `SstableMeta`. + /// This is to prevent the compactor from consuming too much memory, but it may cause the compactor to be less efficient. + #[serde(default = "default::storage::compactor_max_preload_meta_file_count")] + pub compactor_max_preload_meta_file_count: usize, + /// Object storage configuration /// 1. General configuration /// 2. Some special configuration of Backend @@ -1795,6 +1801,10 @@ pub mod default { 64 } + pub fn compactor_max_preload_meta_file_count() -> usize { + 32 + } + // deprecated pub fn table_info_statistic_history_times() -> usize { 240 diff --git a/src/config/docs.md b/src/config/docs.md index 4a25867ed63c1..bfe6a2fb5429c 100644 --- a/src/config/docs.md +++ b/src/config/docs.md @@ -125,6 +125,7 @@ This page is automatically generated by `./risedev generate-example-config` | compactor_fast_max_compact_task_size | | 2147483648 | | compactor_iter_max_io_retry_times | | 8 | | compactor_max_overlap_sst_count | | 64 | +| compactor_max_preload_meta_file_count | The maximum number of meta files that can be preloaded. If the number of meta files exceeds this value, the compactor will try to compute parallelism only through `SstableInfo`, no longer preloading `SstableMeta`. This is to prevent the compactor from consuming too much memory, but it may cause the compactor to be less efficient. | 32 | | compactor_max_sst_key_count | | 2097152 | | compactor_max_sst_size | | 536870912 | | compactor_max_task_multiplier | Compactor calculates the maximum number of tasks that can be executed on the node based on `worker_num` and `compactor_max_task_multiplier`. `max_pull_task_count` = `worker_num` * `compactor_max_task_multiplier` | 3.0 | diff --git a/src/config/example.toml b/src/config/example.toml index 1d540b429a876..7a01ff5254e7f 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -176,6 +176,7 @@ compactor_iter_max_io_retry_times = 8 table_info_statistic_history_times = 240 mem_table_spill_threshold = 4194304 compactor_max_overlap_sst_count = 64 +compactor_max_preload_meta_file_count = 32 time_travel_version_cache_capacity = 32 [storage.cache.block_cache_eviction] diff --git a/src/connector/src/connector_common/iceberg/mod.rs b/src/connector/src/connector_common/iceberg/mod.rs index bf96e474eee80..3d1b016577c6d 100644 --- a/src/connector/src/connector_common/iceberg/mod.rs +++ b/src/connector/src/connector_common/iceberg/mod.rs @@ -185,6 +185,10 @@ impl IcebergCommon { "org.apache.iceberg.aws.s3.S3FileIO".to_string(), ); + // suppress log of S3FileIO like: Unclosed S3FileIO instance created by... + java_catalog_configs + .insert("init-creation-stacktrace".to_string(), "false".to_string()); + if let Some(endpoint) = &self.endpoint { java_catalog_configs .insert("s3.endpoint".to_string(), endpoint.clone().to_string()); @@ -258,7 +262,6 @@ mod v1 { let catalog_type = self.catalog_type().to_string(); - iceberg_configs.insert(CATALOG_TYPE.to_string(), catalog_type.clone()); iceberg_configs.insert(CATALOG_NAME.to_string(), self.catalog_name()); match catalog_type.as_str() { @@ -267,14 +270,16 @@ mod v1 { format!("iceberg.catalog.{}.warehouse", self.catalog_name()), self.warehouse_path.clone(), ); + iceberg_configs.insert(CATALOG_TYPE.to_string(), "storage".into()); } - "rest" => { + "rest_rust" => { let uri = self .catalog_uri .clone() .with_context(|| "`catalog.uri` must be set in rest catalog".to_string())?; iceberg_configs .insert(format!("iceberg.catalog.{}.uri", self.catalog_name()), uri); + iceberg_configs.insert(CATALOG_TYPE.to_string(), "rest".into()); } _ => { bail!( @@ -351,7 +356,7 @@ mod v1 { java_catalog_props: &HashMap, ) -> ConnectorResult { match self.catalog_type() { - "storage" | "rest" => { + "storage" | "rest_rust" => { let iceberg_configs = self.build_iceberg_configs()?; let catalog = load_catalog(&iceberg_configs).await?; Ok(catalog) @@ -359,7 +364,8 @@ mod v1 { catalog_type if catalog_type == "hive" || catalog_type == "jdbc" - || catalog_type == "glue" => + || catalog_type == "glue" + || catalog_type == "rest" => { // Create java catalog let (base_catalog_config, java_catalog_props) = @@ -368,6 +374,7 @@ mod v1 { "hive" => "org.apache.iceberg.hive.HiveCatalog", "jdbc" => "org.apache.iceberg.jdbc.JdbcCatalog", "glue" => "org.apache.iceberg.aws.glue.GlueCatalog", + "rest" => "org.apache.iceberg.rest.RESTCatalog", _ => unreachable!(), }; @@ -444,7 +451,7 @@ mod v2 { let catalog = storage_catalog::StorageCatalog::new(config)?; Ok(Arc::new(catalog)) } - "rest" => { + "rest_rust" => { let mut iceberg_configs = HashMap::new(); if let Some(region) = &self.region { iceberg_configs.insert(S3_REGION.to_string(), region.clone().to_string()); @@ -512,13 +519,18 @@ mod v2 { let catalog = iceberg_catalog_glue::GlueCatalog::new(config).await?; Ok(Arc::new(catalog)) } - catalog_type if catalog_type == "hive" || catalog_type == "jdbc" => { + catalog_type + if catalog_type == "hive" + || catalog_type == "jdbc" + || catalog_type == "rest" => + { // Create java catalog let (base_catalog_config, java_catalog_props) = self.build_jni_catalog_configs(java_catalog_props)?; let catalog_impl = match catalog_type { "hive" => "org.apache.iceberg.hive.HiveCatalog", "jdbc" => "org.apache.iceberg.jdbc.JdbcCatalog", + "rest" => "org.apache.iceberg.rest.RESTCatalog", _ => unreachable!(), }; diff --git a/src/expr/impl/src/aggregate/first_last_value.rs b/src/expr/impl/src/aggregate/first_last_value.rs new file mode 100644 index 0000000000000..841442148f722 --- /dev/null +++ b/src/expr/impl/src/aggregate/first_last_value.rs @@ -0,0 +1,88 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::types::{Datum, ScalarRefImpl}; +use risingwave_common_estimate_size::EstimateSize; +use risingwave_expr::aggregate; +use risingwave_expr::aggregate::AggStateDyn; + +/// Note that different from `min` and `max`, `first_value` doesn't ignore `NULL` values. +/// +/// ```slt +/// statement ok +/// create table t(v1 int, ts int); +/// +/// statement ok +/// insert into t values (null, 1), (2, 2), (null, 3); +/// +/// query I +/// select first_value(v1 order by ts) from t; +/// ---- +/// NULL +/// +/// statement ok +/// drop table t; +/// ``` +#[aggregate("first_value(any) -> any")] +fn first_value(state: &mut FirstValueState, input: Option>) { + if state.0.is_none() { + state.0 = Some(input.map(|x| x.into_scalar_impl())); + } +} + +#[derive(Debug, Clone, Default, EstimateSize)] +struct FirstValueState(Option); + +impl AggStateDyn for FirstValueState {} + +impl From<&FirstValueState> for Datum { + fn from(state: &FirstValueState) -> Self { + if let Some(state) = &state.0 { + state.clone() + } else { + None + } + } +} + +/// Note that different from `min` and `max`, `last_value` doesn't ignore `NULL` values. +/// +/// ```slt +/// statement ok +/// create table t(v1 int, ts int); +/// +/// statement ok +/// insert into t values (null, 1), (2, 2), (null, 3); +/// +/// query I +/// select last_value(v1 order by ts) from t; +/// ---- +/// NULL +/// +/// statement ok +/// drop table t; +/// ``` +#[aggregate("last_value(*) -> auto", state = "ref")] // TODO(rc): `last_value(any) -> any` +fn last_value(_: Option, input: Option) -> Option { + input +} + +#[aggregate("internal_last_seen_value(*) -> auto", state = "ref", internal)] +fn internal_last_seen_value(state: T, input: T, retract: bool) -> T { + if retract { + state + } else { + input + } +} diff --git a/src/expr/impl/src/aggregate/general.rs b/src/expr/impl/src/aggregate/general.rs index 0c94312335b4b..daaea5e782fd1 100644 --- a/src/expr/impl/src/aggregate/general.rs +++ b/src/expr/impl/src/aggregate/general.rs @@ -124,25 +124,6 @@ fn max(state: T, input: T) -> T { state.max(input) } -#[aggregate("first_value(*) -> auto", state = "ref")] -fn first_value(state: T, _: T) -> T { - state -} - -#[aggregate("last_value(*) -> auto", state = "ref")] -fn last_value(_: T, input: T) -> T { - input -} - -#[aggregate("internal_last_seen_value(*) -> auto", state = "ref", internal)] -fn internal_last_seen_value(state: T, input: T, retract: bool) -> T { - if retract { - state - } else { - input - } -} - /// Note the following corner cases: /// /// ```slt diff --git a/src/expr/impl/src/aggregate/mod.rs b/src/expr/impl/src/aggregate/mod.rs index 349574018fedf..881465b4cf82f 100644 --- a/src/expr/impl/src/aggregate/mod.rs +++ b/src/expr/impl/src/aggregate/mod.rs @@ -20,6 +20,7 @@ mod bit_or; mod bit_xor; mod bool_and; mod bool_or; +mod first_last_value; mod general; mod jsonb_agg; mod mode; diff --git a/src/frontend/planner_test/tests/testdata/input/update.yaml b/src/frontend/planner_test/tests/testdata/input/update.yaml index 2322f68ad95e0..65c0f47eb4cd4 100644 --- a/src/frontend/planner_test/tests/testdata/input/update.yaml +++ b/src/frontend/planner_test/tests/testdata/input/update.yaml @@ -98,3 +98,10 @@ update t set a = a + 1; expected_outputs: - batch_distributed_plan +- name: update table with subquery in the set clause + sql: | + create table t1 (v1 int primary key, v2 int); + create table t2 (v1 int primary key, v2 int); + update t1 set v1 = (select v1 from t2 where t1.v2 = t2.v2); + expected_outputs: + - binder_error diff --git a/src/frontend/planner_test/tests/testdata/output/update.yaml b/src/frontend/planner_test/tests/testdata/output/update.yaml index 884b091cf8af7..eae928bb858a7 100644 --- a/src/frontend/planner_test/tests/testdata/output/update.yaml +++ b/src/frontend/planner_test/tests/testdata/output/update.yaml @@ -165,3 +165,9 @@ └─BatchUpdate { table: t, exprs: [($0 + 1:Int32), $1, $2] } └─BatchExchange { order: [], dist: HashShard(t.a, t.b, t._row_id) } └─BatchScan { table: t, columns: [t.a, t.b, t._row_id], distribution: UpstreamHashShard(t._row_id) } +- name: update table with subquery in the set clause + sql: | + create table t1 (v1 int primary key, v2 int); + create table t2 (v1 int primary key, v2 int); + update t1 set v1 = (select v1 from t2 where t1.v2 = t2.v2); + binder_error: 'Bind error: subquery on the right side of assignment is unsupported' diff --git a/src/frontend/src/binder/update.rs b/src/frontend/src/binder/update.rs index cbe51f9ec69bd..6775c2173e00b 100644 --- a/src/frontend/src/binder/update.rs +++ b/src/frontend/src/binder/update.rs @@ -17,7 +17,6 @@ use std::collections::{BTreeMap, HashMap}; use fixedbitset::FixedBitSet; use itertools::Itertools; -use risingwave_common::bail_not_implemented; use risingwave_common::catalog::{Schema, TableVersionId}; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_sqlparser::ast::{Assignment, AssignmentValue, Expr, ObjectName, SelectItem}; @@ -129,15 +128,17 @@ impl Binder { for Assignment { id, value } in assignments { // FIXME: Parsing of `id` is not strict. It will even treat `a.b` as `(a, b)`. let assignments = match (id.as_slice(), value) { + // _ = (subquery) + (_ids, AssignmentValue::Expr(Expr::Subquery(_))) => { + return Err(ErrorCode::BindError( + "subquery on the right side of assignment is unsupported".to_owned(), + ) + .into()) + } // col = expr ([id], value) => { vec![(id.clone(), value)] } - - // (col1, col2) = (subquery) - (_ids, AssignmentValue::Expr(Expr::Subquery(_))) => { - bail_not_implemented!("subquery on the right side of multi-assignment"); - } // (col1, col2) = (expr1, expr2) // TODO: support `DEFAULT` in multiple assignments (ids, AssignmentValue::Expr(Expr::Row(values))) if ids.len() == values.len() => id diff --git a/src/meta/src/barrier/checkpoint/control.rs b/src/meta/src/barrier/checkpoint/control.rs index a064848dc2675..beb77b3217ad7 100644 --- a/src/meta/src/barrier/checkpoint/control.rs +++ b/src/meta/src/barrier/checkpoint/control.rs @@ -21,8 +21,10 @@ use fail::fail_point; use prometheus::HistogramTimer; use risingwave_common::catalog::{DatabaseId, TableId}; use risingwave_meta_model::WorkerId; +use risingwave_pb::ddl_service::DdlProgress; use risingwave_pb::hummock::HummockVersionStats; use risingwave_pb::meta::PausedReason; +use risingwave_pb::stream_plan::PbSubscriptionUpstreamInfo; use risingwave_pb::stream_service::BarrierCompleteResponse; use tracing::{debug, warn}; @@ -35,22 +37,32 @@ use crate::barrier::notifier::Notifier; use crate::barrier::progress::{CreateMviewProgressTracker, TrackingCommand, TrackingJob}; use crate::barrier::rpc::{from_partial_graph_id, ControlStreamManager}; use crate::barrier::schedule::{NewBarrier, PeriodicBarriers}; -use crate::barrier::utils::{collect_commit_epoch_info, collect_creating_job_commit_epoch_info}; +use crate::barrier::utils::collect_creating_job_commit_epoch_info; use crate::barrier::{ BarrierKind, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType, SnapshotBackfillInfo, TracedEpoch, }; use crate::manager::ActiveStreamingWorkerNodes; use crate::rpc::metrics::GLOBAL_META_METRICS; -use crate::MetaResult; +use crate::{MetaError, MetaResult}; #[derive(Default)] pub(crate) struct CheckpointControl { - pub(crate) databases: HashMap, - pub(crate) hummock_version_stats: HummockVersionStats, + databases: HashMap, + hummock_version_stats: HummockVersionStats, } impl CheckpointControl { + pub(crate) fn new( + databases: HashMap, + hummock_version_stats: HummockVersionStats, + ) -> Self { + Self { + databases, + hummock_version_stats, + } + } + pub(crate) fn ack_completed(&mut self, output: BarrierCompleteOutput) { self.hummock_version_stats = output.hummock_version_stats; for (database_id, (command_prev_epoch, creating_job_epochs)) in output.epochs_to_ack { @@ -204,23 +216,88 @@ impl CheckpointControl { .values() .for_each(|database| database.update_barrier_nums_metrics()); } + + pub(crate) fn gen_ddl_progress(&self) -> HashMap { + let mut progress = HashMap::new(); + for database_checkpoint_control in self.databases.values() { + // Progress of normal backfill + progress.extend( + database_checkpoint_control + .create_mview_tracker + .gen_ddl_progress(), + ); + // Progress of snapshot backfill + for creating_job in database_checkpoint_control + .creating_streaming_job_controls + .values() + { + progress.extend([( + creating_job.info.table_fragments.table_id().table_id, + creating_job.gen_ddl_progress(), + )]); + } + } + progress + } + + pub(crate) fn is_failed_at_worker_err(&self, worker_id: WorkerId) -> bool { + for database_checkpoint_control in self.databases.values() { + let failed_barrier = + database_checkpoint_control.barrier_wait_collect_from_worker(worker_id as _); + if failed_barrier.is_some() + || database_checkpoint_control + .state + .inflight_graph_info + .contains_worker(worker_id as _) + || database_checkpoint_control + .creating_streaming_job_controls + .values() + .any(|job| job.is_wait_on_worker(worker_id)) + { + return true; + } + } + false + } + + pub(crate) fn clear_on_err(&mut self, err: &MetaError) { + for (_, node) in self + .databases + .values_mut() + .flat_map(|database| take(&mut database.command_ctx_queue)) + { + for notifier in node.notifiers { + notifier.notify_failed(err.clone()); + } + node.enqueue_time.observe_duration(); + } + self.databases + .values_mut() + .for_each(|database| database.create_mview_tracker.abort_all()); + } + + pub(crate) fn subscriptions(&self) -> impl Iterator + '_ { + self.databases + .values() + .flat_map(|database| &database.state.inflight_subscription_info) + } } /// Controls the concurrent execution of commands. pub(crate) struct DatabaseCheckpointControl { database_id: DatabaseId, - pub(crate) state: BarrierWorkerState, + state: BarrierWorkerState, /// Save the state and message of barrier in order. /// Key is the `prev_epoch`. - pub(crate) command_ctx_queue: BTreeMap, + command_ctx_queue: BTreeMap, /// The barrier that are completing. /// Some((`prev_epoch`, `should_pause_inject_barrier`)) completing_barrier: Option<(u64, bool)>, - pub(crate) creating_streaming_job_controls: HashMap, + creating_streaming_job_controls: HashMap, - pub(crate) create_mview_tracker: CreateMviewProgressTracker, + create_mview_tracker: CreateMviewProgressTracker, } impl DatabaseCheckpointControl { @@ -531,9 +608,12 @@ impl DatabaseCheckpointControl { let (_, mut node) = self.command_ctx_queue.pop_first().expect("non-empty"); assert!(node.state.creating_jobs_to_wait.is_empty()); assert!(node.state.node_to_collect.is_empty()); - let mut finished_jobs = self - .create_mview_tracker - .apply_collected_command(&node, hummock_version_stats); + let mut finished_jobs = self.create_mview_tracker.apply_collected_command( + node.command_ctx.command.as_ref(), + &node.command_ctx.barrier_info, + &node.state.resps, + hummock_version_stats, + ); if !node.command_ctx.barrier_info.kind.is_checkpoint() { assert!(finished_jobs.is_empty()); node.notifiers.into_iter().for_each(|notifier| { @@ -561,10 +641,9 @@ impl DatabaseCheckpointControl { })); }); let task = task.get_or_insert_default(); - collect_commit_epoch_info( + node.command_ctx.collect_commit_epoch_info( &mut task.commit_info, take(&mut node.state.resps), - &node.command_ctx, self.collect_backfill_pinned_upstream_log_epoch(), ); self.completing_barrier = Some(( @@ -630,25 +709,25 @@ impl DatabaseCheckpointControl { } /// The state and message of this barrier, a node for concurrent checkpoint. -pub(crate) struct EpochNode { +struct EpochNode { /// Timer for recording barrier latency, taken after `complete_barriers`. - pub(crate) enqueue_time: HistogramTimer, + enqueue_time: HistogramTimer, /// Whether this barrier is in-flight or completed. - pub(crate) state: BarrierEpochState, + state: BarrierEpochState, /// Context of this command to generate barrier and do some post jobs. - pub(crate) command_ctx: CommandContext, + command_ctx: CommandContext, /// Notifiers of this barrier. - pub(crate) notifiers: Vec, + notifiers: Vec, } #[derive(Debug)] /// The state of barrier. -pub(crate) struct BarrierEpochState { - pub(crate) node_to_collect: HashSet, +struct BarrierEpochState { + node_to_collect: HashSet, - pub(crate) resps: Vec, + resps: Vec, creating_jobs_to_wait: HashSet, diff --git a/src/meta/src/barrier/checkpoint/mod.rs b/src/meta/src/barrier/checkpoint/mod.rs index a5144913258c2..f9840f5eb3873 100644 --- a/src/meta/src/barrier/checkpoint/mod.rs +++ b/src/meta/src/barrier/checkpoint/mod.rs @@ -16,5 +16,5 @@ mod control; mod creating_job; mod state; -pub(super) use control::{CheckpointControl, DatabaseCheckpointControl, EpochNode}; +pub(super) use control::{CheckpointControl, DatabaseCheckpointControl}; pub(super) use state::BarrierWorkerState; diff --git a/src/meta/src/barrier/checkpoint/state.rs b/src/meta/src/barrier/checkpoint/state.rs index 871ff7e7faf60..8f1f91272b2f8 100644 --- a/src/meta/src/barrier/checkpoint/state.rs +++ b/src/meta/src/barrier/checkpoint/state.rs @@ -35,16 +35,16 @@ pub(crate) struct BarrierWorkerState { pending_non_checkpoint_barriers: Vec, /// Inflight running actors info. - pub(crate) inflight_graph_info: InflightDatabaseInfo, + pub(super) inflight_graph_info: InflightDatabaseInfo, - pub(crate) inflight_subscription_info: InflightSubscriptionInfo, + pub(super) inflight_subscription_info: InflightSubscriptionInfo, /// Whether the cluster is paused and the reason. paused_reason: Option, } impl BarrierWorkerState { - pub fn new() -> Self { + pub(super) fn new() -> Self { Self { in_flight_prev_epoch: TracedEpoch::new(Epoch::now()), pending_non_checkpoint_barriers: vec![], diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 23a8857e9ad11..73ebc8d446295 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -12,15 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; use std::fmt::Formatter; use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::TableId; use risingwave_common::hash::ActorMapping; +use risingwave_common::must_match; use risingwave_common::types::Timestamptz; use risingwave_common::util::epoch::Epoch; use risingwave_connector::source::SplitImpl; +use risingwave_hummock_sdk::change_log::build_table_change_log_delta; use risingwave_meta_model::WorkerId; use risingwave_pb::catalog::{CreateType, Table}; use risingwave_pb::common::PbWorkerNode; @@ -36,12 +39,15 @@ use risingwave_pb::stream_plan::{ DropSubscriptionsMutation, PauseMutation, ResumeMutation, SourceChangeSplitMutation, StopMutation, StreamActor, SubscriptionUpstreamInfo, ThrottleMutation, UpdateMutation, }; +use risingwave_pb::stream_service::BarrierCompleteResponse; use tracing::warn; use super::info::{CommandFragmentChanges, InflightStreamingJobInfo}; use crate::barrier::info::BarrierInfo; +use crate::barrier::utils::collect_resp_info; use crate::barrier::InflightSubscriptionInfo; use crate::controller::fragment::InflightFragmentInfo; +use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo}; use crate::manager::{DdlType, StreamingJob}; use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments, TableParallelism}; use crate::stream::{build_actor_connector_splits, SplitAssignment, ThrottleConfig}; @@ -430,23 +436,23 @@ impl BarrierKind { /// [`CommandContext`] is used for generating barrier and doing post stuffs according to the given /// [`Command`]. -pub struct CommandContext { +pub(super) struct CommandContext { /// Resolved info in this barrier loop. - pub node_map: HashMap, - pub subscription_info: InflightSubscriptionInfo, + pub(super) node_map: HashMap, + subscription_info: InflightSubscriptionInfo, - pub barrier_info: BarrierInfo, + pub(super) barrier_info: BarrierInfo, - pub table_ids_to_commit: HashSet, + pub(super) table_ids_to_commit: HashSet, - pub command: Option, + pub(super) command: Option, /// The tracing span of this command. /// /// Differs from [`crate::barrier::TracedEpoch`], this span focuses on the lifetime of the corresponding /// barrier, including the process of waiting for the barrier to be sent, flowing through the /// stream graph on compute nodes, and finishing its `post_collect` stuffs. - pub _span: tracing::Span, + _span: tracing::Span, } impl std::fmt::Debug for CommandContext { @@ -477,7 +483,7 @@ impl CommandContext { } } - pub fn get_truncate_epoch(&self, retention_second: u64) -> Epoch { + fn get_truncate_epoch(&self, retention_second: u64) -> Epoch { let Some(truncate_timestamptz) = Timestamptz::from_secs( self.barrier_info .prev_epoch @@ -491,6 +497,86 @@ impl CommandContext { }; Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64) } + + pub(super) fn collect_commit_epoch_info( + &self, + info: &mut CommitEpochInfo, + resps: Vec, + backfill_pinned_log_epoch: HashMap)>, + ) { + let (sst_to_context, synced_ssts, new_table_watermarks, old_value_ssts) = + collect_resp_info(resps); + + let new_table_fragment_infos = if let Some(Command::CreateStreamingJob { info, job_type }) = + &self.command + && !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)) + { + let table_fragments = &info.table_fragments; + let mut table_ids: HashSet<_> = table_fragments + .internal_table_ids() + .into_iter() + .map(TableId::new) + .collect(); + if let Some(mv_table_id) = table_fragments.mv_table_id() { + table_ids.insert(TableId::new(mv_table_id)); + } + + vec![NewTableFragmentInfo { table_ids }] + } else { + vec![] + }; + + let mut mv_log_store_truncate_epoch = HashMap::new(); + let mut update_truncate_epoch = + |table_id: TableId, truncate_epoch| match mv_log_store_truncate_epoch + .entry(table_id.table_id) + { + Entry::Occupied(mut entry) => { + let prev_truncate_epoch = entry.get_mut(); + if truncate_epoch < *prev_truncate_epoch { + *prev_truncate_epoch = truncate_epoch; + } + } + Entry::Vacant(entry) => { + entry.insert(truncate_epoch); + } + }; + for (mv_table_id, subscriptions) in &self.subscription_info.mv_depended_subscriptions { + if let Some(truncate_epoch) = subscriptions + .values() + .max() + .map(|max_retention| self.get_truncate_epoch(*max_retention).0) + { + update_truncate_epoch(*mv_table_id, truncate_epoch); + } + } + for (_, (backfill_epoch, upstream_mv_table_ids)) in backfill_pinned_log_epoch { + for mv_table_id in upstream_mv_table_ids { + update_truncate_epoch(mv_table_id, backfill_epoch); + } + } + + let table_new_change_log = build_table_change_log_delta( + old_value_ssts.into_iter(), + synced_ssts.iter().map(|sst| &sst.sst_info), + must_match!(&self.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs), + mv_log_store_truncate_epoch.into_iter(), + ); + + let epoch = self.barrier_info.prev_epoch(); + for table_id in &self.table_ids_to_commit { + info.tables_to_commit + .try_insert(*table_id, epoch) + .expect("non duplicate"); + } + + info.sstables.extend(synced_ssts); + info.new_table_watermarks.extend(new_table_watermarks); + info.sst_to_context.extend(sst_to_context); + info.new_table_fragment_infos + .extend(new_table_fragment_infos); + info.change_log_delta.extend(table_new_change_log); + } } impl Command { diff --git a/src/meta/src/barrier/context/mod.rs b/src/meta/src/barrier/context/mod.rs index f1e0737929518..e69b9644de8dd 100644 --- a/src/meta/src/barrier/context/mod.rs +++ b/src/meta/src/barrier/context/mod.rs @@ -66,18 +66,44 @@ pub(super) trait GlobalBarrierWorkerContext: Send + Sync + 'static { async fn reload_runtime_info(&self) -> MetaResult; } -pub(crate) struct GlobalBarrierWorkerContextImpl { - pub(crate) scheduled_barriers: ScheduledBarriers, +pub(super) struct GlobalBarrierWorkerContextImpl { + scheduled_barriers: ScheduledBarriers, - pub(crate) status: Arc>, + status: Arc>, - pub(crate) metadata_manager: MetadataManager, + pub(super) metadata_manager: MetadataManager, - pub(crate) hummock_manager: HummockManagerRef, + hummock_manager: HummockManagerRef, - pub(crate) source_manager: SourceManagerRef, + source_manager: SourceManagerRef, - pub(crate) scale_controller: ScaleControllerRef, + scale_controller: ScaleControllerRef, - pub(crate) env: MetaSrvEnv, + pub(super) env: MetaSrvEnv, +} + +impl GlobalBarrierWorkerContextImpl { + pub(super) fn new( + scheduled_barriers: ScheduledBarriers, + status: Arc>, + metadata_manager: MetadataManager, + hummock_manager: HummockManagerRef, + source_manager: SourceManagerRef, + scale_controller: ScaleControllerRef, + env: MetaSrvEnv, + ) -> Self { + Self { + scheduled_barriers, + status, + metadata_manager, + hummock_manager, + source_manager, + scale_controller, + env, + } + } + + pub(super) fn status(&self) -> Arc> { + self.status.clone() + } } diff --git a/src/meta/src/barrier/context/recovery.rs b/src/meta/src/barrier/context/recovery.rs index 5c13b923b8fc8..83e7cd13919aa 100644 --- a/src/meta/src/barrier/context/recovery.rs +++ b/src/meta/src/barrier/context/recovery.rs @@ -64,7 +64,6 @@ impl GlobalBarrierWorkerContextImpl { Ok(()) } - // FIXME: didn't consider Values here async fn recover_background_mv_progress( &self, ) -> MetaResult> { @@ -82,7 +81,14 @@ impl GlobalBarrierWorkerContextImpl { .get_job_fragments_by_id(mview.table_id) .await?; let table_fragments = TableFragments::from_protobuf(table_fragments); - mview_map.insert(table_id, (mview.definition.clone(), table_fragments)); + if table_fragments.tracking_progress_actor_ids().is_empty() { + // If there's no tracking actor in the mview, we can finish the job directly. + mgr.catalog_controller + .finish_streaming_job(mview.table_id, None) + .await?; + } else { + mview_map.insert(table_id, (mview.definition.clone(), table_fragments)); + } } // If failed, enter recovery mode. diff --git a/src/meta/src/barrier/manager.rs b/src/meta/src/barrier/manager.rs index 4b65733721231..1a8ff9be53f24 100644 --- a/src/meta/src/barrier/manager.rs +++ b/src/meta/src/barrier/manager.rs @@ -113,6 +113,8 @@ impl GlobalBarrierManager { scale_controller: ScaleControllerRef, ) -> (Arc, JoinHandle<()>, oneshot::Sender<()>) { let (request_tx, request_rx) = unbounded_channel(); + let hummock_manager_clone = hummock_manager.clone(); + let metadata_manager_clone = metadata_manager.clone(); let barrier_worker = GlobalBarrierWorker::new( scheduled_barriers, env, @@ -125,10 +127,10 @@ impl GlobalBarrierManager { ) .await; let manager = Self { - status: barrier_worker.context.status.clone(), - hummock_manager: barrier_worker.context.hummock_manager.clone(), + status: barrier_worker.context.status(), + hummock_manager: hummock_manager_clone, request_tx, - metadata_manager: barrier_worker.context.metadata_manager.clone(), + metadata_manager: metadata_manager_clone, }; let (join_handle, shutdown_tx) = barrier_worker.start(); (Arc::new(manager), join_handle, shutdown_tx) diff --git a/src/meta/src/barrier/progress.rs b/src/meta/src/barrier/progress.rs index 17f044142b027..36d1a9a0b242c 100644 --- a/src/meta/src/barrier/progress.rs +++ b/src/meta/src/barrier/progress.rs @@ -23,8 +23,9 @@ use risingwave_pb::catalog::CreateType; use risingwave_pb::ddl_service::DdlProgress; use risingwave_pb::hummock::HummockVersionStats; use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress; +use risingwave_pb::stream_service::PbBarrierCompleteResponse; -use crate::barrier::checkpoint::EpochNode; +use crate::barrier::info::BarrierInfo; use crate::barrier::{ Command, CreateStreamingJobCommandInfo, CreateStreamingJobType, ReplaceTablePlan, }; @@ -419,12 +420,13 @@ impl CreateMviewProgressTracker { /// Return the finished jobs when the barrier kind is `Checkpoint` pub(super) fn apply_collected_command( &mut self, - epoch_node: &EpochNode, + command: Option<&Command>, + barrier_info: &BarrierInfo, + resps: impl IntoIterator, version_stats: &HummockVersionStats, ) -> Vec { - let command_ctx = &epoch_node.command_ctx; let new_tracking_job_info = - if let Some(Command::CreateStreamingJob { info, job_type }) = &command_ctx.command { + if let Some(Command::CreateStreamingJob { info, job_type }) = command { match job_type { CreateStreamingJobType::Normal => Some((info, None)), CreateStreamingJobType::SinkIntoTable(replace_table) => { @@ -438,26 +440,19 @@ impl CreateMviewProgressTracker { } else { None }; - assert!(epoch_node.state.node_to_collect.is_empty()); self.update_tracking_jobs( new_tracking_job_info, - epoch_node - .state - .resps - .iter() + resps + .into_iter() .flat_map(|resp| resp.create_mview_progress.iter()), version_stats, ); - if let Some(table_id) = command_ctx - .command - .as_ref() - .and_then(Command::table_to_cancel) - { + if let Some(table_id) = command.and_then(Command::table_to_cancel) { // the cancelled command is possibly stashed in `finished_commands` and waiting // for checkpoint, we should also clear it. self.cancel_command(table_id); } - if command_ctx.barrier_info.kind.is_checkpoint() { + if barrier_info.kind.is_checkpoint() { self.take_finished_jobs() } else { vec![] diff --git a/src/meta/src/barrier/utils.rs b/src/meta/src/barrier/utils.rs index dfdc2a6ab7e88..7e49a0f9d49af 100644 --- a/src/meta/src/barrier/utils.rs +++ b/src/meta/src/barrier/utils.rs @@ -12,13 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; use itertools::Itertools; use risingwave_common::catalog::TableId; -use risingwave_common::must_match; -use risingwave_hummock_sdk::change_log::build_table_change_log_delta; use risingwave_hummock_sdk::sstable_info::SstableInfo; use risingwave_hummock_sdk::table_stats::from_prost_table_stats_map; use risingwave_hummock_sdk::table_watermark::{ @@ -27,8 +24,6 @@ use risingwave_hummock_sdk::table_watermark::{ use risingwave_hummock_sdk::{HummockSstableObjectId, LocalSstableInfo}; use risingwave_pb::stream_service::BarrierCompleteResponse; -use crate::barrier::command::CommandContext; -use crate::barrier::{BarrierKind, Command, CreateStreamingJobType}; use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo}; #[expect(clippy::type_complexity)] @@ -80,86 +75,6 @@ pub(super) fn collect_resp_info( ) } -pub(super) fn collect_commit_epoch_info( - info: &mut CommitEpochInfo, - resps: Vec, - command_ctx: &CommandContext, - backfill_pinned_log_epoch: HashMap)>, -) { - let (sst_to_context, synced_ssts, new_table_watermarks, old_value_ssts) = - collect_resp_info(resps); - - let new_table_fragment_infos = if let Some(Command::CreateStreamingJob { info, job_type }) = - &command_ctx.command - && !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)) - { - let table_fragments = &info.table_fragments; - let mut table_ids: HashSet<_> = table_fragments - .internal_table_ids() - .into_iter() - .map(TableId::new) - .collect(); - if let Some(mv_table_id) = table_fragments.mv_table_id() { - table_ids.insert(TableId::new(mv_table_id)); - } - - vec![NewTableFragmentInfo { table_ids }] - } else { - vec![] - }; - - let mut mv_log_store_truncate_epoch = HashMap::new(); - let mut update_truncate_epoch = - |table_id: TableId, truncate_epoch| match mv_log_store_truncate_epoch - .entry(table_id.table_id) - { - Entry::Occupied(mut entry) => { - let prev_truncate_epoch = entry.get_mut(); - if truncate_epoch < *prev_truncate_epoch { - *prev_truncate_epoch = truncate_epoch; - } - } - Entry::Vacant(entry) => { - entry.insert(truncate_epoch); - } - }; - for (mv_table_id, subscriptions) in &command_ctx.subscription_info.mv_depended_subscriptions { - if let Some(truncate_epoch) = subscriptions - .values() - .max() - .map(|max_retention| command_ctx.get_truncate_epoch(*max_retention).0) - { - update_truncate_epoch(*mv_table_id, truncate_epoch); - } - } - for (_, (backfill_epoch, upstream_mv_table_ids)) in backfill_pinned_log_epoch { - for mv_table_id in upstream_mv_table_ids { - update_truncate_epoch(mv_table_id, backfill_epoch); - } - } - - let table_new_change_log = build_table_change_log_delta( - old_value_ssts.into_iter(), - synced_ssts.iter().map(|sst| &sst.sst_info), - must_match!(&command_ctx.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs), - mv_log_store_truncate_epoch.into_iter(), - ); - - let epoch = command_ctx.barrier_info.prev_epoch(); - for table_id in &command_ctx.table_ids_to_commit { - info.tables_to_commit - .try_insert(*table_id, epoch) - .expect("non duplicate"); - } - - info.sstables.extend(synced_ssts); - info.new_table_watermarks.extend(new_table_watermarks); - info.sst_to_context.extend(sst_to_context); - info.new_table_fragment_infos - .extend(new_table_fragment_infos); - info.change_log_delta.extend(table_new_change_log); -} - pub(super) fn collect_creating_job_commit_epoch_info( commit_info: &mut CommitEpochInfo, epoch: u64, diff --git a/src/meta/src/barrier/worker.rs b/src/meta/src/barrier/worker.rs index dfc902f31cfce..ee0cdd97fb616 100644 --- a/src/meta/src/barrier/worker.rs +++ b/src/meta/src/barrier/worker.rs @@ -13,7 +13,7 @@ // limitations under the License. use std::collections::HashMap; -use std::mem::{replace, take}; +use std::mem::replace; use std::sync::Arc; use std::time::Duration; @@ -118,15 +118,15 @@ impl GlobalBarrierWorker { let status = Arc::new(ArcSwap::new(Arc::new(BarrierManagerStatus::Starting))); - let context = Arc::new(GlobalBarrierWorkerContextImpl { + let context = Arc::new(GlobalBarrierWorkerContextImpl::new( scheduled_barriers, status, metadata_manager, hummock_manager, source_manager, scale_controller, - env: env.clone(), - }); + env.clone(), + )); let control_stream_manager = ControlStreamManager::new(env.clone()); @@ -258,15 +258,7 @@ impl GlobalBarrierWorker { if let Some(request) = request { match request { BarrierManagerRequest::GetDdlProgress(result_tx) => { - let mut progress = HashMap::new(); - for database_checkpoint_control in self.checkpoint_control.databases.values() { - // Progress of normal backfill - progress.extend(database_checkpoint_control.create_mview_tracker.gen_ddl_progress()); - // Progress of snapshot backfill - for creating_job in database_checkpoint_control.creating_streaming_job_controls.values() { - progress.extend([(creating_job.info.table_fragments.table_id().table_id, creating_job.gen_ddl_progress())]); - } - } + let progress = self.checkpoint_control.gen_ddl_progress(); if result_tx.send(progress).is_err() { error!("failed to send get ddl progress"); } @@ -287,7 +279,7 @@ impl GlobalBarrierWorker { info!(?changed_worker, "worker changed"); if let ActiveStreamingWorkerChange::Add(node) | ActiveStreamingWorkerChange::Update(node) = changed_worker { - self.control_stream_manager.add_worker(node, self.checkpoint_control.databases.values().flat_map(|database| &database.state.inflight_subscription_info), &*self.context).await; + self.control_stream_manager.add_worker(node, self.checkpoint_control.subscriptions(), &*self.context).await; } } @@ -328,23 +320,11 @@ impl GlobalBarrierWorker { (worker_id, resp_result) = self.control_stream_manager.next_collect_barrier_response() => { if let Err(e) = resp_result.and_then(|resp| self.checkpoint_control.barrier_collected(resp, &mut self.control_stream_manager)) { { - let mut err = None; - for database_checkpoint_control in self.checkpoint_control.databases.values() { - let failed_barrier = database_checkpoint_control.barrier_wait_collect_from_worker(worker_id as _); - if failed_barrier.is_some() - || database_checkpoint_control.state.inflight_graph_info.contains_worker(worker_id as _) - || database_checkpoint_control.creating_streaming_job_controls.values().any(|job| job.is_wait_on_worker(worker_id)) { - - err = Some((e, failed_barrier)); - break; - } - } - if let Some((e, failed_barrier)) = err { + + if self.checkpoint_control.is_failed_at_worker_err(worker_id) { let errors = self.control_stream_manager.collect_errors(worker_id, e).await; let err = merge_node_rpc_errors("get error from control stream", errors); - if let Some(failed_barrier) = failed_barrier { - self.report_collect_failure(failed_barrier, &err); - } + self.report_collect_failure(&err); self.failure_recovery(err).await; } else { warn!(worker_id, "no barrier to collect from worker, ignore err"); @@ -424,21 +404,7 @@ impl GlobalBarrierWorker { } } } - for (_, node) in self - .checkpoint_control - .databases - .values_mut() - .flat_map(|database| take(&mut database.command_ctx_queue)) - { - for notifier in node.notifiers { - notifier.notify_failed(err.clone()); - } - node.enqueue_time.observe_duration(); - } - self.checkpoint_control - .databases - .values_mut() - .for_each(|database| database.create_mview_tracker.abort_all()); + self.checkpoint_control.clear_on_err(err); } } @@ -505,12 +471,10 @@ impl GlobalBarrierWorker { impl GlobalBarrierWorker { /// Send barrier-complete-rpc and wait for responses from all CNs - pub(super) fn report_collect_failure(&self, barrier_info: &BarrierInfo, error: &MetaError) { + pub(super) fn report_collect_failure(&self, error: &MetaError) { // Record failure in event log. use risingwave_pb::meta::event_log; let event = event_log::EventCollectBarrierFail { - prev_epoch: barrier_info.prev_epoch(), - cur_epoch: barrier_info.curr_epoch.value().0, error: error.to_report_string(), }; self.env @@ -728,10 +692,10 @@ impl GlobalBarrierWorker { ( active_streaming_nodes, control_stream_manager, - CheckpointControl { + CheckpointControl::new( databases, hummock_version_stats, - }, + ), ) }; if recovery_result.is_err() { diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index f24149f58b41d..40d3c025c0c8b 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -723,7 +723,7 @@ impl MetadataManager { } impl MetadataManager { - /// Wait for job finishing notification in `TrackingJob::pre_finish`. + /// Wait for job finishing notification in `TrackingJob::finish`. /// The progress is updated per barrier. pub(crate) async fn wait_streaming_job_finished( &self, diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index 4307a4acc5a35..a93a4c4d02272 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -401,14 +401,6 @@ impl TableFragments { .cloned() } - /// Returns actors that contains backfill executors. - pub fn backfill_actor_ids(&self) -> HashSet { - Self::filter_actor_ids(self, |fragment_type_mask| { - (fragment_type_mask & FragmentTypeFlag::StreamScan as u32) != 0 - }) - .collect() - } - pub fn snapshot_backfill_actor_ids(&self) -> HashSet { Self::filter_actor_ids(self, |mask| { (mask & FragmentTypeFlag::SnapshotBackfillStreamScan as u32) != 0 diff --git a/src/storage/src/hummock/compactor/compaction_utils.rs b/src/storage/src/hummock/compactor/compaction_utils.rs index 5e22d3e45701b..73b178d73e802 100644 --- a/src/storage/src/hummock/compactor/compaction_utils.rs +++ b/src/storage/src/hummock/compactor/compaction_utils.rs @@ -232,10 +232,9 @@ pub async fn generate_splits( context: &CompactorContext, max_sub_compaction: u32, ) -> HummockResult> { - const MAX_FILE_COUNT: usize = 32; let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20; if compaction_size > parallel_compact_size { - if sstable_infos.len() > MAX_FILE_COUNT { + if sstable_infos.len() > context.storage_opts.compactor_max_preload_meta_file_count { return Ok(generate_splits_fast( sstable_infos, compaction_size, diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index 3e3acf4b48ef9..0b8cadaf4c97e 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -88,6 +88,16 @@ impl BufferTracker { ) } + #[cfg(test)] + fn for_test_with_config(flush_threshold: usize, min_batch_flush_size: usize) -> Self { + Self::new( + usize::MAX, + flush_threshold, + GenericGauge::new("test", "test").unwrap(), + min_batch_flush_size, + ) + } + fn new( capacity: usize, flush_threshold: usize, @@ -232,12 +242,24 @@ impl HummockEventHandler { let upload_compactor_context = compactor_context.clone(); let upload_task_latency = state_store_metrics.uploader_upload_task_latency.clone(); let wait_poll_latency = state_store_metrics.uploader_wait_poll_latency.clone(); + let recent_versions = RecentVersions::new( + pinned_version, + compactor_context + .storage_opts + .max_cached_recent_versions_number, + state_store_metrics.clone(), + ); + let buffer_tracker = BufferTracker::from_storage_opts( + &compactor_context.storage_opts, + state_store_metrics.uploader_uploading_task_size.clone(), + ); Self::new_inner( version_update_rx, - pinned_version, compactor_context.sstable_store.clone(), state_store_metrics, - &compactor_context.storage_opts, + CacheRefillConfig::from_storage_opts(&compactor_context.storage_opts), + recent_versions, + buffer_tracker, Arc::new(move |payload, task_info| { static NEXT_UPLOAD_TASK_ID: LazyLock = LazyLock::new(|| AtomicUsize::new(0)); @@ -288,22 +310,20 @@ impl HummockEventHandler { fn new_inner( version_update_rx: UnboundedReceiver, - pinned_version: PinnedVersion, sstable_store: SstableStoreRef, state_store_metrics: Arc, - storage_opts: &StorageOpts, + refill_config: CacheRefillConfig, + recent_versions: RecentVersions, + buffer_tracker: BufferTracker, spawn_upload_task: SpawnUploadTask, spawn_refill_task: SpawnRefillTask, ) -> Self { let (hummock_event_tx, hummock_event_rx) = event_channel(state_store_metrics.event_handler_pending_event.clone()); - let (version_update_notifier_tx, _) = tokio::sync::watch::channel(pinned_version.clone()); + let (version_update_notifier_tx, _) = + tokio::sync::watch::channel(recent_versions.latest_version().clone()); let version_update_notifier_tx = Arc::new(version_update_notifier_tx); let read_version_mapping = Arc::new(RwLock::new(HashMap::default())); - let buffer_tracker = BufferTracker::from_storage_opts( - storage_opts, - state_store_metrics.uploader_uploading_task_size.clone(), - ); let metrics = HummockEventHandlerMetrics { event_handler_on_upload_finish_latency: state_store_metrics @@ -319,27 +339,18 @@ impl HummockEventHandler { let uploader = HummockUploader::new( state_store_metrics.clone(), - pinned_version.clone(), + recent_versions.latest_version().clone(), spawn_upload_task, buffer_tracker, - storage_opts, - ); - let refiller = CacheRefiller::new( - CacheRefillConfig::from_storage_opts(storage_opts), - sstable_store, - spawn_refill_task, ); + let refiller = CacheRefiller::new(refill_config, sstable_store, spawn_refill_task); Self { hummock_event_tx, hummock_event_rx, version_update_rx, version_update_notifier_tx, - recent_versions: Arc::new(ArcSwap::from_pointee(RecentVersions::new( - pinned_version, - storage_opts.max_cached_recent_versions_number, - state_store_metrics, - ))), + recent_versions: Arc::new(ArcSwap::from_pointee(recent_versions)), read_version_mapping, local_read_version_mapping: Default::default(), uploader, @@ -465,22 +476,24 @@ impl HummockEventHandler { .start_sync_epoch(new_sync_epoch, sync_result_sender, table_ids); } - fn handle_clear(&mut self, notifier: oneshot::Sender<()>) { + fn handle_clear(&mut self, notifier: oneshot::Sender<()>, table_ids: Option>) { info!( current_version_id = ?self.uploader.hummock_version().id(), "handle clear event" ); - self.uploader.clear(); + self.uploader.clear(table_ids.clone()); - assert!( - self.local_read_version_mapping.is_empty(), - "read version mapping not empty when clear. remaining tables: {:?}", - self.local_read_version_mapping - .values() - .map(|(_, read_version)| read_version.read().table_id()) - .collect_vec() - ); + if table_ids.is_none() { + assert!( + self.local_read_version_mapping.is_empty(), + "read version mapping not empty when clear. remaining tables: {:?}", + self.local_read_version_mapping + .values() + .map(|(_, read_version)| read_version.read().table_id()) + .collect_vec() + ); + } // Notify completion of the Clear event. let _ = notifier.send(()).inspect_err(|e| { @@ -634,8 +647,8 @@ impl HummockEventHandler { } => { self.handle_sync_epoch(new_sync_epoch, sync_result_sender, table_ids); } - HummockEvent::Clear(notifier) => { - self.handle_clear(notifier); + HummockEvent::Clear(notifier, table_ids) => { + self.handle_clear(notifier, table_ids); } HummockEvent::Shutdown => { unreachable!("shutdown is handled specially") @@ -837,6 +850,7 @@ mod tests { use futures::FutureExt; use parking_lot::Mutex; use risingwave_common::bitmap::Bitmap; + use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; use risingwave_common::util::epoch::{test_epoch, EpochExt}; use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; @@ -846,15 +860,22 @@ mod tests { use tokio::sync::mpsc::unbounded_channel; use tokio::sync::oneshot; - use crate::hummock::event_handler::refiller::CacheRefiller; - use crate::hummock::event_handler::uploader::test_utils::{gen_imm, TEST_TABLE_ID}; + use crate::hummock::event_handler::hummock_event_handler::BufferTracker; + use crate::hummock::event_handler::refiller::{CacheRefillConfig, CacheRefiller}; + use crate::hummock::event_handler::uploader::test_utils::{ + gen_imm, gen_imm_inner, prepare_uploader_order_test_spawn_task_fn, TEST_TABLE_ID, + }; use crate::hummock::event_handler::uploader::UploadTaskOutput; - use crate::hummock::event_handler::{HummockEvent, HummockEventHandler}; + use crate::hummock::event_handler::{ + HummockEvent, HummockEventHandler, HummockReadVersionRef, LocalInstanceGuard, + }; use crate::hummock::iterator::test_utils::mock_sstable_store; use crate::hummock::local_version::pinned_version::PinnedVersion; + use crate::hummock::local_version::recent_versions::RecentVersions; use crate::hummock::store::version::{StagingData, VersionUpdate}; use crate::hummock::test_utils::default_opts_for_test; use crate::hummock::HummockError; + use crate::mem_table::ImmutableMemtable; use crate::monitor::HummockStateStoreMetrics; use crate::store::SealCurrentEpochOptions; @@ -884,12 +905,19 @@ mod tests { let (tx, rx) = oneshot::channel(); let rx = Arc::new(Mutex::new(Some(rx))); + let storage_opt = default_opts_for_test(); + let metrics = Arc::new(HummockStateStoreMetrics::unused()); + let event_handler = HummockEventHandler::new_inner( version_update_rx, - initial_version.clone(), mock_sstable_store().await, - Arc::new(HummockStateStoreMetrics::unused()), - &default_opts_for_test(), + metrics.clone(), + CacheRefillConfig::from_storage_opts(&storage_opt), + RecentVersions::new(initial_version.clone(), 10, metrics.clone()), + BufferTracker::from_storage_opts( + &storage_opt, + metrics.uploader_uploading_task_size.clone(), + ), Arc::new(move |_, info| { assert_eq!(info.epochs.len(), 1); let epoch = info.epochs[0]; @@ -918,7 +946,7 @@ mod tests { let send_event = |event| event_tx.send(event).unwrap(); - let _join_handle = spawn(event_handler.start_hummock_event_handler_worker()); + let join_handle = spawn(event_handler.start_hummock_event_handler_worker()); let (read_version, guard) = { let (tx, rx) = oneshot::channel(); @@ -1001,5 +1029,281 @@ mod tests { tx.send(()).unwrap(); rx1.await.unwrap().unwrap_err(); rx2.await.unwrap().unwrap_err(); + + send_event(HummockEvent::Shutdown); + join_handle.await.unwrap(); + } + + #[tokio::test] + async fn test_clear_tables() { + let table_id1 = TableId::new(1); + let table_id2 = TableId::new(2); + let epoch0 = test_epoch(233); + + let initial_version = PinnedVersion::new( + HummockVersion::from_rpc_protobuf(&PbHummockVersion { + id: 1, + state_table_info: HashMap::from_iter([ + ( + table_id1.table_id, + StateTableInfo { + committed_epoch: epoch0, + compaction_group_id: StaticCompactionGroupId::StateDefault as _, + }, + ), + ( + table_id2.table_id, + StateTableInfo { + committed_epoch: epoch0, + compaction_group_id: StaticCompactionGroupId::StateDefault as _, + }, + ), + ]), + ..Default::default() + }), + unbounded_channel().0, + ); + + let (_version_update_tx, version_update_rx) = unbounded_channel(); + + let epoch1 = epoch0.next_epoch(); + let epoch2 = epoch1.next_epoch(); + let epoch3 = epoch2.next_epoch(); + + let imm_size = gen_imm_inner(TEST_TABLE_ID, epoch1, 0, None).await.size(); + + // The buffer can hold at most 1 imm. When a new imm is added, the previous one will be spilled, and the newly added one will be retained. + let buffer_tracker = BufferTracker::for_test_with_config(imm_size * 2 - 1, 1); + let memory_limiter = buffer_tracker.get_memory_limiter().clone(); + + let gen_imm = |table_id, epoch, spill_offset| { + let imm = gen_imm_inner(table_id, epoch, spill_offset, Some(&*memory_limiter)) + .now_or_never() + .unwrap(); + assert_eq!(imm.size(), imm_size); + imm + }; + let imm1_1 = gen_imm(table_id1, epoch1, 0); + let imm1_2_1 = gen_imm(table_id1, epoch2, 0); + + let storage_opt = default_opts_for_test(); + let metrics = Arc::new(HummockStateStoreMetrics::unused()); + + let (spawn_task, new_task_notifier) = prepare_uploader_order_test_spawn_task_fn(false); + + let event_handler = HummockEventHandler::new_inner( + version_update_rx, + mock_sstable_store().await, + metrics.clone(), + CacheRefillConfig::from_storage_opts(&storage_opt), + RecentVersions::new(initial_version.clone(), 10, metrics.clone()), + buffer_tracker, + spawn_task, + CacheRefiller::default_spawn_refill_task(), + ); + + let event_tx = event_handler.event_sender(); + + let send_event = |event| event_tx.send(event).unwrap(); + let flush_event = || async { + let (tx, rx) = oneshot::channel(); + send_event(HummockEvent::FlushEvent(tx)); + rx.await.unwrap(); + }; + let start_epoch = |table_id, epoch| { + send_event(HummockEvent::StartEpoch { + epoch, + table_ids: HashSet::from_iter([table_id]), + }) + }; + let init_epoch = |instance: &LocalInstanceGuard, init_epoch| { + send_event(HummockEvent::InitEpoch { + instance_id: instance.instance_id, + init_epoch, + }) + }; + let write_imm = |read_version: &HummockReadVersionRef, + instance: &LocalInstanceGuard, + imm: &ImmutableMemtable| { + read_version + .write() + .update(VersionUpdate::Staging(StagingData::ImmMem(imm.clone()))); + + send_event(HummockEvent::ImmToUploader { + instance_id: instance.instance_id, + imm: imm.clone(), + }); + }; + let seal_epoch = |instance: &LocalInstanceGuard, next_epoch| { + send_event(HummockEvent::LocalSealEpoch { + instance_id: instance.instance_id, + next_epoch, + opts: SealCurrentEpochOptions::for_test(), + }) + }; + let sync_epoch = |table_id, new_sync_epoch| { + let (tx, rx) = oneshot::channel(); + send_event(HummockEvent::SyncEpoch { + new_sync_epoch, + sync_result_sender: tx, + table_ids: HashSet::from_iter([table_id]), + }); + rx + }; + + let join_handle = spawn(event_handler.start_hummock_event_handler_worker()); + + let (read_version1, guard1) = { + let (tx, rx) = oneshot::channel(); + send_event(HummockEvent::RegisterReadVersion { + table_id: table_id1, + new_read_version_sender: tx, + is_replicated: false, + vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)), + }); + rx.await.unwrap() + }; + + let (read_version2, guard2) = { + let (tx, rx) = oneshot::channel(); + send_event(HummockEvent::RegisterReadVersion { + table_id: table_id2, + new_read_version_sender: tx, + is_replicated: false, + vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)), + }); + rx.await.unwrap() + }; + + // prepare data of table1 + let (task1_1_finish_tx, task1_1_rx) = { + start_epoch(table_id1, epoch1); + + init_epoch(&guard1, epoch1); + + write_imm(&read_version1, &guard1, &imm1_1); + + start_epoch(table_id1, epoch2); + + seal_epoch(&guard1, epoch2); + + let (wait_task_start, task_finish_tx) = new_task_notifier(HashMap::from_iter([( + guard1.instance_id, + vec![imm1_1.batch_id()], + )])); + + let mut rx = sync_epoch(table_id1, epoch1); + wait_task_start.await; + assert!(poll_fn(|cx| Poll::Ready(rx.poll_unpin(cx).is_pending())).await); + + write_imm(&read_version1, &guard1, &imm1_2_1); + flush_event().await; + + (task_finish_tx, rx) + }; + // by now, the state in uploader of table_id1 + // unsync: epoch2 -> [imm1_2] + // syncing: epoch1 -> [imm1_1] + + let (task1_2_finish_tx, _finish_txs) = { + let mut finish_txs = vec![]; + let imm2_1_1 = gen_imm(table_id2, epoch1, 0); + start_epoch(table_id2, epoch1); + init_epoch(&guard2, epoch1); + let (wait_task_start, task1_2_finish_tx) = new_task_notifier(HashMap::from_iter([( + guard1.instance_id, + vec![imm1_2_1.batch_id()], + )])); + write_imm(&read_version2, &guard2, &imm2_1_1); + wait_task_start.await; + + let imm2_1_2 = gen_imm(table_id2, epoch1, 1); + let (wait_task_start, finish_tx) = new_task_notifier(HashMap::from_iter([( + guard2.instance_id, + vec![imm2_1_2.batch_id(), imm2_1_1.batch_id()], + )])); + finish_txs.push(finish_tx); + write_imm(&read_version2, &guard2, &imm2_1_2); + wait_task_start.await; + + let imm2_1_3 = gen_imm(table_id2, epoch1, 2); + write_imm(&read_version2, &guard2, &imm2_1_3); + start_epoch(table_id2, epoch2); + seal_epoch(&guard2, epoch2); + let (wait_task_start, finish_tx) = new_task_notifier(HashMap::from_iter([( + guard2.instance_id, + vec![imm2_1_3.batch_id()], + )])); + finish_txs.push(finish_tx); + let _sync_rx = sync_epoch(table_id2, epoch1); + wait_task_start.await; + + let imm2_2_1 = gen_imm(table_id2, epoch2, 0); + write_imm(&read_version2, &guard2, &imm2_2_1); + flush_event().await; + let imm2_2_2 = gen_imm(table_id2, epoch2, 1); + write_imm(&read_version2, &guard2, &imm2_2_2); + let (wait_task_start, finish_tx) = new_task_notifier(HashMap::from_iter([( + guard2.instance_id, + vec![imm2_2_2.batch_id(), imm2_2_1.batch_id()], + )])); + finish_txs.push(finish_tx); + wait_task_start.await; + + let imm2_2_3 = gen_imm(table_id2, epoch2, 2); + write_imm(&read_version2, &guard2, &imm2_2_3); + + // by now, the state in uploader of table_id2 + // syncing: epoch1 -> spill: [imm2_1_2, imm2_1_1], sync: [imm2_1_3] + // unsync: epoch2 -> spilling: [imm2_2_2, imm2_2_1], imm: [imm2_2_3] + // the state in uploader of table_id1 + // unsync: epoch2 -> spilling [imm1_2] + // syncing: epoch1 -> [imm1_1] + + drop(guard2); + let (clear_tx, clear_rx) = oneshot::channel(); + send_event(HummockEvent::Clear( + clear_tx, + Some(HashSet::from_iter([table_id2])), + )); + clear_rx.await.unwrap(); + (task1_2_finish_tx, finish_txs) + }; + + let imm1_2_2 = gen_imm(table_id1, epoch2, 1); + write_imm(&read_version1, &guard1, &imm1_2_2); + start_epoch(table_id1, epoch3); + seal_epoch(&guard1, epoch3); + + let (tx2, mut sync_rx2) = oneshot::channel(); + let (wait_task_start, task1_2_2_finish_tx) = new_task_notifier(HashMap::from_iter([( + guard1.instance_id, + vec![imm1_2_2.batch_id()], + )])); + send_event(HummockEvent::SyncEpoch { + new_sync_epoch: epoch2, + sync_result_sender: tx2, + table_ids: HashSet::from_iter([table_id1]), + }); + wait_task_start.await; + assert!(poll_fn(|cx| Poll::Ready(sync_rx2.poll_unpin(cx).is_pending())).await); + + task1_1_finish_tx.send(()).unwrap(); + let sync_data1 = task1_1_rx.await.unwrap().unwrap(); + sync_data1 + .uploaded_ssts + .iter() + .all(|sst| sst.epochs() == &vec![epoch1]); + task1_2_finish_tx.send(()).unwrap(); + assert!(poll_fn(|cx| Poll::Ready(sync_rx2.poll_unpin(cx).is_pending())).await); + task1_2_2_finish_tx.send(()).unwrap(); + let sync_data2 = sync_rx2.await.unwrap().unwrap(); + sync_data2 + .uploaded_ssts + .iter() + .all(|sst| sst.epochs() == &vec![epoch2]); + + send_event(HummockEvent::Shutdown); + join_handle.await.unwrap(); } } diff --git a/src/storage/src/hummock/event_handler/mod.rs b/src/storage/src/hummock/event_handler/mod.rs index 910c567e5d4da..46b44c051fdff 100644 --- a/src/storage/src/hummock/event_handler/mod.rs +++ b/src/storage/src/hummock/event_handler/mod.rs @@ -65,7 +65,7 @@ pub enum HummockEvent { }, /// Clear shared buffer and reset all states - Clear(oneshot::Sender<()>), + Clear(oneshot::Sender<()>, Option>), Shutdown, @@ -122,7 +122,9 @@ impl HummockEvent { table_ids, } => format!("AwaitSyncEpoch epoch {} {:?}", new_sync_epoch, table_ids), - HummockEvent::Clear(_) => "Clear".to_string(), + HummockEvent::Clear(_, table_ids) => { + format!("Clear {:?}", table_ids) + } HummockEvent::Shutdown => "Shutdown".to_string(), diff --git a/src/storage/src/hummock/event_handler/uploader/mod.rs b/src/storage/src/hummock/event_handler/uploader/mod.rs index 8326b3b876890..96b565c00ef49 100644 --- a/src/storage/src/hummock/event_handler/uploader/mod.rs +++ b/src/storage/src/hummock/event_handler/uploader/mod.rs @@ -53,7 +53,6 @@ use crate::hummock::store::version::StagingSstableInfo; use crate::hummock::{HummockError, HummockResult, ImmutableMemtable}; use crate::mem_table::ImmId; use crate::monitor::HummockStateStoreMetrics; -use crate::opts::StorageOpts; use crate::store::SealCurrentEpochOptions; /// Take epoch data inclusively before `epoch` out from `data` @@ -784,6 +783,7 @@ struct UnsyncData { // An index as a mapping from instance id to its table id instance_table_id: HashMap, unsync_epochs: HashMap>, + spilled_data: HashMap, HashSet)>, } impl UnsyncData { @@ -903,6 +903,43 @@ impl UnsyncData { None } } + + fn clear_tables(&mut self, table_ids: &HashSet, task_manager: &mut TaskManager) { + for table_id in table_ids { + if let Some(table_unsync_data) = self.table_data.remove(table_id) { + for task_id in table_unsync_data.spill_tasks.into_values().flatten() { + if let Some(task_status) = task_manager.abort_task(task_id) { + must_match!(task_status, UploadingTaskStatus::Spilling(spill_table_ids) => { + assert!(spill_table_ids.is_subset(table_ids)); + }); + } + if let Some((_, spill_table_ids)) = self.spilled_data.remove(&task_id) { + assert!(spill_table_ids.is_subset(table_ids)); + } + } + assert!( + table_unsync_data.instance_data.is_empty(), + "should be clear when dropping the read version instance" + ); + } + } + debug_assert!(self + .spilled_data + .values() + .all(|(_, spill_table_ids)| spill_table_ids.is_disjoint(table_ids))); + self.unsync_epochs.retain(|_, unsync_epoch_table_ids| { + if !unsync_epoch_table_ids.is_disjoint(table_ids) { + assert!(unsync_epoch_table_ids.is_subset(table_ids)); + false + } else { + true + } + }); + assert!(self + .instance_table_id + .values() + .all(|table_id| !table_ids.contains(table_id))); + } } impl UploaderData { @@ -958,7 +995,7 @@ impl UploaderData { ); } for task_id in task_ids { - if self.spilled_data.contains_key(&task_id) { + if self.unsync_data.spilled_data.contains_key(&task_id) { spilled_tasks.insert(task_id); } else { uploading_tasks.insert(task_id); @@ -988,8 +1025,11 @@ impl UploaderData { .iter() .rev() .map(|task_id| { - let (sst, spill_table_ids) = - self.spilled_data.remove(task_id).expect("should exist"); + let (sst, spill_table_ids) = self + .unsync_data + .spilled_data + .remove(task_id) + .expect("should exist"); assert!( spill_table_ids.is_subset(&table_ids), "spilled tabled ids {:?} not a subset of sync table id {:?}", @@ -1057,7 +1097,6 @@ impl UploaderContext { pinned_version: PinnedVersion, spawn_upload_task: SpawnUploadTask, buffer_tracker: BufferTracker, - _config: &StorageOpts, stats: Arc, ) -> Self { UploaderContext { @@ -1079,20 +1118,52 @@ struct UploaderData { syncing_data: BTreeMap, task_manager: TaskManager, - spilled_data: HashMap, HashSet)>, next_sync_id: usize, } impl UploaderData { fn abort(self, err: impl Fn() -> HummockError) { - self.task_manager.abort(); + self.task_manager.abort_all_tasks(); for syncing_data in self.syncing_data.into_values() { send_sync_result(syncing_data.sync_result_sender, Err(err())); } } + fn clear_tables(&mut self, table_ids: HashSet) { + if table_ids.is_empty() { + return; + } + self.unsync_data + .clear_tables(&table_ids, &mut self.task_manager); + self.syncing_data.retain(|sync_id, syncing_data| { + if !syncing_data.table_ids.is_disjoint(&table_ids) { + assert!(syncing_data.table_ids.is_subset(&table_ids)); + for task_id in &syncing_data.remaining_uploading_tasks { + match self + .task_manager + .abort_task(*task_id) + .expect("should exist") + { + UploadingTaskStatus::Spilling(spill_table_ids) => { + assert!(spill_table_ids.is_subset(&table_ids)); + } + UploadingTaskStatus::Sync(task_sync_id) => { + assert_eq!(sync_id, &task_sync_id); + } + } + } + false + } else { + true + } + }); + + self.check_upload_task_consistency(); + } + fn min_uncommitted_sst_id(&self) -> Option { - self.spilled_data + self.unsync_data + .spilled_data .values() .map(|(s, _)| s) .chain(self.syncing_data.values().flat_map(|s| s.uploaded.iter())) @@ -1141,7 +1212,6 @@ impl HummockUploader { pinned_version: PinnedVersion, spawn_upload_task: SpawnUploadTask, buffer_tracker: BufferTracker, - config: &StorageOpts, ) -> Self { Self { state: UploaderState::Working(UploaderData::default()), @@ -1149,7 +1219,6 @@ impl HummockUploader { pinned_version, spawn_upload_task, buffer_tracker, - config, state_store_metrics, ), } @@ -1308,15 +1377,21 @@ impl HummockUploader { } } - pub(crate) fn clear(&mut self) { - if let UploaderState::Working(data) = replace( - &mut self.state, - UploaderState::Working(UploaderData::default()), - ) { - data.abort(|| HummockError::other("uploader is reset")); - } + pub(crate) fn clear(&mut self, table_ids: Option>) { + if let Some(table_ids) = table_ids { + if let UploaderState::Working(data) = &mut self.state { + data.clear_tables(table_ids); + } + } else { + if let UploaderState::Working(data) = replace( + &mut self.state, + UploaderState::Working(UploaderData::default()), + ) { + data.abort(|| HummockError::other("uploader is reset")); + } - self.context.stats.uploader_syncing_epoch_count.set(0); + self.context.stats.uploader_syncing_epoch_count.set(0); + } } pub(crate) fn may_destroy_instance(&mut self, instance_id: LocalInstanceId) { @@ -1331,10 +1406,11 @@ impl HummockUploader { .into_values() .flat_map(|task_ids| task_ids.into_iter()) .filter(|task_id| { - if let Some((_, table_ids)) = data.spilled_data.get_mut(task_id) { + if let Some((_, table_ids)) = data.unsync_data.spilled_data.get_mut(task_id) + { assert!(table_ids.remove(&removed_table_data.table_id)); if table_ids.is_empty() { - data.spilled_data.remove(task_id); + data.unsync_data.spilled_data.remove(task_id); } false } else { @@ -1422,7 +1498,7 @@ impl UploaderData { .collect(); let mut spill_task_table_id_from_manager: HashMap<_, HashSet<_>> = HashMap::new(); - for (task_id, (_, table_ids)) in &self.spilled_data { + for (task_id, (_, table_ids)) in &self.unsync_data.spilled_data { spill_task_table_id_from_manager.insert(*task_id, table_ids.clone()); } let mut syncing_task_from_manager: HashMap<_, HashSet<_>> = HashMap::new(); @@ -1473,7 +1549,9 @@ impl HummockUploader { data.may_notify_sync_task(&self.context); } UploadingTaskStatus::Spilling(table_ids) => { - data.spilled_data.insert(task_id, (sst.clone(), table_ids)); + data.unsync_data + .spilled_data + .insert(task_id, (sst.clone(), table_ids)); } } data.check_upload_task_consistency(); diff --git a/src/storage/src/hummock/event_handler/uploader/spiller.rs b/src/storage/src/hummock/event_handler/uploader/spiller.rs index ba04d85856ace..4e560c36eacf0 100644 --- a/src/storage/src/hummock/event_handler/uploader/spiller.rs +++ b/src/storage/src/hummock/event_handler/uploader/spiller.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::cmp::Ordering; use std::collections::{HashMap, HashSet}; use risingwave_common::catalog::TableId; @@ -75,7 +76,20 @@ impl<'a> Spiller<'a> { if let Some(unsync_epoch_id) = self .epoch_info .iter() - .max_by_key(|(_, info)| info.payload_size) + .max_by( + |(UnsyncEpochId(_, table1), info1), (UnsyncEpochId(_, table2), info2)| { + info1.payload_size.cmp(&info2.payload_size).then_with(|| { + if !cfg!(test) { + Ordering::Equal + } else { + assert_ne!(table1, table2); + // enforce deterministic spill order in test + // smaller table id will be spilled first. + table2.cmp(table1) + } + }) + }, + ) .map(|(unsync_epoch_id, _)| *unsync_epoch_id) { let spill_epoch = unsync_epoch_id.epoch(); diff --git a/src/storage/src/hummock/event_handler/uploader/task_manager.rs b/src/storage/src/hummock/event_handler/uploader/task_manager.rs index 2347be1ed57eb..fd53fae1db322 100644 --- a/src/storage/src/hummock/event_handler/uploader/task_manager.rs +++ b/src/storage/src/hummock/event_handler/uploader/task_manager.rs @@ -97,12 +97,21 @@ impl TaskManager { } } - pub(super) fn abort(self) { + pub(super) fn abort_all_tasks(self) { for task in self.tasks.into_values() { task.task.join_handle.abort(); } } + pub(super) fn abort_task(&mut self, task_id: UploadingTaskId) -> Option { + self.tasks.remove(&task_id).map(|entry| { + entry.task.join_handle.abort(); + self.task_order + .retain(|inflight_task_id| *inflight_task_id != task_id); + entry.status + }) + } + pub(super) fn spill( &mut self, context: &UploaderContext, diff --git a/src/storage/src/hummock/event_handler/uploader/test_utils.rs b/src/storage/src/hummock/event_handler/uploader/test_utils.rs index 6eb41bda52071..3e7b92624109a 100644 --- a/src/storage/src/hummock/event_handler/uploader/test_utils.rs +++ b/src/storage/src/hummock/event_handler/uploader/test_utils.rs @@ -45,8 +45,9 @@ use tokio::task::yield_now; use crate::hummock::event_handler::hummock_event_handler::BufferTracker; use crate::hummock::event_handler::uploader::uploader_imm::UploaderImm; use crate::hummock::event_handler::uploader::{ - HummockUploader, TableUnsyncData, UploadTaskInfo, UploadTaskOutput, UploadTaskPayload, - UploaderContext, UploaderData, UploaderState, UploadingTask, UploadingTaskId, + HummockUploader, SpawnUploadTask, TableUnsyncData, UploadTaskInfo, UploadTaskOutput, + UploadTaskPayload, UploaderContext, UploaderData, UploaderState, UploadingTask, + UploadingTaskId, }; use crate::hummock::event_handler::{LocalInstanceId, TEST_LOCAL_INSTANCE_ID}; use crate::hummock::local_version::pinned_version::PinnedVersion; @@ -121,7 +122,7 @@ pub(super) async fn gen_imm_with_limiter( gen_imm_inner(TEST_TABLE_ID, epoch, 0, limiter).await } -pub(super) async fn gen_imm_inner( +pub(crate) async fn gen_imm_inner( table_id: TableId, epoch: HummockEpoch, spill_offset: u16, @@ -176,12 +177,10 @@ where Fut: UploadOutputFuture, F: UploadFn, { - let config = StorageOpts::default(); UploaderContext::new( initial_pinned_version(), Arc::new(move |payload, task_info| spawn(upload_fn(payload, task_info))), BufferTracker::for_test(), - &config, Arc::new(HummockStateStoreMetrics::unused()), ) } @@ -191,15 +190,11 @@ where Fut: UploadOutputFuture, F: UploadFn, { - let config = StorageOpts { - ..Default::default() - }; HummockUploader::new( Arc::new(HummockStateStoreMetrics::unused()), initial_pinned_version(), Arc::new(move |payload, task_info| spawn(upload_fn(payload, task_info))), BufferTracker::for_test(), - &config, ) } @@ -272,16 +267,12 @@ impl HummockUploader { } #[expect(clippy::type_complexity)] -pub(crate) fn prepare_uploader_order_test( - config: &StorageOpts, +pub(crate) fn prepare_uploader_order_test_spawn_task_fn( skip_schedule: bool, ) -> ( - BufferTracker, - HummockUploader, + SpawnUploadTask, impl Fn(HashMap>) -> (BoxFuture<'static, ()>, oneshot::Sender<()>), ) { - let gauge = GenericGauge::new("test", "test").unwrap(); - let buffer_tracker = BufferTracker::from_storage_opts(config, gauge); // (the started task send the imm ids of payload, the started task wait for finish notify) #[allow(clippy::type_complexity)] let task_notifier_holder: Arc< @@ -304,35 +295,51 @@ pub(crate) fn prepare_uploader_order_test( (await_start_future, finish_tx) } }; + let spawn_fn = Arc::new({ + move |_, task_info: UploadTaskInfo| { + let task_notifier_holder = task_notifier_holder.clone(); + let task_item = task_notifier_holder.lock().pop_back(); + let start_epoch = *task_info.epochs.last().unwrap(); + let end_epoch = *task_info.epochs.first().unwrap(); + assert!(end_epoch >= start_epoch); + spawn(async move { + let ssts = gen_sstable_info(start_epoch, end_epoch); + if !skip_schedule { + let (start_tx, finish_rx) = task_item.unwrap(); + start_tx.send(task_info).unwrap(); + finish_rx + .await + .map_err(|_| HummockError::other("failed to receive rx"))?; + } + Ok(UploadTaskOutput { + new_value_ssts: ssts, + old_value_ssts: vec![], + wait_poll_timer: None, + }) + }) + } + }); + (spawn_fn, new_task_notifier) +} + +#[expect(clippy::type_complexity)] +pub(crate) fn prepare_uploader_order_test( + config: &StorageOpts, + skip_schedule: bool, +) -> ( + BufferTracker, + HummockUploader, + impl Fn(HashMap>) -> (BoxFuture<'static, ()>, oneshot::Sender<()>), +) { + let (spawn_fn, new_task_notifier) = prepare_uploader_order_test_spawn_task_fn(skip_schedule); + let gauge = GenericGauge::new("test", "test").unwrap(); + let buffer_tracker = BufferTracker::from_storage_opts(config, gauge); - let config = StorageOpts::default(); let uploader = HummockUploader::new( Arc::new(HummockStateStoreMetrics::unused()), initial_pinned_version(), - Arc::new({ - move |_, task_info: UploadTaskInfo| { - let task_notifier_holder = task_notifier_holder.clone(); - let task_item = task_notifier_holder.lock().pop_back(); - let start_epoch = *task_info.epochs.last().unwrap(); - let end_epoch = *task_info.epochs.first().unwrap(); - assert!(end_epoch >= start_epoch); - spawn(async move { - let ssts = gen_sstable_info(start_epoch, end_epoch); - if !skip_schedule { - let (start_tx, finish_rx) = task_item.unwrap(); - start_tx.send(task_info).unwrap(); - finish_rx.await.unwrap(); - } - Ok(UploadTaskOutput { - new_value_ssts: ssts, - old_value_ssts: vec![], - wait_poll_timer: None, - }) - }) - } - }), + spawn_fn, buffer_tracker.clone(), - &config, ); (buffer_tracker, uploader, new_task_notifier) } diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index 3b4a143d0ad53..43bb08c44a0ba 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -513,11 +513,21 @@ impl HummockStorage { pub async fn clear_shared_buffer(&self) { let (tx, rx) = oneshot::channel(); self.hummock_event_sender - .send(HummockEvent::Clear(tx)) + .send(HummockEvent::Clear(tx, None)) .expect("should send success"); rx.await.expect("should wait success"); } + pub async fn clear_tables(&self, table_ids: HashSet) { + if !table_ids.is_empty() { + let (tx, rx) = oneshot::channel(); + self.hummock_event_sender + .send(HummockEvent::Clear(tx, Some(table_ids))) + .expect("should send success"); + rx.await.expect("should wait success"); + } + } + /// Declare the start of an epoch. This information is provided for spill so that the spill task won't /// include data of two or more syncs. // TODO: remove this method when we support spill task that can include data of more two or more syncs diff --git a/src/storage/src/opts.rs b/src/storage/src/opts.rs index 14500f7c5113e..e41e07dafc44a 100644 --- a/src/storage/src/opts.rs +++ b/src/storage/src/opts.rs @@ -140,6 +140,9 @@ pub struct StorageOpts { pub compactor_max_overlap_sst_count: usize, + /// The maximum number of meta files that can be preloaded. + pub compactor_max_preload_meta_file_count: usize, + pub object_store_config: ObjectStoreConfig, pub time_travel_version_cache_capacity: u64, } @@ -243,6 +246,7 @@ impl From<(&RwConfig, &SystemParamsReader, &StorageMemoryConfig)> for StorageOpt .compactor_concurrent_uploading_sst_count, time_travel_version_cache_capacity: c.storage.time_travel_version_cache_capacity, compactor_max_overlap_sst_count: c.storage.compactor_max_overlap_sst_count, + compactor_max_preload_meta_file_count: c.storage.compactor_max_preload_meta_file_count, } } } diff --git a/src/stream/src/executor/dispatch.rs b/src/stream/src/executor/dispatch.rs index 39f561633b8dd..609fed1be038f 100644 --- a/src/stream/src/executor/dispatch.rs +++ b/src/stream/src/executor/dispatch.rs @@ -787,6 +787,10 @@ impl Dispatcher for HashDataDispatcher { } if !visible { + assert!( + last_vnode_when_update_delete.is_none(), + "invisible row between U- and U+, op = {op:?}", + ); new_ops.push(op); continue; } @@ -797,7 +801,11 @@ impl Dispatcher for HashDataDispatcher { if op == Op::UpdateDelete { last_vnode_when_update_delete = Some(vnode); } else if op == Op::UpdateInsert { - if vnode != last_vnode_when_update_delete.unwrap() { + if vnode + != last_vnode_when_update_delete + .take() + .expect("missing U- before U+") + { new_ops.push(Op::Delete); new_ops.push(Op::Insert); } else { @@ -808,6 +816,10 @@ impl Dispatcher for HashDataDispatcher { new_ops.push(op); } } + assert!( + last_vnode_when_update_delete.is_none(), + "missing U+ after U-" + ); let ops = new_ops; diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index 0b5f6d6178e67..0bfdcdd59117c 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -12,9 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeSet, HashMap}; +use std::collections::BTreeSet; use std::fmt::Display; use std::future::pending; +use std::iter::once; use std::sync::Arc; use std::time::Duration; @@ -260,9 +261,6 @@ pub(super) struct LocalBarrierWorker { /// Current barrier collection state. pub(super) state: ManagedBarrierState, - /// Record all unexpected exited actors. - failure_actors: HashMap, - control_stream_handle: ControlStreamHandle, pub(super) actor_manager: Arc, @@ -272,9 +270,6 @@ pub(super) struct LocalBarrierWorker { barrier_event_rx: UnboundedReceiver, actor_failure_rx: UnboundedReceiver<(ActorId, StreamError)>, - - /// Cached result of [`Self::try_find_root_failure`]. - cached_root_failure: Option, } impl LocalBarrierWorker { @@ -289,14 +284,12 @@ impl LocalBarrierWorker { }, )); Self { - failure_actors: HashMap::default(), state: ManagedBarrierState::new(actor_manager.clone(), shared_context.clone()), control_stream_handle: ControlStreamHandle::empty(), actor_manager, current_shared_context: shared_context, barrier_event_rx: event_rx, actor_failure_rx: failure_rx, - cached_root_failure: None, } } @@ -543,19 +536,6 @@ impl LocalBarrierWorker { request.actor_ids_to_collect ); - for actor_id in &request.actor_ids_to_collect { - if self.failure_actors.contains_key(actor_id) { - // The failure actors could exit before the barrier is issued, while their - // up-downstream actors could be stuck somehow. Return error directly to trigger the - // recovery. - return Err(StreamError::barrier_send( - barrier.clone(), - *actor_id, - "actor has already failed", - )); - } - } - self.state.transform_to_issued(barrier, request)?; Ok(()) } @@ -596,8 +576,7 @@ impl LocalBarrierWorker { err: StreamError, err_context: &'static str, ) { - self.add_failure(actor_id, err.clone()); - let root_err = self.try_find_root_failure().await.unwrap(); // always `Some` because we just added one + let root_err = self.try_find_root_failure(err).await; if let Some(actor_state) = self.state.actor_states.get(&actor_id) && (!actor_state.inflight_barriers.is_empty() || actor_state.is_running()) @@ -616,10 +595,7 @@ impl LocalBarrierWorker { /// This is similar to [`Self::notify_actor_failure`], but since there's not always an actor failure, /// the given `err` will be used if there's no root failure found. async fn notify_other_failure(&mut self, err: StreamError, message: impl Into) { - let root_err = self - .try_find_root_failure() - .await - .unwrap_or_else(|| ScoredStreamError::new(err)); + let root_err = self.try_find_root_failure(err).await; self.control_stream_handle.reset_stream_with_err( anyhow!(root_err) @@ -628,40 +604,24 @@ impl LocalBarrierWorker { ); } - fn add_failure(&mut self, actor_id: ActorId, err: StreamError) { - if let Some(prev_err) = self.failure_actors.insert(actor_id, err) { - warn!( - actor_id, - prev_err = %prev_err.as_report(), - "actor error overwritten" - ); - } - } - /// Collect actor errors for a while and find the one that might be the root cause. /// /// Returns `None` if there's no actor error received. - async fn try_find_root_failure(&mut self) -> Option { - if self.cached_root_failure.is_some() { - return self.cached_root_failure.clone(); - } - + async fn try_find_root_failure(&mut self, first_err: StreamError) -> ScoredStreamError { + let mut later_errs = vec![]; // fetch more actor errors within a timeout let _ = tokio::time::timeout(Duration::from_secs(3), async { - while let Some((actor_id, error)) = self.actor_failure_rx.recv().await { - self.add_failure(actor_id, error); + while let Some((_, error)) = self.actor_failure_rx.recv().await { + later_errs.push(error); } }) .await; - // Find the error with highest score. - self.cached_root_failure = self - .failure_actors - .values() + once(first_err) + .chain(later_errs.into_iter()) .map(|e| ScoredStreamError::new(e.clone())) - .max_by_key(|e| e.score); - - self.cached_root_failure.clone() + .max_by_key(|e| e.score) + .expect("non-empty") } } diff --git a/src/tests/sqlsmith/src/sql_gen/scalar.rs b/src/tests/sqlsmith/src/sql_gen/scalar.rs index 62cd7218dcc90..a532f6138c596 100644 --- a/src/tests/sqlsmith/src/sql_gen/scalar.rs +++ b/src/tests/sqlsmith/src/sql_gen/scalar.rs @@ -81,11 +81,15 @@ impl SqlGenerator<'_, R> { data_type: AstDataType::SmallInt, value: self.gen_int(i16::MIN as isize, i16::MAX as isize), })), - T::Varchar => Expr::Value(Value::SingleQuotedString( - (0..10) - .map(|_| self.rng.sample(Alphanumeric) as char) - .collect(), - )), + T::Varchar => Expr::Cast { + // since we are generating random scalar literal, we should cast it to avoid unknown type + expr: Box::new(Expr::Value(Value::SingleQuotedString( + (0..10) + .map(|_| self.rng.sample(Alphanumeric) as char) + .collect(), + ))), + data_type: AstDataType::Varchar, + }, T::Decimal => Expr::Nested(Box::new(Expr::Value(Value::Number(self.gen_float())))), T::Float64 => Expr::Nested(Box::new(Expr::TypedString { data_type: AstDataType::Float(None),