From 8ed9575d847ecadfe8baaafdc626750160b4a42c Mon Sep 17 00:00:00 2001 From: bayk Date: Fri, 17 Jan 2025 20:46:05 -0800 Subject: [PATCH] peer IO optimization. Sending all ready messages in the single package. That should improve scalability, reduce latency impact --- Cargo.lock | 287 ++++++++++++++++++++------------------- chain/src/pibd_params.rs | 12 +- p2p/Cargo.toml | 1 + p2p/src/conn.rs | 47 +++++-- p2p/src/handshake.rs | 6 +- p2p/src/msg.rs | 50 ++++--- p2p/src/peer.rs | 2 +- 7 files changed, 221 insertions(+), 184 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e5e98ba6e..9ef13cd8c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -191,7 +191,7 @@ dependencies = [ "concurrent-queue", "event-listener-strategy", "futures-core", - "pin-project-lite 0.2.15", + "pin-project-lite 0.2.16", ] [[package]] @@ -203,7 +203,7 @@ dependencies = [ "async-task", "concurrent-queue", "fastrand 2.3.0", - "futures-lite 2.5.0", + "futures-lite 2.6.0", "slab", ] @@ -218,7 +218,7 @@ dependencies = [ "async-io 2.4.0", "async-lock 3.4.0", "blocking", - "futures-lite 2.5.0", + "futures-lite 2.6.0", "once_cell", ] @@ -236,7 +236,7 @@ dependencies = [ "log", "parking", "polling 2.8.0", - "rustix 0.37.27", + "rustix 0.37.28", "slab", "socket2 0.4.10", "waker-fn", @@ -252,10 +252,10 @@ dependencies = [ "cfg-if 1.0.0", "concurrent-queue", "futures-io", - "futures-lite 2.5.0", + "futures-lite 2.6.0", "parking", "polling 3.7.4", - "rustix 0.38.42", + "rustix 0.38.43", "slab", "tracing", "windows-sys 0.59.0", @@ -276,9 +276,9 @@ version = "3.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff6e472cdea888a4bd64f342f09b3f50e1886d32afe8df3d663c01140b811b18" dependencies = [ - "event-listener 5.3.1", + "event-listener 5.4.0", "event-listener-strategy", - "pin-project-lite 0.2.15", + "pin-project-lite 0.2.16", ] [[package]] @@ -295,13 +295,13 @@ dependencies = [ "futures-channel", "futures-core", "futures-io", - "futures-lite 2.5.0", + "futures-lite 2.6.0", "gloo-timers", "kv-log-macro", "log", "memchr", "once_cell", - "pin-project-lite 0.2.15", + "pin-project-lite 0.2.16", "pin-utils", "slab", "wasm-bindgen-futures", @@ -323,7 +323,7 @@ dependencies = [ "futures-sink", "futures-util", "memchr", - "pin-project-lite 0.2.15", + "pin-project-lite 0.2.16", ] [[package]] @@ -429,9 +429,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" -version = "2.6.0" +version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de" +checksum = "8f68f53c83ab957f72c32642f3868eec03eb974d1fb82e453128456482613d36" [[package]] name = "blake2" @@ -511,7 +511,7 @@ dependencies = [ "async-channel 2.3.1", "async-task", "futures-io", - "futures-lite 2.5.0", + "futures-lite 2.6.0", "piper", ] @@ -594,9 +594,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.7" +version = "1.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a012a0df96dd6d06ba9a1b29d6402d1a5d77c6befd2566afdc26e10603dc93d7" +checksum = "13208fcbb66eaeffe09b99fffbe1af420f00a7b35aa99ad683dfc1aa76145229" dependencies = [ "jobserver", "libc", @@ -950,7 +950,7 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac7ac0eb0cede3dfdfebf4d5f22354e05a730b79c25fd03481fc69fcfba0a73e" dependencies = [ - "proc-macro2 1.0.92", + "proc-macro2 1.0.93", ] [[package]] @@ -1019,9 +1019,9 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" dependencies = [ - "proc-macro2 1.0.92", + "proc-macro2 1.0.93", "quote 1.0.38", - "syn 2.0.95", + "syn 2.0.96", ] [[package]] @@ -1042,9 +1042,9 @@ checksum = "95133861a8032aaea082871032f5815eb9e98cef03fa916ab4500513994df9e5" dependencies = [ "fnv", "ident_case", - "proc-macro2 1.0.92", + "proc-macro2 1.0.93", "quote 1.0.38", - "syn 2.0.95", + "syn 2.0.96", ] [[package]] @@ -1055,14 +1055,14 @@ checksum = "d336a2a514f6ccccaa3e09b02d41d35330c07ddf03a62165fcec10bb561c7806" dependencies = [ "darling_core", "quote 1.0.38", - "syn 2.0.95", + "syn 2.0.96", ] [[package]] name = "data-encoding" -version = "2.6.0" +version = "2.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8566979429cf69b49a5c740c60791108e86440e8be149bbea4fe54d2c32d6e2" +checksum = "0e60eed09d8c01d3cee5b7d30acb059b76614c918fa0f992e0dd6eeb10daad6f" [[package]] name = "deranged" @@ -1146,9 +1146,9 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ - "proc-macro2 1.0.92", + "proc-macro2 1.0.93", "quote 1.0.38", - "syn 2.0.95", + "syn 2.0.96", ] [[package]] @@ -1226,9 +1226,9 @@ version = "0.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f282cfdfe92516eb26c2af8589c274c7c17681f5ecc03c18255fe741c6aa64eb" dependencies = [ - "proc-macro2 1.0.92", + "proc-macro2 1.0.93", "quote 1.0.38", - "syn 2.0.95", + "syn 2.0.96", ] [[package]] @@ -1256,9 +1256,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "59c3b24c345d8c314966bdc1832f6c2635bfcce8e7cf363bd115987bba2ee242" dependencies = [ "darling", - "proc-macro2 1.0.92", + "proc-macro2 1.0.93", "quote 1.0.38", - "syn 2.0.95", + "syn 2.0.96", ] [[package]] @@ -1298,13 +1298,13 @@ checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" [[package]] name = "event-listener" -version = "5.3.1" +version = "5.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6032be9bd27023a771701cc49f9f053c751055f71efb2e0ae5c15809093675ba" +checksum = "3492acde4c3fc54c845eaab3eed8bd00c7a7d881f78bfc801e43a93dec1331ae" dependencies = [ "concurrent-queue", "parking", - "pin-project-lite 0.2.15", + "pin-project-lite 0.2.16", ] [[package]] @@ -1313,8 +1313,8 @@ version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c3e4e0dd3673c1139bf041f3008816d9cf2946bbfac2945c09e523b8d7b05b2" dependencies = [ - "event-listener 5.3.1", - "pin-project-lite 0.2.15", + "event-listener 5.4.0", + "pin-project-lite 0.2.16", ] [[package]] @@ -1491,21 +1491,21 @@ dependencies = [ "futures-io", "memchr", "parking", - "pin-project-lite 0.2.15", + "pin-project-lite 0.2.16", "waker-fn", ] [[package]] name = "futures-lite" -version = "2.5.0" +version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cef40d21ae2c515b51041df9ed313ed21e572df340ea58a922a0aefe7e8891a1" +checksum = "f5edaec856126859abb19ed65f39e90fea3a9574b9707f13539acf4abf7eb532" dependencies = [ "fastrand 2.3.0", "futures-core", "futures-io", "parking", - "pin-project-lite 0.2.15", + "pin-project-lite 0.2.16", ] [[package]] @@ -1514,9 +1514,9 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ - "proc-macro2 1.0.92", + "proc-macro2 1.0.93", "quote 1.0.38", - "syn 2.0.95", + "syn 2.0.96", ] [[package]] @@ -1550,7 +1550,7 @@ dependencies = [ "futures-sink", "futures-task", "memchr", - "pin-project-lite 0.2.15", + "pin-project-lite 0.2.16", "pin-utils", "slab", ] @@ -1820,7 +1820,7 @@ dependencies = [ "httparse", "httpdate", "itoa 0.4.8", - "pin-project 1.1.7", + "pin-project 1.1.8", "socket2 0.3.19", "tokio 0.2.25", "tower-service", @@ -2012,9 +2012,9 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1ec89e9337638ecdc08744df490b221a7399bf8d164eb52a665454e60e075ad6" dependencies = [ - "proc-macro2 1.0.92", + "proc-macro2 1.0.93", "quote 1.0.38", - "syn 2.0.95", + "syn 2.0.96", ] [[package]] @@ -2177,9 +2177,9 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.76" +version = "0.3.77" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6717b6b5b077764fb5966237269cb3c64edddde4b14ce42647430a78ced9e7b7" +checksum = "1cfaf33c695fc6e08064efbc1f72ec937429614f25eef83af942d0e227c3a28f" dependencies = [ "once_cell", "wasm-bindgen", @@ -2266,7 +2266,7 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c0ff37bd590ca25063e35af745c343cb7a0271906fb7b37e4813e8f79f00268d" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.8.0", "libc", "redox_syscall 0.5.8", ] @@ -2289,9 +2289,9 @@ dependencies = [ [[package]] name = "libz-sys" -version = "1.1.20" +version = "1.1.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2d16453e800a8cf6dd2fc3eb4bc99b786a9b90c663b8559a5b1a041bf89e472" +checksum = "df9b68e50e6e0b26f672573834882eb57759f6db9b3be2ea3c35c91188bb4eaa" dependencies = [ "cc", "libc", @@ -2313,9 +2313,9 @@ checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519" [[package]] name = "linux-raw-sys" -version = "0.4.14" +version = "0.4.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" +checksum = "d26c52dbd32dccf2d10cac7725f8eae5296885fb5703b261f7d0a0739ec807ab" [[package]] name = "litemap" @@ -2356,9 +2356,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.22" +version = "0.4.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" +checksum = "04cbf5b083de1c7e0222a7a51dbfdba1cbe1c6ab0b15e29fff3f6c077fd9cd9f" dependencies = [ "serde", "value-bag", @@ -2428,9 +2428,9 @@ dependencies = [ [[package]] name = "miniz_oxide" -version = "0.8.2" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ffbe83022cedc1d264172192511ae958937694cd57ce297164951b8b3568394" +checksum = "b8402cab7aefae129c6977bb0ff1b8fd9a04eb5b51efc50a70bea51cda0c7924" dependencies = [ "adler2", ] @@ -2530,7 +2530,7 @@ checksum = "424f6e86263cd5294cbd7f1e95746b95aca0e0d66bff31e5a40d6baa87b4aa99" dependencies = [ "proc-macro-crate", "proc-macro-error", - "proc-macro2 1.0.92", + "proc-macro2 1.0.93", "quote 1.0.38", "syn 1.0.109", "synstructure 0.12.6", @@ -2550,7 +2550,7 @@ dependencies = [ "bytes 1.9.0", "futures 0.3.31", "log", - "pin-project 1.1.7", + "pin-project 1.1.8", "smallvec", "unsigned-varint 0.7.2", ] @@ -2605,7 +2605,7 @@ dependencies = [ "mwc-libp2p-yamux", "parity-multiaddr", "parking_lot 0.11.2", - "pin-project 1.1.7", + "pin-project 1.1.8", "smallvec", "wasm-timer", ] @@ -2630,7 +2630,7 @@ dependencies = [ "multistream-select", "parity-multiaddr", "parking_lot 0.11.2", - "pin-project 1.1.7", + "pin-project 1.1.8", "prost", "prost-build", "rand 0.7.3", @@ -2771,7 +2771,7 @@ dependencies = [ "log", "mwc-libp2p-core", "socket2 0.3.19", - "tokio 1.42.0", + "tokio 1.43.0", ] [[package]] @@ -2939,6 +2939,7 @@ dependencies = [ "bitflags 1.3.2", "bytes 0.5.6", "chrono", + "crossbeam", "ed25519-dalek", "enum_primitive", "futures 0.3.31", @@ -3108,7 +3109,7 @@ version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.8.0", "cfg-if 1.0.0", "cfg_aliases", "libc", @@ -3497,11 +3498,11 @@ dependencies = [ [[package]] name = "pin-project" -version = "1.1.7" +version = "1.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be57f64e946e500c8ee36ef6331845d40a93055567ec57e8fae13efd33759b95" +checksum = "1e2ec53ad785f4d35dac0adea7f7dc6f1bb277ad84a680c7afefeae05d1f5916" dependencies = [ - "pin-project-internal 1.1.7", + "pin-project-internal 1.1.8", ] [[package]] @@ -3510,20 +3511,20 @@ version = "0.4.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "851c8d0ce9bebe43790dedfc86614c23494ac9f423dd618d3a61fc693eafe61e" dependencies = [ - "proc-macro2 1.0.92", + "proc-macro2 1.0.93", "quote 1.0.38", "syn 1.0.109", ] [[package]] name = "pin-project-internal" -version = "1.1.7" +version = "1.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c0f5fad0874fc7abcd4d750e76917eaebbecaa2c20bde22e1dbeeba8beb758c" +checksum = "d56a66c0c55993aa927429d0f8a0abfd74f084e4d9c192cffed01e418d83eefb" dependencies = [ - "proc-macro2 1.0.92", + "proc-macro2 1.0.93", "quote 1.0.38", - "syn 2.0.95", + "syn 2.0.96", ] [[package]] @@ -3534,9 +3535,9 @@ checksum = "257b64915a082f7811703966789728173279bdebb956b143dbcd23f6f970a777" [[package]] name = "pin-project-lite" -version = "0.2.15" +version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "915a1e146535de9163f3987b8944ed8cf49a18bb0056bcebcdcece385cece4ff" +checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" [[package]] name = "pin-utils" @@ -3573,7 +3574,7 @@ dependencies = [ "concurrent-queue", "libc", "log", - "pin-project-lite 0.2.15", + "pin-project-lite 0.2.16", "windows-sys 0.48.0", ] @@ -3586,8 +3587,8 @@ dependencies = [ "cfg-if 1.0.0", "concurrent-queue", "hermit-abi 0.4.0", - "pin-project-lite 0.2.15", - "rustix 0.38.42", + "pin-project-lite 0.2.16", + "rustix 0.38.43", "tracing", "windows-sys 0.59.0", ] @@ -3659,7 +3660,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" dependencies = [ "proc-macro-error-attr", - "proc-macro2 1.0.92", + "proc-macro2 1.0.93", "quote 1.0.38", "syn 1.0.109", "version_check", @@ -3671,7 +3672,7 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" dependencies = [ - "proc-macro2 1.0.92", + "proc-macro2 1.0.93", "quote 1.0.38", "version_check", ] @@ -3687,9 +3688,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.92" +version = "1.0.93" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37d3544b3f2748c54e147655edb5025752e2303145b5aefb3c3ea2c78b973bb0" +checksum = "60946a68e5f9d28b0dc1c21bb8a97ee7d018a8b322fa57838ba31cc878e22d99" dependencies = [ "unicode-ident", ] @@ -3730,7 +3731,7 @@ checksum = "169a15f3008ecb5160cba7d37bcd690a7601b6d30cfb87a117d45e59d52af5d4" dependencies = [ "anyhow", "itertools", - "proc-macro2 1.0.92", + "proc-macro2 1.0.93", "quote 1.0.38", "syn 1.0.109", ] @@ -3766,7 +3767,7 @@ version = "1.0.38" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e4dccaaaf89514f546c693ddc140f729f958c247918a13380cccc6078391acc" dependencies = [ - "proc-macro2 1.0.92", + "proc-macro2 1.0.93", ] [[package]] @@ -4009,7 +4010,7 @@ version = "0.5.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "03a862b389f93e68874fbf580b9de08dd02facb9a788ebadaf4a3fd33cf58834" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.8.0", ] [[package]] @@ -4133,9 +4134,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.37.27" +version = "0.37.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fea8ca367a3a01fe35e6943c400addf443c0f57670e6ec51196f71a4b8762dd2" +checksum = "519165d378b97752ca44bbe15047d5d3409e875f39327546b42ac81d7e18c1b6" dependencies = [ "bitflags 1.3.2", "errno", @@ -4147,14 +4148,14 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.42" +version = "0.38.43" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f93dc38ecbab2eb790ff964bb77fa94faf256fd3e73285fd7ba0903b76bedb85" +checksum = "a78891ee6bf2340288408954ac787aa063d8e8817e9f53abb37c695c6d834ef6" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.8.0", "errno", "libc", - "linux-raw-sys 0.4.14", + "linux-raw-sys 0.4.15", "windows-sys 0.59.0", ] @@ -4323,16 +4324,16 @@ version = "1.0.217" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a9bf7cf98d04a2b28aead066b7496853d4779c9cc183c440dbac457641e19a0" dependencies = [ - "proc-macro2 1.0.92", + "proc-macro2 1.0.93", "quote 1.0.38", - "syn 2.0.95", + "syn 2.0.96", ] [[package]] name = "serde_json" -version = "1.0.134" +version = "1.0.135" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d00f4175c42ee48b15416f6193a959ba3a0d67fc699a0db9ad12df9f83991c7d" +checksum = "2b0d7ba2887406110130a978386c4e1befb98c674b4fba677954e4db976630d9" dependencies = [ "itoa 1.0.14", "memchr", @@ -4577,18 +4578,18 @@ version = "1.0.109" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" dependencies = [ - "proc-macro2 1.0.92", + "proc-macro2 1.0.93", "quote 1.0.38", "unicode-ident", ] [[package]] name = "syn" -version = "2.0.95" +version = "2.0.96" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46f71c0377baf4ef1cc3e3402ded576dccc315800fbc62dfc7fe04b009773b4a" +checksum = "d5d0adab1ae378d7f53bdebc67a39f1f151407ef230f0ce2883572f5d8985c80" dependencies = [ - "proc-macro2 1.0.92", + "proc-macro2 1.0.93", "quote 1.0.38", "unicode-ident", ] @@ -4599,7 +4600,7 @@ version = "0.12.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f36bdaa60a83aca3921b5259d5400cbf5e90fc51931376a9bd4a0eb79aa7210f" dependencies = [ - "proc-macro2 1.0.92", + "proc-macro2 1.0.93", "quote 1.0.38", "syn 1.0.109", "unicode-xid 0.2.6", @@ -4611,9 +4612,9 @@ version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971" dependencies = [ - "proc-macro2 1.0.92", + "proc-macro2 1.0.93", "quote 1.0.38", - "syn 2.0.95", + "syn 2.0.96", ] [[package]] @@ -4653,7 +4654,7 @@ dependencies = [ "fastrand 2.3.0", "getrandom 0.2.15", "once_cell", - "rustix 0.38.42", + "rustix 0.38.43", "windows-sys 0.59.0", ] @@ -4700,9 +4701,9 @@ version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" dependencies = [ - "proc-macro2 1.0.92", + "proc-macro2 1.0.93", "quote 1.0.38", - "syn 2.0.95", + "syn 2.0.96", ] [[package]] @@ -4794,14 +4795,14 @@ dependencies = [ [[package]] name = "tokio" -version = "1.42.0" +version = "1.43.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5cec9b21b0450273377fc97bd4c33a8acffc8c996c987a7c5b319a0083707551" +checksum = "3d61fa4ffa3de412bfea335c6ecff681de2b609ba3c77ef3e00e521813a9ed9e" dependencies = [ "backtrace", "libc", "mio 1.0.3", - "pin-project-lite 0.2.15", + "pin-project-lite 0.2.16", "socket2 0.5.8", "windows-sys 0.52.0", ] @@ -4822,7 +4823,7 @@ version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e44da00bfc73a25f814cd8d7e57a68a5c31b74b3152a0a1d1f590c97ed06265a" dependencies = [ - "proc-macro2 1.0.92", + "proc-macro2 1.0.93", "quote 1.0.38", "syn 1.0.109", ] @@ -4941,7 +4942,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" dependencies = [ "log", - "pin-project-lite 0.2.15", + "pin-project-lite 0.2.16", "tracing-core", ] @@ -4960,7 +4961,7 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" dependencies = [ - "pin-project 1.1.7", + "pin-project 1.1.8", "tracing", ] @@ -5160,34 +5161,35 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.99" +version = "0.2.100" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a474f6281d1d70c17ae7aa6a613c87fce69a127e2624002df63dcb39d6cf6396" +checksum = "1edc8929d7499fc4e8f0be2262a241556cfc54a0bea223790e71446f2aab1ef5" dependencies = [ "cfg-if 1.0.0", "once_cell", + "rustversion", "wasm-bindgen-macro", ] [[package]] name = "wasm-bindgen-backend" -version = "0.2.99" +version = "0.2.100" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f89bb38646b4f81674e8f5c3fb81b562be1fd936d84320f3264486418519c79" +checksum = "2f0a0651a5c2bc21487bde11ee802ccaf4c51935d0d3d42a6101f98161700bc6" dependencies = [ "bumpalo", "log", - "proc-macro2 1.0.92", + "proc-macro2 1.0.93", "quote 1.0.38", - "syn 2.0.95", + "syn 2.0.96", "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-futures" -version = "0.4.49" +version = "0.4.50" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38176d9b44ea84e9184eff0bc34cc167ed044f816accfe5922e54d84cf48eca2" +checksum = "555d470ec0bc3bb57890405e5d4322cc9ea83cebb085523ced7be4144dac1e61" dependencies = [ "cfg-if 1.0.0", "js-sys", @@ -5198,9 +5200,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.99" +version = "0.2.100" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2cc6181fd9a7492eef6fef1f33961e3695e4579b9872a6f7c83aee556666d4fe" +checksum = "7fe63fc6d09ed3792bd0897b314f53de8e16568c2b3f7982f468c0bf9bd0b407" dependencies = [ "quote 1.0.38", "wasm-bindgen-macro-support", @@ -5208,22 +5210,25 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.99" +version = "0.2.100" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30d7a95b763d3c45903ed6c81f156801839e5ee968bb07e534c44df0fcd330c2" +checksum = "8ae87ea40c9f689fc23f209965b6fb8a99ad69aeeb0231408be24920604395de" dependencies = [ - "proc-macro2 1.0.92", + "proc-macro2 1.0.93", "quote 1.0.38", - "syn 2.0.95", + "syn 2.0.96", "wasm-bindgen-backend", "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-shared" -version = "0.2.99" +version = "0.2.100" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "943aab3fdaaa029a6e0271b35ea10b72b943135afe9bffca82384098ad0e06a6" +checksum = "1a05d73b933a847d6cccdda8f838a22ff101ad9bf93e33684f39c1f5f0eece3d" +dependencies = [ + "unicode-ident", +] [[package]] name = "wasm-timer" @@ -5242,9 +5247,9 @@ dependencies = [ [[package]] name = "web-sys" -version = "0.3.76" +version = "0.3.77" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04dd7223427d52553d3702c004d3b2fe07c148165faa56313cb00211e31c12bc" +checksum = "33b6dd2ef9186f1f2072e409e99cd22a975331a6b3591b12c764e0e55c60d5d2" dependencies = [ "js-sys", "wasm-bindgen", @@ -5278,7 +5283,7 @@ dependencies = [ "either", "home", "once_cell", - "rustix 0.38.42", + "rustix 0.38.43", ] [[package]] @@ -5361,9 +5366,9 @@ version = "0.57.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9107ddc059d5b6fbfbffdfa7a7fe3e22a226def0b2608f72e9d552763d3e1ad7" dependencies = [ - "proc-macro2 1.0.92", + "proc-macro2 1.0.93", "quote 1.0.38", - "syn 2.0.95", + "syn 2.0.96", ] [[package]] @@ -5372,9 +5377,9 @@ version = "0.57.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "29bee4b38ea3cde66011baa44dba677c432a78593e202392d1e9070cf2a7fca7" dependencies = [ - "proc-macro2 1.0.92", + "proc-macro2 1.0.93", "quote 1.0.38", - "syn 2.0.95", + "syn 2.0.96", ] [[package]] @@ -5638,9 +5643,9 @@ version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2380878cad4ac9aac1e2435f3eb4020e8374b5f13c296cb75b4620ff8e229154" dependencies = [ - "proc-macro2 1.0.92", + "proc-macro2 1.0.93", "quote 1.0.38", - "syn 2.0.95", + "syn 2.0.96", "synstructure 0.13.1", ] @@ -5660,9 +5665,9 @@ version = "0.7.35" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ - "proc-macro2 1.0.92", + "proc-macro2 1.0.93", "quote 1.0.38", - "syn 2.0.95", + "syn 2.0.96", ] [[package]] @@ -5680,9 +5685,9 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "595eed982f7d355beb85837f651fa22e90b3c044842dc7f2c2842c086f295808" dependencies = [ - "proc-macro2 1.0.92", + "proc-macro2 1.0.93", "quote 1.0.38", - "syn 2.0.95", + "syn 2.0.96", "synstructure 0.13.1", ] @@ -5701,9 +5706,9 @@ version = "1.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" dependencies = [ - "proc-macro2 1.0.92", + "proc-macro2 1.0.93", "quote 1.0.38", - "syn 2.0.95", + "syn 2.0.96", ] [[package]] @@ -5723,9 +5728,9 @@ version = "0.10.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6eafa6dfb17584ea3e2bd6e76e0cc15ad7af12b09abdd1ca55961bed9b1063c6" dependencies = [ - "proc-macro2 1.0.92", + "proc-macro2 1.0.93", "quote 1.0.38", - "syn 2.0.95", + "syn 2.0.96", ] [[package]] diff --git a/chain/src/pibd_params.rs b/chain/src/pibd_params.rs index 56c207b84..2e4d11420 100644 --- a/chain/src/pibd_params.rs +++ b/chain/src/pibd_params.rs @@ -143,11 +143,7 @@ impl PibdParams { /// Number of simultaneous requests for blocks we should make per available peer. pub fn get_blocks_request_per_peer(&self) -> usize { - match self.cpu_num { - 1 => 2, - 2 => 3, - _ => 4, - } + cmp::min(8, self.cpu_num * 2) } /// Maxumum number of blocks that can await into the DB as orphans @@ -164,11 +160,7 @@ impl PibdParams { /// divisible by 3 to try and evenly spread requests amount the 3 main MMRs (Bitmap segments /// will always be requested first) pub fn get_segments_request_per_peer(&self) -> usize { - match self.cpu_num { - 1 => 2, - 2 => 3, - _ => 4, - } + cmp::min(8, self.cpu_num * 2) } /// Maximum number of simultaneous requests. Please note, the data will be processed in a single thread, so diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index 64c319648..824f38206 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -35,6 +35,7 @@ tokio = {version = "0.2", features = ["full"] } ed25519-dalek = "1" serde_json = "1" bytes = "0.5" +crossbeam = "0.8" mwc_core = { path = "../core", version = "5.3.6" } mwc_store = { path = "../store", version = "5.3.6" } diff --git a/p2p/src/conn.rs b/p2p/src/conn.rs index 283d3f536..f8496809f 100644 --- a/p2p/src/conn.rs +++ b/p2p/src/conn.rs @@ -26,17 +26,19 @@ use crate::msg::{write_message, Consumed, Message, Msg}; use crate::mwc_core::ser::ProtocolVersion; use crate::types::Error; use crate::util::{RateCounter, RwLock}; +use crossbeam::channel::{RecvTimeoutError, TryRecvError}; use mwc_chain::SyncState; use std::fs::File; use std::io::{self, Write}; use std::net::{Shutdown, TcpStream}; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::RecvTimeoutError; -use std::sync::{mpsc, Arc}; +use std::sync::Arc; use std::thread::{self, JoinHandle}; use std::time::Duration; -pub const SEND_CHANNEL_CAP: usize = 100; +// Potentially there can be large messages, like 1.5mb blocks. The Cap is for single peer, we really don't want overflow the network +// That is don't put too large number here. 10 looks reasonable for this case +pub const SEND_CHANNEL_CAP: usize = 10; const CHANNEL_TIMEOUT: Duration = Duration::from_millis(15000); @@ -111,7 +113,7 @@ impl StopHandle { #[derive(Clone)] pub struct ConnHandle { /// Channel to allow sending data through the connection - pub send_channel: mpsc::SyncSender, + pub send_channel: crossbeam::channel::Sender, } impl ConnHandle { @@ -125,13 +127,13 @@ impl ConnHandle { pub fn send(&self, msg: Msg) -> Result<(), Error> { match self.send_channel.try_send(msg) { Ok(()) => Ok(()), - Err(mpsc::TrySendError::Disconnected(_)) => { + Err(crossbeam::channel::TrySendError::Disconnected(_)) => { Err(Error::Send("try_send disconnected".to_owned())) } - Err(mpsc::TrySendError::Full(_)) => { - debug!("conn_handle: try_send but buffer is full, dropping msg"); - Ok(()) - } + Err(crossbeam::channel::TrySendError::Full(msg)) => self + .send_channel + .send(msg) + .map_err(|_| Error::Send("try_send disconnected".to_owned())), } } } @@ -183,7 +185,7 @@ pub fn listen( where H: MessageHandler, { - let (send_tx, send_rx) = mpsc::sync_channel(SEND_CHANNEL_CAP); + let (send_tx, send_rx) = crossbeam::channel::bounded(SEND_CHANNEL_CAP); let stopped = Arc::new(AtomicBool::new(false)); @@ -217,7 +219,7 @@ fn poll( conn_handle: ConnHandle, version: ProtocolVersion, handler: H, - send_rx: mpsc::Receiver, + send_rx: crossbeam::channel::Receiver, stopped: Arc, tracker: Arc, sync_state: Arc, @@ -248,6 +250,9 @@ where break; } + // Note, we are processing messages from a single peer one by one intentionally. Even we can process them in parallel, + // we don't want to do that because DDOS attacks. One peer can't get more than a single thread of this node. + // check the read end let (next, bytes_read) = codec.read(); @@ -337,7 +342,25 @@ where let mut retry_send = Err(()); let _ = writer.set_write_timeout(Some(BODY_IO_TIMEOUT)); loop { - let maybe_data = retry_send.or_else(|_| send_rx.recv_timeout(CHANNEL_TIMEOUT)); + let maybe_data = retry_send.or_else(|_| { + let mut data = match send_rx.recv_timeout(CHANNEL_TIMEOUT) { + Ok(msg) => vec![msg], + Err(e) => return Err(e), + }; + // send_rx expected to have capacuty. Capacity will limit the number of message that we can read form the stream + loop { + match send_rx.try_recv() { + Ok(msg) => { + data.push(msg); + } + Err(TryRecvError::Empty) => break, + Err(TryRecvError::Disconnected) => { + return Err(RecvTimeoutError::Disconnected) + } // All other error are fatal, report as disconnected + } + } + Ok(data) + }); retry_send = Err(()); match maybe_data { Ok(data) => { diff --git a/p2p/src/handshake.rs b/p2p/src/handshake.rs index 77c092b4b..8c9184320 100644 --- a/p2p/src/handshake.rs +++ b/p2p/src/handshake.rs @@ -137,7 +137,7 @@ impl Handshake { // write and read the handshake response let msg = Msg::new(Type::Hand, hand, self.protocol_version)?; - write_message(conn, &msg, self.tracker.clone())?; + write_message(conn, &vec![msg], self.tracker.clone())?; let shake: Shake = read_message(conn, self.protocol_version, Type::Shake)?; if shake.genesis != self.genesis { @@ -157,7 +157,7 @@ impl Handshake { // send tor address let tor_address = TorAddress::new(onion_address); let msg = Msg::new(Type::TorAddress, tor_address, self.protocol_version)?; - write_message(conn, &msg, self.tracker.clone())?; + write_message(conn, &vec![msg], self.tracker.clone())?; } else { debug!("non-Tor peer {:?}", self_addr); } @@ -269,7 +269,7 @@ impl Handshake { }; let msg = Msg::new(Type::Shake, shake, negotiated_version)?; - write_message(conn, &msg, self.tracker.clone())?; + write_message(conn, &vec![msg], self.tracker.clone())?; trace!("Success handshake with {}.", peer_info.addr); diff --git a/p2p/src/msg.rs b/p2p/src/msg.rs index 32d98d63a..f6ac11afa 100644 --- a/p2p/src/msg.rs +++ b/p2p/src/msg.rs @@ -255,7 +255,7 @@ pub fn read_message( pub fn write_message( stream: &mut W, - msg: &Msg, + msgs: &Vec, tracker: Arc, ) -> Result<(), Error> { // Introduce a delay so messages are spaced at least 150ms apart. @@ -269,26 +269,42 @@ pub fn write_message( } } - let mut buf = ser::ser_vec(&msg.header, msg.version)?; - buf.extend(&msg.body[..]); - stream.write_all(&buf[..])?; - tracker.inc_sent(buf.len() as u64); - if let Some(file) = &msg.attachment { - let mut file = file.try_clone()?; - let mut buf = [0u8; 8000]; - loop { - match file.read(&mut buf[..]) { - Ok(0) => break, - Ok(n) => { - stream.write_all(&buf[..n])?; - // Increase sent bytes "quietly" without incrementing the counter. - // (In a loop here for the single attachment). - tracker.inc_quiet_sent(n as u64); + // sending tmp buffer. + let mut tmp_buf: Vec = vec![]; + + for msg in msgs { + tmp_buf.extend(ser::ser_vec(&msg.header, msg.version)?); + tmp_buf.extend(&msg.body[..]); + if let Some(file) = &msg.attachment { + // finalize what we have before attachments... + if !tmp_buf.is_empty() { + stream.write_all(&tmp_buf[..])?; + tracker.inc_sent(tmp_buf.len() as u64); + tmp_buf.clear(); + } + let mut file = file.try_clone()?; + let mut buf = [0u8; 8000]; + loop { + match file.read(&mut buf[..]) { + Ok(0) => break, + Ok(n) => { + stream.write_all(&buf[..n])?; + // Increase sent bytes "quietly" without incrementing the counter. + // (In a loop here for the single attachment). + tracker.inc_quiet_sent(n as u64); + } + Err(e) => return Err(From::from(e)), } - Err(e) => return Err(From::from(e)), } } } + + if !tmp_buf.is_empty() { + stream.write_all(&tmp_buf[..])?; + tracker.inc_sent(tmp_buf.len() as u64); + tmp_buf.clear(); + } + Ok(()) } diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index 6b49d73fe..1cb57e2ec 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -47,7 +47,7 @@ use mwc_chain::txhashset::Segmenter; use mwc_chain::SyncState; const MAX_TRACK_SIZE: usize = 200; -const MAX_PEER_MSG_PER_MIN: u64 = 500; +const MAX_PEER_MSG_PER_MIN: u64 = 1000; #[derive(Debug, Clone, Copy, PartialEq, Eq)] /// Remind: don't mix up this 'State' with that 'State' in p2p/src/store.rs,