From 9df0e732c5c997c7a00f367400bdec45643bdae8 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Wed, 27 Nov 2024 18:43:14 +0800 Subject: [PATCH] aws: switch to aws-sdk (#13814) (#17833) close tikv/tikv#12371 * switch kms to aws_sdk lib * switch s3 to aws_sdk lib Signed-off-by: ti-chi-bot Signed-off-by: 3pointer Co-authored-by: Andrey Koshchiy Co-authored-by: 3pointer Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- Cargo.lock | 1174 +++++++++++++++---- Cargo.toml | 11 +- components/cloud/aws/Cargo.toml | 27 +- components/cloud/aws/src/kms.rs | 326 ++--- components/cloud/aws/src/s3.rs | 1034 ++++++++-------- components/cloud/aws/src/util.rs | 284 +++-- components/sst_importer/src/sst_importer.rs | 48 +- components/tikv_util/Cargo.toml | 1 - components/tikv_util/src/stream.rs | 25 +- deny.toml | 12 +- 10 files changed, 1922 insertions(+), 1020 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0adf8afbbb9..b9747b3ddf2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -54,9 +54,9 @@ dependencies = [ [[package]] name = "ahash" -version = "0.8.7" +version = "0.8.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77c3a9648d43b9cd48db467b3f87fdd6e146bcc88ab0180006cef2179fe11d01" +checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" dependencies = [ "cfg-if 1.0.0", "once_cell", @@ -79,6 +79,15 @@ version = "0.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4f263788a35611fba42eb41ff811c5d0360c58b97402570312a350736e2542e" +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc 0.2.151", +] + [[package]] name = "ansi_term" version = "0.11.0" @@ -130,6 +139,17 @@ dependencies = [ "nodrop", ] +[[package]] +name = "assert-json-diff" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4259cbe96513d2f1073027a259fc2ca917feb3026a5a8d984e3628e490255cc0" +dependencies = [ + "extend", + "serde", + "serde_json", +] + [[package]] name = "async-channel" version = "1.6.1" @@ -248,29 +268,400 @@ name = "aws" version = "0.0.1" dependencies = [ "async-trait", + "aws-config", + "aws-credential-types", + "aws-sdk-kms", + "aws-sdk-s3", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", "base64 0.13.0", "bytes", "cloud", "fail", "futures 0.3.15", "futures-util", + "grpcio", + "http 0.2.12", + "hyper", + "hyper-tls", "kvproto", "md5", - "rusoto_core", - "rusoto_credential", - "rusoto_kms", - "rusoto_mock", - "rusoto_s3", - "rusoto_sts", + "prometheus", "slog", "slog-global", "thiserror", "tikv_util", "tokio", + "tokio-util", "url", "uuid 0.8.2", ] +[[package]] +name = "aws-config" +version = "1.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e95816a168520d72c0e7680c405a5a8c1fb6a035b4bc4b9d7b0de8e1a941697" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-sdk-sts", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand 2.0.1", + "http 0.2.12", + "time 0.3.20", + "tokio", + "tracing", + "url", +] + +[[package]] +name = "aws-credential-types" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60e8f6b615cb5fc60a98132268508ad104310f0cfb25a1c22eee76efdf9154da" +dependencies = [ + "aws-smithy-async", + "aws-smithy-runtime-api", + "aws-smithy-types", + "zeroize", +] + +[[package]] +name = "aws-runtime" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f42c2d4218de4dcd890a109461e2f799a1a2ba3bcd2cde9af88360f5df9266c6" +dependencies = [ + "aws-credential-types", + "aws-sigv4", + "aws-smithy-async", + "aws-smithy-eventstream", + "aws-smithy-http", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand 2.0.1", + "http 0.2.12", + "http-body 0.4.5", + "once_cell", + "percent-encoding", + "pin-project-lite", + "tracing", + "uuid 1.7.0", +] + +[[package]] +name = "aws-sdk-kms" +version = "1.40.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70ebbbc319551583b9233a74b359ede7349102e779fc12371d2478e80b50d218" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "http 0.2.12", + "once_cell", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sdk-s3" +version = "1.40.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8367c403fdf27690684b926a46ed9524099a69dd5dfcef62028bf4096b5b809f" +dependencies = [ + "ahash 0.8.11", + "aws-credential-types", + "aws-runtime", + "aws-sigv4", + "aws-smithy-async", + "aws-smithy-checksums", + "aws-smithy-eventstream", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-smithy-xml", + "aws-types", + "bytes", + "fastrand 2.0.1", + "hex 0.4.3", + "hmac", + "http 0.2.12", + "http-body 0.4.5", + "lru", + "once_cell", + "percent-encoding", + "regex-lite", + "sha2 0.10.8", + "tracing", + "url", +] + +[[package]] +name = "aws-sdk-sts" +version = "1.37.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e52dc3fd7dfa6c01a69cf3903e00aa467261639138a05b06cd92314d2c8fb07" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-query", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-smithy-xml", + "aws-types", + "http 0.2.12", + "once_cell", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sigv4" +version = "1.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc8db6904450bafe7473c6ca9123f88cc11089e41a025408f992db4e22d3be68" +dependencies = [ + "aws-credential-types", + "aws-smithy-eventstream", + "aws-smithy-http", + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", + "form_urlencoded", + "hex 0.4.3", + "hmac", + "http 0.2.12", + "http 1.1.0", + "once_cell", + "percent-encoding", + "sha2 0.10.8", + "time 0.3.20", + "tracing", +] + +[[package]] +name = "aws-smithy-async" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62220bc6e97f946ddd51b5f1361f78996e704677afc518a4ff66b7a72ea1378c" +dependencies = [ + "futures-util", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "aws-smithy-checksums" +version = "0.60.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "598b1689d001c4d4dc3cb386adb07d37786783aee3ac4b324bcadac116bf3d23" +dependencies = [ + "aws-smithy-http", + "aws-smithy-types", + "bytes", + "crc32c", + "crc32fast", + "hex 0.4.3", + "http 0.2.12", + "http-body 0.4.5", + "md-5", + "pin-project-lite", + "sha1", + "sha2 0.10.8", + "tracing", +] + +[[package]] +name = "aws-smithy-eventstream" +version = "0.60.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cef7d0a272725f87e51ba2bf89f8c21e4df61b9e49ae1ac367a6d69916ef7c90" +dependencies = [ + "aws-smithy-types", + "bytes", + "crc32fast", +] + +[[package]] +name = "aws-smithy-http" +version = "0.60.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c8bc3e8fdc6b8d07d976e301c02fe553f72a39b7a9fea820e023268467d7ab6" +dependencies = [ + "aws-smithy-eventstream", + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", + "bytes-utils", + "futures-core", + "http 0.2.12", + "http-body 0.4.5", + "once_cell", + "percent-encoding", + "pin-project-lite", + "pin-utils", + "tracing", +] + +[[package]] +name = "aws-smithy-json" +version = "0.60.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4683df9469ef09468dad3473d129960119a0d3593617542b7d52086c8486f2d6" +dependencies = [ + "aws-smithy-types", +] + +[[package]] +name = "aws-smithy-protocol-test" +version = "0.63.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b92b62199921f10685c6b588fdbeb81168ae4e7950ae3e5f50145a01bb5f1ad" +dependencies = [ + "assert-json-diff", + "aws-smithy-runtime-api", + "base64-simd", + "cbor-diag", + "ciborium", + "http 0.2.12", + "pretty_assertions", + "regex-lite", + "roxmltree", + "serde_json", + "thiserror", +] + +[[package]] +name = "aws-smithy-query" +version = "0.60.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2fbd61ceb3fe8a1cb7352e42689cec5335833cd9f94103a61e98f9bb61c64bb" +dependencies = [ + "aws-smithy-types", + "urlencoding", +] + +[[package]] +name = "aws-smithy-runtime" +version = "1.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a065c0fe6fdbdf9f11817eb68582b2ab4aff9e9c39e986ae48f7ec576c6322db" +dependencies = [ + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-protocol-test", + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", + "fastrand 2.0.1", + "h2", + "http 0.2.12", + "http-body 0.4.5", + "http-body 1.0.0", + "httparse", + "hyper", + "indexmap 2.0.1", + "once_cell", + "pin-project-lite", + "pin-utils", + "serde", + "serde_json", + "tokio", + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "aws-smithy-runtime-api" +version = "1.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e086682a53d3aa241192aa110fa8dfce98f2f5ac2ead0de84d41582c7e8fdb96" +dependencies = [ + "aws-smithy-async", + "aws-smithy-types", + "bytes", + "http 0.2.12", + "http 1.1.0", + "pin-project-lite", + "tokio", + "tracing", + "zeroize", +] + +[[package]] +name = "aws-smithy-types" +version = "1.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "147100a7bea70fa20ef224a6bad700358305f5dc0f84649c53769761395b355b" +dependencies = [ + "base64-simd", + "bytes", + "bytes-utils", + "futures-core", + "http 0.2.12", + "http 1.1.0", + "http-body 0.4.5", + "http-body 1.0.0", + "http-body-util", + "itoa 1.0.1", + "num-integer", + "pin-project-lite", + "pin-utils", + "ryu", + "serde", + "time 0.3.20", + "tokio", + "tokio-util", +] + +[[package]] +name = "aws-smithy-xml" +version = "0.60.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d123fbc2a4adc3c301652ba8e149bf4bc1d1725affb9784eb20c953ace06bf55" +dependencies = [ + "xmlparser", +] + +[[package]] +name = "aws-types" +version = "1.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5221b91b3e441e6675310829fd8984801b772cb1546ef6c0e54dec9f1ac13fef" +dependencies = [ + "aws-credential-types", + "aws-smithy-async", + "aws-smithy-runtime-api", + "aws-smithy-types", + "rustc_version 0.4.0", + "tracing", +] + [[package]] name = "azure" version = "0.0.1" @@ -435,7 +826,7 @@ dependencies = [ "futures 0.3.15", "futures-util", "grpcio", - "hex 0.4.2", + "hex 0.4.3", "keys", "kvproto", "lazy_static", @@ -480,7 +871,7 @@ dependencies = [ "file_system", "futures 0.3.15", "grpcio", - "hex 0.4.2", + "hex 0.4.3", "kvproto", "lazy_static", "log_wrappers", @@ -531,6 +922,16 @@ version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4a4ddaa51a5bc52a6948f74c06d20aaaddb71924eab79b8c97a8c556e942d6a" +[[package]] +name = "base64-simd" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "339abbe78e73178762e23bea9dfd08e697eb3f3301cd4be981c0f78ba5859195" +dependencies = [ + "outref", + "vsimd", +] + [[package]] name = "batch-system" version = "0.1.0" @@ -593,7 +994,7 @@ dependencies = [ "quote", "regex", "rustc-hash", - "shlex 1.3.0", + "shlex", "which", ] @@ -614,7 +1015,7 @@ dependencies = [ "quote", "regex", "rustc-hash", - "shlex 1.3.0", + "shlex", "syn 2.0.79", ] @@ -666,12 +1067,30 @@ dependencies = [ "generic-array", ] +[[package]] +name = "block-buffer" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" +dependencies = [ + "generic-array", +] + [[package]] name = "boolinator" version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cfa8873f51c92e232f9bac4065cddef41b714152812bfc5f7672ba16d6ef8cd9" +[[package]] +name = "bs58" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf88ba1141d185c399bee5288d850d63b8369520c1eafc32a0430b5b6c287bf4" +dependencies = [ + "tinyvec", +] + [[package]] name = "bstr" version = "0.2.8" @@ -704,11 +1123,18 @@ checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" [[package]] name = "bytes" -version = "1.0.1" +version = "1.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "428d9aa8fbc0670b7b8d6030a7fadd0f86151cae55e4dbbece15f3780a3dfaf3" + +[[package]] +name = "bytes-utils" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b700ce4376041dcd0a327fd0097c41095743c4c8af8887265942faf1100bd040" +checksum = "7dafe3a8757b027e2be6e4e5601ed563c55989fcf1546e933c66c8eb3a058d35" dependencies = [ - "serde", + "bytes", + "either", ] [[package]] @@ -791,6 +1217,25 @@ dependencies = [ "txn_types", ] +[[package]] +name = "cbor-diag" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc245b6ecd09b23901a4fbad1ad975701fd5061ceaef6afa93a2d70605a64429" +dependencies = [ + "bs58", + "chrono", + "data-encoding", + "half 2.4.1", + "nom 7.1.3", + "num-bigint", + "num-rational 0.4.1", + "num-traits", + "separator", + "url", + "uuid 1.7.0", +] + [[package]] name = "cc" version = "1.0.83" @@ -851,7 +1296,7 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766" dependencies = [ - "nom 7.1.0", + "nom 7.1.3", ] [[package]] @@ -868,10 +1313,11 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.20" +version = "0.4.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6127248204b9aba09a362f6c930ef6a78f2c1b2215f8a7b398c06e1083f17af0" +checksum = "bfd4d1b31faaa3a89d7934dbded3111da0d2ef28e3ebccdb4f0179f5929d1ef1" dependencies = [ + "iana-time-zone", "js-sys", "num-integer", "num-traits", @@ -891,6 +1337,33 @@ dependencies = [ "parse-zoneinfo", ] +[[package]] +name = "ciborium" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42e69ffd6f0917f5c029256a24d0161db17cea3997d185db0d35926308770f0e" +dependencies = [ + "ciborium-io", + "ciborium-ll", + "serde", +] + +[[package]] +name = "ciborium-io" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05afea1e0a06c9be33d539b876f1ce3692f4afea2cb41f740e7743225ed1c757" + +[[package]] +name = "ciborium-ll" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57663b653d948a338bfb3eeba9bb2fd5fcfaecb9e199e87e1eda4d9e8b240fd9" +dependencies = [ + "ciborium-io", + "half 2.4.1", +] + [[package]] name = "clang-sys" version = "1.1.1" @@ -1077,9 +1550,9 @@ dependencies = [ [[package]] name = "core-foundation-sys" -version = "0.8.2" +version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea221b5284a47e40033bf9b66f35f984ec0ea2931eb03505246cd27a963f981b" +checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" [[package]] name = "cpu-time" @@ -1091,6 +1564,15 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "cpufeatures" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53fe5e26ff1b7aef8bca9c6080520cfb8d9333c7568e1829cef191a9723e5504" +dependencies = [ + "libc 0.2.151", +] + [[package]] name = "cpuid-bool" version = "0.1.2" @@ -1099,20 +1581,20 @@ checksum = "8aebca1129a03dc6dc2b127edd729435bbc4a37e1d5f4d7513165089ceb02634" [[package]] name = "crc32c" -version = "0.6.4" +version = "0.6.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8f48d60e5b4d2c53d5c2b1d8a58c849a70ae5e5509b08a48d047e3b65714a74" +checksum = "3a47af21622d091a8f0fb295b88bc886ac74efcc613efc19f5d0b21de5c89e47" dependencies = [ "rustc_version 0.4.0", ] [[package]] name = "crc32fast" -version = "1.2.0" +version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba125de2af0df55319f41944744ad91c71113bf74a4646efff39afe1f6842db1" +checksum = "b3855a8a784b474f333699ef2bbca9db2c4a1f6d9088a90a2d25b1eb53111eaa" dependencies = [ - "cfg-if 0.1.10", + "cfg-if 1.0.0", ] [[package]] @@ -1255,6 +1737,12 @@ version = "0.8.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80" +[[package]] +name = "crunchy" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a81dae078cea95a014a339291cec439d2f232ebe854a9d672b796c6afafa9b7" + [[package]] name = "crypto" version = "0.0.1" @@ -1264,6 +1752,16 @@ dependencies = [ "slog-global", ] +[[package]] +name = "crypto-common" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" +dependencies = [ + "generic-array", + "typenum", +] + [[package]] name = "csv" version = "1.1.6" @@ -1369,6 +1867,12 @@ dependencies = [ "syn 1.0.103", ] +[[package]] +name = "diff" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56254986775e3233ffa9c4d7d3faaf6d36a2c09d30b20687e9f88bc8bafc16c8" + [[package]] name = "digest" version = "0.9.0" @@ -1378,6 +1882,17 @@ dependencies = [ "generic-array", ] +[[package]] +name = "digest" +version = "0.10.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" +dependencies = [ + "block-buffer 0.10.4", + "crypto-common", + "subtle", +] + [[package]] name = "dirs-next" version = "2.0.0" @@ -1443,7 +1958,7 @@ dependencies = [ "file_system", "futures 0.3.15", "futures-util", - "hex 0.4.2", + "hex 0.4.3", "kvproto", "lazy_static", "matches", @@ -1697,6 +2212,18 @@ dependencies = [ "coprocessor_plugin_api", ] +[[package]] +name = "extend" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f47da3a72ec598d9c8937a7ebca8962a5c7a1f28444e38c2b33c771ba3f55f05" +dependencies = [ + "proc-macro-error", + "proc-macro2", + "quote", + "syn 1.0.103", +] + [[package]] name = "external_storage" version = "0.0.1" @@ -1865,6 +2392,12 @@ version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2fad85553e09a6f881f739c29f0b00b0f01357c743266d478b68951ce23285f3" +[[package]] +name = "foldhash" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f81ec6369c545a7d40e4589b5597581fa1c441fe1cce96dd1de43159910a36a2" + [[package]] name = "foreign-types" version = "0.3.2" @@ -1882,11 +2415,10 @@ checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" [[package]] name = "form_urlencoded" -version = "1.0.1" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5fc25a87fa4fd2094bffb06925852034d90a17f0d1e05197d4956d3555752191" +checksum = "e13624c2627564efccf4934284bdd98cbaa14e79b0b5a141218e507b3a823456" dependencies = [ - "matches", "percent-encoding", ] @@ -2137,7 +2669,7 @@ dependencies = [ "cloud", "crc32c", "futures-util", - "http", + "http 0.2.12", "hyper", "hyper-tls", "kvproto", @@ -2289,7 +2821,7 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http", + "http 0.2.12", "indexmap 2.0.1", "slab", "tokio", @@ -2303,6 +2835,16 @@ version = "1.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eabb4a44450da02c90444cf74558da904edde8fb4e9035a9a6a4e15445af0bd7" +[[package]] +name = "half" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6dd08c532ae367adf81c312a4580bc67f1d0fe8bc9c460520283f4c0ff277888" +dependencies = [ + "cfg-if 1.0.0", + "crunchy", +] + [[package]] name = "hashbrown" version = "0.9.1" @@ -2315,8 +2857,19 @@ version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a" dependencies = [ - "ahash 0.8.7", + "ahash 0.8.11", + "allocator-api2", +] + +[[package]] +name = "hashbrown" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e087f84d4f86bf4b218b927129862374b72199ae7d8657835f1e89000eea4fb" +dependencies = [ "allocator-api2", + "equivalent", + "foldhash", ] [[package]] @@ -2377,9 +2930,18 @@ checksum = "805026a5d0141ffc30abb3be3173848ad46a1b1664fe632428479619a3644d77" [[package]] name = "hex" -version = "0.4.2" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" + +[[package]] +name = "hmac" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "644f9158b2f133fd50f5fb3242878846d9eb792e445c893805ff0e3824006e35" +checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" +dependencies = [ + "digest 0.10.7", +] [[package]] name = "home" @@ -2403,9 +2965,20 @@ dependencies = [ [[package]] name = "http" -version = "0.2.8" +version = "0.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75f43d41e26995c17e71ee126451dd3941010b0514a81a9d11f3b341debc2399" +checksum = "601cbb57e577e2f5ef5be8e7b83f0f63994f25aa94d673e54a92d5c516d101f1" +dependencies = [ + "bytes", + "fnv", + "itoa 1.0.1", +] + +[[package]] +name = "http" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" dependencies = [ "bytes", "fnv", @@ -2419,7 +2992,30 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" dependencies = [ "bytes", - "http", + "http 0.2.12", + "pin-project-lite", +] + +[[package]] +name = "http-body" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" +dependencies = [ + "bytes", + "http 1.1.0", +] + +[[package]] +name = "http-body-util" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0475f8b2ac86659c21b64320d5d653f9efe42acd2a4e560073ec61a155a34f1d" +dependencies = [ + "bytes", + "futures-core", + "http 1.1.0", + "http-body 1.0.0", "pin-project-lite", ] @@ -2488,17 +3084,17 @@ dependencies = [ [[package]] name = "hyper" -version = "0.14.23" +version = "0.14.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "034711faac9d2166cb1baf1a2fb0b60b1f277f8492fd72176c17f3515e1abd3c" +checksum = "bf96e135eb83a2a8ddf766e426a841d8ddd7449d5f00d34ea02b41d2f19eef80" dependencies = [ "bytes", "futures-channel", "futures-core", "futures-util", "h2", - "http", - "http-body", + "http 0.2.12", + "http-body 0.4.5", "httparse", "httpdate", "itoa 1.0.1", @@ -2516,7 +3112,7 @@ version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a9d52322a69f0a93f177d76ca82073fcec8d5b4eb6e28525d5b3142fa718195c" dependencies = [ - "http", + "http 0.2.12", "hyper", "linked_hash_set", "once_cell", @@ -2541,6 +3137,29 @@ dependencies = [ "tokio-native-tls", ] +[[package]] +name = "iana-time-zone" +version = "0.1.60" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7ffbb5a1b541ea2561f8c41c087286cc091e21e556a4f09a8f6cbf17b69b141" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "wasm-bindgen", + "windows-core", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + [[package]] name = "ident_case" version = "1.0.1" @@ -2549,11 +3168,10 @@ checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" [[package]] name = "idna" -version = "0.2.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02e2673c30ee86b5b96a9cb52ad15718aa1f966f5ab9ad54a8b95d5ca33120a9" +checksum = "634d9b1461af396cad843f47fdba5597a4f9e6ddd4bfb6ff5d85028c25cb12f6" dependencies = [ - "matches", "unicode-bidi", "unicode-normalization", ] @@ -2578,7 +3196,7 @@ dependencies = [ "engine_traits", "fail", "futures 0.3.15", - "hex 0.4.2", + "hex 0.4.3", "keys", "kvproto", "lazy_static", @@ -2628,6 +3246,7 @@ checksum = "ad227c3af19d4914570ad36d30409928b75967c298feb9ea1969db3a610bb14e" dependencies = [ "equivalent", "hashbrown 0.14.0", + "serde", ] [[package]] @@ -2981,7 +3600,7 @@ name = "log_wrappers" version = "0.0.1" dependencies = [ "atomic", - "hex 0.4.2", + "hex 0.4.3", "protobuf", "serde", "slog", @@ -2990,6 +3609,15 @@ dependencies = [ "toml", ] +[[package]] +name = "lru" +version = "0.12.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "234cf4f4a04dc1f57e24b96cc0cd600cf2af460d4161ac5ecdd0af8e1f3b2a38" +dependencies = [ + "hashbrown 0.15.0", +] + [[package]] name = "lz4-sys" version = "1.9.5" @@ -3011,12 +3639,31 @@ dependencies = [ "syn 1.0.103", ] +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata 0.1.8", +] + [[package]] name = "matches" version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08" +[[package]] +name = "md-5" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf" +dependencies = [ + "cfg-if 1.0.0", + "digest 0.10.7", +] + [[package]] name = "md5" version = "0.7.0" @@ -3288,13 +3935,12 @@ dependencies = [ [[package]] name = "nom" -version = "7.1.0" +version = "7.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b1d11e1ef389c76fe5b81bcaf2ea32cf88b62bc494e19f493d0b30e7a930109" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" dependencies = [ "memchr", "minimal-lexical", - "version_check 0.9.4", ] [[package]] @@ -3324,6 +3970,16 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi 0.3.9", +] + [[package]] name = "num" version = "0.3.0" @@ -3333,7 +3989,18 @@ dependencies = [ "num-complex", "num-integer", "num-iter", - "num-rational", + "num-rational 0.3.0", + "num-traits", +] + +[[package]] +name = "num-bigint" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "608e7659b5c3d7cba262d894801b9ec9d00de989e8a82bd4bef91d08da45cdc0" +dependencies = [ + "autocfg", + "num-integer", "num-traits", ] @@ -3410,6 +4077,18 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-rational" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0638a1c9d0a3c0914158145bc76cff373a75a627e6ecbfb71cbe6f453a5a19b0" +dependencies = [ + "autocfg", + "num-bigint", + "num-integer", + "num-traits", +] + [[package]] name = "num-traits" version = "0.2.17" @@ -3448,12 +4127,12 @@ dependencies = [ "base64 0.13.0", "chrono", "getrandom 0.2.11", - "http", + "http 0.2.12", "rand 0.8.5", "serde", "serde_json", "serde_path_to_error", - "sha2", + "sha2 0.9.1", "thiserror", "url", ] @@ -3576,6 +4255,18 @@ dependencies = [ "memchr", ] +[[package]] +name = "outref" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4030760ffd992bef45b0ae3f10ce1aba99e33464c90d14dd7c039884963ddc7a" + +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "page_size" version = "0.4.2" @@ -3702,9 +4393,9 @@ checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" [[package]] name = "percent-encoding" -version = "2.1.0" +version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" +checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" [[package]] name = "perfcnt" @@ -3893,6 +4584,16 @@ version = "0.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857" +[[package]] +name = "pretty_assertions" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af7cee1a6c8a5b9208b3cb1061f10c0cb689087b3d8ce85fb9d2dd7a29b6ba66" +dependencies = [ + "diff", + "yansi", +] + [[package]] name = "prettyplease" version = "0.2.6" @@ -3944,7 +4645,7 @@ checksum = "0941606b9934e2d98a3677759a971756eb821f75764d0e0d26946d08e74d9104" dependencies = [ "bitflags 1.3.2", "byteorder", - "hex 0.4.2", + "hex 0.4.3", "lazy_static", "libc 0.2.151", ] @@ -4014,7 +4715,7 @@ dependencies = [ "rand 0.8.5", "rand_chacha 0.3.0", "rand_xorshift", - "regex-syntax", + "regex-syntax 0.8.2", "rusty-fork", "tempfile", "unarray", @@ -4133,7 +4834,7 @@ dependencies = [ "fail", "fs2", "hashbrown 0.14.0", - "hex 0.4.2", + "hex 0.4.3", "if_chain", "lazy_static", "libc 0.2.151", @@ -4347,7 +5048,7 @@ checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ "libc 0.2.151", "rand_chacha 0.3.0", - "rand_core 0.6.2", + "rand_core 0.6.4", ] [[package]] @@ -4367,7 +5068,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e12735cf05c9e10bf21534da50a147b924d555dc7a547c42e6bb2d5b6017ae0d" dependencies = [ "ppv-lite86", - "rand_core 0.6.2", + "rand_core 0.6.4", ] [[package]] @@ -4396,9 +5097,9 @@ dependencies = [ [[package]] name = "rand_core" -version = "0.6.2" +version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34cf66eb183df1c5876e2dcf6b13d57340741e8dc255b48e40a26de954d06ae7" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" dependencies = [ "getrandom 0.2.11", ] @@ -4418,7 +5119,7 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fac4373cd91b4f55722c553fb0f286edbb81ef3ff6eec7b99d1898a4110a0b28" dependencies = [ - "rand_core 0.6.2", + "rand_core 0.6.4", ] [[package]] @@ -4427,7 +5128,7 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d25bf25ec5ae4a3f1b92f929810509a2f53d7dca2f50b794ff57e3face536c8f" dependencies = [ - "rand_core 0.6.2", + "rand_core 0.6.4", ] [[package]] @@ -4509,7 +5210,7 @@ dependencies = [ "aho-corasick", "memchr", "regex-automata 0.4.3", - "regex-syntax", + "regex-syntax 0.8.2", ] [[package]] @@ -4519,6 +5220,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "92b73c2a1770c255c240eaa4ee600df1704a38dc3feaa6e949e7fcd4f8dc09f9" dependencies = [ "byteorder", + "regex-syntax 0.6.29", + "utf8-ranges", ] [[package]] @@ -4529,9 +5232,21 @@ checksum = "5f804c7828047e88b2d32e2d7fe5a105da8ee3264f01902f796c8e067dc2483f" dependencies = [ "aho-corasick", "memchr", - "regex-syntax", + "regex-syntax 0.8.2", ] +[[package]] +name = "regex-lite" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30b661b2f27137bdbc16f00eda72866a92bb28af1753ffbd56744fb6e2e9cd8e" + +[[package]] +name = "regex-syntax" +version = "0.6.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" + [[package]] name = "regex-syntax" version = "0.8.2" @@ -4558,8 +5273,8 @@ dependencies = [ "encoding_rs 0.8.33", "futures-core", "futures-util", - "http", - "http-body", + "http 0.2.12", + "http-body 0.4.5", "hyper", "hyper-tls", "ipnet", @@ -4719,121 +5434,12 @@ dependencies = [ ] [[package]] -name = "rusoto_core" -version = "0.46.0" -source = "git+https://github.com/tikv/rusoto?branch=gh1482-s3-addr-styles#2b142c1792062a7a3a8317610d78dd141ab4223d" -dependencies = [ - "async-trait", - "base64 0.13.0", - "bytes", - "crc32fast", - "futures 0.3.15", - "http", - "hyper", - "hyper-tls", - "lazy_static", - "log", - "rusoto_credential", - "rusoto_signature", - "rustc_version 0.3.3", - "serde", - "serde_json", - "tokio", - "xml-rs", -] - -[[package]] -name = "rusoto_credential" -version = "0.46.0" -source = "git+https://github.com/tikv/rusoto?branch=gh1482-s3-addr-styles#2b142c1792062a7a3a8317610d78dd141ab4223d" -dependencies = [ - "async-trait", - "chrono", - "dirs-next", - "futures 0.3.15", - "hyper", - "serde", - "serde_json", - "shlex 0.1.1", - "tokio", - "zeroize", -] - -[[package]] -name = "rusoto_kms" -version = "0.46.0" -source = "git+https://github.com/tikv/rusoto?branch=gh1482-s3-addr-styles#2b142c1792062a7a3a8317610d78dd141ab4223d" -dependencies = [ - "async-trait", - "bytes", - "futures 0.3.15", - "rusoto_core", - "serde", - "serde_json", -] - -[[package]] -name = "rusoto_mock" -version = "0.46.0" -source = "git+https://github.com/tikv/rusoto?branch=gh1482-s3-addr-styles#2b142c1792062a7a3a8317610d78dd141ab4223d" -dependencies = [ - "async-trait", - "chrono", - "futures 0.3.15", - "http", - "rusoto_core", - "serde", - "serde_json", -] - -[[package]] -name = "rusoto_s3" -version = "0.46.0" -source = "git+https://github.com/tikv/rusoto?branch=gh1482-s3-addr-styles#2b142c1792062a7a3a8317610d78dd141ab4223d" -dependencies = [ - "async-trait", - "bytes", - "futures 0.3.15", - "rusoto_core", - "serde", - "serde_derive", - "xml-rs", -] - -[[package]] -name = "rusoto_signature" -version = "0.46.0" -source = "git+https://github.com/tikv/rusoto?branch=gh1482-s3-addr-styles#2b142c1792062a7a3a8317610d78dd141ab4223d" -dependencies = [ - "base64 0.13.0", - "bytes", - "chrono", - "futures 0.3.15", - "hex 0.4.2", - "http", - "hyper", - "log", - "openssl", - "percent-encoding", - "pin-project-lite", - "rusoto_credential", - "rustc_version 0.3.3", - "serde", - "tokio", -] - -[[package]] -name = "rusoto_sts" -version = "0.46.0" -source = "git+https://github.com/tikv/rusoto?branch=gh1482-s3-addr-styles#2b142c1792062a7a3a8317610d78dd141ab4223d" +name = "roxmltree" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "921904a62e410e37e215c40381b7117f830d9d89ba60ab5236170541dd25646b" dependencies = [ - "async-trait", - "bytes", - "chrono", - "futures 0.3.15", - "rusoto_core", - "serde_urlencoded", - "xml-rs", + "xmlparser", ] [[package]] @@ -4934,9 +5540,9 @@ dependencies = [ [[package]] name = "ryu" -version = "1.0.4" +version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed3d612bc64430efeb3f7ee6ef26d590dce0c43249217bddc62112540c7941e1" +checksum = "e86697c916019a8588c99b5fac3cead74ec0b4b819707a682fd4d23fa0ce1ba1" [[package]] name = "safemem" @@ -5060,6 +5666,12 @@ dependencies = [ "pest", ] +[[package]] +name = "separator" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f97841a747eef040fcd2e7b3b9a220a7205926e60488e673d9e4926d27772ce5" + [[package]] name = "serde" version = "1.0.194" @@ -5075,7 +5687,7 @@ version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e18acfa2f90e8b735b2836ab8d538de304cbb6729a7360729ea5a895d15a622" dependencies = [ - "half", + "half 1.8.2", "serde", ] @@ -5101,12 +5713,12 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.64" +version = "1.0.109" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "799e97dc9fdae36a5c8b8f2cae9ce2ee9fdce2058c57a93e6099d919fd982f79" +checksum = "cb0652c533506ad7a2e353cce269330d6afd8bdfb6d75e0ace5b35aacbd7b9e9" dependencies = [ - "indexmap 1.6.2", - "itoa 0.4.4", + "indexmap 2.0.1", + "itoa 1.0.1", "ryu", "serde", ] @@ -5240,19 +5852,41 @@ dependencies = [ "tikv_util", ] +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if 1.0.0", + "cpufeatures", + "digest 0.10.7", +] + [[package]] name = "sha2" version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2933378ddfeda7ea26f48c555bdad8bb446bf8a3d17832dc83e380d444cfb8c1" dependencies = [ - "block-buffer", + "block-buffer 0.9.0", "cfg-if 0.1.10", "cpuid-bool", - "digest", + "digest 0.9.0", "opaque-debug", ] +[[package]] +name = "sha2" +version = "0.10.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "793db75ad2bcafc3ffa7c68b215fee268f537982cd901d132f89c6343f3a3dc8" +dependencies = [ + "cfg-if 1.0.0", + "cpufeatures", + "digest 0.10.7", +] + [[package]] name = "sharded-slab" version = "0.1.7" @@ -5262,12 +5896,6 @@ dependencies = [ "lazy_static", ] -[[package]] -name = "shlex" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fdf1b9db47230893d76faad238fd6097fd6d6a9245cd7a4d90dbd639536bbd2" - [[package]] name = "shlex" version = "1.3.0" @@ -5372,9 +6000,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.11.1" +version = "1.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "942b4a808e05215192e39f4ab80813e599068285906cc91aa64f923db842bd5a" +checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" [[package]] name = "smartstring" @@ -5601,6 +6229,12 @@ dependencies = [ "syn 2.0.79", ] +[[package]] +name = "subtle" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc" + [[package]] name = "symbolic-common" version = "12.8.0" @@ -5681,7 +6315,7 @@ dependencies = [ "bytes", "chrono", "futures-util", - "http", + "http 0.2.12", "percent-encoding", "pin-utils", "serde", @@ -5697,7 +6331,7 @@ version = "0.9.6" source = "git+https://github.com/tikv/tame-oauth?branch=fips-0.9#487e287c0d316b832dc44735cd9b7f7c432a10aa" dependencies = [ "data-encoding", - "http", + "http 0.2.12", "lock_api", "openssl", "parking_lot 0.11.1", @@ -6108,22 +6742,22 @@ checksum = "b1141d4d61095b28419e22cb0bbf02755f5e54e0526f97f1e3d1d160e60885fb" [[package]] name = "thiserror" -version = "1.0.30" +version = "1.0.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "854babe52e4df1653706b98fcfc05843010039b406875930a70e4d9644e5c417" +checksum = "978c9a314bd8dc99be594bc3c175faaa9794be04a5a5e153caba6915336cebac" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.30" +version = "1.0.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa32fd3f627f367fe16f893e2597ae3c05020f8bba2666a4e6ea73d377e5714b" +checksum = "f9456a42c5b0d803c8cd86e73dd7cc9edd429499f37a3550d286d5e86720569f" dependencies = [ "proc-macro2", "quote", - "syn 1.0.103", + "syn 2.0.79", ] [[package]] @@ -6202,12 +6836,12 @@ dependencies = [ "criterion", "encoding_rs 0.8.29", "error_code", - "hex 0.4.2", + "hex 0.4.3", "kvproto", "lazy_static", "log_wrappers", "match-template", - "nom 7.1.0", + "nom 7.1.3", "num", "num-derive 0.3.0", "num-traits", @@ -6267,7 +6901,7 @@ dependencies = [ "crypto", "file_system", "flate2", - "hex 0.4.2", + "hex 0.4.3", "log_wrappers", "match-template", "num", @@ -6331,8 +6965,8 @@ dependencies = [ "grpcio", "grpcio-health", "health_controller", - "hex 0.4.2", - "http", + "hex 0.4.3", + "http 0.2.12", "hybrid_engine", "hyper", "hyper-openssl", @@ -6434,7 +7068,7 @@ dependencies = [ "futures 0.3.15", "gag", "grpcio", - "hex 0.4.2", + "hex 0.4.3", "keys", "kvproto", "log", @@ -6581,7 +7215,7 @@ dependencies = [ "gag", "grpcio", "heck 0.3.1", - "http", + "http 0.2.12", "kvproto", "lazy_static", "libc 0.2.151", @@ -6602,7 +7236,6 @@ dependencies = [ "protobuf", "rand 0.8.5", "regex", - "rusoto_core", "serde", "serde_json", "slog", @@ -6674,6 +7307,21 @@ dependencies = [ "serde_json", ] +[[package]] +name = "tinyvec" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87cc5ceb3875bb20c2890005a4e226a4651264a5c75edb2421b52861a0a0cb50" +dependencies = [ + "tinyvec_macros", +] + +[[package]] +name = "tinyvec_macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" + [[package]] name = "tipb" version = "0.0.1" @@ -6856,6 +7504,28 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-serde" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc6b213177105856957181934e4920de57730fc69bf42c37ee5bb664d406d9e1" +dependencies = [ + "serde", + "tracing-core", ] [[package]] @@ -6864,10 +7534,19 @@ version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" dependencies = [ + "matchers", + "nu-ansi-term", + "once_cell", + "regex", + "serde", + "serde_json", "sharded-slab", "smallvec", "thread_local", + "tracing", "tracing-core", + "tracing-log", + "tracing-serde", ] [[package]] @@ -6956,12 +7635,9 @@ checksum = "eeba86d422ce181a719445e51872fa30f1f7413b62becb52e95ec91aa262d85c" [[package]] name = "unicode-bidi" -version = "0.3.4" +version = "0.3.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49f2bd0c6468a8230e1db229cff8029217cf623c767ea5d60bfbd42729ea54d5" -dependencies = [ - "matches", -] +checksum = "08f95100a766bf4f8f28f90d77e0a5461bbdb219042e7679bebe79004fed8d75" [[package]] name = "unicode-ident" @@ -6971,11 +7647,11 @@ checksum = "6ceab39d59e4c9499d4e5a8ee0e2735b891bb7308ac83dfb4e80cad195c9f6f3" [[package]] name = "unicode-normalization" -version = "0.1.12" +version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5479532badd04e128284890390c1e876ef7a993d0570b3597ae43dfa1d59afa4" +checksum = "a56d1686db2308d901306f92a263857ef59ea39678a5458e7cb17f01415101f5" dependencies = [ - "smallvec", + "tinyvec", ] [[package]] @@ -6998,17 +7674,28 @@ checksum = "f962df74c8c05a667b5ee8bcf162993134c104e96440b663c8daa176dc772d8c" [[package]] name = "url" -version = "2.2.2" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a507c383b2d33b5fc35d1861e77e6b383d158b2da5e14fe51b83dfedf6fd578c" +checksum = "31e6302e3bb753d46e83516cae55ae196fc0c309407cf11ab35cc51a4c2a4633" dependencies = [ "form_urlencoded", "idna", - "matches", "percent-encoding", "serde", ] +[[package]] +name = "urlencoding" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" + +[[package]] +name = "utf8-ranges" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fcfc827f90e53a02eaef5e535ee14266c1d569214c6aa70133a624d8a3164ba" + [[package]] name = "uuid" version = "0.8.2" @@ -7034,6 +7721,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0fb139b14473e1350e34439c888e44c805f37b4293d17f87ea920a66a20a99a" +[[package]] +name = "valuable" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" + [[package]] name = "vcpkg" version = "0.2.11" @@ -7058,6 +7751,12 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "vsimd" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c3082ca00d5a5ef149bb8b555a72ae84c9c59f7250f013ac822ac2e49b19c64" + [[package]] name = "wait-timeout" version = "0.2.0" @@ -7238,6 +7937,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows-core" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" +dependencies = [ + "windows-targets 0.52.0", +] + [[package]] name = "windows-sys" version = "0.32.0" @@ -7511,10 +8219,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "213b7324336b53d2414b2db8537e56544d981803139155afa84f76eeebb7a546" [[package]] -name = "xml-rs" -version = "0.8.0" +name = "xmlparser" +version = "0.13.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66fee0b777b0f5ac1c69bb06d361268faafa61cd4682ae064a171c16c433e9e4" + +[[package]] +name = "yansi" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "541b12c998c5b56aa2b4e6f18f03664eef9a4fd0a246a55594efae6cc2d964b5" +checksum = "09041cd90cf85f7f8b2df60c646f853b7f535ce68f85244eb6731cf89fa498ec" [[package]] name = "yatp" @@ -7555,9 +8269,9 @@ dependencies = [ [[package]] name = "zeroize" -version = "1.1.0" +version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3cbac2ed2ba24cc90f5e06485ac8c7c1e5449fe8911aef4d8877218af021a5b8" +checksum = "525b4ec142c6b68a2d10f01f7bbf6755599ca3f81ea53b8431b7dd348f5fdb2d" [[package]] name = "zipf" diff --git a/Cargo.toml b/Cargo.toml index 5a282e7bde3..fe1350100d1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -204,19 +204,10 @@ raft-proto = { git = "https://github.com/tikv/raft-rs", branch = "master" } protobuf = { git = "https://github.com/pingcap/rust-protobuf", branch = "v2.8" } protobuf-codegen = { git = "https://github.com/pingcap/rust-protobuf", branch = "v2.8" } -# TODO: remove this replacement after rusoto_s3 truly supports virtual-host style (https://github.com/rusoto/rusoto/pull/1823). -# UPDATE: use openssl for signature to support fips 140 -rusoto_core = { git = "https://github.com/tikv/rusoto", branch = "gh1482-s3-addr-styles" } -rusoto_credential = { git = "https://github.com/tikv/rusoto", branch = "gh1482-s3-addr-styles" } -rusoto_kms = { git = "https://github.com/tikv/rusoto", branch = "gh1482-s3-addr-styles" } -rusoto_mock = { git = "https://github.com/tikv/rusoto", branch = "gh1482-s3-addr-styles" } -rusoto_s3 = { git = "https://github.com/tikv/rusoto", branch = "gh1482-s3-addr-styles" } -rusoto_sts = { git = "https://github.com/tikv/rusoto", branch = "gh1482-s3-addr-styles" } +snappy-sys = { git = "https://github.com/tikv/rust-snappy.git", branch = "static-link" } # NOTICE: use openssl for signature to support fips 140 tame-oauth = { git = "https://github.com/tikv/tame-oauth", branch = "fips-0.9" } -snappy-sys = { git = "https://github.com/tikv/rust-snappy.git", branch = "static-link" } - # remove this when https://github.com/danburkert/fs2-rs/pull/42 is merged. fs2 = { git = "https://github.com/tikv/fs2-rs", branch = "tikv" } diff --git a/components/cloud/aws/Cargo.toml b/components/cloud/aws/Cargo.toml index 15c1fb11744..ce7e15d13c4 100644 --- a/components/cloud/aws/Cargo.toml +++ b/components/cloud/aws/Cargo.toml @@ -10,6 +10,18 @@ failpoints = ["fail/failpoints"] [dependencies] async-trait = "0.1" + +aws-config = { version = "1", features = [], default-features = false } +aws-credential-types = { version = "1", features = ["hardcoded-credentials"] } +# Note: sts@1.40.0, s3@1.47.0 and kms@1.41.0 is the latest version that supports rustc 1.77... +# We may update this after we update our rustc. +aws-sdk-kms = { version = "=1.40.0", features = [], default-features = false } +aws-sdk-s3 = { version = "=1.40.0", features = ["rt-tokio"], default-features = false } + +aws-smithy-runtime = { version = "1", features = [ "client", "connector-hyper-0-14-x" ], default-features = false } +aws-smithy-runtime-api = { version = "1", features = [], default-features = false } +aws-smithy-types = { version = "1", features = ["byte-stream-poll-next"] } + base64 = "0.13.0" bytes = "1.0" cloud = { workspace = true } @@ -19,22 +31,25 @@ futures-util = { version = "0.3", default-features = false, features = ["io"] } # This is only a dependency to vendor openssl for rusoto. It's not clear exactly # how openssl is built for tikv, but it seems to be controlled by grpcio. This # makes `cargo test -p aws` link correctly. +grpcio = { workspace = true } +http = "0.2.0" +hyper = "0.14" +hyper-tls = { version = "0.5" } kvproto = { workspace = true } md5 = "0.7.0" -rusoto_core = "0.46.0" -rusoto_credential = "0.46.0" -rusoto_kms = { version = "0.46.0", features = ["serialize_structs"] } -rusoto_s3 = { version = "0.46.0", features = ["serialize_structs"] } -rusoto_sts = "0.46.0" +prometheus = { version = "0.13", default-features = false, features = ["nightly"] } slog = { workspace = true } slog-global = { workspace = true } thiserror = "1.0" tikv_util = { workspace = true } # better to not use slog-global, but pass in the logger tokio = { version = "1.5", features = ["time"] } +tokio-util = { version = "0.7" } url = "2.0" uuid = { version = "0.8", features = ["v4"] } [dev-dependencies] +aws-smithy-runtime = { version = "1.4.0", features = ["test-util", "client"] } +base64 = "0.13" futures = "0.3" -rusoto_mock = "0.46.0" +tokio = { version = "1.5", features = ["macros"] } diff --git a/components/cloud/aws/src/kms.rs b/components/cloud/aws/src/kms.rs index 560a27ed0b7..932b571fb7b 100644 --- a/components/cloud/aws/src/kms.rs +++ b/components/cloud/aws/src/kms.rs @@ -3,24 +3,29 @@ use std::ops::Deref; use async_trait::async_trait; +use aws_config::BehaviorVersion; +use aws_credential_types::provider::{error::CredentialsError, ProvideCredentials}; +use aws_sdk_kms::{ + operation::{decrypt::DecryptError, generate_data_key::GenerateDataKeyError}, + primitives::Blob, + types::DataKeySpec, + Client, +}; +use aws_sdk_s3::config::HttpClient; use cloud::{ error::{Error, KmsError, OtherError, Result}, kms::{Config, CryptographyType, DataKeyPair, EncryptedKey, KeyId, KmsProvider, PlainKey}, }; -use rusoto_core::{request::DispatchSignedRequest, RusotoError}; -use rusoto_credential::{AwsCredentials, ProvideAwsCredentials, StaticProvider}; -use rusoto_kms::{ - DecryptError, DecryptRequest, GenerateDataKeyError, GenerateDataKeyRequest, Kms, KmsClient, -}; -use tikv_util::stream::RetryError; +use futures::executor::block_on; -use crate::util; +use crate::util::{self, is_retryable, SdkError}; + +const AWS_KMS_DATA_KEY_SPEC: DataKeySpec = DataKeySpec::Aes256; -const AWS_KMS_DATA_KEY_SPEC: &str = "AES_256"; pub const ENCRYPTION_VENDOR_NAME_AWS_KMS: &str = "AWS"; pub struct AwsKms { - client: KmsClient, + client: Client, current_key_id: KeyId, region: String, endpoint: String, @@ -40,20 +45,30 @@ impl std::fmt::Debug for AwsKms { } impl AwsKms { - fn new_with_creds_dispatcher( + fn new_with_creds_client( config: Config, - dispatcher: Dispatcher, + client: Http, credentials_provider: Creds, ) -> Result where - Creds: ProvideAwsCredentials + Send + Sync + 'static, - Dispatcher: DispatchSignedRequest + Send + Sync + 'static, + Http: HttpClient + 'static, + Creds: ProvideCredentials + 'static, { - let region = util::get_region( - config.location.region.as_ref(), - config.location.endpoint.as_ref(), + let mut loader = aws_config::defaults(BehaviorVersion::latest()) + .credentials_provider(credentials_provider) + .http_client(client); + + loader = util::configure_region( + loader, + &config.location.region, + !config.location.endpoint.is_empty(), )?; - let client = KmsClient::new_with(dispatcher, credentials_provider, region); + + loader = util::configure_endpoint(loader, &config.location.endpoint); + + let sdk_config = block_on(loader.load()); + let client = Client::new(&sdk_config); + Ok(AwsKms { client, current_key_id: config.key_id, @@ -63,7 +78,8 @@ impl AwsKms { } pub fn new(config: Config) -> Result { - let dispatcher = util::new_http_client()?; + let client = util::new_http_client(); + let creds = util::new_credentials_provider(client.clone()); match config.aws.as_ref() { Some(aws_config) => { if let (Some(access_key), Some(secret_access_key)) = ( @@ -71,24 +87,24 @@ impl AwsKms { aws_config.secret_access_key.clone(), ) { // Use provided AWS credentials - let credentials = AwsCredentials::new( + let credentials = aws_credential_types::Credentials::new( access_key, secret_access_key, None, // session token None, // expiration + "user-provided", ); - let static_provider = StaticProvider::from(credentials); - Self::new_with_creds_dispatcher(config, dispatcher, static_provider) + let static_provider = + aws_credential_types::provider::SharedCredentialsProvider::new(credentials); + Self::new_with_creds_client(config, client, static_provider) } else { // Fall back to default credentials provider - let provider = util::CredentialsProvider::new()?; - Self::new_with_creds_dispatcher(config, dispatcher, provider) + Self::new_with_creds_client(config, client, creds) } } None => { // No AWS config provided, use default credentials provider - let provider = util::CredentialsProvider::new()?; - Self::new_with_creds_dispatcher(config, dispatcher, provider) + Self::new_with_creds_client(config, client, creds) } } } @@ -103,38 +119,27 @@ impl KmsProvider for AwsKms { // On decrypt failure, the rule is to return WrongMasterKey error in case it is // possible that a wrong master key has been used, or other error otherwise. async fn decrypt_data_key(&self, data_key: &EncryptedKey) -> Result> { - let decrypt_request = DecryptRequest { - ciphertext_blob: bytes::Bytes::copy_from_slice(data_key), - // Use default algorithm SYMMETRIC_DEFAULT. - encryption_algorithm: None, - // Use key_id encoded in ciphertext. - key_id: Some(self.current_key_id.deref().clone()), - // Encryption context and grant tokens are not used. - encryption_context: None, - grant_tokens: None, - }; self.client - .decrypt(decrypt_request.clone()) + .decrypt() + .ciphertext_blob(Blob::new(data_key.clone().into_inner())) + .key_id(self.current_key_id.deref().clone()) + .send() .await .map_err(classify_decrypt_error) - .map(|response| response.plaintext.unwrap().as_ref().to_vec()) + .map(|response| response.plaintext().unwrap().as_ref().to_vec()) } async fn generate_data_key(&self) -> Result { - let generate_request = GenerateDataKeyRequest { - encryption_context: None, - grant_tokens: None, - key_id: self.current_key_id.deref().clone(), - key_spec: Some(AWS_KMS_DATA_KEY_SPEC.to_owned()), - number_of_bytes: None, - }; self.client - .generate_data_key(generate_request) + .generate_data_key() + .key_id(self.current_key_id.deref().clone()) + .key_spec(AWS_KMS_DATA_KEY_SPEC) + .send() .await .map_err(classify_generate_data_key_error) .and_then(|response| { - let ciphertext_key = response.ciphertext_blob.unwrap().as_ref().to_vec(); - let plaintext_key = response.plaintext.unwrap().as_ref().to_vec(); + let ciphertext_key = response.ciphertext_blob().unwrap().as_ref().to_vec(); + let plaintext_key = response.plaintext().unwrap().as_ref().to_vec(); Ok(DataKeyPair { encrypted: EncryptedKey::new(ciphertext_key)?, plaintext: PlainKey::new(plaintext_key, CryptographyType::AesGcm256)?, @@ -143,67 +148,52 @@ impl KmsProvider for AwsKms { } } -// Rusoto errors Display implementation just gives the cause message and -// discards the type. This is really bad when the cause message is empty! -// Use Debug instead: this will show both -pub struct FixRusotoErrorDisplay( - RusotoError, -); -impl std::fmt::Debug for FixRusotoErrorDisplay { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:?}", self.0) - } -} -impl std::fmt::Display for FixRusotoErrorDisplay { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:?}", self.0) - } -} -impl std::error::Error for FixRusotoErrorDisplay {} - -fn classify_generate_data_key_error(err: RusotoError) -> Error { - if let RusotoError::Service(e) = &err { - match &e { - GenerateDataKeyError::NotFound(_) => Error::ApiNotFound(err.into()), - GenerateDataKeyError::InvalidKeyUsage(_) => { +fn classify_generate_data_key_error(err: SdkError) -> Error { + if let SdkError::ServiceError(service_err) = &err { + match &service_err.err() { + GenerateDataKeyError::NotFoundException(_) => Error::ApiNotFound(err.into()), + GenerateDataKeyError::InvalidKeyUsageException(_) => { Error::KmsError(KmsError::Other(OtherError::from_box(err.into()))) } - GenerateDataKeyError::DependencyTimeout(_) => Error::ApiTimeout(err.into()), - GenerateDataKeyError::KMSInternal(_) => Error::ApiInternal(err.into()), - _ => Error::KmsError(KmsError::Other(OtherError::from_box( - FixRusotoErrorDisplay(err).into(), - ))), + GenerateDataKeyError::DependencyTimeoutException(_) => Error::ApiTimeout(err.into()), + GenerateDataKeyError::KmsInternalException(_) => Error::ApiInternal(err.into()), + _ => Error::KmsError(KmsError::Other(OtherError::from_box(err.into()))), } } else { classify_error(err) } } -fn classify_decrypt_error(err: RusotoError) -> Error { - if let RusotoError::Service(e) = &err { - match &e { - DecryptError::IncorrectKey(_) | DecryptError::NotFound(_) => { +fn classify_decrypt_error(err: SdkError) -> Error { + if let SdkError::ServiceError(service_err) = &err { + match &service_err.err() { + DecryptError::IncorrectKeyException(_) | DecryptError::NotFoundException(_) => { Error::KmsError(KmsError::WrongMasterKey(err.into())) } - DecryptError::DependencyTimeout(_) => Error::ApiTimeout(err.into()), - DecryptError::KMSInternal(_) => Error::ApiInternal(err.into()), - _ => Error::KmsError(KmsError::Other(OtherError::from_box( - FixRusotoErrorDisplay(err).into(), - ))), + DecryptError::DependencyTimeoutException(_) => Error::ApiTimeout(err.into()), + DecryptError::KmsInternalException(_) => Error::ApiInternal(err.into()), + _ => Error::KmsError(KmsError::Other(OtherError::from_box(err.into()))), } } else { classify_error(err) } } -fn classify_error(err: RusotoError) -> Error { +fn classify_error(err: SdkError) -> Error { match &err { - RusotoError::HttpDispatch(_) => Error::ApiTimeout(err.into()), - RusotoError::Credentials(_) => Error::ApiAuthentication(err.into()), - e if e.is_retryable() => Error::ApiInternal(err.into()), - _ => Error::KmsError(KmsError::Other(OtherError::from_box( - FixRusotoErrorDisplay(err).into(), - ))), + SdkError::DispatchFailure(dispatch_failure) => { + let maybe_credentials_err = dispatch_failure + .as_connector_error() + .and_then(|connector_err| std::error::Error::source(connector_err)) + .filter(|src_err| src_err.is::()); + if maybe_credentials_err.is_some() { + Error::ApiAuthentication(err.into()) + } else { + Error::ApiTimeout(err.into()) + } + } + e if is_retryable(e) => Error::ApiInternal(err.into()), + _ => Error::KmsError(KmsError::Other(OtherError::from_box(err.into()))), } } @@ -223,11 +213,11 @@ impl std::fmt::Debug for KmsClientDebug { #[cfg(test)] mod tests { - // use rusoto_mock::MockRequestDispatcher; + use aws_sdk_kms::config::Credentials; + use aws_smithy_runtime::client::http::test_util::{ReplayEvent, StaticReplayClient}; + use aws_smithy_types::body::SdkBody; use cloud::kms::Location; - use rusoto_credential::StaticProvider; - use rusoto_kms::{DecryptResponse, GenerateDataKeyResponse}; - use rusoto_mock::MockRequestDispatcher; + use http::Uri; use super::*; @@ -239,7 +229,7 @@ mod tests { key_id: KeyId::new("test_key_id".to_string()).unwrap(), vendor: String::new(), location: Location { - region: "ap-southeast-2".to_string(), + region: "cn-north-1".to_string(), endpoint: String::new(), }, azure: None, @@ -247,36 +237,68 @@ mod tests { aws: None, }; - let dispatcher = - MockRequestDispatcher::with_status(200).with_json_body(GenerateDataKeyResponse { - ciphertext_blob: Some(magic_contents.as_ref().into()), - key_id: Some("test_key_id".to_string()), - plaintext: Some(key_contents.clone().into()), - }); - let credentials_provider = - StaticProvider::new_minimal("abc".to_string(), "xyz".to_string()); - let aws_kms = AwsKms::new_with_creds_dispatcher( - config.clone(), - dispatcher, - credentials_provider.clone(), - ) - .unwrap(); + let resp = format!( + "{{\"KeyId\": \"test_key_id\", \"Plaintext\": \"{}\", \"CiphertextBlob\": \"{}\" }}", + base64::encode(key_contents.clone()), + base64::encode(magic_contents) + ); + + let client = StaticReplayClient::new(vec![ReplayEvent::new( + http::Request::builder() + .method("POST") + .uri(Uri::from_static("https://kms.cn-north-1.amazonaws.com.cn/")) + .body(SdkBody::from( + "{\"KeyId\":\"test_key_id\",\"KeySpec\":\"AES_256\"}", + )) + .unwrap(), + http::Response::builder() + .status(200) + .body(SdkBody::from(resp)) + .unwrap(), + )]); + + let creds = Credentials::from_keys("abc", "xyz", None); + + let aws_kms = + AwsKms::new_with_creds_client(config.clone(), client.clone(), creds.clone()).unwrap(); + let data_key = aws_kms.generate_data_key().await.unwrap(); + assert_eq!( data_key.encrypted, EncryptedKey::new(magic_contents.to_vec()).unwrap() ); assert_eq!(*data_key.plaintext, key_contents); - let dispatcher = MockRequestDispatcher::with_status(200).with_json_body(DecryptResponse { - plaintext: Some(key_contents.clone().into()), - key_id: Some("test_key_id".to_string()), - encryption_algorithm: None, - }); - let aws_kms = - AwsKms::new_with_creds_dispatcher(config, dispatcher, credentials_provider).unwrap(); + client.assert_requests_match(&[]); + + let req = format!( + "{{\"KeyId\":\"test_key_id\",\"CiphertextBlob\":\"{}\"}}", + base64::encode(data_key.encrypted.clone().into_inner()) + ); + + let resp = format!( + "{{\"KeyId\": \"test_key_id\", \"Plaintext\": \"{}\", \"EncryptionAlgorithm\": \"SYMMETRIC_DEFAULT\" }}", + base64::encode(key_contents.clone()), + ); + + let client = StaticReplayClient::new(vec![ReplayEvent::new( + http::Request::builder() + .uri(Uri::from_static("https://kms.cn-north-1.amazonaws.com.cn/")) + .body(SdkBody::from(req)) + .unwrap(), + http::Response::builder() + .status(200) + .body(SdkBody::from(resp)) + .unwrap(), + )]); + + let aws_kms = AwsKms::new_with_creds_client(config, client.clone(), creds).unwrap(); + let plaintext = aws_kms.decrypt_data_key(&data_key.encrypted).await.unwrap(); assert_eq!(plaintext, key_contents); + + client.assert_requests_match(&[]); } #[tokio::test] @@ -285,7 +307,7 @@ mod tests { key_id: KeyId::new("test_key_id".to_string()).unwrap(), vendor: String::new(), location: Location { - region: "ap-southeast-2".to_string(), + region: "cn-north-1".to_string(), endpoint: String::new(), }, azure: None, @@ -293,28 +315,70 @@ mod tests { aws: None, }; + let enc_key = EncryptedKey::new(b"invalid".to_vec()).unwrap(); + + let req = format!( + "{{\"KeyId\":\"test_key_id\",\"CiphertextBlob\":\"{}\"}}", + base64::encode(enc_key.clone().into_inner()) + ); + // IncorrectKeyException // // HTTP Status Code: 400 // Json, see: - // https://github.com/rusoto/rusoto/blob/mock-v0.43.0/rusoto/services/kms/src/generated.rs#L1970 - // https://github.com/rusoto/rusoto/blob/mock-v0.43.0/rusoto/core/src/proto/json/error.rs#L7 // https://docs.aws.amazon.com/kms/latest/APIReference/API_Decrypt.html#API_Decrypt_Errors - let dispatcher = MockRequestDispatcher::with_status(400).with_body( - r#"{ - "__type": "IncorrectKeyException", - "Message": "mock" - }"#, - ); - let credentials_provider = - StaticProvider::new_minimal("abc".to_string(), "xyz".to_string()); - let aws_kms = - AwsKms::new_with_creds_dispatcher(config, dispatcher, credentials_provider).unwrap(); - let enc_key = EncryptedKey::new(b"invalid".to_vec()).unwrap(); + let client = StaticReplayClient::new(vec![ReplayEvent::new( + http::Request::builder() + .uri(Uri::from_static("https://kms.cn-north-1.amazonaws.com.cn/")) + .body(SdkBody::from(req)) + .unwrap(), + http::Response::builder() + .status(400) + .body(SdkBody::from( + r#"{ + "__type": "IncorrectKeyException", + "Message": "mock" + }"#, + )) + .unwrap(), + )]); + + let creds = Credentials::from_keys("abc", "xyz", None); + + let aws_kms = AwsKms::new_with_creds_client(config, client.clone(), creds).unwrap(); let fut = aws_kms.decrypt_data_key(&enc_key); + match fut.await { Err(Error::KmsError(KmsError::WrongMasterKey(_))) => (), other => panic!("{:?}", other), } + + client.assert_requests_match(&[]); + } + + #[tokio::test] + #[cfg(FALSE)] + // FIXME: enable this (or move this to an integration test) + async fn test_aws_kms_localstack() { + let config = Config { + key_id: KeyId::new("cbf4ef24-982d-4fd3-a75b-b95aaec84860".to_string()).unwrap(), + vendor: String::new(), + location: Location { + region: "us-east-1".to_string(), + endpoint: "http://localhost:4566".to_string(), + }, + azure: None, + gcp: None, + }; + + let creds = + Credentials::from_keys("testUser".to_string(), "testAccessKey".to_string(), None); + let aws_kms = + AwsKms::new_with_creds_client(config, util::new_http_client(), creds).unwrap(); + + let data_key = aws_kms.generate_data_key().await.unwrap(); + let plaintext = aws_kms.decrypt_data_key(&data_key.encrypted).await.unwrap(); + + assert_eq!(plaintext, data_key.plaintext.clone()); } } diff --git a/components/cloud/aws/src/s3.rs b/components/cloud/aws/src/s3.rs index 1211e67ad6a..7180e9d28b4 100644 --- a/components/cloud/aws/src/s3.rs +++ b/components/cloud/aws/src/s3.rs @@ -7,6 +7,15 @@ use std::{ }; use async_trait::async_trait; +use aws_config::{sts::AssumeRoleProvider, BehaviorVersion, Region, SdkConfig}; +use aws_credential_types::{provider::ProvideCredentials, Credentials}; +use aws_sdk_s3::{ + config::HttpClient, + operation::get_object::GetObjectError, + types::{CompletedMultipartUpload, CompletedPart}, + Client, +}; +use bytes::Bytes; use cloud::{ blob::{ none_to_empty, BlobConfig, BlobObject, BlobStorage, BucketConf, DeletableStorage, @@ -15,22 +24,24 @@ use cloud::{ metrics::CLOUD_REQUEST_HISTOGRAM_VEC, }; use fail::fail_point; -use futures::stream::{self, Stream}; +use futures::{executor::block_on, stream::Stream}; use futures_util::{ future::{FutureExt, LocalBoxFuture}, io::{AsyncRead, AsyncReadExt}, stream::TryStreamExt, + StreamExt, }; pub use kvproto::brpb::S3 as InputConfig; -use rusoto_core::{request::DispatchSignedRequest, ByteStream, RusotoError}; -use rusoto_credential::{ProvideAwsCredentials, StaticProvider}; -use rusoto_s3::{util::AddressingStyle, *}; -use rusoto_sts::{StsAssumeRoleSessionCredentialsProvider, StsClient}; use thiserror::Error; -use tikv_util::{debug, stream::error_stream, time::Instant}; +use tikv_util::{ + debug, + stream::{error_stream, RetryError}, + time::Instant, +}; use tokio::time::{sleep, timeout}; +use tokio_util::io::ReaderStream; -use crate::util::{self, retry_and_count}; +use crate::util::{self, retry_and_count, SdkError}; const CONNECTION_TIMEOUT: Duration = Duration::from_secs(900); pub const STORAGE_VENDOR_NAME_AWS: &str = "aws"; @@ -141,10 +152,15 @@ impl BlobConfig for Config { } } +pub struct S3CompletedPart { + pub e_tag: Option, + pub part_number: i32, +} + #[derive(Clone)] pub struct S3Storage { config: Config, - client: S3Client, + client: Client, } impl S3Storage { @@ -161,83 +177,126 @@ impl S3Storage { } /// Create a new S3 storage for the given config. - pub fn new(config: Config) -> io::Result { - Self::with_request_dispatcher(config, util::new_http_client()?) + pub fn new(config: Config) -> io::Result { + let client = util::new_http_client(); + Self::new_with_client(config, client) } - fn new_creds_dispatcher( - config: Config, - dispatcher: Dispatcher, - credentials_provider: Creds, - ) -> io::Result + fn new_with_client(config: Config, client: Http) -> io::Result where - Creds: ProvideAwsCredentials + Send + Sync + 'static, - Dispatcher: DispatchSignedRequest + Send + Sync + 'static, + Http: HttpClient + Clone + 'static, { - let bucket_region = none_to_empty(config.bucket.region.clone()); - let bucket_endpoint = config.bucket.endpoint.clone(); - let region = util::get_region(&bucket_region, &none_to_empty(bucket_endpoint))?; - let mut client = S3Client::new_with(dispatcher, credentials_provider, region); - if config.force_path_style { - client.config_mut().addressing_style = AddressingStyle::Path; + // static credentials are used with minio + if let Some(access_key_pair) = &config.access_key_pair { + let creds = Credentials::from_keys( + (*access_key_pair.access_key).to_owned(), + (*access_key_pair.secret_access_key).to_owned(), + access_key_pair + .session_token + .as_deref() + .map(|s| s.to_owned()), + ); + Self::maybe_assume_role(config, client, creds) + } else { + let creds = util::new_credentials_provider(client.clone()); + Self::maybe_assume_role(config, client, creds) } - Ok(S3Storage { config, client }) } - fn maybe_assume_role( + fn maybe_assume_role( config: Config, - cred_provider: P, - dispatcher: D, - ) -> io::Result + client: Http, + credentials_provider: Creds, + ) -> io::Result where - P: ProvideAwsCredentials + Send + Sync + 'static, - D: DispatchSignedRequest + Send + Sync + 'static, + Http: HttpClient + 'static, + Creds: ProvideCredentials + 'static, { if config.role_arn.is_some() { - // try use role arn anyway with current creds when it's not nil. - let bucket_region = none_to_empty(config.bucket.region.clone()); - let bucket_endpoint = config.bucket.endpoint.clone(); - let region = util::get_region(&bucket_region, &none_to_empty(bucket_endpoint))?; - // cannot use the same dispatcher because of move, so use another http client. - let sts = StsClient::new_with(util::new_http_client()?, cred_provider, region); let duration_since_epoch = SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .unwrap(); let timestamp_secs = duration_since_epoch.as_secs(); - let cred_provider = StsAssumeRoleSessionCredentialsProvider::new( - sts, - String::clone(config.role_arn.as_deref().unwrap()), - format!("{}", timestamp_secs), - config.external_id.as_deref().cloned(), - // default duration is 15min - None, - None, - None, - ); - Self::new_creds_dispatcher(config, dispatcher, cred_provider) + + let mut builder = AssumeRoleProvider::builder(config.role_arn.as_deref().unwrap()) + .session_name(format!("{}", timestamp_secs)); + + if let Some(external_id) = &config.external_id { + builder = builder.external_id(external_id.as_str()); + } + + if let Some(region) = &config.bucket.region { + builder = builder.region(Region::new(region.to_string())); + } + + let credentials_provider: io::Result = block_on(async { + let sdk_config = + Self::load_sdk_config(&config, util::new_http_client(), credentials_provider) + .await?; + builder = builder.configure(&sdk_config); + Ok(builder.build().await) + }); + Self::new_with_creds_client(config, client, credentials_provider?) } else { // or just use original cred_provider to access s3. - Self::new_creds_dispatcher(config, dispatcher, cred_provider) + Self::new_with_creds_client(config, client, credentials_provider) } } - pub fn with_request_dispatcher(config: Config, dispatcher: D) -> io::Result + async fn load_sdk_config( + config: &Config, + client: Http, + creds: Creds, + ) -> io::Result where - D: DispatchSignedRequest + Send + Sync + 'static, + Http: HttpClient + 'static, + Creds: ProvideCredentials + 'static, { - // static credentials are used with minio - if let Some(access_key_pair) = &config.access_key_pair { - let cred_provider = StaticProvider::new( - (*access_key_pair.access_key).to_owned(), - (*access_key_pair.secret_access_key).to_owned(), - access_key_pair.session_token.as_deref().cloned(), - None, - ); - Self::maybe_assume_role(config, cred_provider, dispatcher) - } else { - let cred_provider = util::CredentialsProvider::new()?; - Self::maybe_assume_role(config, cred_provider, dispatcher) - } + let bucket_region = none_to_empty(config.bucket.region.clone()); + let bucket_endpoint = none_to_empty(config.bucket.endpoint.clone()); + + let mut loader = + aws_config::defaults(BehaviorVersion::latest()).credentials_provider(creds); + + loader = util::configure_region(loader, &bucket_region, !bucket_endpoint.is_empty())?; + loader = util::configure_endpoint(loader, &bucket_endpoint); + loader = loader.http_client(client); + Ok(loader.load().await) + } + + fn new_with_creds_client( + config: Config, + client: Http, + credentials_provider: Creds, + ) -> io::Result + where + Http: HttpClient + 'static, + Creds: ProvideCredentials + 'static, + { + block_on(Self::new_with_creds_client_async( + config, + client, + credentials_provider, + )) + } + + async fn new_with_creds_client_async( + config: Config, + client: Http, + credentials_provider: Creds, + ) -> io::Result + where + Http: HttpClient + 'static, + Creds: ProvideCredentials + 'static, + { + let sdk_config = Self::load_sdk_config(&config, client, credentials_provider).await?; + + let mut builder = aws_sdk_s3::config::Builder::from(&sdk_config); + builder.set_force_path_style(Some(config.force_path_style)); + + let client = Client::from_conf(builder.build()); + + Ok(S3Storage { config, client }) } fn maybe_prefix_key(&self, key: &str) -> String { @@ -262,39 +321,53 @@ impl S3Storage { let key = self.maybe_prefix_key(name); let bucket = self.config.bucket.bucket.clone(); debug!("read file from s3 storage"; "key" => %key); - let req = GetObjectRequest { - key, - bucket: (*bucket).clone(), - range, - ..Default::default() - }; - Box::new( - self.client - .get_object(req) - .map(move |future| match future { - Ok(out) => out.body.unwrap(), - Err(RusotoError::Service(GetObjectError::NoSuchKey(key))) => { - ByteStream::new(error_stream(io::Error::new( + + let async_read = self + .client + .get_object() + .key(key.clone()) + .bucket((*bucket).clone()) + .set_range(range) + .send() + .map(move |fut| { + let stream: Box> + Unpin + Send> = match fut { + Ok(out) => Box::new(ReaderStream::new(out.body.into_async_read())), + Err(SdkError::ServiceError(service_err)) => match service_err.err() { + GetObjectError::NoSuchKey(_) => create_error_stream( io::ErrorKind::NotFound, format!("no key {} at bucket {}", key, *bucket), - ))) - } - Err(e) => ByteStream::new(error_stream(io::Error::new( + ), + _ => create_error_stream( + io::ErrorKind::Other, + format!("failed to get object {:?}", service_err), + ), + }, + Err(e) => create_error_stream( io::ErrorKind::Other, format!("failed to get object {}", e), - ))), - }) - .flatten_stream() - .into_async_read(), - ) + ), + }; + stream + }) + .flatten_stream() + .into_async_read(); + + Box::new(Box::pin(async_read)) } } +fn create_error_stream( + kind: io::ErrorKind, + msg: String, +) -> Box> + Unpin + Send + Sync> { + Box::new(error_stream(io::Error::new(kind, msg))) +} + /// A helper for uploading a large files to S3 storage. /// /// Note: this uploader does not support uploading files larger than 19.5 GiB. struct S3Uploader<'client> { - client: &'client S3Client, + client: &'client Client, bucket: String, key: String, @@ -306,23 +379,36 @@ struct S3Uploader<'client> { object_lock_enabled: bool, upload_id: String, - parts: Vec, + parts: Vec, } /// The errors a uploader can meet. /// This was made for make the result of [S3Uploader::run] get [Send]. #[derive(Debug, Error)] -enum UploadError { +pub enum UploadError { #[error("io error {0}")] Io(#[from] io::Error), - #[error("rusoto error {0}")] + #[error("aws-sdk error: {msg}")] // Maybe make it a trait if needed? - Rusoto(String), + Sdk { msg: String, retryable: bool }, +} + +impl RetryError for UploadError { + fn is_retryable(&self) -> bool { + match self { + UploadError::Io(_) => false, + UploadError::Sdk { msg: _, retryable } => *retryable, + } + } } -impl From> for UploadError { - fn from(r: RusotoError) -> Self { - Self::Rusoto(format!("{}", r)) +impl From> for UploadError { + fn from(err: SdkError) -> Self { + let msg = format!("{:?}", err); + Self::Sdk { + msg, + retryable: util::is_retryable(&err), + } } } @@ -363,7 +449,7 @@ const MINIMUM_PART_SIZE: usize = 5 * 1024 * 1024; impl<'client> S3Uploader<'client> { /// Creates a new uploader with a given target location and upload /// configuration. - fn new(client: &'client S3Client, config: &Config, key: String) -> Self { + fn new(client: &'client Client, config: &Config, key: String) -> Self { Self { client, key, @@ -389,7 +475,7 @@ impl<'client> S3Uploader<'client> { // For short files, execute one put_object to upload the entire thing. let mut data = Vec::with_capacity(est_len as usize); reader.read_to_end(&mut data).await?; - retry_and_count(|| self.upload(&data), "upload_small_file").await?; + Box::pin(retry_and_count(|| self.upload(&data), "upload_small_file")).await?; Ok(()) } else { // Otherwise, use multipart upload to improve robustness. @@ -424,76 +510,92 @@ impl<'client> S3Uploader<'client> { } /// Starts a multipart upload process. - async fn begin(&self) -> Result> { - match timeout( - Self::get_timeout(), + async fn begin(&self) -> Result { + let request = async { self.client - .create_multipart_upload(CreateMultipartUploadRequest { - bucket: self.bucket.clone(), - key: self.key.clone(), - acl: self.acl.as_ref().map(|s| s.to_string()), - server_side_encryption: self - .server_side_encryption + .create_multipart_upload() + .bucket(self.bucket.clone()) + .key(&self.key) + .set_acl(self.acl.as_ref().map(|s| s.as_str().into())) + .set_server_side_encryption( + self.server_side_encryption .as_ref() - .map(|s| s.to_string()), - ssekms_key_id: self.sse_kms_key_id.as_ref().map(|s| s.to_string()), - storage_class: self.storage_class.as_ref().map(|s| s.to_string()), - ..Default::default() - }), - ) - .await - { - Ok(output) => output?.upload_id.ok_or_else(|| { - RusotoError::ParseError( - "missing upload-id from create_multipart_upload()".to_owned(), + .map(|s| s.as_str().into()), ) - }), - Err(_) => Err(RusotoError::ParseError( - "timeout after 15mins for begin in s3 storage".to_owned(), - )), - } + .set_ssekms_key_id(self.sse_kms_key_id.as_ref().map(|s| s.to_string())) + .set_storage_class(self.storage_class.as_ref().map(|s| s.as_str().into())) + .send() + .await? + .upload_id() + .ok_or_else(|| UploadError::Sdk { + msg: "missing upload-id from create_multipart_upload()".to_owned(), + retryable: false, + }) + .map(|s| s.into()) + }; + timeout(Self::get_timeout(), request) + .await + .map_err(|_| UploadError::Sdk { + msg: "timeout after 15mins for begin in s3 storage".to_owned(), + retryable: false, + })? } /// Completes a multipart upload process, asking S3 to join all parts into a /// single file. - async fn complete(&self) -> Result<(), RusotoError> { - let res = timeout( - Self::get_timeout(), + async fn complete(&self) -> Result<(), UploadError> { + let request = async { + let aws_parts: Vec<_> = self + .parts + .iter() + .map(|p| { + CompletedPart::builder() + .part_number(p.part_number) + .set_e_tag(p.e_tag.clone()) + .build() + }) + .collect(); + self.client - .complete_multipart_upload(CompleteMultipartUploadRequest { - bucket: self.bucket.clone(), - key: self.key.clone(), - upload_id: self.upload_id.clone(), - multipart_upload: Some(CompletedMultipartUpload { - parts: Some(self.parts.clone()), - }), - ..Default::default() - }), - ) - .await - .map_err(|_| { - RusotoError::ParseError("timeout after 15mins for complete in s3 storage".to_owned()) - })?; - res.map(|_| ()) + .complete_multipart_upload() + .bucket(self.bucket.clone()) + .key(&self.key) + .upload_id(&self.upload_id) + .multipart_upload( + CompletedMultipartUpload::builder() + .set_parts(Some(aws_parts)) + .build(), + ) + .send() + .await?; + Ok(()) + }; + timeout(Self::get_timeout(), request) + .await + .map_err(|_| UploadError::Sdk { + msg: "timeout after 15mins for upload in s3 storage".to_owned(), + retryable: false, + })? } /// Aborts the multipart upload process, deletes all uploaded parts. - async fn abort(&self) -> Result<(), RusotoError> { - let res = timeout( - Self::get_timeout(), + async fn abort(&self) -> Result<(), UploadError> { + let request = async { self.client - .abort_multipart_upload(AbortMultipartUploadRequest { - bucket: self.bucket.clone(), - key: self.key.clone(), - upload_id: self.upload_id.clone(), - ..Default::default() - }), - ) - .await - .map_err(|_| { - RusotoError::ParseError("timeout after 15mins for abort in s3 storage".to_owned()) - })?; - res.map(|_| ()) + .abort_multipart_upload() + .bucket(&self.bucket) + .key(&self.key) + .upload_id(&self.upload_id) + .send() + .await?; + Ok(()) + }; + timeout(Self::get_timeout(), request) + .await + .map_err(|_| UploadError::Sdk { + msg: "timeout after 15mins for upload in s3 storage".to_owned(), + retryable: false, + })? } /// Uploads a part of the file. @@ -503,93 +605,106 @@ impl<'client> S3Uploader<'client> { &self, part_number: i64, data: &[u8], - ) -> Result> { - let res = timeout(Self::get_timeout(), async { - let start = Instant::now(); - let r = self + ) -> Result { + let request = async { + let result = self .client - .upload_part(UploadPartRequest { - bucket: self.bucket.clone(), - key: self.key.clone(), - upload_id: self.upload_id.clone(), - part_number, - content_length: Some(data.len() as i64), - content_md5: get_content_md5(self.object_lock_enabled, data), - body: Some(data.to_vec().into()), - ..Default::default() - }) - .await; + .upload_part() + .bucket(&self.bucket) + .key(&self.key) + .upload_id(&self.upload_id) + .part_number(part_number as i32) + .content_length(data.len() as i64) + .set_content_md5(get_content_md5(self.object_lock_enabled, data)) + .body(data.to_vec().into()) + .send() + .await?; + Ok(S3CompletedPart { + e_tag: result.e_tag().map(|t| t.into()), + part_number: part_number as i32, + }) + }; + timeout(Self::get_timeout(), async { + let start = Instant::now(); + let result = request.await; CLOUD_REQUEST_HISTOGRAM_VEC .with_label_values(&["s3", "upload_part"]) .observe(start.saturating_elapsed().as_secs_f64()); - r + result }) - .await; - match res { - Ok(part) => Ok(CompletedPart { - e_tag: part?.e_tag, - part_number: Some(part_number), - }), - Err(_) => Err(RusotoError::ParseError( - "timeout after 15mins for upload part in s3 storage".to_owned(), - )), - } + .await + .map_err(|_| UploadError::Sdk { + msg: "timeout after 15mins for upload part in s3 storage".to_owned(), + retryable: false, + })? } /// Uploads a file atomically. /// /// This should be used only when the data is known to be short, and thus /// relatively cheap to retry the entire upload. - async fn upload(&self, data: &[u8]) -> Result<(), RusotoError> { - let res = timeout(Self::get_timeout(), async { - #[cfg(feature = "failpoints")] - let delay_duration = (|| { - fail_point!("s3_sleep_injected", |t| { - let t = t.unwrap().parse::().unwrap(); - Duration::from_millis(t) + async fn upload(&self, data: &[u8]) -> Result<(), UploadError> { + let request = async { + self.client + .put_object() + .bucket(&self.bucket) + .key(&self.key) + .set_acl(self.acl.as_ref().map(|s| s.as_str().into())) + .set_ssekms_key_id(self.sse_kms_key_id.as_ref().map(|s| s.to_string())) + .set_storage_class(self.storage_class.as_ref().map(|s| s.as_str().into())) + .content_length(data.len() as i64) + .body(data.to_vec().into()) + .set_server_side_encryption( + self.server_side_encryption + .as_ref() + .map(|s| s.as_str().into()), + ) + .set_content_md5(get_content_md5(self.object_lock_enabled, data)) + .send() + .await + .map(|_| ()) + .map_err(|err| err.into()) + }; + timeout( + Self::get_timeout(), + Box::pin(async { + #[cfg(feature = "failpoints")] + let delay_duration = (|| { + fail_point!("s3_sleep_injected", |t| { + let t = t.unwrap().parse::().unwrap(); + Duration::from_millis(t) + }); + Duration::from_millis(0) + })(); + #[cfg(not(feature = "failpoints"))] + let delay_duration = Duration::from_millis(0); + + if delay_duration > Duration::from_millis(0) { + sleep(delay_duration).await; + } + + fail_point!("s3_put_obj_err", |_| { + Err(UploadError::Sdk { + msg: "failed to put object".to_owned(), + retryable: false, + }) }); - Duration::from_millis(0) - })(); - #[cfg(not(feature = "failpoints"))] - let delay_duration = Duration::from_millis(0); - if delay_duration > Duration::from_millis(0) { - sleep(delay_duration).await; - } + let start = Instant::now(); - fail_point!("s3_put_obj_err", |_| { - Err(RusotoError::ParseError("failed to put object".to_owned())) - }); + let result = request.await; - let start = Instant::now(); - let r = self - .client - .put_object(PutObjectRequest { - bucket: self.bucket.clone(), - key: self.key.clone(), - acl: self.acl.as_ref().map(|s| s.to_string()), - server_side_encryption: self - .server_side_encryption - .as_ref() - .map(|s| s.to_string()), - ssekms_key_id: self.sse_kms_key_id.as_ref().map(|s| s.to_string()), - storage_class: self.storage_class.as_ref().map(|s| s.to_string()), - content_length: Some(data.len() as i64), - content_md5: get_content_md5(self.object_lock_enabled, data), - body: Some(data.to_vec().into()), - ..Default::default() - }) - .await; - CLOUD_REQUEST_HISTOGRAM_VEC - .with_label_values(&["s3", "put_object"]) - .observe(start.saturating_elapsed().as_secs_f64()); - r - }) + CLOUD_REQUEST_HISTOGRAM_VEC + .with_label_values(&["s3", "put_object"]) + .observe(start.saturating_elapsed().as_secs_f64()); + result + }), + ) .await - .map_err(|_| { - RusotoError::ParseError("timeout after 15mins for upload in s3 storage".to_owned()) - })?; - res.map(|_| ()) + .map_err(|_| UploadError::Sdk { + msg: "timeout after 15mins for upload in s3 storage".to_owned(), + retryable: false, + })? } fn get_timeout() -> Duration { @@ -619,7 +734,7 @@ impl BlobStorage for S3Storage { debug!("save file to s3 storage"; "key" => %key); let uploader = S3Uploader::new(&self.client, &self.config, key); - let result = uploader.run(&mut reader, content_length).await; + let result = Box::pin(uploader.run(&mut reader, content_length)).await; result.map_err(|e| { let error_code = if let UploadError::Io(ref io_error) = e { io_error.kind() @@ -643,67 +758,18 @@ impl BlobStorage for S3Storage { } } -struct S3PrefixIter<'cli> { - cli: &'cli S3Storage, - finished: bool, - cont_token: Option, - prefix: String, -} - -impl<'cli> S3PrefixIter<'cli> { - async fn next_page(&mut self) -> io::Result>> { - if self.finished { - return Ok(None); - } - let mut input = ListObjectsV2Request::default(); - input.bucket = String::clone(&self.cli.config.bucket.bucket); - input.prefix = Some(self.cli.maybe_prefix_key(&self.prefix)); - input.continuation_token = self.cont_token.clone(); - let now = Instant::now(); - let res = retry_and_count( - || self.cli.client.list_objects_v2(input.clone()), - "get_one_page", - ) - .await - .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; - CLOUD_REQUEST_HISTOGRAM_VEC - .with_label_values(&["s3", "list_objects_v2"]) - .observe(now.saturating_elapsed().as_secs_f64()); - - self.finished = !res.is_truncated.ok_or_else(|| { - io::Error::new(io::ErrorKind::InvalidData, "no IsTruncated in response") - })? || res.next_continuation_token.is_none(); - self.cont_token = res.next_continuation_token; - let data = res - .contents - .unwrap_or_default() - .into_iter() - .map(|data| BlobObject { - key: self - .cli - .strip_prefix_if_needed(data.key.unwrap_or_default()), - }) - .collect::>(); - Ok(Some(data)) - } -} - impl DeletableStorage for S3Storage { fn delete(&self, name: &str) -> LocalBoxFuture<'_, io::Result<()>> { let key = self.maybe_prefix_key(name); async move { let now = Instant::now(); - let res = retry_and_count( - || { - self.client.delete_object(DeleteObjectRequest { - bucket: self.config.bucket.bucket.to_string(), - key: key.clone(), - ..Default::default() - }) - }, - "delete_object", - ) - .await; + let res = self + .client + .delete_object() + .bucket(self.config.bucket.bucket.to_string()) + .key(key.clone()) + .send() + .await; CLOUD_REQUEST_HISTOGRAM_VEC .with_label_values(&["s3", "delete_object"]) .observe(now.saturating_elapsed().as_secs_f64()); @@ -724,19 +790,42 @@ impl IterableStorage for S3Storage { &self, prefix: &str, ) -> Pin> + '_>> { - let walker = S3PrefixIter { - cli: self, - finished: false, - cont_token: None, - prefix: prefix.to_owned(), - }; - let s = stream::try_unfold(walker, |mut w| async move { - let res = w.next_page().await?; - io::Result::Ok(res.map(|v| (v, w))) - }) - .map_ok(|data| stream::iter(data.into_iter().map(Ok))) - .try_flatten(); - Box::pin(s) + let builder = self + .client + .list_objects_v2() + .bucket(self.config.bucket.bucket.to_string()) + .prefix(self.maybe_prefix_key(prefix)); + let mut page_stream = builder.into_paginator().send(); + let stream = futures::stream::poll_fn(move |cx| page_stream.poll_next(cx)); + + stream + .map_ok(|page| { + page.contents + .map(|cs| { + futures::stream::iter(cs.into_iter().map(|v| { + Ok(BlobObject { + key: v.key.map(|k| self.strip_prefix_if_needed(k)).ok_or_else( + || { + io::Error::new( + io::ErrorKind::InvalidData, + "object key is empty", + ) + }, + )?, + }) + })) + .left_stream() + }) + .unwrap_or_else(|| futures::stream::empty().right_stream()) + }) + .map_err(|err| { + io::Error::new( + io::ErrorKind::Other, + format!("sdk encounters an unexpected error: {:?}", err), + ) + }) + .try_flatten() + .boxed_local() } } @@ -744,134 +833,12 @@ impl IterableStorage for S3Storage { mod tests { use std::assert_matches::assert_matches; - use rusoto_core::signature::SignedRequest; - use rusoto_mock::{MockRequestDispatcher, MultipleMockRequestDispatcher}; - use tikv_util::stream::block_on_external_io; + use aws_sdk_s3::{config::Credentials, primitives::SdkBody}; + use aws_smithy_runtime::client::http::test_util::{ReplayEvent, StaticReplayClient}; + use http::Uri; use super::*; - fn make_list_bucket_result( - name: &str, - pfx: &str, - next_cont_token: Option<&str>, - is_truncated: bool, - max_keys: u64, - items: impl IntoIterator, - ) -> MockRequestDispatcher { - let items = items.into_iter().collect::>(); - let mut s = format!( - r#" - - - {} - {} - {} - {} - {} - {}"#, - name, - pfx, - next_cont_token.unwrap_or(""), - items.len(), - max_keys, - is_truncated - ); - for item in items { - s.push_str(&format!( - r#" - - {} - STANDARD - "#, - item - )); - } - s.push_str("\n"); - MockRequestDispatcher::with_status(200).with_body(&s) - } - - #[tokio::test] - async fn test_list_objects() { - const BUCKET: &str = "breeze"; - const PREFIX: &str = "/my/great/prefix"; - - let bucket_name = StringNonEmpty::required(BUCKET.to_string()).unwrap(); - let bucket = BucketConf::default(bucket_name); - let mut config = Config::default(bucket); - let multi_part_size = 2; - // set multi_part_size to use upload_part function - config.multi_part_size = multi_part_size; - - let check_cont_tok = |cont: Option| { - move |r: &SignedRequest| { - assert_eq!( - r.params.get("continuation-token").and_then(|v| v.as_ref()), - cont.as_ref() - ); - } - }; - - let files = |pfx, max| { - let mut i = 0; - std::iter::repeat_with(move || { - i += 1; - format!("{}-{}", pfx, i) - }) - .take(max) - }; - - // split magic_contents into 3 parts, so we mock 5 requests here(1 begin + 3 - // part + 1 complete) - let dispatcher = MultipleMockRequestDispatcher::new(vec![ - make_list_bucket_result(BUCKET, PREFIX, Some("foo"), true, 16, files("foo", 16)) - .with_request_checker(check_cont_tok(None)), - make_list_bucket_result(BUCKET, PREFIX, Some("bar"), true, 16, files("bar", 16)) - .with_request_checker(check_cont_tok(Some("foo".to_owned()))), - make_list_bucket_result(BUCKET, PREFIX, None, false, 16, files("quux", 8)) - .with_request_checker(check_cont_tok(Some("bar".to_owned()))), - MockRequestDispatcher::with_status(400).with_request_checker(|req| { - panic!("Walk haven't stopped. The last request is {:?}", req) - }), - ]); - - let credentials_provider = StaticProvider::new_minimal(String::new(), String::new()); - let s = S3Storage::new_creds_dispatcher(config, dispatcher, credentials_provider).unwrap(); - assert_eq!( - s.iter_prefix(PREFIX) - .map_ok(|v| v.key) - .try_collect::>() - .await - .unwrap(), - files("foo", 16) - .chain(files("bar", 16)) - .chain(files("quux", 8)) - .collect::>() - ); - } - - #[test] - #[ignore] - fn test_somewhat() { - let mut bucket = BucketConf::default(StringNonEmpty::opt("astro".to_owned()).unwrap()); - bucket.endpoint = StringNonEmpty::opt("http://10.2.7.193:9000".to_owned()); - let s3 = Config::default(bucket); - let s3 = Config { - access_key_pair: Some(AccessKeyPair { - access_key: StringNonEmpty::opt("minioadmin".to_owned()).unwrap(), - secret_access_key: StringNonEmpty::opt("minioadmin".to_owned()).unwrap(), - session_token: None, - }), - force_path_style: true, - ..s3 - }; - - let storage = S3Storage::new(s3).unwrap(); - let s = storage.iter_prefix("tpcc-1000-incr-with-crc64/v1/backupmeta"); - let items = block_on_external_io(TryStreamExt::try_collect::>(s)); - println!("{:?}", items); - println!("{}", items.unwrap().len()); - } - #[test] fn test_s3_get_content_md5() { // base64 encode md5sum "helloworld" @@ -919,32 +886,90 @@ mod tests { let magic_contents = "567890"; let bucket_name = StringNonEmpty::required("mybucket".to_string()).unwrap(); - let bucket = BucketConf::default(bucket_name); + let mut bucket = BucketConf::default(bucket_name); + bucket.region = Some(StringNonEmpty::required("cn-north-1".to_string()).unwrap()); + let mut config = Config::default(bucket); let multi_part_size = 2; // set multi_part_size to use upload_part function config.multi_part_size = multi_part_size; + config.force_path_style = true; // split magic_contents into 3 parts, so we mock 5 requests here(1 begin + 3 // part + 1 complete) - let dispatcher = MultipleMockRequestDispatcher::new(vec![ - MockRequestDispatcher::with_status(200).with_body( - r#" - - 1 - "#, + let client = StaticReplayClient::new(vec![ + ReplayEvent::new( + http::Request::builder() + .uri(Uri::from_static( + "https://s3.cn-north-1.amazonaws.com.cn/mybucket/mykey?uploads" + )) + .body(SdkBody::from("")) + .unwrap(), + http::Response::builder() + .status(200) + .body(SdkBody::from( + r#" + + mybucket + mykey + 1 + "# + )).unwrap() + ), + ReplayEvent::new( + http::Request::builder() + .uri(Uri::from_static( + "https://s3.cn-north-1.amazonaws.com.cn/mybucket/mykey?x-id=UploadPart&partNumber=1&uploadId=1" + )) + .body(SdkBody::from("56")) + .unwrap(), + http::Response::builder().status(200).body(SdkBody::from("")).unwrap() + ), + ReplayEvent::new( + http::Request::builder() + .uri(Uri::from_static( + "https://s3.cn-north-1.amazonaws.com.cn/mybucket/mykey?x-id=UploadPart&partNumber=2&uploadId=1" + )) + .body(SdkBody::from("78")) + .unwrap(), + http::Response::builder().status(200).body(SdkBody::from("")).unwrap() + ), + ReplayEvent::new( + http::Request::builder() + .uri(Uri::from_static( + "https://s3.cn-north-1.amazonaws.com.cn/mybucket/mykey?x-id=UploadPart&partNumber=3&uploadId=1" + )) + .body(SdkBody::from("90")) + .unwrap(), + http::Response::builder().status(200).body(SdkBody::from("")).unwrap() + ), + ReplayEvent::new( + http::Request::builder() + .uri(Uri::from_static( + "https://s3.cn-north-1.amazonaws.com.cn/mybucket/mykey?uploadId=1" + )) + .body(SdkBody::from( + r#"123"# + )) + .unwrap(), + http::Response::builder() + .status(200) + .body(SdkBody::from( + r#" + + https://s3.cn-north-1.amazonaws.com.cn/mybucket/mykey + mybucket + mykey + + + "# + )).unwrap() ), - MockRequestDispatcher::with_status(200), - MockRequestDispatcher::with_status(200), - MockRequestDispatcher::with_status(200), - MockRequestDispatcher::with_status(200), ]); - let credentials_provider = - StaticProvider::new_minimal("abc".to_string(), "xyz".to_string()); - - let s = S3Storage::new_creds_dispatcher(config, dispatcher, credentials_provider).unwrap(); + let creds = Credentials::from_keys("abc".to_string(), "xyz".to_string(), None); + let s = S3Storage::new_with_creds_client(config.clone(), client.clone(), creds).unwrap(); s.put( "mykey", PutResource(Box::new(magic_contents.as_bytes())), @@ -952,6 +977,9 @@ mod tests { ) .await .unwrap(); + + client.assert_requests_match(&[]); + assert_eq!( CLOUD_REQUEST_HISTOGRAM_VEC .get_metric_with_label_values(&["s3", "upload_part"]) @@ -972,18 +1000,52 @@ mod tests { bucket.prefix = StringNonEmpty::opt("myprefix".to_string()); let mut config = Config::default(bucket); config.force_path_style = true; - let dispatcher = MockRequestDispatcher::with_status(200).with_request_checker( - move |req: &SignedRequest| { - assert_eq!(req.region.name(), "ap-southeast-2"); - assert_eq!(req.hostname(), "s3.ap-southeast-2.amazonaws.com"); - assert_eq!(req.path(), "/mybucket/myprefix/mykey"); - // PutObject is translated to HTTP PUT. - assert_eq!(req.payload.is_some(), req.method() == "PUT"); - }, - ); - let credentials_provider = - StaticProvider::new_minimal("abc".to_string(), "xyz".to_string()); - let s = S3Storage::new_creds_dispatcher(config, dispatcher, credentials_provider).unwrap(); + + let client = StaticReplayClient::new(vec![ + ReplayEvent::new( + http::Request::builder() + .method("PUT") + .uri(Uri::from_static( + "https://s3.ap-southeast-2.amazonaws.com/mybucket/myprefix/mykey?x-id=PutObject", + )) + .body(SdkBody::from("5678")) + .unwrap(), + http::Response::builder() + .status(200) + .body(SdkBody::from("")) + .unwrap(), + ), + ReplayEvent::new( + http::Request::builder() + .method("GET") + .uri(Uri::from_static( + "https://s3.ap-southeast-2.amazonaws.com/mybucket/myprefix/mykey?x-id=GetObject", + )) + .body(SdkBody::from("")) + .unwrap(), + http::Response::builder() + .status(200) + .body(SdkBody::from("5678")) + .unwrap(), + ), + ReplayEvent::new( + http::Request::builder() + .method("PUT") + .uri(Uri::from_static( + "https://s3.ap-southeast-2.amazonaws.com/mybucket/myprefix/mykey?x-id=PutObject", + )) + .body(SdkBody::from("5678")) + .unwrap(), + http::Response::builder() + .status(200) + .body(SdkBody::from("")) + .unwrap(), + ), + ]); + + let creds = Credentials::from_keys("abc".to_string(), "xyz".to_string(), None); + + let s = S3Storage::new_with_creds_client(config.clone(), client.clone(), creds).unwrap(); s.put( "mykey", PutResource(Box::new(magic_contents.as_bytes())), @@ -995,8 +1057,8 @@ mod tests { let mut reader = s.get("mykey"); let mut buf = Vec::new(); let ret = reader.read_to_end(&mut buf).await; - assert!(ret.unwrap() == 0); - assert!(buf.is_empty()); + assert!(ret.unwrap() == 4); + assert!(!buf.is_empty()); // inject put error let s3_put_obj_err_fp = "s3_put_obj_err"; @@ -1008,6 +1070,7 @@ mod tests { ) .await .unwrap_err(); + fail::remove(s3_put_obj_err_fp); // test timeout @@ -1039,10 +1102,12 @@ mod tests { .unwrap(); fail::remove(s3_sleep_injected_fp); fail::remove(s3_timeout_injected_fp); + + client.assert_requests_match(&[]); } - #[test] - fn test_s3_storage_with_virtual_host() { + #[tokio::test] + async fn test_s3_storage_with_virtual_host() { let magic_contents = "abcd"; let bucket_name = StringNonEmpty::required("bucket2".to_string()).unwrap(); let mut bucket = BucketConf::default(bucket_name); @@ -1050,58 +1115,75 @@ mod tests { bucket.prefix = StringNonEmpty::opt("prefix2".to_string()); let mut config = Config::default(bucket); config.force_path_style = false; - let dispatcher = MockRequestDispatcher::with_status(200).with_request_checker( - move |req: &SignedRequest| { - assert_eq!(req.region.name(), "ap-southeast-1"); - assert_eq!(req.hostname(), "bucket2.s3.ap-southeast-1.amazonaws.com"); - assert_eq!(req.path(), "/prefix2/key2"); - // PutObject is translated to HTTP PUT. - assert_eq!(req.payload.is_some(), req.method() == "PUT"); - }, - ); - let credentials_provider = - StaticProvider::new_minimal("abc".to_string(), "xyz".to_string()); - let s = S3Storage::new_creds_dispatcher(config, dispatcher, credentials_provider).unwrap(); - block_on_external_io(s.put( + + let client = StaticReplayClient::new(vec![ReplayEvent::new( + http::Request::builder() + .method("PUT") + .uri(Uri::from_static( + "https://bucket2.s3.ap-southeast-1.amazonaws.com/prefix2/key2?x-id=PutObject", + )) + .body(SdkBody::from("abcd")) + .unwrap(), + http::Response::builder() + .status(200) + .body(SdkBody::from("")) + .unwrap(), + )]); + + let creds = Credentials::from_keys("abc".to_string(), "xyz".to_string(), None); + + let s = S3Storage::new_with_creds_client(config.clone(), client.clone(), creds).unwrap(); + s.put( "key2", PutResource(Box::new(magic_contents.as_bytes())), magic_contents.len() as u64, - )) + ) + .await .unwrap(); + + client.assert_requests_match(&[]); } - #[test] + #[tokio::test] #[cfg(FALSE)] // FIXME: enable this (or move this to an integration test) if we've got a - // reliable way to test s3 (rusoto_mock requires custom logic to verify the + // reliable way to test s3 (aws test_util requires custom logic to verify the // body stream which itself can have bug) - fn test_real_s3_storage() { + async fn test_real_s3_storage() { use tikv_util::time::Limiter; let bucket = BucketConf { - endpoint: "http://127.0.0.1:9000".to_owned(), - bucket: "bucket".to_owned(), - prefix: "prefix".to_owned(), - ..BucketConf::default() + endpoint: Some(StringNonEmpty::required("http://127.0.0.1:9000".to_owned()).unwrap()), + bucket: StringNonEmpty::required("bucket".to_owned()).unwrap(), + prefix: Some(StringNonEmpty::required("prefix".to_owned()).unwrap()), + region: None, + storage_class: None, }; let s3 = Config { - access_key: "93QZ01QRBYQQXC37XHZV".to_owned(), - secret_access_key: "N2VcI4Emg0Nm7fDzGBMJvguHHUxLGpjfwt2y4+vJ".to_owned(), + access_key_pair: Some(AccessKeyPair { + access_key: StringNonEmpty::required("93QZ01QRBYQQXC37XHZV".to_owned()).unwrap(), + secret_access_key: StringNonEmpty::required( + "N2VcI4Emg0Nm7fDzGBMJvguHHUxLGpjfwt2y4+vJ".to_owned(), + ) + .unwrap(), + session_token: None, + }), force_path_style: true, - ..Config::default() + ..Config::default(bucket) }; let limiter = Limiter::new(f64::INFINITY); - let storage = S3Storage::new(&s3).unwrap(); + let storage = S3Storage::new(s3).unwrap(); const LEN: usize = 1024 * 1024 * 4; static CONTENT: [u8; LEN] = [50_u8; LEN]; storage - .write( + .put( "huge_file", - Box::new(limiter.limit(&CONTENT[..])), + PutResource(Box::new(limiter.limit(&CONTENT[..]))), LEN as u64, ) + .await .unwrap(); let mut reader = storage.get("huge_file"); diff --git a/components/cloud/aws/src/util.rs b/components/cloud/aws/src/util.rs index 6ee27bb0c42..3abbfa28671 100644 --- a/components/cloud/aws/src/util.rs +++ b/components/cloud/aws/src/util.rs @@ -1,27 +1,35 @@ // Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0. +use std::{error::Error as StdError, io}; -use std::io::{self, Error, ErrorKind}; - -use async_trait::async_trait; -use cloud::metrics; -use futures::{future::TryFutureExt, Future}; -use rusoto_core::{ - region::Region, - request::{HttpClient, HttpConfig}, -}; -use rusoto_credential::{ - AutoRefreshingProvider, AwsCredentials, ChainProvider, CredentialsError, ProvideAwsCredentials, +use ::aws_smithy_runtime_api::client::orchestrator::HttpResponse; +use aws_config::{ + default_provider::credentials::DefaultCredentialsChain, + environment::EnvironmentVariableRegionProvider, + meta::region::{self, ProvideRegion, RegionProviderChain}, + profile::ProfileFileRegionProvider, + provider_config::ProviderConfig, + ConfigLoader, Region, }; -use rusoto_sts::WebIdentityProvider; +use aws_credential_types::provider::{error::CredentialsError, ProvideCredentials}; +use aws_sdk_kms::config::SharedHttpClient; +use aws_sdk_s3::config::HttpClient; +use aws_smithy_runtime::client::http::hyper_014::HyperClientBuilder; +use cloud::metrics; +use futures::{Future, TryFutureExt}; +use hyper::Client; +use hyper_tls::HttpsConnector; use tikv_util::{ - stream::{retry_ext, RetryError, RetryExt}, + stream::{block_on_external_io, retry_ext, RetryError, RetryExt}, warn, }; -#[allow(dead_code)] // This will be used soon, please remove the allow. const READ_BUF_SIZE: usize = 1024 * 1024 * 2; -const AWS_WEB_IDENTITY_TOKEN_FILE: &str = "AWS_WEB_IDENTITY_TOKEN_FILE"; +const DEFAULT_REGION: &str = "us-east-1"; + +pub(crate) type SdkError = + ::aws_smithy_runtime_api::client::result::SdkError; + struct CredentialsErrorWrapper(CredentialsError); impl From for CredentialsError { @@ -32,7 +40,7 @@ impl From for CredentialsError { impl std::fmt::Display for CredentialsErrorWrapper { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.0.message)?; + write!(f, "{:?}", self.0)?; Ok(()) } } @@ -43,37 +51,78 @@ impl RetryError for CredentialsErrorWrapper { } } -pub fn new_http_client() -> io::Result { - let mut http_config = HttpConfig::new(); - // This can greatly improve performance dealing with payloads greater - // than 100MB. See https://github.com/rusoto/rusoto/pull/1227 - // for more information. - http_config.read_buf_size(READ_BUF_SIZE); - // It is important to explicitly create the client and not use a global - // See https://github.com/tikv/tikv/issues/7236. - HttpClient::new_with_config(http_config).map_err(|e| { - Error::new( - ErrorKind::Other, - format!("create aws http client error: {}", e), - ) - }) +pub fn new_http_client() -> SharedHttpClient { + let mut hyper_builder = Client::builder(); + hyper_builder.http1_read_buf_exact_size(READ_BUF_SIZE); + + HyperClientBuilder::new() + .hyper_builder(hyper_builder) + .build(HttpsConnector::new()) +} + +pub fn new_credentials_provider(http: impl HttpClient + 'static) -> DefaultCredentialsProvider { + let fut = DefaultCredentialsProvider::new(http); + if let Ok(hnd) = tokio::runtime::Handle::try_current() { + tokio::task::block_in_place(move || hnd.block_on(fut)) + } else { + block_on_external_io(fut) + } +} + +pub fn is_retryable(error: &SdkError) -> bool { + match error { + SdkError::TimeoutError(_) => true, + SdkError::DispatchFailure(_) => true, + SdkError::ResponseError(resp_err) => { + let code = resp_err.raw().status(); + code.is_server_error() || code.as_u16() == http::StatusCode::REQUEST_TIMEOUT.as_u16() + } + _ => false, + } } -pub fn get_region(region: &str, endpoint: &str) -> io::Result { +pub fn configure_endpoint(loader: ConfigLoader, endpoint: &str) -> ConfigLoader { if !endpoint.is_empty() { - Ok(Region::Custom { - name: region.to_owned(), - endpoint: endpoint.to_owned(), - }) - } else if !region.is_empty() { - region.parse::().map_err(|e| { - Error::new( - ErrorKind::InvalidInput, - format!("invalid aws region format {}: {}", region, e), - ) - }) + loader.endpoint_url(endpoint) } else { - Ok(Region::default()) + loader + } +} + +pub fn configure_region( + loader: ConfigLoader, + region: &str, + custom: bool, +) -> io::Result { + if !region.is_empty() { + validate_region(region, custom)?; + Ok(loader.region(Region::new(region.to_owned()))) + } else { + Ok(loader.region(DefaultRegionProvider::new())) + } +} + +fn validate_region(region: &str, custom: bool) -> io::Result<()> { + if custom { + return Ok(()); + } + let v: &str = ®ion.to_lowercase(); + + match v { + "ap-east-1" | "apeast1" | "ap-northeast-1" | "apnortheast1" | "ap-northeast-2" + | "apnortheast2" | "ap-northeast-3" | "apnortheast3" | "ap-south-1" | "apsouth1" + | "ap-southeast-1" | "apsoutheast1" | "ap-southeast-2" | "apsoutheast2" + | "ca-central-1" | "cacentral1" | "eu-central-1" | "eucentral1" | "eu-west-1" + | "euwest1" | "eu-west-2" | "euwest2" | "eu-west-3" | "euwest3" | "eu-north-1" + | "eunorth1" | "eu-south-1" | "eusouth1" | "me-south-1" | "mesouth1" | "us-east-1" + | "useast1" | "sa-east-1" | "saeast1" | "us-east-2" | "useast2" | "us-west-1" + | "uswest1" | "us-west-2" | "uswest2" | "us-gov-east-1" | "usgoveast1" + | "us-gov-west-1" | "usgovwest1" | "cn-north-1" | "cnnorth1" | "cn-northwest-1" + | "cnnorthwest1" | "af-south-1" | "afsouth1" => Ok(()), + _ => Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!("invalid aws region format {}", region), + )), } } @@ -93,98 +142,89 @@ where ).await } -pub struct CredentialsProvider(AutoRefreshingProvider); +#[derive(Debug)] +struct DefaultRegionProvider(RegionProviderChain); -impl CredentialsProvider { - pub fn new() -> io::Result { - Ok(CredentialsProvider( - AutoRefreshingProvider::new(DefaultCredentialsProvider::default()).map_err(|e| { - Error::new( - ErrorKind::Other, - format!("create aws credentials provider error: {}", e), - ) - })?, - )) +impl DefaultRegionProvider { + fn new() -> Self { + let env_provider = EnvironmentVariableRegionProvider::new(); + let profile_provider = ProfileFileRegionProvider::builder().build(); + + // same as default region resolving in rusoto + let chain = RegionProviderChain::first_try(env_provider) + .or_else(profile_provider) + .or_else(Region::new(DEFAULT_REGION)); + + Self(chain) } } -#[async_trait] -impl ProvideAwsCredentials for CredentialsProvider { - async fn credentials(&self) -> Result { - self.0.credentials().await +impl ProvideRegion for DefaultRegionProvider { + fn region(&self) -> region::future::ProvideRegion<'_> { + ProvideRegion::region(&self.0) } } -// Same as rusoto_credentials::DefaultCredentialsProvider with extra -// rusoto_sts::WebIdentityProvider support. +#[derive(Debug)] pub struct DefaultCredentialsProvider { - // Underlying implementation of rusoto_credentials::DefaultCredentialsProvider. - default_provider: ChainProvider, - // Provider IAM support in Kubernetes. - web_identity_provider: WebIdentityProvider, + default_provider: DefaultCredentialsChain, } -impl Default for DefaultCredentialsProvider { - fn default() -> DefaultCredentialsProvider { - DefaultCredentialsProvider { - default_provider: ChainProvider::new(), - web_identity_provider: WebIdentityProvider::from_k8s_env(), - } +impl DefaultCredentialsProvider { + async fn new(cli: impl HttpClient + 'static) -> Self { + let cfg = ProviderConfig::default().with_http_client(cli); + let default_provider = DefaultCredentialsChain::builder() + .configure(cfg) + .build() + .await; + Self { default_provider } } } -#[async_trait] -impl ProvideAwsCredentials for DefaultCredentialsProvider { - async fn credentials(&self) -> Result { - // use web identity provider first for the kubernetes environment. - let cred = if std::env::var(AWS_WEB_IDENTITY_TOKEN_FILE).is_ok() { - // we need invoke assume_role in web identity provider - // this API may failed sometimes. - // according to AWS experience, it's better to retry it with 10 times - // exponential backoff for every error, because we cannot +impl ProvideCredentials for DefaultCredentialsProvider { + fn provide_credentials<'a>( + &'a self, + ) -> aws_credential_types::provider::future::ProvideCredentials<'a> + where + Self: 'a, + { + aws_credential_types::provider::future::ProvideCredentials::new(async move { + // Add exponential backoff for every error, because we cannot // distinguish the error type. - retry_and_count( + let cred = retry_and_count( || { #[cfg(test)] fail::fail_point!("cred_err", |_| { + let cause: Box = + String::from("injected error").into(); Box::pin(futures::future::err(CredentialsErrorWrapper( - CredentialsError::new("injected error"), + CredentialsError::provider_error(cause), ))) as std::pin::Pin + Send>> }); - let res = self - .web_identity_provider - .credentials() - .map_err(|e| CredentialsErrorWrapper(e)); - #[cfg(test)] - return Box::pin(res); - #[cfg(not(test))] - res - }, - "get_cred_over_the_cloud", - ) - .await - .map_err(|e| e.0) - } else { - // Add exponential backoff for every error, because we cannot - // distinguish the error type. - retry_and_count( - || { - self.default_provider - .credentials() - .map_err(|e| CredentialsErrorWrapper(e)) + + Box::pin( + self.default_provider + .provide_credentials() + .map_err(|e| CredentialsErrorWrapper(e)), + ) }, "get_cred_on_premise", ) .await - .map_err(|e| e.0) - }; - - cred.map_err(|e| { - CredentialsError::new(format_args!( - "Couldn't find AWS credentials in sources ({}).", - e.message - )) + .map_err(|e| e.0); + + cred.map_err(|e| { + let msg = e + .source() + .map(|src_err| src_err.to_string()) + .unwrap_or_else(|| e.to_string()); + let cause: Box = + format_args!("Couldn't find AWS credentials in sources ({}).", msg) + .to_string() + .into(); + CredentialsError::provider_error(cause) + }) }) } } @@ -197,20 +237,30 @@ mod tests { #[cfg(feature = "failpoints")] #[tokio::test] async fn test_default_provider() { - let default_provider = DefaultCredentialsProvider::default(); + const AWS_WEB_IDENTITY_TOKEN_FILE: &str = "AWS_WEB_IDENTITY_TOKEN_FILE"; + + let default_provider = DefaultCredentialsProvider::new(new_http_client()).await; std::env::set_var(AWS_WEB_IDENTITY_TOKEN_FILE, "tmp"); // mock k8s env with web_identitiy_provider fail::cfg("cred_err", "return").unwrap(); fail::cfg("retry_count", "return(1)").unwrap(); - let res = default_provider.credentials().await; + let res = default_provider.provide_credentials().await; assert_eq!(res.is_err(), true); - assert_eq!( - res.err().unwrap().message, - "Couldn't find AWS credentials in sources (injected error)." - ); + + let err = res.unwrap_err(); + + match err { + CredentialsError::ProviderError(_) => { + assert_eq!( + err.source().unwrap().to_string(), + "Couldn't find AWS credentials in sources (injected error)." + ) + } + err => panic!("unexpected error type: {}", err), + } + fail::remove("cred_err"); fail::remove("retry_count"); - std::env::remove_var(AWS_WEB_IDENTITY_TOKEN_FILE); } } diff --git a/components/sst_importer/src/sst_importer.rs b/components/sst_importer/src/sst_importer.rs index e3047d59ae1..207a859c700 100644 --- a/components/sst_importer/src/sst_importer.rs +++ b/components/sst_importer/src/sst_importer.rs @@ -509,7 +509,13 @@ impl SstImporter { })?; } - let ext_storage = self.external_storage_or_cache(backend, cache_key)?; + // The `DashMap` locks the entry to ensure that only one thread loads the + // credentials at a time. However, if the thread gets blocked during the + // loading process, it can lead to a deadlock. To avoid this, blocking + // operations must be performed outside of the `DashMap`. + let ext_storage = tokio::task::block_in_place(move || { + self.external_storage_or_cache(backend, cache_key) + })?; let ext_storage = self.auto_encrypt_local_file_if_needed(ext_storage); let result = ext_storage @@ -2471,15 +2477,17 @@ mod tests { // test do_download_kv_file(). assert!(importer.download_to_disk_only()); - let output = block_on_external_io(importer.download_kv_file( - &kv_meta, - ext_storage, - &backend, - &Limiter::new(f64::INFINITY), - None, - Vec::new(), - )) - .unwrap(); + let runtime = tokio::runtime::Runtime::new().unwrap(); + let output = runtime + .block_on(importer.download_kv_file( + &kv_meta, + ext_storage, + &backend, + &Limiter::new(f64::INFINITY), + None, + Vec::new(), + )) + .unwrap(); assert_eq!(*output, buff); check_file_exists(&path.save, Some(&*key_manager)); @@ -3566,15 +3574,17 @@ mod tests { ) .unwrap(); - let output = block_on_external_io(importer.download_kv_file( - &kv_meta, - ext_storage, - &storage_backend, - &Limiter::new(f64::INFINITY), - opt_cipher_info, - master_key_configs, - )) - .unwrap(); + let runtime = tokio::runtime::Runtime::new().unwrap(); + let output = runtime + .block_on(importer.download_kv_file( + &kv_meta, + ext_storage, + &storage_backend, + &Limiter::new(f64::INFINITY), + opt_cipher_info, + master_key_configs, + )) + .unwrap(); assert_eq!(*output, file_content); if !in_mem { check_file_exists(&path.save, opt_key_manager.as_deref()); diff --git a/components/tikv_util/Cargo.toml b/components/tikv_util/Cargo.toml index 8e6e7fb58ed..e1ab04cc732 100644 --- a/components/tikv_util/Cargo.toml +++ b/components/tikv_util/Cargo.toml @@ -48,7 +48,6 @@ prometheus = { version = "0.13", features = ["nightly"] } prometheus-static-metric = "0.5" protobuf = "2" rand = "0.8" -rusoto_core = "0.46.0" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" slog = { workspace = true } diff --git a/components/tikv_util/src/stream.rs b/components/tikv_util/src/stream.rs index 2dd50e332a1..4f16a75ef57 100644 --- a/components/tikv_util/src/stream.rs +++ b/components/tikv_util/src/stream.rs @@ -13,8 +13,6 @@ use std::{ use bytes::Bytes; use futures::stream::{self, Stream}; use futures_util::io::AsyncRead; -use http::status::StatusCode; -use rusoto_core::{request::HttpDispatchError, RusotoError}; use tokio::runtime::Builder; const MAX_RETRY_DELAY: Duration = Duration::from_secs(32); @@ -268,32 +266,11 @@ where } } -pub fn http_retriable(status: StatusCode) -> bool { - status.is_server_error() || status == StatusCode::REQUEST_TIMEOUT -} - -impl RetryError for RusotoError { - fn is_retryable(&self) -> bool { - match self { - Self::HttpDispatch(e) => e.is_retryable(), - Self::Unknown(resp) if http_retriable(resp.status) => true, - _ => false, - } - } -} - -impl RetryError for HttpDispatchError { - fn is_retryable(&self) -> bool { - true - } -} - #[cfg(test)] mod tests { use std::{cell::RefCell, pin::Pin}; use futures::{Future, FutureExt}; - use rusoto_core::HttpDispatchError; use super::RetryError; use crate::stream::retry; @@ -312,7 +289,7 @@ mod tests { #[test] fn test_retry_is_send_even_return_type_not_sync() { struct BangSync(Option>); - let fut = retry(|| futures::future::ok::<_, HttpDispatchError>(BangSync(None))); + let fut = retry(|| futures::future::ok::<_, TriviallyRetry>(BangSync(None))); assert_send(fut) } diff --git a/deny.toml b/deny.toml index c49499ce8f5..794ab97a21c 100644 --- a/deny.toml +++ b/deny.toml @@ -8,11 +8,11 @@ deny = [ # We allow md5 for AWS S3 object lock feature which requires # computting object's md5. { name = "md5", wrappers = ["aws"] }, - { name = "md-5" }, - { name = "sha1" }, + { name = "md-5", wrappers = ["aws-smithy-checksums"]}, + { name = "sha1", wrappers = ["aws-smithy-checksums"]}, { name = "sha-1" }, - # We allow sha2 for oauth2 crate, because it does use sha2 in TiKV use case. - { name = "sha2", wrappers = ["oauth2"] }, + # We allow sha2 for oauth2 and aws rust sdk crate, because it does use sha2 in TiKV use case. + { name = "sha2", wrappers = ["oauth2", "aws-sigv4", "aws-smithy-checksums", "aws-sdk-s3"] }, { name = "sha3" }, # Symmetric encryption { name = "aes" }, @@ -27,14 +27,14 @@ deny = [ { name = "ecdsa" }, { name = "ed25519" }, # Message authentication codes - { name = "hmac" }, + { name = "hmac", wrappers = ["aws-sigv4", "aws-sdk-s3"]}, # We prefer the system native TLS or OpenSSL. { name = "rustls" }, { name = "ring" }, # Ban trait crates from RustCrypto. { name = "aead" }, { name = "cipher" }, - { name = "digest", wrappers = ["sha2"] }, + { name = "digest", wrappers = ["sha2", "md-5", "sha1", "hmac"] }, { name = "password-hash" }, { name = "signature" }, ]