diff --git a/.gitignore b/.gitignore index f80eef4..65420fe 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ target database.redb +database-ah-usdc.redb +database-ah-usdt.redb diff --git a/Cargo.lock b/Cargo.lock index 9f73842..894ed83 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -43,7 +43,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d122413f284cf2d62fb1b7db97e02edb8cda96d769b16e443a4f6195e35662b0" dependencies = [ "crypto-common", - "generic-array 0.14.7", + "generic-array", ] [[package]] @@ -59,9 +59,9 @@ dependencies = [ [[package]] name = "ahash" -version = "0.8.10" +version = "0.8.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b79b82693f705137f8fb9b37871d99e4f9a7df12b917eed79c3d3954830a60b" +checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" dependencies = [ "cfg-if", "getrandom", @@ -313,12 +313,6 @@ dependencies = [ "nodrop", ] -[[package]] -name = "arrayvec" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" - [[package]] name = "arrayvec" version = "0.7.4" @@ -494,7 +488,7 @@ dependencies = [ "axum-core", "bytes", "futures-util", - "http 1.0.0", + "http 1.1.0", "http-body 1.0.0", "http-body-util", "hyper 1.2.0", @@ -527,7 +521,7 @@ dependencies = [ "async-trait", "bytes", "futures-util", - "http 1.0.0", + "http 1.1.0", "http-body 1.0.0", "http-body-util", "mime", @@ -669,25 +663,13 @@ dependencies = [ "constant_time_eq 0.3.0", ] -[[package]] -name = "block-buffer" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0940dc441f31689269e10ac70eb1002a3a1d3ad1390e030043662eb7fe4688b" -dependencies = [ - "block-padding", - "byte-tools", - "byteorder", - "generic-array 0.12.4", -] - [[package]] name = "block-buffer" version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4152116fd6e9dadb291ae18fc1ec3575ed6d84c29642d97890f4b4a3417297e4" dependencies = [ - "generic-array 0.14.7", + "generic-array", ] [[package]] @@ -696,16 +678,7 @@ version = "0.10.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" dependencies = [ - "generic-array 0.14.7", -] - -[[package]] -name = "block-padding" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa79dedbb091f449f1f39e53edf88d5dbe95f895dae6135a8d7b881fb5af73f5" -dependencies = [ - "byte-tools", + "generic-array", ] [[package]] @@ -757,12 +730,6 @@ version = "1.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3ac9f8b63eca6fd385229b3675f6cc0dc5c8a5c8a54a59d4f52ffd670d87b0c" -[[package]] -name = "byte-tools" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3b5ca7a04898ad4bcd41c90c5285445ff5b791899bb1b0abdd2a2aa791211d7" - [[package]] name = "byteorder" version = "1.5.0" @@ -777,9 +744,9 @@ checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223" [[package]] name = "cc" -version = "1.0.88" +version = "1.0.90" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02f341c093d19155a6e41631ce5971aac4e9a868262212153124c15fa22d1cdc" +checksum = "8cd6604a82acf3039f1144f54b8eb34e91ffba622051189e71b781822d5ee1f5" [[package]] name = "cfg-if" @@ -800,9 +767,9 @@ dependencies = [ [[package]] name = "chrono" -version = "0.4.34" +version = "0.4.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5bc015644b92d5890fab7489e49d21f879d5c990186827d42ec511919404f38b" +checksum = "8eaf5903dcbc0a39312feb77df2ff4c76387d591b9fc7b04a238dcf8bb62639a" dependencies = [ "android-tzdata", "iana-time-zone", @@ -950,7 +917,7 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" dependencies = [ - "generic-array 0.14.7", + "generic-array", "rand_core 0.6.4", "typenum", ] @@ -961,7 +928,7 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b584a330336237c1eecd3e94266efb216c56ed91225d634cb2991c5f3fd1aeab" dependencies = [ - "generic-array 0.14.7", + "generic-array", "subtle", ] @@ -971,23 +938,10 @@ version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "25fab6889090c8133f3deb8f73ba3c65a7f456f66436fc012a1b1e272b1e103e" dependencies = [ - "generic-array 0.14.7", + "generic-array", "subtle", ] -[[package]] -name = "curve25519-dalek" -version = "2.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a9b85542f99a2dfa2a1b8e192662741c9859a846b296bef1c92ef9b58b5a216" -dependencies = [ - "byteorder", - "digest 0.8.1", - "rand_core 0.5.1", - "subtle", - "zeroize", -] - [[package]] name = "curve25519-dalek" version = "3.2.0" @@ -1144,22 +1098,13 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "digest" -version = "0.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3d0c8c8752312f9713efd397ff63acb9f85585afbf179282e720e7704954dd5" -dependencies = [ - "generic-array 0.12.4", -] - [[package]] name = "digest" version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3dd60d1080a57a05ab032377049e0591415d2b31afd7028356dbf3cc6dcb066" dependencies = [ - "generic-array 0.14.7", + "generic-array", ] [[package]] @@ -1304,9 +1249,22 @@ dependencies = [ [[package]] name = "env_logger" -version = "0.11.2" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4cd405aab171cb85d6735e5c8d9db038c17d3ca007a4d2c25f337935c3d90580" +dependencies = [ + "humantime", + "is-terminal", + "log", + "regex", + "termcolor", +] + +[[package]] +name = "env_logger" +version = "0.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c012a26a7f605efc424dd53697843a72be7dc86ad2d01f7814337794a12231d" +checksum = "38b35839ba51819680ba087cd351788c9a3c476841207e0b8cee0b04722343b9" dependencies = [ "anstream", "anstyle", @@ -1399,12 +1357,6 @@ dependencies = [ "syn 2.0.52", ] -[[package]] -name = "fake-simd" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed" - [[package]] name = "fallible-iterator" version = "0.2.0" @@ -1597,15 +1549,6 @@ dependencies = [ "slab", ] -[[package]] -name = "generic-array" -version = "0.12.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ffdf9f34f1447443d37393cc6c2b8313aebddcd96906caf34e54c68d8e57d7bd" -dependencies = [ - "typenum", -] - [[package]] name = "generic-array" version = "0.14.7" @@ -1665,7 +1608,7 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http 0.2.11", + "http 0.2.12", "indexmap 2.2.5", "slab", "tokio", @@ -1684,7 +1627,7 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http 1.0.0", + "http 1.1.0", "indexmap 2.2.5", "slab", "tokio", @@ -1722,7 +1665,7 @@ version = "0.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e" dependencies = [ - "ahash 0.8.10", + "ahash 0.8.11", ] [[package]] @@ -1731,7 +1674,7 @@ version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" dependencies = [ - "ahash 0.8.10", + "ahash 0.8.11", "allocator-api2", "serde", ] @@ -1790,15 +1733,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "17ea0a1394df5b6574da6e0c1ade9e78868c9fb0a4e5ef4428e32da4676b85b1" dependencies = [ "digest 0.9.0", - "generic-array 0.14.7", + "generic-array", "hmac 0.8.1", ] [[package]] name = "http" -version = "0.2.11" +version = "0.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8947b1a6fad4393052c7ba1f4cd97bed3e953a95c79c92ad9b051a04611d9fbb" +checksum = "601cbb57e577e2f5ef5be8e7b83f0f63994f25aa94d673e54a92d5c516d101f1" dependencies = [ "bytes", "fnv", @@ -1807,9 +1750,9 @@ dependencies = [ [[package]] name = "http" -version = "1.0.0" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b32afd38673a8016f7c9ae69e5af41a58f81b1d31689040f2f1959594ce194ea" +checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" dependencies = [ "bytes", "fnv", @@ -1823,7 +1766,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" dependencies = [ "bytes", - "http 0.2.11", + "http 0.2.12", "pin-project-lite", ] @@ -1834,7 +1777,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" dependencies = [ "bytes", - "http 1.0.0", + "http 1.1.0", ] [[package]] @@ -1845,7 +1788,7 @@ checksum = "41cb79eb393015dadd30fc252023adb0b2400a0caee0fa2a077e6e21a551e840" dependencies = [ "bytes", "futures-util", - "http 1.0.0", + "http 1.1.0", "http-body 1.0.0", "pin-project-lite", ] @@ -1879,7 +1822,7 @@ dependencies = [ "futures-core", "futures-util", "h2 0.3.24", - "http 0.2.11", + "http 0.2.12", "http-body 0.4.6", "httparse", "httpdate", @@ -1902,7 +1845,7 @@ dependencies = [ "futures-channel", "futures-util", "h2 0.4.2", - "http 1.0.0", + "http 1.1.0", "http-body 1.0.0", "httparse", "httpdate", @@ -1919,7 +1862,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" dependencies = [ "futures-util", - "http 0.2.11", + "http 0.2.12", "hyper 0.14.28", "log", "rustls 0.21.10", @@ -1936,7 +1879,7 @@ checksum = "ca38ef113da30126bbff9cd1705f9273e15d45498615d138b0c20279ac7a76aa" dependencies = [ "bytes", "futures-util", - "http 1.0.0", + "http 1.1.0", "http-body 1.0.0", "hyper 1.2.0", "pin-project-lite", @@ -2045,7 +1988,7 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a0c10553d664a4d0bcff9f4215d0aac67a639cc68ef660840afe309b807bc9f5" dependencies = [ - "generic-array 0.14.7", + "generic-array", ] [[package]] @@ -2077,6 +2020,17 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "is-terminal" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f23ff5ef2b80d608d61efee834934d862cd92461afc0560dedf493e4c033738b" +dependencies = [ + "hermit-abi", + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "itertools" version = "0.10.5" @@ -2103,9 +2057,9 @@ checksum = "b1a46d1a171d865aa5f83f92695765caa047a9b4cbae2cbf37dbd613a793fd4c" [[package]] name = "js-sys" -version = "0.3.68" +version = "0.3.69" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "406cda4b368d531c842222cf9d2600a9a4acce8d29423695379c6868a143a9ee" +checksum = "29c15563dc2726973df627357ce0c9ddddbea194836909d655df6a75d2cf296d" dependencies = [ "wasm-bindgen", ] @@ -2124,12 +2078,12 @@ dependencies = [ [[package]] name = "jsonrpsee" -version = "0.22.1" +version = "0.22.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16fcc9dd231e72d22993f1643d5f7f0db785737dbe3c3d7ca222916ab4280795" +checksum = "87f3ae45a64cfc0882934f963be9431b2a165d667f53140358181f262aca0702" dependencies = [ - "jsonrpsee-core 0.22.1", - "jsonrpsee-types 0.22.1", + "jsonrpsee-core 0.22.2", + "jsonrpsee-types 0.22.2", "jsonrpsee-ws-client", ] @@ -2140,7 +2094,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9f9ed46590a8d5681975f126e22531698211b926129a40a2db47cbca429220" dependencies = [ "futures-util", - "http 0.2.11", + "http 0.2.12", "jsonrpsee-core 0.21.0", "pin-project", "rustls-native-certs 0.7.0", @@ -2156,13 +2110,13 @@ dependencies = [ [[package]] name = "jsonrpsee-client-transport" -version = "0.22.1" +version = "0.22.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0476c96eb741b40d39dcb39d0124e3b9be9840ec77653c42a0996563ae2a53f7" +checksum = "455fc882e56f58228df2aee36b88a1340eafd707c76af2fa68cf94b37d461131" dependencies = [ "futures-util", - "http 0.2.11", - "jsonrpsee-core 0.22.1", + "http 0.2.12", + "jsonrpsee-core 0.22.2", "pin-project", "rustls-native-certs 0.7.0", "rustls-pki-types", @@ -2201,9 +2155,9 @@ dependencies = [ [[package]] name = "jsonrpsee-core" -version = "0.22.1" +version = "0.22.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b974d8f6139efbe8425f32cb33302aba6d5e049556b5bfc067874e7a0da54a2e" +checksum = "b75568f4f9696e3a47426e1985b548e1a9fcb13372a5e320372acaf04aca30d1" dependencies = [ "anyhow", "async-lock 3.3.0", @@ -2211,7 +2165,7 @@ dependencies = [ "beef", "futures-timer", "futures-util", - "jsonrpsee-types 0.22.1", + "jsonrpsee-types 0.22.2", "pin-project", "rustc-hash", "serde", @@ -2257,9 +2211,9 @@ dependencies = [ [[package]] name = "jsonrpsee-types" -version = "0.22.1" +version = "0.22.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b13dac43c1a9fc2648b37f306b0a5b0e29b2a6e1c36a33b95c1948da2494e9c5" +checksum = "3467fd35feeee179f71ab294516bdf3a81139e7aeebdd860e46897c12e1a3368" dependencies = [ "anyhow", "beef", @@ -2270,24 +2224,42 @@ dependencies = [ [[package]] name = "jsonrpsee-ws-client" -version = "0.22.1" +version = "0.22.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1bbaaf4ce912654081d997ade417c3155727db106c617c0612e85f504c2f744" +checksum = "68ca71e74983f624c0cb67828e480a981586074da8ad3a2f214c6a3f884edab9" dependencies = [ - "http 0.2.11", - "jsonrpsee-client-transport 0.22.1", - "jsonrpsee-core 0.22.1", - "jsonrpsee-types 0.22.1", + "http 0.2.12", + "jsonrpsee-client-transport 0.22.2", + "jsonrpsee-core 0.22.2", + "jsonrpsee-types 0.22.2", "url", ] [[package]] name = "kalatori" -version = "0.1.1" +version = "0.1.2" dependencies = [ "anyhow", "axum", - "env_logger", + "env_logger 0.11.3", + "hex", + "log", + "reconnecting-jsonrpsee-ws-client", + "redb", + "serde", + "serde_json", + "subxt", + "tokio", + "tokio-util", +] + +[[package]] +name = "kalatori-ah" +version = "0.1.2" +dependencies = [ + "anyhow", + "axum", + "env_logger 0.10.2", "hex", "log", "reconnecting-jsonrpsee-ws-client", @@ -2468,18 +2440,6 @@ dependencies = [ "hash-db", ] -[[package]] -name = "merlin" -version = "2.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e261cf0f8b3c42ded9f7d2bb59dea03aa52bc8a1cbc7482f9fc3fd1229d3b42" -dependencies = [ - "byteorder", - "keccak", - "rand_core 0.5.1", - "zeroize", -] - [[package]] name = "merlin" version = "3.0.0" @@ -2640,12 +2600,6 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" -[[package]] -name = "opaque-debug" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2839e79665f131bdb5782e51f2c6c9599c133c6098982a54c794358bf432529c" - [[package]] name = "opaque-debug" version = "0.3.1" @@ -2746,18 +2700,18 @@ checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" [[package]] name = "pin-project" -version = "1.1.4" +version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0302c4a0442c456bd56f841aee5c3bfd17967563f6fadc9ceb9f9c23cf3807e0" +checksum = "b6bf43b791c5b9e34c3d182969b4abb522f9343702850a2e57f460d00d09b4b3" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "1.1.4" +version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "266c042b60c9c76b8d53061e52b2e0d1116abc57cefc8c5cd671619a56ac3690" +checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", @@ -2824,7 +2778,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8159bd90725d2df49889a078b54f4f79e87f1f8a8444194cdca81d38f5393abf" dependencies = [ "cpufeatures", - "opaque-debug 0.3.1", + "opaque-debug", "universal-hash", ] @@ -2985,7 +2939,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ea5cf7b021db88f1af45a9b2ecdbe5bc1c5cbebc146632269d572cdd435f5cf" dependencies = [ "futures", - "jsonrpsee 0.22.1", + "jsonrpsee 0.22.2", "serde_json", "subxt", "thiserror", @@ -3041,7 +2995,7 @@ checksum = "b62dbe01f0b06f9d8dc7d49e05a0785f153b00b2c227856282f671e0318c9b15" dependencies = [ "aho-corasick", "memchr", - "regex-automata 0.4.5", + "regex-automata 0.4.6", "regex-syntax 0.8.2", ] @@ -3056,9 +3010,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.4.5" +version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5bb987efffd3c6d0d8f5f89510bb458559eab11e4f869acb20bf845e016259cd" +checksum = "86b83b8b9847f9bf95ef68afb0b8e6cdb80f498442f5179a29fad448fcc1eaea" dependencies = [ "aho-corasick", "memchr", @@ -3416,27 +3370,11 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "772575a524feeb803e5b0fcbc6dd9f367e579488197c94c6e4023aad2305774d" dependencies = [ - "ahash 0.8.10", + "ahash 0.8.11", "cfg-if", "hashbrown 0.13.2", ] -[[package]] -name = "schnorrkel" -version = "0.9.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "021b403afe70d81eea68f6ea12f6b3c9588e5d536a94c3bf80f15e7faa267862" -dependencies = [ - "arrayref", - "arrayvec 0.5.2", - "curve25519-dalek 2.1.3", - "merlin 2.0.1", - "rand_core 0.5.1", - "sha2 0.8.2", - "subtle", - "zeroize", -] - [[package]] name = "schnorrkel" version = "0.11.4" @@ -3448,7 +3386,7 @@ dependencies = [ "arrayvec 0.7.4", "curve25519-dalek 4.1.2", "getrandom_or_panic", - "merlin 3.0.0", + "merlin", "rand_core 0.6.4", "serde_bytes", "sha2 0.10.8", @@ -3609,19 +3547,7 @@ dependencies = [ "cfg-if", "cpufeatures", "digest 0.9.0", - "opaque-debug 0.3.1", -] - -[[package]] -name = "sha2" -version = "0.8.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a256f46ea78a0c0d9ff00077504903ac881a1dafdc20da66545699e7776b3e69" -dependencies = [ - "block-buffer 0.7.3", - "digest 0.8.1", - "fake-simd", - "opaque-debug 0.2.3", + "opaque-debug", ] [[package]] @@ -3634,7 +3560,7 @@ dependencies = [ "cfg-if", "cpufeatures", "digest 0.9.0", - "opaque-debug 0.3.1", + "opaque-debug", ] [[package]] @@ -3757,7 +3683,7 @@ dependencies = [ "itertools 0.12.1", "libm", "libsecp256k1", - "merlin 3.0.0", + "merlin", "no-std-net", "nom", "num-bigint", @@ -3769,7 +3695,7 @@ dependencies = [ "rand", "rand_chacha", "ruzstd", - "schnorrkel 0.11.4", + "schnorrkel", "serde", "serde_json", "sha2 0.10.8", @@ -3895,14 +3821,14 @@ dependencies = [ "itertools 0.10.5", "libsecp256k1", "log", - "merlin 3.0.0", + "merlin", "parity-scale-codec", "parking_lot", "paste", "primitive-types", "rand", "scale-info", - "schnorrkel 0.11.4", + "schnorrkel", "secp256k1", "secrecy", "serde", @@ -4125,7 +4051,7 @@ version = "29.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2e4d24d84a0beb44a71dcac1b41980e1edf7fb722c7f3046710136a283cd479b" dependencies = [ - "ahash 0.8.10", + "ahash 0.8.11", "hash-db", "lazy_static", "memory-db", @@ -4225,13 +4151,13 @@ checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" [[package]] name = "substrate-bip39" -version = "0.4.5" +version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e620c7098893ba667438b47169c00aacdd9e7c10e042250ce2b60b087ec97328" +checksum = "6a7590dc041b9bc2825e52ce5af8416c73dbe9d0654402bfd4b4941938b94d8f" dependencies = [ "hmac 0.11.0", "pbkdf2 0.8.0", - "schnorrkel 0.9.1", + "schnorrkel", "sha2 0.9.9", "zeroize", ] @@ -4883,9 +4809,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.91" +version = "0.2.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1e124130aee3fb58c5bdd6b639a0509486b0338acaaae0c84a5124b0f588b7f" +checksum = "4be2531df63900aeb2bca0daaaddec08491ee64ceecbee5076636a3b026795a8" dependencies = [ "cfg-if", "wasm-bindgen-macro", @@ -4893,9 +4819,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.91" +version = "0.2.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c9e7e1900c352b609c8488ad12639a311045f40a35491fb69ba8c12f758af70b" +checksum = "614d787b966d3989fa7bb98a654e369c762374fd3213d212cfc0251257e747da" dependencies = [ "bumpalo", "log", @@ -4908,9 +4834,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.91" +version = "0.2.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b30af9e2d358182b5c7449424f017eba305ed32a7010509ede96cdc4696c46ed" +checksum = "a1f8823de937b71b9460c0c34e25f3da88250760bec0ebac694b49997550d726" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -4918,9 +4844,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.91" +version = "0.2.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "642f325be6301eb8107a83d12a8ac6c1e1c54345a7ef1a9261962dfefda09e66" +checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" dependencies = [ "proc-macro2", "quote", @@ -4931,9 +4857,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.91" +version = "0.2.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f186bd2dcf04330886ce82d6f33dd75a7bfcf69ecf5763b89fcde53b6ac9838" +checksum = "af190c94f2773fdb3729c55b007a722abb5384da03bc0986df4c289bf5567e96" [[package]] name = "wasmi" diff --git a/Cargo.toml b/Cargo.toml index 67f4b8b..b63f785 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "kalatori" authors = ["Alzymologist Oy "] -version = "0.1.1" +version = "0.1.2" edition = "2021" description = "A gateway daemon for Kalatori." license = "GPL-3.0-or-later" @@ -10,6 +10,9 @@ readme = true keywords = ["substrate", "blockchain", "finance", "service", "middleware"] categories = ["finance"] +[workspace] +members = ["kalatori-ah"] + [dependencies] tokio = { version = "1", features = ["full"] } tokio-util = { version = "0.7", features = ["full"] } diff --git a/kalatori-ah/Cargo.toml b/kalatori-ah/Cargo.toml new file mode 100644 index 0000000..8a3230a --- /dev/null +++ b/kalatori-ah/Cargo.toml @@ -0,0 +1,42 @@ +[package] +name = "kalatori-ah" +authors = ["Alzymologist Oy "] +version = "0.1.2" +edition = "2021" +description = "A gateway daemon for Kalatori." +license = "GPL-3.0-or-later" +repository = "https://github.com/Alzymologist/Kalatori-backend" +readme = true +keywords = ["substrate", "blockchain", "finance", "service", "middleware"] +categories = ["finance"] + +[dependencies] +tokio = { version = "1", features = ["full"] } +tokio-util = { version = "0.7", features = ["full"] } +anyhow = "1" +env_logger = "0.10" +log = "0.4" +subxt = { version = "0.34", features = ["substrate-compat"] } +axum = "0.7" +serde = "1" +redb = "1" +serde_json = "1" +hex = "0.4" +reconnecting-jsonrpsee-ws-client = { version = "0.3", features = ["subxt"] } + +[profile.release] +strip = true +lto = true +codegen-units = 1 + +[lints.rust] +future_incompatible = "warn" +let_underscore = "warn" +rust_2018_idioms = "warn" +unused = "warn" + +[lints.clippy] +shadow_reuse = "warn" +shadow_same = "warn" +shadow_unrelated = "warn" +cargo_common_metadata = "warn" diff --git a/kalatori-ah/src/database.rs b/kalatori-ah/src/database.rs new file mode 100644 index 0000000..6cbe14d --- /dev/null +++ b/kalatori-ah/src/database.rs @@ -0,0 +1,380 @@ +use crate::{ + rpc::{ChainProperties, EndpointProperties}, + Account, Balance, BlockNumber, RuntimeConfig, Version, DATABASE_VERSION, OVERRIDE_RPC, SEED, +}; +use anyhow::{Context, Result}; +use redb::{ + backends::InMemoryBackend, AccessGuard, ReadOnlyTable, ReadableTable, RedbValue, Table, + TableDefinition, TableHandle, TypeName, +}; +use std::sync::Arc; +use subxt::{ + ext::{ + codec::{Compact, Decode, Encode}, + sp_core::{ + crypto::Ss58Codec, + sr25519::{Pair, Public}, + DeriveJunction, Pair as _, + }, + }, + tx::PairSigner, +}; +use tokio::{ + sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}, + task, +}; + +type Order = [u8; 32]; + +pub const MODULE: &str = module_path!(); + +// Tables + +const ROOT: TableDefinition<'_, &str, Vec> = TableDefinition::new("root"); +const INVOICES: TableDefinition<'_, &[u8; 32], Invoice> = TableDefinition::new("invoices"); + +// Keys + +// The database version must be stored in a separate slot to be used by the not implemented yet +// database migration logic. +const DB_VERSION_KEY: &str = "db_version"; +const DAEMON_INFO: &str = "daemon_info"; +const LAST_BLOCK: &str = "last_block"; + +// Slots + +#[derive(Debug, Encode, Decode)] +#[codec(crate = subxt::ext::codec)] +pub struct Invoice { + pub recipient: Account, + pub order: Order, + pub status: InvoiceStatus, +} + +impl Invoice { + pub fn signer(&self, pair: &Pair) -> Result> { + let invoice_pair = pair + .derive( + [self.recipient.clone().into(), self.order] + .map(DeriveJunction::Hard) + .into_iter(), + None, + ) + .context("failed to derive an invoice key pair")? + .0; + + Ok(PairSigner::new(invoice_pair)) + } +} + +#[derive(Debug, Encode, Decode)] +#[codec(crate = subxt::ext::codec)] +pub enum InvoiceStatus { + Unpaid(Balance), + Paid(Balance), +} + +impl RedbValue for Invoice { + type SelfType<'a> = Self; + + type AsBytes<'a> = Vec; + + fn fixed_width() -> Option { + None + } + + fn from_bytes<'a>(mut data: &'a [u8]) -> Self::SelfType<'_> + where + Self: 'a, + { + Self::decode(&mut data).unwrap() + } + + fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'a>) -> Self::AsBytes<'_> { + value.encode() + } + + fn type_name() -> TypeName { + TypeName::new(stringify!(Invoice)) + } +} + +#[derive(Encode, Decode)] +#[codec(crate = subxt::ext::codec)] +struct DaemonInfo { + rpc: String, + key: Public, +} + +pub struct Database { + db: redb::Database, + properties: Arc>, + pair: Pair, + rpc: String, + destination: Option, +} + +impl Database { + pub fn initialise( + path_option: Option, + override_rpc: bool, + pair: Pair, + EndpointProperties { url, chain }: EndpointProperties, + destination: Option, + ) -> Result<(Arc, Option)> { + let public = pair.public(); + let public_formatted = public.to_ss58check_with_version( + task::block_in_place(|| chain.blocking_read()).address_format, + ); + let given_rpc = url.get(); + + let mut database = if let Some(path) = path_option { + log::info!("Creating/Opening the database at \"{path}\"."); + + redb::Database::create(path) + } else { + log::warn!( + "The in-memory backend for the database is selected. All saved data will be deleted after the shutdown!" + ); + + redb::Database::builder().create_with_backend(InMemoryBackend::new()) + }.context("failed to create/open the database")?; + + let tx = database + .begin_write() + .context("failed to begin a write transaction")?; + let mut table = tx + .open_table(ROOT) + .with_context(|| format!("failed to open the `{}` table", ROOT.name()))?; + drop( + tx.open_table(INVOICES) + .with_context(|| format!("failed to open the `{}` table", INVOICES.name()))?, + ); + + let last_block = match ( + get_slot(&table, DB_VERSION_KEY)?, + get_slot(&table, DAEMON_INFO)?, + get_slot(&table, LAST_BLOCK)?, + ) { + (None, None, None) => { + table + .insert( + DB_VERSION_KEY, + Compact(DATABASE_VERSION).encode(), + ) + .context("failed to insert the database version")?; + insert_daemon_info(&mut table, given_rpc.clone(), public)?; + + None + } + (Some(encoded_db_version), Some(daemon_info), last_block_option) => { + let Compact::(db_version) = + decode_slot(&encoded_db_version, DB_VERSION_KEY)?; + let DaemonInfo { rpc: db_rpc, key } = decode_slot(&daemon_info, DAEMON_INFO)?; + + if db_version != DATABASE_VERSION { + anyhow::bail!( + "database contains an unsupported database version (\"{db_version}\"), expected \"{DATABASE_VERSION}\"" + ); + } + + if public != key { + anyhow::bail!( + "public key from `{SEED}` doesn't equal the one from the database (\"{public_formatted}\")" + ); + } + + if given_rpc != db_rpc { + if override_rpc { + log::warn!( + "The saved RPC endpoint ({db_rpc:?}) differs from the given one ({given_rpc:?}) and will be overwritten by it because `{OVERRIDE_RPC}` is set." + ); + + insert_daemon_info(&mut table, given_rpc.clone(), public)?; + } else { + anyhow::bail!( + "database contains a different RPC endpoint address ({db_rpc:?}), expected {given_rpc:?}" + ); + } + } else if override_rpc { + log::warn!( + "`{OVERRIDE_RPC}` is set but the saved RPC endpoint ({db_rpc:?}) equals to the given one." + ); + } + + if let Some(encoded_last_block) = last_block_option { + Some(decode_slot::>(&encoded_last_block, LAST_BLOCK)?.0) + } else { + None + } + } + _ => anyhow::bail!( + "database was found but it doesn't contain `{DB_VERSION_KEY:?}` and/or `{DAEMON_INFO:?}`, maybe it was created by another program" + ), + }; + + drop(table); + + tx.commit().context("failed to commit a transaction")?; + + let compacted = database + .compact() + .context("failed to compact the database")?; + + if compacted { + log::debug!("The database was successfully compacted."); + } else { + log::debug!("The database doesn't need the compaction."); + } + + log::info!("Public key from the given seed: \"{public_formatted}\"."); + + Ok(( + Arc::new(Self { + db: database, + properties: chain, + pair, + rpc: given_rpc, + destination, + }), + last_block, + )) + } + + pub fn rpc(&self) -> &str { + &self.rpc + } + + pub fn destination(&self) -> &Option { + &self.destination + } + + pub fn write(&self) -> Result> { + self.db + .begin_write() + .map(WriteTransaction) + .context("failed to begin a write transaction for the database") + } + + pub fn read(&self) -> Result> { + self.db + .begin_read() + .map(ReadTransaction) + .context("failed to begin a read transaction for the database") + } + + pub async fn properties(&self) -> RwLockReadGuard<'_, ChainProperties> { + self.properties.read().await + } + + pub async fn properties_write(&self) -> RwLockWriteGuard<'_, ChainProperties> { + self.properties.write().await + } + + pub fn pair(&self) -> &Pair { + &self.pair + } +} + +pub struct ReadTransaction<'db>(redb::ReadTransaction<'db>); + +impl ReadTransaction<'_> { + pub fn invoices(&self) -> Result> { + self.0 + .open_table(INVOICES) + .map(ReadInvoices) + .with_context(|| format!("failed to open the `{}` table", INVOICES.name())) + } +} + +pub struct ReadInvoices<'tx>(ReadOnlyTable<'tx, &'static [u8; 32], Invoice>); + +impl ReadInvoices<'_> { + pub fn get(&self, account: &Account) -> Result>> { + self.0 + .get(AsRef::<[u8; 32]>::as_ref(account)) + .context("failed to get an invoice from the database") + } + + pub fn try_iter( + &self, + ) -> Result, AccessGuard<'_, Invoice>)>>> + { + self.0 + .iter() + .context("failed to get the invoices iterator") + .map(|iter| iter.map(|item| item.context("failed to get an invoice from the iterator"))) + } +} + +pub struct WriteTransaction<'db>(redb::WriteTransaction<'db>); + +impl<'db> WriteTransaction<'db> { + pub fn root(&self) -> Result> { + self.0 + .open_table(ROOT) + .map(Root) + .with_context(|| format!("failed to open the `{}` table", ROOT.name())) + } + + pub fn invoices(&self) -> Result> { + self.0 + .open_table(INVOICES) + .map(WriteInvoices) + .with_context(|| format!("failed to open the `{}` table", INVOICES.name())) + } + + pub fn commit(self) -> Result<()> { + self.0 + .commit() + .context("failed to commit a write transaction in the database") + } +} + +pub struct WriteInvoices<'db, 'tx>(Table<'db, 'tx, &'static [u8; 32], Invoice>); + +impl WriteInvoices<'_, '_> { + pub fn save( + &mut self, + account: &Account, + invoice: &Invoice, + ) -> Result>> { + self.0 + .insert(AsRef::<[u8; 32]>::as_ref(account), invoice) + .context("failed to save an invoice in the database") + } +} + +pub struct Root<'db, 'tx>(Table<'db, 'tx, &'static str, Vec>); + +impl Root<'_, '_> { + pub fn save_last_block(&mut self, number: BlockNumber) -> Result<()> { + self.0 + .insert(LAST_BLOCK, Compact(number).encode()) + .context("context")?; + + Ok(()) + } +} + +fn get_slot(table: &Table<'_, '_, &str, Vec>, key: &str) -> Result>> { + table + .get(key) + .map(|slot_option| slot_option.map(|slot| slot.value().clone())) + .with_context(|| format!("failed to get the {key:?} slot")) +} + +fn decode_slot(mut slot: &[u8], key: &str) -> Result { + T::decode(&mut slot).with_context(|| format!("failed to decode the {key:?} slot")) +} + +fn insert_daemon_info( + table: &mut Table<'_, '_, &str, Vec>, + rpc: String, + key: Public, +) -> Result<()> { + table + .insert(DAEMON_INFO, DaemonInfo { rpc, key }.encode()) + .map(|_| ()) + .context("failed to insert the daemon info") +} diff --git a/kalatori-ah/src/lib.rs b/kalatori-ah/src/lib.rs new file mode 100644 index 0000000..2ccdd81 --- /dev/null +++ b/kalatori-ah/src/lib.rs @@ -0,0 +1,322 @@ +use anyhow::{Context, Error, Result}; +use database::Database; +use env_logger::{Builder, Env}; +use environment_variables::{ + DATABASE, DESTINATION, HOST, IN_MEMORY_DB, LOG, LOG_STYLE, OVERRIDE_RPC, RPC, SEED, USD_ASSET, +}; +use log::LevelFilter; +use rpc::Processor; +use serde::Deserialize; +use std::{ + env::{self, VarError}, + future::Future, +}; +use subxt::{ + config::{DefaultExtrinsicParams, Header}, + ext::{ + codec::{Decode, Encode}, + scale_decode::DecodeAsType, + scale_encode::EncodeAsType, + sp_core::{crypto::AccountId32, Pair}, + }, + Config, PolkadotConfig, +}; +use tokio::{ + signal, + sync::mpsc::{self, UnboundedSender}, +}; +use tokio_util::{sync::CancellationToken, task::TaskTracker}; + +mod database; +mod rpc; + +pub mod server; + +pub mod environment_variables { + pub const HOST: &str = "KALATORI_HOST"; + pub const SEED: &str = "KALATORI_SEED"; + pub const LOG: &str = "KALATORI_LOG"; + pub const LOG_STYLE: &str = "KALATORI_LOG_STYLE"; + pub const DATABASE: &str = "KALATORI_DATABASE"; + pub const RPC: &str = "KALATORI_RPC"; + pub const OVERRIDE_RPC: &str = "KALATORI_OVERRIDE_RPC"; + pub const IN_MEMORY_DB: &str = "KALATORI_IN_MEMORY_DB"; + pub const DESTINATION: &str = "KALATORI_DESTINATION"; + pub const USD_ASSET: &str = "KALATORI_USD_ASSET"; +} + +pub const DEFAULT_RPC: &str = "wss://westend-asset-hub-rpc.polkadot.io"; +pub const DATABASE_VERSION: Version = 0; +// Expected USD(C/T) fee (0.03) +pub const EXPECTED_USDX_FEE: Balance = 30000; + +const USDT_ID: u32 = 1984; +const USDC_ID: u32 = 1337; +// https://github.com/paritytech/polkadot-sdk/blob/7c9fd83805cc446983a7698c7a3281677cf655c8/substrate/client/cli/src/config.rs#L50 +const SCANNER_TO_LISTENER_SWITCH_POINT: BlockNumber = 512; + +#[derive(Clone, Copy)] +enum Usd { + T, + C, +} + +impl Usd { + fn id(self) -> u32 { + match self { + Usd::T => USDT_ID, + Usd::C => USDC_ID, + } + } +} + +type OnlineClient = subxt::OnlineClient; +type Account = ::AccountId; +type BlockNumber = <::Header as Header>::Number; +type Hash = ::Hash; +// https://github.com/paritytech/polkadot-sdk/blob/a3dc2f15f23b3fd25ada62917bfab169a01f2b0d/substrate/bin/node/primitives/src/lib.rs#L43 +type Balance = u128; +// https://github.com/paritytech/subxt/blob/f06a95d687605bf826db9d83b2932a73a57b169f/subxt/src/config/signed_extensions.rs#L71 +type Nonce = u64; +// https://github.com/dtolnay/semver/blob/f9cc2df9415c880bd3610c2cdb6785ac7cad31ea/src/lib.rs#L163-L165 +type Version = u64; +// https://github.com/serde-rs/json/blob/0131ac68212e8094bd14ee618587d731b4f9a68b/src/number.rs#L29 +type Decimals = u64; + +struct RuntimeConfig; + +impl Config for RuntimeConfig { + type Hash = ::Hash; + type AccountId = AccountId32; + type Address = ::Address; + type Signature = ::Signature; + type Hasher = ::Hasher; + type Header = ::Header; + type ExtrinsicParams = DefaultExtrinsicParams; + type AssetId = u32; +} + +#[derive(EncodeAsType, Encode, Decode, DecodeAsType, Clone, Debug, Deserialize, PartialEq)] +#[encode_as_type(crate_path = "subxt::ext::scale_encode")] +#[decode_as_type(crate_path = "subxt::ext::scale_decode")] +#[codec(crate = subxt::ext::codec)] +struct MultiLocation { + /// The number of parent junctions at the beginning of this `MultiLocation`. + parents: u8, + /// The interior (i.e. non-parent) junctions that this `MultiLocation` contains. + interior: Junctions, +} + +#[derive(EncodeAsType, Encode, Decode, DecodeAsType, Clone, Debug, Deserialize, PartialEq)] +#[encode_as_type(crate_path = "subxt::ext::scale_encode")] +#[decode_as_type(crate_path = "subxt::ext::scale_decode")] +#[codec(crate = subxt::ext::codec)] +enum Junctions { + /// The interpreting consensus system. + #[codec(index = 0)] + Here, + /// A relative path comprising 2 junctions. + #[codec(index = 2)] + X2(Junction, Junction), +} + +#[derive(EncodeAsType, Encode, Decode, DecodeAsType, Clone, Debug, Deserialize, PartialEq)] +#[encode_as_type(crate_path = "subxt::ext::scale_encode")] +#[decode_as_type(crate_path = "subxt::ext::scale_decode")] +#[codec(crate = subxt::ext::codec)] +enum Junction { + /// An instanced, indexed pallet that forms a constituent part of the context. + /// + /// Generally used when the context is a Frame-based chain. + #[codec(index = 4)] + PalletInstance(u8), + /// A non-descript index within the context location. + /// + /// Usage will vary widely owing to its generality. + /// + /// NOTE: Try to avoid using this and instead use a more specific item. + #[codec(index = 5)] + GeneralIndex(#[codec(compact)] u128), +} + +#[doc(hidden)] +#[allow(clippy::too_many_lines)] +#[tokio::main] +pub async fn main() -> Result<()> { + let mut builder = Builder::new(); + + if cfg!(debug_assertions) { + builder.filter_level(LevelFilter::Debug) + } else { + builder + .filter_level(LevelFilter::Off) + .filter_module(server::MODULE, LevelFilter::Info) + .filter_module(rpc::MODULE, LevelFilter::Info) + .filter_module(database::MODULE, LevelFilter::Info) + .filter_module(env!("CARGO_PKG_NAME"), LevelFilter::Info) + } + .parse_env(Env::new().filter(LOG).write_style(LOG_STYLE)) + .init(); + + let host = env::var(HOST) + .with_context(|| format!("`{HOST}` isn't set"))? + .parse() + .with_context(|| format!("failed to convert `{HOST}` to a socket address"))?; + + let usd_asset = match env::var(USD_ASSET) + .with_context(|| format!("`{USD_ASSET}` isn't set"))? + .as_str() + { + "USDC" => Usd::C, + "USDT" => Usd::T, + _ => anyhow::bail!("{USD_ASSET} must equal USDC or USDT"), + }; + + let pair = Pair::from_string( + &env::var(SEED).with_context(|| format!("`{SEED}` isn't set"))?, + None, + ) + .with_context(|| format!("failed to generate a key pair from `{SEED}`"))?; + + let endpoint = env::var(RPC).or_else(|error| { + if error == VarError::NotPresent { + log::debug!( + "`{RPC}` isn't present, using the default value instead: \"{DEFAULT_RPC}\"." + ); + + Ok(DEFAULT_RPC.into()) + } else { + Err(error).context(format!("failed to read `{RPC}`")) + } + })?; + + let override_rpc = env::var_os(OVERRIDE_RPC).is_some(); + + let database_path = if env::var_os(IN_MEMORY_DB).is_none() { + Some(env::var(DATABASE).or_else(|error| { + if error == VarError::NotPresent { + let default_v = match usd_asset { + Usd::C => "database-ah-usdc.redb", + Usd::T => "database-ah-usdt.redb", + }; + + log::debug!( + "`{DATABASE}` isn't present, using the default value instead: \"{default_v}\"." + ); + + Ok(default_v.into()) + } else { + Err(error).context(format!("failed to read `{DATABASE}`")) + } + })?) + } else { + if env::var_os(DATABASE).is_some() { + log::warn!( + "`{IN_MEMORY_DB}` is set along with `{DATABASE}`. The latter will be ignored." + ); + } + + None + }; + + let destination = match env::var(DESTINATION) { + Ok(destination) => Ok(Some( + AccountId32::try_from(hex::decode(&destination[2..])?.as_ref()) + .map_err(|()| anyhow::anyhow!("unknown destination address length"))?, + )), + Err(VarError::NotPresent) => Ok(None), + Err(error) => Err(error).context(format!("failed to read `{DESTINATION}`")), + }?; + + log::info!( + "Kalatori {} by {} is starting...", + env!("CARGO_PKG_VERSION"), + env!("CARGO_PKG_AUTHORS") + ); + + let shutdown_notification = CancellationToken::new(); + let (error_tx, mut error_rx) = mpsc::unbounded_channel(); + + let (api_config, endpoint_properties, updater) = + rpc::prepare(endpoint, shutdown_notification.clone(), usd_asset) + .await + .context("failed to prepare the node module")?; + + let (database, last_saved_block) = Database::initialise( + database_path, + override_rpc, + pair, + endpoint_properties, + destination, + ) + .context("failed to initialise the database module")?; + + let processor = Processor::new(api_config, database.clone(), shutdown_notification.clone()) + .context("failed to initialise the RPC module")?; + + let server = server::new(shutdown_notification.clone(), host, database) + .await + .context("failed to initialise the server module")?; + + let task_tracker = TaskTracker::new(); + + task_tracker.close(); + + task_tracker.spawn(shutdown( + shutdown_listener(shutdown_notification.clone()), + error_tx.clone(), + )); + task_tracker.spawn(shutdown(updater.ignite(), error_tx.clone())); + task_tracker.spawn(shutdown( + processor.ignite(last_saved_block, task_tracker.clone(), error_tx.clone()), + error_tx, + )); + task_tracker.spawn(server); + + while let Some(error) = error_rx.recv().await { + log::error!("Received a fatal error!\n{error:?}"); + + if !shutdown_notification.is_cancelled() { + log::info!("Initialising the shutdown..."); + + shutdown_notification.cancel(); + } + } + + task_tracker.wait().await; + + log::info!("Goodbye!"); + + Ok(()) +} + +async fn shutdown_listener(shutdown_notification: CancellationToken) -> Result<&'static str> { + tokio::select! { + biased; + signal = signal::ctrl_c() => { + signal.context("failed to listen for the shutdown signal")?; + + // Print shutdown log messages on the next line after the Control-C command. + println!(); + + log::info!("Received the shutdown signal. Initialising the shutdown..."); + + shutdown_notification.cancel(); + + Ok("The shutdown signal listener is shut down.") + } + () = shutdown_notification.cancelled() => { + Ok("The shutdown signal listener is shut down.") + } + } +} + +async fn shutdown( + task: impl Future>, + error_tx: UnboundedSender, +) { + match task.await { + Ok(shutdown_message) => log::info!("{shutdown_message}"), + Err(error) => error_tx.send(error).unwrap(), + } +} diff --git a/kalatori-ah/src/main.rs b/kalatori-ah/src/main.rs new file mode 100644 index 0000000..252df04 --- /dev/null +++ b/kalatori-ah/src/main.rs @@ -0,0 +1,5 @@ +use anyhow::Result; + +fn main() -> Result<()> { + kalatori_ah::main() +} diff --git a/kalatori-ah/src/rpc.rs b/kalatori-ah/src/rpc.rs new file mode 100644 index 0000000..5626ab7 --- /dev/null +++ b/kalatori-ah/src/rpc.rs @@ -0,0 +1,1279 @@ +use crate::{ + database::{Database, Invoice, InvoiceStatus, ReadInvoices}, + shutdown, Account, Balance, BlockNumber, Decimals, Hash, Nonce, OnlineClient, RuntimeConfig, + Usd, EXPECTED_USDX_FEE, SCANNER_TO_LISTENER_SWITCH_POINT, +}; +use anyhow::{Context, Result}; +use reconnecting_jsonrpsee_ws_client::ClientBuilder; +use serde::{Deserialize, Deserializer}; +use std::{ + collections::{hash_map::Entry, HashMap}, + error::Error, + fmt::{self, Arguments, Display, Formatter, Write}, + sync::Arc, +}; +use subxt::{ + backend::{ + legacy::{LegacyBackend, LegacyRpcMethods}, + rpc::{RpcClient, RpcSubscription}, + Backend, BackendExt, RuntimeVersion, + }, + blocks::{Block, BlocksClient}, + config::{DefaultExtrinsicParamsBuilder, Header}, + constants::ConstantsClient, + dynamic::{self, Value}, + error::RpcError, + ext::{ + futures::TryFutureExt, + scale_decode::DecodeAsType, + scale_value::{self, At}, + sp_core::{ + crypto::{AccountId32, Ss58AddressFormat}, + sr25519::Pair, + }, + }, + storage::{Storage, StorageClient}, + tx::{PairSigner, SubmittableExtrinsic, TxClient}, + Config, Metadata, +}; +use tokio::sync::{mpsc::UnboundedSender, RwLock}; +use tokio_util::{sync::CancellationToken, task::TaskTracker}; + +pub const MODULE: &str = module_path!(); + +const MAX_BLOCK_NUMBER_ERROR: &str = "block number type overflow is occurred"; +const BLOCK_NONCE_ERROR: &str = "failed to fetch an account nonce by the scanner client"; + +// Pallets + +const SYSTEM: &str = "System"; +const UTILITY: &str = "Utility"; +const ASSETS: &str = "Assets"; + +async fn fetch_best_block(methods: &LegacyRpcMethods) -> Result { + methods + .chain_get_block_hash(None) + .await + .context("failed to get the best block hash")? + .context("received nothing after requesting the best block hash") +} + +async fn fetch_api_runtime( + methods: &LegacyRpcMethods, + backend: &impl Backend, +) -> Result<(Metadata, RuntimeVersion)> { + let best_block = fetch_best_block(methods).await?; + + Ok(( + fetch_metadata(backend, best_block) + .await + .context("failed to fetch metadata")?, + methods + .state_get_runtime_version(Some(best_block)) + .await + .map(|runtime_version| RuntimeVersion { + spec_version: runtime_version.spec_version, + transaction_version: runtime_version.transaction_version, + }) + .context("failed to fetch the runtime version")?, + )) +} + +async fn fetch_min_balance( + storage_finalized: Storage, + usd_asset: &Usd, +) -> Result { + const ASSET: &str = "Asset"; + const MIN_BALANCE: &str = "min_balance"; + + let asset_info = storage_finalized + .fetch(&dynamic::storage(ASSETS, ASSET, vec![usd_asset.id()])) + .await + .context("failed to fetch asset info from the chain")? + .context("received nothing after fetching asset info from the chain")? + .to_value() + .context("failed to decode account info")?; + let encoded_min_balance = asset_info + .at(MIN_BALANCE) + .with_context(|| format!("{MIN_BALANCE} field wasn't found in asset info"))?; + + encoded_min_balance.as_u128().with_context(|| { + format!("expected `u128` as the type of the min balance, got {encoded_min_balance}") + }) +} + +async fn fetch_metadata(backend: &impl Backend, at: Hash) -> Result { + const LATEST_SUPPORTED_METADATA_VERSION: u32 = 15; + + backend + .metadata_at_version(LATEST_SUPPORTED_METADATA_VERSION, at) + .or_else(|error| async { + if let subxt::Error::Rpc(RpcError::ClientError(_)) | subxt::Error::Other(_) = error { + backend.legacy_metadata(at).await + } else { + Err(error) + } + }) + .await + .map_err(Into::into) +} + +async fn fetch_decimals( + storage: Storage, + usd_asset: &Usd, +) -> Result { + const METADATA: &str = "Metadata"; + const DECIMALS: &str = "decimals"; + + let asset_metadata = storage + .fetch(&dynamic::storage(ASSETS, METADATA, vec![usd_asset.id()])) + .await + .context("failed to fetch asset info from the chain")? + .context("received nothing after fetching asset info from the chain")? + .to_value() + .context("failed to decode account info")?; + let encoded_decimals = asset_metadata + .at(DECIMALS) + .with_context(|| format!("{DECIMALS} field wasn't found in asset info"))?; + + encoded_decimals + .as_u128() + .map(|num| num.try_into().expect("must be less than u64")) + .with_context(|| { + format!("expected `u128` as the type of the min balance, got {encoded_decimals}") + }) +} + +fn fetch_constant( + constants: &ConstantsClient, + constant: (&str, &str), +) -> Result { + constants + .at(&dynamic::constant(constant.0, constant.1)) + .with_context(|| format!("failed to get the constant {constant:?}"))? + .as_type() + .with_context(|| format!("failed to decode the constant {constant:?}")) +} + +pub struct ChainProperties { + pub address_format: Ss58AddressFormat, + pub existential_deposit: Balance, + pub block_hash_count: BlockNumber, + pub decimals: Decimals, + pub usd_asset: Usd, +} + +impl ChainProperties { + fn fetch_only_constants( + existential_deposit: Balance, + decimals: Decimals, + constants: &ConstantsClient, + usd_asset: Usd, + ) -> Result { + const ADDRESS_PREFIX: (&str, &str) = (SYSTEM, "SS58Prefix"); + const BLOCK_HASH_COUNT: (&str, &str) = (SYSTEM, "BlockHashCount"); + + Ok(Self { + address_format: Ss58AddressFormat::custom(fetch_constant(constants, ADDRESS_PREFIX)?), + existential_deposit, + decimals, + block_hash_count: fetch_constant(constants, BLOCK_HASH_COUNT)?, + usd_asset, + }) + } +} + +pub struct ApiConfig { + api: Arc, + methods: Arc>, + backend: Arc>, +} + +pub struct EndpointProperties { + pub url: CheckedUrl, + pub chain: Arc>, +} + +pub struct CheckedUrl(String); + +impl CheckedUrl { + pub fn get(self) -> String { + self.0 + } +} + +pub async fn prepare( + url: String, + shutdown_notification: CancellationToken, + usd_asset: Usd, +) -> Result<(ApiConfig, EndpointProperties, Updater)> { + // TODO: + // The current reconnecting client implementation automatically restores all subscriptions, + // including unrecoverable ones, losing all notifications! For now, it shouldn't affect the + // daemon, but may in the future, so we should consider creating our own implementation. + let rpc = RpcClient::new( + ClientBuilder::new() + .build(url.clone()) + .await + .context("failed to construct the RPC client")?, + ); + + log::info!("Connected to an RPC server at \"{url}\"."); + + let methods = Arc::new(LegacyRpcMethods::new(rpc.clone())); + let backend = Arc::new(LegacyBackend::new(rpc)); + + let (metadata, runtime_version) = fetch_api_runtime(&methods, &*backend) + .await + .context("failed to fetch the runtime of the API client")?; + let genesis_hash = methods + .genesis_hash() + .await + .context("failed to get the genesis hash")?; + let api = Arc::new( + OnlineClient::from_backend_with(genesis_hash, runtime_version, metadata, backend.clone()) + .context("failed to construct the API client")?, + ); + let constants = api.constants(); + + let min_balance = fetch_min_balance( + OnlineClient::from_url(url.clone()) + .await? + .storage() + .at_latest() + .await?, + &usd_asset, + ) + .await?; + let decimals = fetch_decimals(api.storage().at_latest().await?, &usd_asset).await?; + let properties = + ChainProperties::fetch_only_constants(min_balance, decimals, &constants, usd_asset)?; + + log::info!( + "Chain properties:\n\ + Address format: \"{}\" ({}).\n\ + Decimal places number: {}.\n\ + Existential deposit: {}.\n\ + USD asset: {} ({}).\n\ + Block hash count: {}.", + properties.address_format, + properties.address_format.prefix(), + decimals, + properties.existential_deposit, + match properties.usd_asset { + Usd::C => "USDC", + Usd::T => "USDT", + }, + properties.usd_asset.id(), + properties.block_hash_count + ); + + let arc_properties = Arc::new(RwLock::const_new(properties)); + + Ok(( + ApiConfig { + api: api.clone(), + methods: methods.clone(), + backend: backend.clone(), + }, + EndpointProperties { + url: CheckedUrl(url), + chain: arc_properties.clone(), + }, + Updater { + methods, + backend, + api, + constants, + shutdown_notification, + properties: arc_properties, + }, + )) +} + +pub struct Updater { + methods: Arc>, + backend: Arc>, + api: Arc, + constants: ConstantsClient, + shutdown_notification: CancellationToken, + properties: Arc>, +} + +impl Updater { + pub async fn ignite(self) -> Result<&'static str> { + loop { + let mut updates = self + .backend + .stream_runtime_version() + .await + .context("failed to get the runtime updates stream")?; + + if let Some(current_runtime_version_result) = updates.next().await { + let current_runtime_version = current_runtime_version_result + .context("failed to decode the current runtime version")?; + + // The updates stream is always returns the current runtime version in the first + // item. We don't skip it though because during a connection loss the runtime can be + // updated, hence this condition will catch this. + if self.api.runtime_version() != current_runtime_version { + self.process_update() + .await + .context("failed to process the first API client update")?; + } + + loop { + tokio::select! { + biased; + () = self.shutdown_notification.cancelled() => { + return Ok("The API client updater is shut down."); + } + runtime_version = updates.next() => { + if runtime_version.is_some() { + self.process_update() + .await + .context( + "failed to process an update for the API client" + )?; + } else { + break; + } + } + } + } + } + + log::warn!( + "Lost the connection while listening the endpoint for API client runtime updates. Retrying..." + ); + } + } + + async fn process_update(&self) -> Result<()> { + // We don't use the runtime version from the updates stream because it doesn't provide the + // best block hash, so we fetch it ourselves (in `fetch_api_runtime`) and use it to make sure + // that metadata & the runtime version are from the same block. + let (metadata, runtime_version) = fetch_api_runtime(&self.methods, &*self.backend) + .await + .context("failed to fetch a new runtime for the API client")?; + + self.api.set_metadata(metadata); + self.api.set_runtime_version(runtime_version); + + let mut current_properties = self.properties.write().await; + let new_properties = ChainProperties::fetch_only_constants( + current_properties.existential_deposit, + current_properties.decimals, + &self.constants, + current_properties.usd_asset, + )?; + + let mut changed = String::new(); + let mut add_change = |message: Arguments<'_>| { + changed.write_fmt(message).unwrap(); + }; + + if new_properties.address_format != current_properties.address_format { + add_change(format_args!( + "\nOld {value}: \"{}\" ({}). New {value}: \"{}\" ({}).", + current_properties.address_format, + current_properties.address_format.prefix(), + new_properties.address_format, + new_properties.address_format.prefix(), + value = "address format", + )); + } + + if new_properties.existential_deposit != current_properties.existential_deposit { + add_change(format_args!( + "\nOld {value}: {}. New {value}: {}.", + current_properties.existential_deposit, + new_properties.existential_deposit, + value = "existential deposit" + )); + } + + if new_properties.decimals != current_properties.decimals { + add_change(format_args!( + "\nOld {value}: {}. New {value}: {}.", + current_properties.decimals, + new_properties.decimals, + value = "decimal places number" + )); + } + + if new_properties.block_hash_count != current_properties.block_hash_count { + add_change(format_args!( + "\nOld {value}: {}. New {value}: {}.", + current_properties.block_hash_count, + new_properties.block_hash_count, + value = "block hash count" + )); + } + + if !changed.is_empty() { + *current_properties = new_properties; + + log::warn!("The chain properties has been changed:{changed}"); + } + + log::info!("A runtime update has been found and applied for the API client."); + + Ok(()) + } +} + +#[derive(Debug)] +struct Shutdown; + +impl Error for Shutdown {} + +// Not used, but required for the `anyhow::Context` trait. +impl Display for Shutdown { + fn fmt(&self, _: &mut Formatter<'_>) -> fmt::Result { + unimplemented!() + } +} + +struct Api { + tx: TxClient, + blocks: BlocksClient, +} + +struct Scanner { + client: OnlineClient, + blocks: BlocksClient, + storage: StorageClient, +} + +struct ProcessorFinalized { + database: Arc, + client: OnlineClient, + backend: Arc>, + methods: Arc>, + shutdown_notification: CancellationToken, +} + +impl ProcessorFinalized { + async fn finalized_head_number_and_hash(&self) -> Result<(BlockNumber, Hash)> { + let head_hash = self + .methods + .chain_get_finalized_head() + .await + .context("failed to get the finalized head hash")?; + let head = self + .methods + .chain_get_block(Some(head_hash)) + .await + .context("failed to get the finalized head")? + .context("received nothing after requesting the finalized head")?; + + Ok((head.block.header.number, head_hash)) + } + + pub async fn ignite(self) -> Result<&'static str> { + self.execute().await.or_else(|error| { + error + .downcast() + .map(|Shutdown| "The RPC module is shut down.") + }) + } + + async fn execute(mut self) -> Result<&'static str> { + let write_tx = self.database.write()?; + let mut write_invoices = write_tx.invoices()?; + let (mut finalized_number, finalized_hash) = self.finalized_head_number_and_hash().await?; + + self.set_client_metadata(finalized_hash).await?; + + // TODO: + // Design a new DB format to store unpaid accounts in a separate table. + + for invoice_result in self.database.read()?.invoices()?.try_iter()? { + let invoice = invoice_result?; + + match invoice.1.value().status { + InvoiceStatus::Unpaid(price) => { + if self + .balance(finalized_hash, &Account::from(*invoice.0.value())) + .await? + >= price + { + let mut changed_invoice = invoice.1.value(); + + changed_invoice.status = InvoiceStatus::Paid(price); + + log::debug!("background scan {changed_invoice:?}"); + + write_invoices + .save(&Account::from(*invoice.0.value()), &changed_invoice)?; + } + } + InvoiceStatus::Paid(_) => continue, + } + } + + drop(write_invoices); + + write_tx.commit()?; + + let mut subscription = self.finalized_heads().await?; + + loop { + self.process_finalized_heads(subscription, &mut finalized_number) + .await?; + + log::warn!("Lost the connection while processing finalized heads. Retrying..."); + + subscription = self + .finalized_heads() + .await + .context("failed to update the subscription while processing finalized heads")?; + } + } + + async fn process_skipped( + &self, + next_unscanned: &mut BlockNumber, + head: BlockNumber, + ) -> Result<()> { + for skipped_number in *next_unscanned..head { + if self.shutdown_notification.is_cancelled() { + return Err(Shutdown.into()); + } + + let skipped_hash = self + .methods + .chain_get_block_hash(Some(skipped_number.into())) + .await + .context("failed to get the hash of a skipped block")? + .context("received nothing after requesting the hash of a skipped block")?; + + self.process_block(skipped_number, skipped_hash).await?; + } + + *next_unscanned = head; + + Ok(()) + } + + async fn process_finalized_heads( + &mut self, + mut subscription: RpcSubscription<::Header>, + next_unscanned: &mut BlockNumber, + ) -> Result<()> { + loop { + tokio::select! { + biased; + () = self.shutdown_notification.cancelled() => { + return Err(Shutdown.into()); + } + head_result_option = subscription.next() => { + if let Some(head_result) = head_result_option { + let head = head_result.context( + "received an error from the RPC client while processing finalized heads" + )?; + + self + .process_skipped(next_unscanned, head.number) + .await + .context("failed to process a skipped gap in the listening mode")?; + self.process_block(head.number, head.hash()).await?; + + *next_unscanned = head.number + .checked_add(1) + .context(MAX_BLOCK_NUMBER_ERROR)?; + } else { + break; + } + } + } + } + + Ok(()) + } + + async fn finalized_heads(&self) -> Result::Header>> { + self.methods + .chain_subscribe_finalized_heads() + .await + .context("failed to subscribe to finalized heads") + } + + async fn process_block(&self, number: BlockNumber, hash: Hash) -> Result<()> { + log::debug!("background block {number}"); + + let block = self + .client + .blocks() + .at(hash) + .await + .context("failed to obtain a block for processing")?; + let events = block + .events() + .await + .context("failed to obtain block events")?; + + let read_tx = self.database.read()?; + let read_invoices = read_tx.invoices()?; + + let mut update = false; + let mut invoices_changes = HashMap::new(); + + for event_result in events.iter() { + const UPDATE: &str = "CodeUpdated"; + const TRANSFERRED: &str = "Transferred"; + const ASSET_MIN_BALANCE_CHANGED: &str = "AssetMinBalanceChanged"; + const METADATA_SET: &str = "MetadataSet"; + + let event = event_result.context("failed to decode an event")?; + let metadata = event.event_metadata(); + + match (metadata.pallet.name(), &*metadata.variant.name) { + (SYSTEM, UPDATE) => update = true, + (ASSETS, TRANSFERRED) => Transferred::deserialize( + event + .field_values() + .context("failed to decode event's fields")?, + ) + .context("failed to deserialize a transfer event")? + .process( + &mut invoices_changes, + &read_invoices, + self.database.properties().await.usd_asset, + )?, + (ASSETS, ASSET_MIN_BALANCE_CHANGED) => { + let mut props = self.database.properties_write().await; + let new_min_balance = fetch_min_balance( + self.client.storage().at_latest().await?, + &props.usd_asset, + ) + .await?; + + props.existential_deposit = new_min_balance; + } + (ASSETS, METADATA_SET) => { + let props = self.database.properties_write().await; + let new_decimals = + fetch_decimals(self.client.storage().at_latest().await?, &props.usd_asset) + .await?; + + if props.decimals != new_decimals { + anyhow::bail!("decimals have been changed: {new_decimals}"); + } + } + _ => {} + } + } + + let write_tx = self.database.write()?; + let mut write_invoices = write_tx.invoices()?; + + for (invoice, mut changes) in invoices_changes { + log::debug!("final loop acc : {invoice}; changes: {changes:?}"); + if let InvoiceStatus::Unpaid(price) = changes.invoice.status { + let balance = self.balance(hash, &invoice).await?; + + log::debug!("unpaid acc balance: {balance}; price: {price}"); + + if balance >= price { + changes.invoice.status = InvoiceStatus::Paid(price); + + write_invoices.save(&invoice, &changes.invoice)?; + } + } + } + + drop(write_invoices); + + write_tx.commit()?; + + if update { + self.set_client_metadata(hash) + .await + .context("failed to update metadata in the finalized client")?; + + log::info!("A metadata update has been found and applied for the finalized client."); + } + + Ok(()) + } + + async fn set_client_metadata(&self, at: Hash) -> Result<()> { + let metadata = fetch_metadata(&*self.backend, at) + .await + .context("failed to fetch metadata for the scanner client")?; + + self.client.set_metadata(metadata); + + Ok(()) + } + + async fn balance(&self, hash: Hash, account: &Account) -> Result { + const ACCOUNT: &str = "Account"; + const BALANCE: &str = "balance"; + + if let Some(account_info) = self + .client + .storage() + .at(hash) + .fetch(&dynamic::storage( + ASSETS, + ACCOUNT, + vec![ + Value::from(self.database.properties().await.usd_asset.id()), + Value::from_bytes(AsRef::<[u8; 32]>::as_ref(account)), + ], + )) + .await + .context("failed to fetch account info from the chain")? + { + let decoded_account_info = account_info + .to_value() + .context("failed to decode account info")?; + let encoded_balance = decoded_account_info + .at(BALANCE) + .with_context(|| format!("{BALANCE} field wasn't found in account info"))?; + + encoded_balance.as_u128().with_context(|| { + format!("expected `u128` as the type of a balance, got {encoded_balance}") + }) + } else { + Ok(0) + } + } +} + +pub struct Processor { + api: Api, + scanner: Scanner, + methods: Arc>, + database: Arc, + backend: Arc>, + shutdown_notification: CancellationToken, +} + +impl Processor { + pub fn new( + ApiConfig { + api, + methods, + backend, + }: ApiConfig, + database: Arc, + shutdown_notification: CancellationToken, + ) -> Result { + let scanner = OnlineClient::from_backend_with( + api.genesis_hash(), + api.runtime_version(), + api.metadata(), + backend.clone(), + ) + .context("failed to initialize the scanner client")?; + + Ok(Processor { + api: Api { + tx: api.tx(), + blocks: api.blocks(), + }, + scanner: Scanner { + blocks: scanner.blocks(), + storage: scanner.storage(), + client: scanner, + }, + methods, + database, + shutdown_notification, + backend, + }) + } + + pub async fn ignite( + self, + latest_saved_block: Option, + task_tracker: TaskTracker, + error_tx: UnboundedSender, + ) -> Result<&'static str> { + self.execute(latest_saved_block, task_tracker, error_tx) + .await + .or_else(|error| { + error + .downcast() + .map(|Shutdown| "The RPC module is shut down.") + }) + } + + async fn execute( + mut self, + latest_saved_block: Option, + task_tracker: TaskTracker, + error_tx: UnboundedSender, + ) -> Result<&'static str> { + task_tracker.spawn(shutdown( + ProcessorFinalized { + database: self.database.clone(), + client: self.scanner.client.clone(), + backend: self.backend.clone(), + methods: self.methods.clone(), + shutdown_notification: self.shutdown_notification.clone(), + } + .ignite(), + error_tx, + )); + + let (mut head_number, head_hash) = self + .finalized_head_number_and_hash() + .await + .context("failed to get the chain head")?; + + let mut next_unscanned_number; + let mut subscription; + + if let Some(latest_saved) = latest_saved_block { + let latest_saved_hash = self + .methods + .chain_get_block_hash(Some(latest_saved.into())) + .await + .context("failed to get the hash of the last saved block")? + .context("received nothing after requesting the hash of the last saved block")?; + + self.set_scanner_metadata(latest_saved_hash).await?; + + next_unscanned_number = latest_saved + .checked_add(1) + .context(MAX_BLOCK_NUMBER_ERROR)?; + + let mut unscanned_amount = head_number.saturating_sub(next_unscanned_number); + + if unscanned_amount >= SCANNER_TO_LISTENER_SWITCH_POINT { + log::info!( + "Detected {unscanned_amount} unscanned blocks! Catching up may take a while." + ); + + while unscanned_amount >= SCANNER_TO_LISTENER_SWITCH_POINT { + self.process_skipped(&mut next_unscanned_number, head_number) + .await + .context("failed to process a skipped gap in the scanning mode")?; + + (head_number, _) = self + .finalized_head_number_and_hash() + .await + .context("failed to get a new chain head")?; + unscanned_amount = head_number.saturating_sub(next_unscanned_number); + } + + log::info!( + "Scanning of skipped blocks has been completed! Switching to the listening mode..." + ); + } + + subscription = self.finalized_heads().await?; + } else { + self.set_scanner_metadata(head_hash).await?; + + next_unscanned_number = head_number.checked_add(1).context(MAX_BLOCK_NUMBER_ERROR)?; + subscription = self.finalized_heads().await?; + } + + // Skip all already scanned blocks in cases like the first startup (we always skip the first + // block to fetch right metadata), an instant daemon restart, or a connection to a lagging + // endpoint. + 'skipping: loop { + loop { + tokio::select! { + biased; + () = self.shutdown_notification.cancelled() => { + return Err(Shutdown.into()); + } + header_result_option = subscription.next() => { + if let Some(header_result) = header_result_option { + let header = header_result.context( + "received an error from the RPC client while skipping saved finalized heads" + )?; + + if header.number >= next_unscanned_number { + break 'skipping; + } + } else { + break; + } + } + } + } + + log::warn!("Lost the connection while skipping already scanned blocks. Retrying..."); + + subscription = self + .finalized_heads() + .await + .context("failed to update the subscription while skipping scanned blocks")?; + } + + loop { + self.process_finalized_heads(subscription, &mut next_unscanned_number) + .await?; + + log::warn!("Lost the connection while processing finalized heads. Retrying..."); + + subscription = self + .finalized_heads() + .await + .context("failed to update the subscription while processing finalized heads")?; + } + } + + async fn finalized_head_number_and_hash(&self) -> Result<(BlockNumber, Hash)> { + let head_hash = self + .methods + .chain_get_finalized_head() + .await + .context("failed to get the finalized head hash")?; + let head = self + .methods + .chain_get_block(Some(head_hash)) + .await + .context("failed to get the finalized head")? + .context("received nothing after requesting the finalized head")?; + + Ok((head.block.header.number, head_hash)) + } + + async fn set_scanner_metadata(&self, at: Hash) -> Result<()> { + let metadata = fetch_metadata(&*self.backend, at) + .await + .context("failed to fetch metadata for the scanner client")?; + + self.scanner.client.set_metadata(metadata); + + Ok(()) + } + + async fn finalized_heads(&self) -> Result::Header>> { + self.methods + .chain_subscribe_finalized_heads() + .await + .context("failed to subscribe to finalized heads") + } + + async fn process_skipped( + &self, + next_unscanned: &mut BlockNumber, + head: BlockNumber, + ) -> Result<()> { + for skipped_number in *next_unscanned..head { + if self.shutdown_notification.is_cancelled() { + return Err(Shutdown.into()); + } + + let skipped_hash = self + .methods + .chain_get_block_hash(Some(skipped_number.into())) + .await + .context("failed to get the hash of a skipped block")? + .context("received nothing after requesting the hash of a skipped block")?; + + self.process_block(skipped_number, skipped_hash).await?; + } + + *next_unscanned = head; + + Ok(()) + } + + async fn process_finalized_heads( + &mut self, + mut subscription: RpcSubscription<::Header>, + next_unscanned: &mut BlockNumber, + ) -> Result<()> { + loop { + tokio::select! { + biased; + () = self.shutdown_notification.cancelled() => { + return Err(Shutdown.into()); + } + head_result_option = subscription.next() => { + if let Some(head_result) = head_result_option { + let head = head_result.context( + "received an error from the RPC client while processing finalized heads" + )?; + + self + .process_skipped(next_unscanned, head.number) + .await + .context("failed to process a skipped gap in the listening mode")?; + self.process_block(head.number, head.hash()).await?; + + *next_unscanned = head.number + .checked_add(1) + .context(MAX_BLOCK_NUMBER_ERROR)?; + } else { + break; + } + } + } + } + + Ok(()) + } + + async fn process_block(&self, number: BlockNumber, hash: Hash) -> Result<()> { + log::info!("Processing the block: {number}."); + + let block = self + .scanner + .blocks + .at(hash) + .await + .context("failed to obtain a block for processing")?; + let events = block + .events() + .await + .context("failed to obtain block events")?; + + let read_tx = self.database.read()?; + let read_invoices = read_tx.invoices()?; + + let mut update = false; + let mut invoices_changes = HashMap::new(); + + for event_result in events.iter() { + const UPDATE: &str = "CodeUpdated"; + const TRANSFERRED: &str = "Transferred"; + let event = event_result.context("failed to decode an event")?; + let metadata = event.event_metadata(); + + match (metadata.pallet.name(), &*metadata.variant.name) { + (SYSTEM, UPDATE) => update = true, + (ASSETS, TRANSFERRED) => Transferred::deserialize( + event + .field_values() + .context("failed to decode event's fields")?, + ) + .context("failed to deserialize a transfer event")? + .process( + &mut invoices_changes, + &read_invoices, + self.database.properties().await.usd_asset, + )?, + _ => {} + } + } + + for (invoice, changes) in invoices_changes { + let price = match changes.invoice.status { + InvoiceStatus::Unpaid(price) | InvoiceStatus::Paid(price) => price, + }; + + self.process_unpaid(&block, changes, hash, invoice, price) + .await + .context("failed to process an unpaid invoice")?; + } + + if update { + self.set_scanner_metadata(hash) + .await + .context("failed to update metadata in the scanner client")?; + + log::info!("A metadata update has been found and applied for the scanner client."); + } + + let write_tx = self.database.write()?; + + write_tx.root()?.save_last_block(number)?; + write_tx.commit()?; + + Ok(()) + } + + async fn balance(&self, hash: Hash, account: &Account) -> Result { + const ACCOUNT: &str = "Account"; + const BALANCE: &str = "balance"; + + if let Some(account_info) = self + .scanner + .storage + .at(hash) + .fetch(&dynamic::storage( + ASSETS, + ACCOUNT, + vec![ + Value::from(self.database.properties().await.usd_asset.id()), + Value::from_bytes(AsRef::<[u8; 32]>::as_ref(account)), + ], + )) + .await + .context("failed to fetch account info from the chain")? + { + let decoded_account_info = account_info + .to_value() + .context("failed to decode account info")?; + let encoded_balance = decoded_account_info + .at(BALANCE) + .with_context(|| format!("{BALANCE} field wasn't found in account info"))?; + + encoded_balance.as_u128().with_context(|| { + format!("expected `u128` as the type of a balance, got {encoded_balance}") + }) + } else { + Ok(0) + } + } + + async fn batch_transfer( + &self, + nonce: Nonce, + block_hash_count: BlockNumber, + signer: &PairSigner, + transfers: Vec, + ) -> Result> { + const FORCE_BATCH: &str = "force_batch"; + + let call = dynamic::tx(UTILITY, FORCE_BATCH, vec![Value::from(transfers)]); + let (number, hash) = self + .finalized_head_number_and_hash() + .await + .context("failed to get the chain head while constructing a transaction")?; + let extensions = DefaultExtrinsicParamsBuilder::new() + .mortal_unchecked(number.into(), hash, block_hash_count.into()) + .tip_of(0, self.database.properties().await.usd_asset.id()); + + self.api + .tx + .create_signed_with_nonce(&call, signer, nonce, extensions.build()) + .context("failed to create a transfer transaction") + } + + async fn current_nonce(&self, account: &Account) -> Result { + self.api + .blocks + .at(fetch_best_block(&self.methods).await?) + .await + .context("failed to obtain the best block for fetching an account nonce")? + .account_nonce(account) + .await + .context("failed to fetch an account nonce by the API client") + } + + async fn process_unpaid( + &self, + block: &Block, + mut changes: InvoiceChanges, + hash: Hash, + invoice: Account, + price: Balance, + ) -> Result<()> { + let balance = self.balance(hash, &invoice).await?; + + if let Some(_remaining) = balance.checked_sub(price) { + changes.invoice.status = InvoiceStatus::Paid(price); + + let block_nonce = block + .account_nonce(&invoice) + .await + .context(BLOCK_NONCE_ERROR)?; + let current_nonce = self.current_nonce(&invoice).await?; + + if current_nonce <= block_nonce { + let properties = self.database.properties().await; + let block_hash_count = properties.block_hash_count; + let signer = changes.invoice.signer(self.database.pair())?; + + let transfers = vec![construct_transfer( + &changes.invoice.recipient, + price - EXPECTED_USDX_FEE, + self.database.properties().await.usd_asset, + )]; + let tx = self + .batch_transfer(current_nonce, block_hash_count, &signer, transfers.clone()) + .await?; + + self.methods + .author_submit_extrinsic(tx.encoded()) + .await + .context("failed to submit an extrinsic") + .unwrap(); + } + } + + Ok(()) + } +} + +fn construct_transfer(to: &Account, amount: Balance, usd_asset: Usd) -> Value { + const TRANSFER_KEEP_ALIVE: &str = "transfer"; + + dbg!(amount); + + dynamic::tx( + ASSETS, + TRANSFER_KEEP_ALIVE, + vec![ + usd_asset.id().into(), + scale_value::value!(Id(Value::from_bytes(to))), + amount.into(), + ], + ) + .into_value() +} + +#[derive(Debug)] +struct InvoiceChanges { + invoice: Invoice, + incoming: HashMap, +} + +#[derive(Deserialize, Debug)] +struct Transferred { + asset_id: u32, + // The implementation of `Deserialize` for `AccountId32` works only with strings. + #[serde(deserialize_with = "account_deserializer")] + from: AccountId32, + #[serde(deserialize_with = "account_deserializer")] + to: AccountId32, + amount: Balance, +} + +fn account_deserializer<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de>, +{ + <([u8; 32],)>::deserialize(deserializer).map(|address| AccountId32::new(address.0)) +} + +impl Transferred { + fn process( + self, + invoices_changes: &mut HashMap, + invoices: &ReadInvoices<'_>, + usd_asset: Usd, + ) -> Result<()> { + log::debug!("Transferred event: {self:?}"); + + if self.from == self.to || self.amount == 0 || self.asset_id != usd_asset.id() { + return Ok(()); + } + + match invoices_changes.entry(self.to) { + Entry::Occupied(entry) => { + entry + .into_mut() + .incoming + .entry(self.from) + .and_modify(|amount| *amount = amount.saturating_add(self.amount)) + .or_insert(self.amount); + } + Entry::Vacant(entry) => { + if let (None, Some(encoded_invoice)) = + (invoices.get(&self.from)?, invoices.get(entry.key())?) + { + entry.insert(InvoiceChanges { + invoice: encoded_invoice.value(), + incoming: [(self.from, self.amount)].into(), + }); + } + } + } + + Ok(()) + } +} diff --git a/kalatori-ah/src/server.rs b/kalatori-ah/src/server.rs new file mode 100644 index 0000000..bc0d134 --- /dev/null +++ b/kalatori-ah/src/server.rs @@ -0,0 +1,208 @@ +use crate::{ + database::{Database, Invoice, InvoiceStatus}, + Account, +}; +use anyhow::{Context, Result}; +use axum::{ + extract::{Path, State}, + routing::get, + Json, Router, +}; +use serde::Serialize; +use std::{future::Future, net::SocketAddr, sync::Arc}; +use subxt::ext::sp_core::{hexdisplay::HexDisplay, DeriveJunction, Pair}; +use tokio::net::TcpListener; +use tokio_util::sync::CancellationToken; + +pub(crate) const MODULE: &str = module_path!(); + +#[derive(Serialize)] +#[serde(untagged)] +pub enum Response { + Error(Error), + Success(Success), +} + +#[derive(Serialize)] +pub struct Error { + error: String, + wss: String, + mul: u64, + version: String, +} + +#[derive(Serialize)] +pub struct Success { + pay_account: String, + price: f64, + recipient: String, + order: String, + wss: String, + mul: u64, + result: String, + version: String, +} + +pub(crate) async fn new( + shutdown_notification: CancellationToken, + host: SocketAddr, + database: Arc, +) -> Result>> { + let app = Router::new() + .route( + "/recipient/:recipient/order/:order/price/:price", + get(handler_recip), + ) + .route("/order/:order/price/:price", get(handler)) + .with_state(database); + + let listener = TcpListener::bind(host) + .await + .context("failed to bind the TCP listener")?; + + log::info!("The server is listening on {host:?}."); + + Ok(async move { + axum::serve(listener, app) + .with_graceful_shutdown(shutdown_notification.cancelled_owned()) + .await + .context("failed to fire up the server")?; + + Ok("The server module is shut down.") + }) +} + +async fn handler_recip( + State(database): State>, + Path((recipient, order, price)): Path<(String, String, f64)>, +) -> Json { + let wss = database.rpc().to_string(); + let mul = database.properties().await.decimals; + + match abcd(database, Some(recipient), order, price).await { + Ok(re) => Response::Success(re), + Err(error) => Response::Error(Error { + wss, + mul, + version: env!("CARGO_PKG_VERSION").into(), + error: error.to_string(), + }), + } + .into() +} + +async fn handler( + State(database): State>, + Path((order, price)): Path<(String, f64)>, +) -> Json { + let wss = database.rpc().to_string(); + let mul = database.properties().await.decimals; + let recipient = database + .destination() + .as_ref() + .map(|d| format!("0x{}", HexDisplay::from(AsRef::<[u8; 32]>::as_ref(&d)))); + + match abcd(database, recipient, order, price).await { + Ok(re) => Response::Success(re), + Err(error) => Response::Error(Error { + wss, + mul, + version: env!("CARGO_PKG_VERSION").into(), + error: error.to_string(), + }), + } + .into() +} + +async fn abcd( + database: Arc, + recipient_option: Option, + order: String, + price_f64: f64, +) -> Result { + let recipient = recipient_option.context("destionation address isn't set")?; + let decoded_recip = hex::decode(&recipient[2..])?; + let recipient_account = Account::try_from(decoded_recip.as_ref()) + .map_err(|()| anyhow::anyhow!("Unknown address length"))?; + let properties = database.properties().await; + #[allow(clippy::cast_precision_loss)] + let mul = 10u128.pow(properties.decimals.try_into()?) as f64; + #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)] + let price = (price_f64 * mul).round() as u128; + let order_encoded = DeriveJunction::hard(&order).unwrap_inner(); + let invoice_account: Account = database + .pair() + .derive( + [ + DeriveJunction::Hard(<[u8; 32]>::from(recipient_account.clone())), + DeriveJunction::Hard(order_encoded), + ] + .into_iter(), + None, + )? + .0 + .public() + .into(); + + if let Some(encoded_invoice) = database.read()?.invoices()?.get(&invoice_account)? { + let invoice = encoded_invoice.value(); + + if let InvoiceStatus::Unpaid(saved_price) = invoice.status { + if saved_price != price { + anyhow::bail!("The invoice was created with different price ({price})."); + } + } + + Ok(Success { + pay_account: format!("0x{}", HexDisplay::from(&invoice_account.as_ref())), + price: match invoice.status { + InvoiceStatus::Unpaid(invoice_price) | InvoiceStatus::Paid(invoice_price) => { + convert(properties.decimals, invoice_price)? + } + }, + wss: database.rpc().to_string(), + mul: properties.decimals, + recipient, + order, + result: match invoice.status { + InvoiceStatus::Unpaid(_) => "waiting", + InvoiceStatus::Paid(_) => "paid", + } + .into(), + version: env!("CARGO_PKG_VERSION").into(), + }) + } else { + let tx = database.write()?; + + tx.invoices()?.save( + &invoice_account, + &Invoice { + recipient: recipient_account, + order: order_encoded, + status: InvoiceStatus::Unpaid(price), + }, + )?; + + tx.commit()?; + + Ok(Success { + pay_account: format!("0x{}", HexDisplay::from(&invoice_account.as_ref())), + price: price_f64, + wss: database.rpc().to_string(), + mul: properties.decimals, + recipient, + order, + version: env!("CARGO_PKG_VERSION").into(), + result: "waiting".into(), + }) + } +} + +fn convert(dec: u64, num: u128) -> Result { + #[allow(clippy::cast_precision_loss)] + let numfl = num as f64; + #[allow(clippy::cast_precision_loss)] + let mul = 10u128.pow(dec.try_into()?) as f64; + + Ok(numfl / mul) +} diff --git a/shoot.sh b/shoot.sh index 539426d..976dcae 100755 --- a/shoot.sh +++ b/shoot.sh @@ -1 +1 @@ -curl 127.0.0.1:16726/order/1337/price/1 +curl 127.0.0.1:16726/order/1337/price/5 diff --git a/src/database.rs b/src/database.rs index 0e117fa..b06187b 100644 --- a/src/database.rs +++ b/src/database.rs @@ -107,7 +107,7 @@ struct DaemonInfo { } pub struct Database { - database: redb::Database, + db: redb::Database, properties: Arc>, pair: Pair, rpc: String, @@ -169,8 +169,8 @@ impl Database { } (Some(encoded_db_version), Some(daemon_info), last_block_option) => { let Compact::(db_version) = - decode_slot(encoded_db_version, DB_VERSION_KEY)?; - let DaemonInfo { rpc: db_rpc, key } = decode_slot(daemon_info, DAEMON_INFO)?; + decode_slot(&encoded_db_version, DB_VERSION_KEY)?; + let DaemonInfo { rpc: db_rpc, key } = decode_slot(&daemon_info, DAEMON_INFO)?; if db_version != DATABASE_VERSION { anyhow::bail!( @@ -203,7 +203,7 @@ impl Database { } if let Some(encoded_last_block) = last_block_option { - Some(decode_slot::>(encoded_last_block, LAST_BLOCK)?.0) + Some(decode_slot::>(&encoded_last_block, LAST_BLOCK)?.0) } else { None } @@ -222,16 +222,16 @@ impl Database { .context("failed to compact the database")?; if compacted { - log::debug!("The database was successfully compacted.") + log::debug!("The database was successfully compacted."); } else { - log::debug!("The database doesn't need the compaction.") + log::debug!("The database doesn't need the compaction."); } log::info!("Public key from the given seed: \"{public_formatted}\"."); Ok(( Arc::new(Self { - database, + db: database, properties: chain, pair, rpc: given_rpc, @@ -250,14 +250,14 @@ impl Database { } pub fn write(&self) -> Result> { - self.database + self.db .begin_write() .map(WriteTransaction) .context("failed to begin a write transaction for the database") } pub fn read(&self) -> Result> { - self.database + self.db .begin_read() .map(ReadTransaction) .context("failed to begin a read transaction for the database") @@ -292,7 +292,7 @@ impl ReadInvoices<'_> { .context("failed to get an invoice from the database") } - pub fn iter( + pub fn try_iter( &self, ) -> Result, AccessGuard<'_, Invoice>)>>> { @@ -356,12 +356,12 @@ impl Root<'_, '_> { fn get_slot(table: &Table<'_, '_, &str, Vec>, key: &str) -> Result>> { table .get(key) - .map(|slot_option| slot_option.map(|slot| slot.value().to_vec())) + .map(|slot_option| slot_option.map(|slot| slot.value().clone())) .with_context(|| format!("failed to get the {key:?} slot")) } -fn decode_slot(slot: Vec, key: &str) -> Result { - T::decode(&mut slot.as_ref()).with_context(|| format!("failed to decode the {key:?} slot")) +fn decode_slot(mut slot: &[u8], key: &str) -> Result { + T::decode(&mut slot).with_context(|| format!("failed to decode the {key:?} slot")) } fn insert_daemon_info( diff --git a/src/lib.rs b/src/lib.rs index 78334ea..2c5fc9e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,9 @@ use anyhow::{Context, Error, Result}; use database::Database; use env_logger::{Builder, Env}; -use environment_variables::*; +use environment_variables::{ + DATABASE, DECIMALS, DESTINATION, HOST, IN_MEMORY_DB, LOG, LOG_STYLE, OVERRIDE_RPC, RPC, SEED, +}; use log::LevelFilter; use rpc::Processor; use std::{ @@ -87,6 +89,7 @@ impl Config for RuntimeConfig { } #[doc(hidden)] +#[allow(clippy::too_many_lines)] #[tokio::main] pub async fn main() -> Result<()> { let mut builder = Builder::new(); diff --git a/src/rpc.rs b/src/rpc.rs index 395f413..0011868 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -61,7 +61,7 @@ async fn fetch_best_block(methods: &LegacyRpcMethods) -> Result, backend: &impl Backend, ) -> Result<(Metadata, RuntimeVersion)> { @@ -117,7 +117,7 @@ pub struct ChainProperties { } impl ChainProperties { - async fn fetch_only_constants( + fn fetch_only_constants( constants: &ConstantsClient, decimals: Decimals, ) -> Result { @@ -154,7 +154,7 @@ impl ChainProperties { "failed to decode the decimal places number, expected a positive integer, got \"{encoded_decimals}\"" ))?; - Self::fetch_only_constants(constants, decimals).await + Self::fetch_only_constants(constants, decimals) } } @@ -198,7 +198,7 @@ pub async fn prepare( let methods = Arc::new(LegacyRpcMethods::new(rpc.clone())); let backend = Arc::new(LegacyBackend::new(rpc)); - let (metadata, runtime_version) = fetch_runtime(&methods, &*backend) + let (metadata, runtime_version) = fetch_api_runtime(&methods, &*backend) .await .context("failed to fetch the runtime of the API client")?; let genesis_hash = methods @@ -213,7 +213,7 @@ pub async fn prepare( let (properties_result, decimals_set) = if let Some(decimals) = decimals_option { ( - ChainProperties::fetch_only_constants(&constants, decimals).await, + ChainProperties::fetch_only_constants(&constants, decimals), true, ) } else { @@ -319,9 +319,9 @@ impl Updater { async fn process_update(&self) -> Result<()> { // We don't use the runtime version from the updates stream because it doesn't provide the - // best block hash, so we fetch it ourselves (in `fetch_runtime`) and use it to make sure + // best block hash, so we fetch it ourselves (in `fetch_api_runtime`) and use it to make sure // that metadata & the runtime version are from the same block. - let (metadata, runtime_version) = fetch_runtime(&self.methods, &*self.backend) + let (metadata, runtime_version) = fetch_api_runtime(&self.methods, &*self.backend) .await .context("failed to fetch a new runtime for the API client")?; @@ -331,8 +331,7 @@ impl Updater { let (mut current_properties, new_properties_result) = if self.decimals_set { let current_properties = self.properties.write().await; let new_properties_result = - ChainProperties::fetch_only_constants(&self.constants, current_properties.decimals) - .await; + ChainProperties::fetch_only_constants(&self.constants, current_properties.decimals); (current_properties, new_properties_result) } else { @@ -464,7 +463,7 @@ impl ProcessorFinalized { // TODO: // Design a new DB format to store unpaid accounts in a separate table. - for invoice_result in self.database.read()?.invoices()?.iter()? { + for invoice_result in self.database.read()?.invoices()?.try_iter()? { let invoice = invoice_result?; match invoice.1.value().status { @@ -596,12 +595,12 @@ impl ProcessorFinalized { let mut invoices_changes = HashMap::new(); for event_result in events.iter() { - let event = event_result.context("failed to decode an event")?; - let metadata = event.event_metadata(); - const UPDATE: &str = "CodeUpdated"; const TRANSFER: &str = "Transfer"; + let event = event_result.context("failed to decode an event")?; + let metadata = event.event_metadata(); + match (metadata.pallet.name(), &*metadata.variant.name) { (SYSTEM, UPDATE) => update = true, (BALANCES, TRANSFER) => Transfer::deserialize( @@ -978,12 +977,12 @@ impl Processor { let mut invoices_changes = HashMap::new(); for event_result in events.iter() { - let event = event_result.context("failed to decode an event")?; - let metadata = event.event_metadata(); - const UPDATE: &str = "CodeUpdated"; const TRANSFER: &str = "Transfer"; + let event = event_result.context("failed to decode an event")?; + let metadata = event.event_metadata(); + match (metadata.pallet.name(), &*metadata.variant.name) { (SYSTEM, UPDATE) => update = true, (BALANCES, TRANSFER) => Transfer::deserialize( @@ -999,13 +998,12 @@ impl Processor { for (invoice, changes) in invoices_changes { let price = match changes.invoice.status { - InvoiceStatus::Unpaid(price) => price, - InvoiceStatus::Paid(price) => price, + InvoiceStatus::Unpaid(price) | InvoiceStatus::Paid(price) => price, }; self.process_unpaid(&block, changes, hash, invoice, price) .await - .context("failed to process an unpaid invoice")? + .context("failed to process an unpaid invoice")?; } if update { @@ -1121,83 +1119,15 @@ impl Processor { let tx = self .batch_transfer(current_nonce, block_hash_count, &signer, transfers.clone()) .await?; - //let mut fee = calculate_estimate_fee(&tx).await?; - /* - if false //let Some(a) = (fee + properties.existential_deposit + price).checked_sub(balance) - { - let price_mod = price - a; - - transfers = vec![construct_transfer(&changes.invoice.recipient, price_mod)]; - tx = self - .batch_transfer(current_nonce, block_hash_count, &signer, transfers.clone()) - .await?; - - self.methods - .author_submit_extrinsic(tx.encoded()) - .await - .context("failed to submit an extrinsic")?; - } else if let Some((account, amount)) = changes.incoming.into_iter().next() { - let mut temp_transfers = transfers.clone(); - - temp_transfers.push(construct_transfer(&account, amount)); - tx = self - .batch_transfer( - current_nonce, - block_hash_count, - &signer, - temp_transfers.clone(), - ) - .await?; - fee = calculate_estimate_fee(&tx).await?; - - if let Some(a) = - (fee + properties.existential_deposit + amount).checked_sub(remaining) - { - let amount_mod = amount - a; - - transfers.push(construct_transfer(&account, amount_mod)); - tx = self - .batch_transfer( - current_nonce, - block_hash_count, - &signer, - transfers.clone(), - ) - .await?; - - self.methods - .author_submit_extrinsic(tx.encoded()) - .await - .context("failed to submit an extrinsic")?; - } else { - self.methods - .author_submit_extrinsic(tx.encoded()) - .await - .context("failed to submit an extrinsic")?; - } - } else { - */ self.methods .author_submit_extrinsic(tx.encoded()) .await .context("failed to submit an extrinsic")?; - - //} } } Ok(()) } - - async fn process_paid( - &self, - _invoice: Account, - _block: &Block, - _changes: InvoiceChanges, - _hash: Hash, - ) -> Result<()> { - Ok(()) - } } fn construct_transfer(to: &Account, _amount: Balance) -> Value { @@ -1211,18 +1141,6 @@ fn construct_transfer(to: &Account, _amount: Balance) -> Value { .into_value() } -/* -async fn calculate_estimate_fee( - extrinsic: &SubmittableExtrinsic, -) -> Result { - extrinsic - .partial_fee_estimate() - .await - .map(|f| f * 2) - .context("failed to obtain a transfer's estimate fee") -} -*/ - struct InvoiceChanges { invoice: Invoice, incoming: HashMap, diff --git a/src/server.rs b/src/server.rs index 2e53fa5..057c332 100644 --- a/src/server.rs +++ b/src/server.rs @@ -116,17 +116,19 @@ async fn handler( async fn abcd( database: Arc, - rrecipient: Option, + recipient_option: Option, order: String, - pprice: f64, + price_f64: f64, ) -> Result { - let recipient = rrecipient.context("destionation address isn't set")?; + let recipient = recipient_option.context("destionation address isn't set")?; let decoded_recip = hex::decode(&recipient[2..])?; let recipient_account = Account::try_from(decoded_recip.as_ref()) .map_err(|()| anyhow::anyhow!("Unknown address length"))?; let properties = database.properties().await; + #[allow(clippy::cast_precision_loss)] let mul = 10u128.pow(properties.decimals.try_into()?) as f64; - let price = (pprice * mul).round() as u128; + #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)] + let price = (price_f64 * mul).round() as u128; let order_encoded = DeriveJunction::hard(&order).unwrap_inner(); let invoice_account: Account = database .pair() @@ -154,8 +156,10 @@ async fn abcd( Ok(Success { pay_account: format!("0x{}", HexDisplay::from(&invoice_account.as_ref())), price: match invoice.status { - InvoiceStatus::Unpaid(uprice) => convert(properties.decimals, uprice)?, - InvoiceStatus::Paid(uprice) => convert(properties.decimals, uprice)?, + InvoiceStatus::Unpaid(invoice_price) => { + convert(properties.decimals, invoice_price)? + } + InvoiceStatus::Paid(invoice_price) => convert(properties.decimals, invoice_price)?, }, wss: database.rpc().to_string(), mul: properties.decimals, @@ -184,7 +188,7 @@ async fn abcd( Ok(Success { pay_account: format!("0x{}", HexDisplay::from(&invoice_account.as_ref())), - price: pprice, + price: price_f64, wss: database.rpc().to_string(), mul: properties.decimals, recipient, @@ -196,7 +200,9 @@ async fn abcd( } fn convert(dec: u64, num: u128) -> Result { + #[allow(clippy::cast_precision_loss)] let numfl = num as f64; + #[allow(clippy::cast_precision_loss)] let mul = 10u128.pow(dec.try_into()?) as f64; Ok(numfl / mul) diff --git a/start-ah.sh b/start-ah.sh new file mode 100755 index 0000000..6f54303 --- /dev/null +++ b/start-ah.sh @@ -0,0 +1,8 @@ +KALATORI_HOST="127.0.0.1:16726" \ +KALATORI_SEED="" \ +KALATORI_DECIMALS="6" \ +KALATORI_DESTINATION="0xd43593c715fdd31c61141abd04a99fd6822c8558854ccde39a5684e7a56da27d" \ +KALATORI_LOG="subxt::events::events_type=off" \ +KALATORI_RPC="wss://polkadot-asset-hub-rpc.polkadot.io" \ +KALATORI_USD_ASSET="USDT" \ +cargo r -p kalatori-ah