diff --git a/Cargo.lock b/Cargo.lock index 3483005..36649c7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -109,54 +109,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "anstream" -version = "0.6.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d96bd03f33fe50a863e394ee9718a706f988b9079b20c3784fb726e7678b62fb" -dependencies = [ - "anstyle", - "anstyle-parse", - "anstyle-query", - "anstyle-wincon", - "colorchoice", - "utf8parse", -] - -[[package]] -name = "anstyle" -version = "1.0.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8901269c6307e8d93993578286ac0edf7f195079ffff5ebdeea6a59ffb7e36bc" - -[[package]] -name = "anstyle-parse" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c75ac65da39e5fe5ab759307499ddad880d724eed2f6ce5b5e8a26f4f387928c" -dependencies = [ - "utf8parse", -] - -[[package]] -name = "anstyle-query" -version = "1.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e28923312444cdd728e4738b3f9c9cac739500909bb3d3c94b43551b16517648" -dependencies = [ - "windows-sys 0.52.0", -] - -[[package]] -name = "anstyle-wincon" -version = "3.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1cd54b81ec8d6180e24654d0b371ad22fc3dd083b6ff8ba325b72e00c87660a7" -dependencies = [ - "anstyle", - "windows-sys 0.52.0", -] - [[package]] name = "anyhow" version = "1.0.81" @@ -327,16 +279,16 @@ checksum = "f28243a43d821d11341ab73c80bed182dc015c514b951616cf79bd4af39af0c3" dependencies = [ "concurrent-queue", "event-listener 5.2.0", - "event-listener-strategy 0.5.0", + "event-listener-strategy 0.5.1", "futures-core", "pin-project-lite", ] [[package]] name = "async-executor" -version = "1.8.0" +version = "1.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17ae5ebefcc48e7452b4987947920dac9450be1110cadf34d1b8c116bdbaf97c" +checksum = "10b3e585719c2358d2660232671ca8ca4ddb4be4ce8a1842d6c2dc8685303316" dependencies = [ "async-lock 3.3.0", "async-task", @@ -409,19 +361,21 @@ dependencies = [ [[package]] name = "async-process" -version = "2.1.0" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "451e3cf68011bd56771c79db04a9e333095ab6349f7e47592b788e9b98720cc8" +checksum = "d999d925640d51b662b7b4e404224dd81de70f4aa4a199383c2c5e5b86885fa3" dependencies = [ "async-channel", "async-io", "async-lock 3.3.0", "async-signal", + "async-task", "blocking", "cfg-if", "event-listener 5.2.0", "futures-lite", "rustix 0.38.32", + "tracing", "windows-sys 0.52.0", ] @@ -457,7 +411,7 @@ checksum = "a507401cad91ec6a857ed5513a2073c82a9b9048762b885bb98655b306964681" dependencies = [ "proc-macro2", "quote", - "syn 2.0.55", + "syn 2.0.57", ] [[package]] @@ -783,9 +737,9 @@ dependencies = [ [[package]] name = "chrono" -version = "0.4.35" +version = "0.4.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8eaf5903dcbc0a39312feb77df2ff4c76387d591b9fc7b04a238dcf8bb62639a" +checksum = "8a0d04d43504c61aa6c7531f1871dd0d418d91130162063b789da00fd7057a5e" dependencies = [ "android-tzdata", "iana-time-zone", @@ -803,12 +757,6 @@ dependencies = [ "inout", ] -[[package]] -name = "colorchoice" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" - [[package]] name = "common-path" version = "1.0.0" @@ -998,7 +946,7 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.55", + "syn 2.0.57", ] [[package]] @@ -1046,7 +994,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn 2.0.55", + "syn 2.0.57", ] [[package]] @@ -1068,7 +1016,7 @@ checksum = "a668eda54683121533a393014d8692171709ff57a7d61f187b6e782719f8933f" dependencies = [ "darling_core 0.20.8", "quote", - "syn 2.0.55", + "syn 2.0.57", ] [[package]] @@ -1081,6 +1029,15 @@ dependencies = [ "zeroize", ] +[[package]] +name = "deranged" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b42b6fa04a440b495c8b04d0e71b707c585f83cb9cb28cf8cd0d976c315e31b4" +dependencies = [ + "powerfmt", +] + [[package]] name = "derivative" version = "2.2.0" @@ -1158,7 +1115,7 @@ dependencies = [ "proc-macro2", "quote", "regex", - "syn 2.0.55", + "syn 2.0.57", "termcolor", "toml", "walkdir", @@ -1291,28 +1248,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "env_filter" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a009aa4810eb158359dda09d0c87378e4bbb89b5a801f016885a4707ba24f7ea" -dependencies = [ - "log", -] - -[[package]] -name = "env_logger" -version = "0.11.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38b35839ba51819680ba087cd351788c9a3c476841207e0b8cee0b04722343b9" -dependencies = [ - "anstream", - "anstyle", - "env_filter", - "humantime", - "log", -] - [[package]] name = "environmental" version = "1.1.4" @@ -1375,9 +1310,9 @@ dependencies = [ [[package]] name = "event-listener-strategy" -version = "0.5.0" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "feedafcaa9b749175d5ac357452a9d41ea2911da598fde46ce1fe02c37751291" +checksum = "332f51cb23d20b0de8458b86580878211da09bcd4503cb579c225b3d124cabb3" dependencies = [ "event-listener 5.2.0", "pin-project-lite", @@ -1394,7 +1329,7 @@ dependencies = [ "prettier-please", "proc-macro2", "quote", - "syn 2.0.55", + "syn 2.0.57", ] [[package]] @@ -1560,7 +1495,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.55", + "syn 2.0.57", ] [[package]] @@ -1854,12 +1789,6 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" -[[package]] -name = "humantime" -version = "2.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" - [[package]] name = "hyper" version = "0.14.28" @@ -2225,17 +2154,17 @@ version = "0.2.0" dependencies = [ "anyhow", "axum", - "env_logger", "hex-simd", - "humantime", - "log", "names", "redb", + "scale-info", "serde", "subxt", "tokio", "tokio-util", "toml_edit 0.22.9", + "tracing", + "tracing-subscriber 0.3.18", "ureq", ] @@ -2369,6 +2298,15 @@ dependencies = [ "regex-automata 0.1.10", ] +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata 0.1.10", +] + [[package]] name = "matchit" version = "0.7.3" @@ -2377,9 +2315,9 @@ checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" [[package]] name = "memchr" -version = "2.7.1" +version = "2.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "523dc4f511e55ab87b694dc30d0f820d60906ef06413f93d4d7a1385599cc149" +checksum = "6c8640c5d730cb13ebd907d8d04b52f55ac9a2eec55b440c8892f40d56c76c1d" [[package]] name = "memfd" @@ -2489,6 +2427,16 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + [[package]] name = "num-bigint" version = "0.4.4" @@ -2500,6 +2448,12 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-conv" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" + [[package]] name = "num-format" version = "0.4.4" @@ -2595,6 +2549,12 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4030760ffd992bef45b0ae3f10ce1aba99e33464c90d14dd7c039884963ddc7a" +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "parity-bip39" version = "2.0.1" @@ -2714,14 +2674,14 @@ checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", - "syn 2.0.55", + "syn 2.0.57", ] [[package]] name = "pin-project-lite" -version = "0.2.13" +version = "0.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58" +checksum = "bda66fc9667c18cb2758a2ac84d1167245054bcf85d5d1aaa6923f45801bdd02" [[package]] name = "pin-utils" @@ -2795,7 +2755,7 @@ dependencies = [ "polkavm-common 0.8.0", "proc-macro2", "quote", - "syn 2.0.55", + "syn 2.0.57", ] [[package]] @@ -2807,7 +2767,7 @@ dependencies = [ "polkavm-common 0.9.0", "proc-macro2", "quote", - "syn 2.0.55", + "syn 2.0.57", ] [[package]] @@ -2817,7 +2777,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "15e85319a0d5129dc9f021c62607e0804f5fb777a05cdda44d750ac0732def66" dependencies = [ "polkavm-derive-impl 0.8.0", - "syn 2.0.55", + "syn 2.0.57", ] [[package]] @@ -2827,7 +2787,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ba81f7b5faac81e528eb6158a6f3c9e0bb1008e0ffa19653bc8dea925ecb429" dependencies = [ "polkavm-derive-impl 0.9.0", - "syn 2.0.55", + "syn 2.0.57", ] [[package]] @@ -2856,6 +2816,12 @@ dependencies = [ "universal-hash", ] +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + [[package]] name = "ppv-lite86" version = "0.2.17" @@ -2869,7 +2835,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "22020dfcf177fcc7bf5deaf7440af371400c67c0de14c399938d8ed4fb4645d3" dependencies = [ "proc-macro2", - "syn 2.0.55", + "syn 2.0.57", ] [[package]] @@ -3057,7 +3023,7 @@ checksum = "5fddb4f8d99b0a2ebafc65a87a69a7b9875e4b1ae1f00db265d300ef7f28bccc" dependencies = [ "proc-macro2", "quote", - "syn 2.0.55", + "syn 2.0.57", ] [[package]] @@ -3255,9 +3221,9 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.4.0" +version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "868e20fada228fefaf6b652e00cc73623d54f8171e7352c18bb281571f2d92da" +checksum = "ecd36cc4259e3e4514335c4a138c6b43171a8d61d8f5c9348f9fc7529416f247" [[package]] name = "rustls-webpki" @@ -3424,7 +3390,7 @@ dependencies = [ "proc-macro2", "quote", "scale-info", - "syn 2.0.55", + "syn 2.0.57", "thiserror", ] @@ -3548,9 +3514,9 @@ dependencies = [ [[package]] name = "security-framework" -version = "2.9.2" +version = "2.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05b64fb303737d99b81884b2c63433e9ae28abebe5eb5045dcdd175dc2ecf4de" +checksum = "770452e37cad93e0a50d5abc3990d2bc351c36d0328f86cefec2f2fb206eaef6" dependencies = [ "bitflags 1.3.2", "core-foundation", @@ -3561,9 +3527,9 @@ dependencies = [ [[package]] name = "security-framework-sys" -version = "2.9.1" +version = "2.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e932934257d3b408ed8f30db49d85ea163bfe74961f017f405b025af298f0c7a" +checksum = "41f3cc463c0ef97e11c3461a9d3787412d30e8e7eb907c79180c4a57bf7c04ef" dependencies = [ "core-foundation-sys", "libc", @@ -3601,7 +3567,7 @@ checksum = "7eb0b34b42edc17f6b7cac84a52a1c5f0e1bb2227e997ca9011ea3dd34e8610b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.55", + "syn 2.0.57", ] [[package]] @@ -3989,7 +3955,7 @@ checksum = "48d09fa0a5f7299fb81ee25ae3853d26200f7a348148aed6de76be905c007dbe" dependencies = [ "proc-macro2", "quote", - "syn 2.0.55", + "syn 2.0.57", ] [[package]] @@ -4110,7 +4076,7 @@ dependencies = [ "proc-macro-crate 3.1.0", "proc-macro2", "quote", - "syn 2.0.55", + "syn 2.0.57", ] [[package]] @@ -4165,7 +4131,7 @@ dependencies = [ "sp-std", "tracing", "tracing-core", - "tracing-subscriber", + "tracing-subscriber 0.2.25", ] [[package]] @@ -4346,7 +4312,7 @@ dependencies = [ "scale-info", "scale-typegen", "subxt-metadata", - "syn 2.0.55", + "syn 2.0.57", "thiserror", "tokio", ] @@ -4380,7 +4346,7 @@ dependencies = [ "quote", "scale-typegen", "subxt-codegen", - "syn 2.0.55", + "syn 2.0.57", ] [[package]] @@ -4410,9 +4376,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.55" +version = "2.0.57" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "002a1b3dbf967edfafc32655d0f377ab0bb7b994aa1d32c8cc7e9b8bf3ebb8f0" +checksum = "11a6ae1e52eb25aab8f3fb9fca13be982a373b8f1157ca14b897a825ba4a2d35" dependencies = [ "proc-macro2", "quote", @@ -4469,7 +4435,7 @@ checksum = "c61f3ba182994efc43764a46c018c347bc492c79f024e705f46567b418f6d4f7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.55", + "syn 2.0.57", ] [[package]] @@ -4482,6 +4448,37 @@ dependencies = [ "once_cell", ] +[[package]] +name = "time" +version = "0.3.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8248b6521bb14bc45b4067159b9b6ad792e2d6d754d6c41fb50e29fefe38749" +dependencies = [ + "deranged", + "itoa", + "num-conv", + "powerfmt", + "serde", + "time-core", + "time-macros", +] + +[[package]] +name = "time-core" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" + +[[package]] +name = "time-macros" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ba3a3ef41e6672a2f0f001392bb5dcd3ff0a9992d618ca761a11c3121547774" +dependencies = [ + "num-conv", + "time-core", +] + [[package]] name = "tinyvec" version = "1.6.0" @@ -4499,9 +4496,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.36.0" +version = "1.37.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61285f6515fa018fb2d1e46eb21223fff441ee8db5d0f1435e8ab4f5cdb80931" +checksum = "1adbebffeca75fcfd058afa480fb6c0b81e165a0323f9c9d39c9697e37c46787" dependencies = [ "backtrace", "bytes", @@ -4523,7 +4520,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.55", + "syn 2.0.57", ] [[package]] @@ -4701,7 +4698,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.55", + "syn 2.0.57", ] [[package]] @@ -4744,7 +4741,7 @@ dependencies = [ "ansi_term", "chrono", "lazy_static", - "matchers", + "matchers 0.0.1", "regex", "serde", "serde_json", @@ -4757,6 +4754,24 @@ dependencies = [ "tracing-serde", ] +[[package]] +name = "tracing-subscriber" +version = "0.3.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" +dependencies = [ + "matchers 0.1.0", + "nu-ansi-term", + "once_cell", + "regex", + "sharded-slab", + "smallvec", + "thread_local", + "time", + "tracing", + "tracing-core", +] + [[package]] name = "trie-db" version = "0.28.0" @@ -4867,6 +4882,8 @@ dependencies = [ "base64 0.21.7", "log", "once_cell", + "serde", + "serde_json", "url", ] @@ -4881,12 +4898,6 @@ dependencies = [ "percent-encoding", ] -[[package]] -name = "utf8parse" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" - [[package]] name = "valuable" version = "0.1.0" @@ -4975,7 +4986,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.55", + "syn 2.0.57", "wasm-bindgen-shared", ] @@ -4997,7 +5008,7 @@ checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.55", + "syn 2.0.57", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -5490,7 +5501,7 @@ checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.55", + "syn 2.0.57", ] [[package]] @@ -5510,5 +5521,5 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" dependencies = [ "proc-macro2", "quote", - "syn 2.0.55", + "syn 2.0.57", ] diff --git a/Cargo.toml b/Cargo.toml index 50a8898..8c55877 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,23 +11,23 @@ keywords = ["substrate", "blockchain", "finance", "service", "middleware"] categories = ["finance"] [dependencies] -env_logger = { version = "0.11", default-features = false, features = ["humantime", "auto-color"] } toml_edit = { version = "0.22", default-features = false, features = ["parse", "serde"] } axum = { version = "0.7", default-features = false, features = ["tokio", "http1", "query", "json", "matched-path"] } +tracing-subscriber = { version = "0.3", default-features = false, features = ["smallvec", "ansi", "env-filter", "time"] } +ureq = { version = "2", default-features = false, features = ["json"] } + +names = { version = "0.14", default-features = false } subxt = { version = "=0.35", features = ["substrate-compat", "unstable-reconnecting-rpc-client"] } tokio-util = { version = "0.7", features = ["rt"] } tokio = { version = "1", features = ["signal"] } -ureq = { version = "2", default-features = false } -names = { version = "0.14", default-features = false } - hex-simd = "0.8" serde = "1" anyhow = "1" -log = "0.4" redb = "2" -humantime = "2" +tracing = "0.1" +scale-info = "2" [profile.release] strip = true diff --git a/kalatori-test.toml b/kalatori-test.toml index 2832a8a..195ea86 100644 --- a/kalatori-test.toml +++ b/kalatori-test.toml @@ -1,6 +1,6 @@ recipient = "5FHneW46xGXgs5mUiveU4sbTyGBzmstUspZC92UhjJM694ty" -account-lifetime = 3600 # 1 hour. -depth = 86400 # 1 day. +account-lifetime = 86400000 # 1 day. +depth = 3600000 # 1 hour. debug = true [[chain]] @@ -17,10 +17,9 @@ name = "westmint" native-token = "WND AH" decimals = 12 endpoints = [ - "wss://polkadot-asset-hub-rpc.polkadot.io", - "wss://statemint-rpc.dwellir.com", + "wss://westend-asset-hub-rpc.polkadot.io", + "wss://westmint-rpc.dwellir.com", ] -multi-location-assets = true [[chain.asset]] name = "JOE" @@ -29,3 +28,28 @@ id = 8 [[chain.asset]] name = "TEST" id = 1234 + +[[chain]] +name = "rococo" +native-token = "ROC" +decimals = 12 +endpoints = [ + "wss://rococo-rpc.polkadot.io", +] + +[[chain]] +name = "assethub-rococo" +native-token = "ROC AH" +decimals = 12 +endpoints = [ + "wss://rococo-asset-hub-rpc.polkadot.io", + "wss://rococo-asset-hub-rpc.dwellir.com", +] + +[[chain.asset]] +name = "USDT" +id = 1984 + +[[chain.asset]] +name = "TUSDT" +id = 7777 diff --git a/kalatori.toml b/kalatori.toml index 1a10b9d..75c2091 100644 --- a/kalatori.toml +++ b/kalatori.toml @@ -1,6 +1,6 @@ recipient = "5FHneW46xGXgs5mUiveU4sbTyGBzmstUspZC92UhjJM694ty" -account-lifetime = 86400 # 1 day. -depth = 604800 # 1 week. +account-lifetime = 604800000 # 1 week. +depth = 86400000 # 1 day. [[chain]] name = "polkadot" @@ -34,3 +34,14 @@ endpoints = [ "wss://kusama-rpc.polkadot.io", "wss://1rpc.io/ksm", ] + +[[chain]] +name = "assethub-kusama" +endpoints = [ + "wss://kusama-asset-hub-rpc.polkadot.io", + "wss://statemine-rpc.dwellir.com", +] + +[[chain.asset]] +name = "RMRK" +id = 8 diff --git a/src/asset.rs b/src/asset.rs new file mode 100644 index 0000000..54fe210 --- /dev/null +++ b/src/asset.rs @@ -0,0 +1,244 @@ +use crate::{AssetId, PalletIndex}; +use std::marker::PhantomData; +use subxt::ext::{ + codec::{Encode, Output}, + scale_decode::{ + self, + error::ErrorKind, + visitor::{ + types::{Composite, Tuple}, + TypeIdFor, + }, + DecodeAsType, IntoVisitor, TypeResolver, Visitor, + }, + scale_encode::{self, EncodeAsType}, +}; + +#[derive(Clone, Debug)] +pub enum Asset { + Id(AssetId), + MultiLocation(PalletIndex, AssetId), +} + +impl Asset { + const ID: &'static str = "Id"; + const MULTI_LOCATION: &'static str = "MultiLocation"; +} + +pub struct AssetVisitor(PhantomData); + +fn try_into_asset_id( + number: impl TryInto + ToString + Copy, +) -> Result { + number.try_into().map_err(|_| { + scale_decode::Error::new(ErrorKind::NumberOutOfRange { + value: number.to_string(), + }) + }) +} + +macro_rules! visit_number { + ($visit:ident, $number:ty) => { + fn $visit<'scale, 'resolver>( + self, + number: $number, + type_id: &TypeIdFor, + ) -> Result, Self::Error> { + self.visit_u32(try_into_asset_id(number)?, type_id) + } + }; +} + +macro_rules! visit_composite_or_tuple { + ($visit:ident, $composite_or_tuple:ident) => { + fn $visit<'scale, 'resolver>( + self, + composite_or_tuple: &mut $composite_or_tuple<'scale, 'resolver, Self::TypeResolver>, + type_id: &TypeIdFor, + ) -> Result, Self::Error> { + if composite_or_tuple.remaining() == 1 { + // Shouldn't panic. We've just checked remaining items above. + return composite_or_tuple + .decode_item(self) + .unwrap() + .map_err(|error| error.at_variant(Self::Value::ID)); + } + + let (pallet_index, asset_id) = (|| { + let MultiLocation { + interior: Junctions::X2(first, second), + .. + } = MultiLocation::into_visitor().$visit(composite_or_tuple, type_id)?; + + let Junction::PalletInstance(pallet_index) = first else { + return Err(scale_decode::Error::new(ErrorKind::CannotFindVariant { + got: first.name().into(), + expected: vec![Junction::PALLET_INSTANCE], + }) + .at_idx(0)); + }; + + let asset_id = (|| { + let Junction::GeneralIndex(general_index) = second else { + return Err(scale_decode::Error::new(ErrorKind::CannotFindVariant { + got: first.name().into(), + expected: vec![Junction::GENERAL_INDEX], + })); + }; + + let asset_id = general_index.try_into().map_err(|_| { + scale_decode::Error::new(ErrorKind::NumberOutOfRange { + value: general_index.to_string(), + }) + })?; + + Ok(asset_id) + })() + .map_err(|error| error.at_idx(1))?; + + Ok((pallet_index, asset_id)) + })() + .map_err(|error| error.at_variant(Self::Value::MULTI_LOCATION))?; + + Ok(Self::Value::MultiLocation(pallet_index, asset_id)) + } + }; +} + +impl Visitor for AssetVisitor { + type Value<'scale, 'resolver> = Asset; + type Error = scale_decode::Error; + type TypeResolver = R; + + fn visit_u32<'scale, 'resolver>( + self, + value: u32, + _: &TypeIdFor, + ) -> Result, Self::Error> { + Ok(Self::Value::Id(value)) + } + + fn visit_u8<'scale, 'resolver>( + self, + value: u8, + type_id: &TypeIdFor, + ) -> Result, Self::Error> { + self.visit_u32(value.into(), type_id) + } + + fn visit_u16<'scale, 'resolver>( + self, + value: u16, + type_id: &TypeIdFor, + ) -> Result, Self::Error> { + self.visit_u32(value.into(), type_id) + } + + visit_number!(visit_i8, i8); + visit_number!(visit_i16, i16); + visit_number!(visit_i32, i32); + visit_number!(visit_u64, u64); + visit_number!(visit_i64, i64); + visit_number!(visit_i128, i128); + visit_number!(visit_u128, u128); + + visit_composite_or_tuple!(visit_tuple, Tuple); + visit_composite_or_tuple!(visit_composite, Composite); +} + +impl IntoVisitor for Asset { + type AnyVisitor = AssetVisitor; + + fn into_visitor() -> Self::AnyVisitor { + AssetVisitor(PhantomData) + } +} + +impl EncodeAsType for Asset { + fn encode_as_type_to( + &self, + type_id: &R::TypeId, + types: &R, + out: &mut Vec, + ) -> Result<(), scale_encode::Error> { + match self { + Self::Id(id) => id.encode_as_type_to(type_id, types, out), + Self::MultiLocation(assets_pallet, asset_id) => { + MultiLocation::new(*assets_pallet, *asset_id).encode_as_type_to(type_id, types, out) + } + } + } +} + +impl Encode for Asset { + fn size_hint(&self) -> usize { + match self { + Self::Id(id) => id.size_hint(), + Self::MultiLocation(assets_pallet, asset_id) => { + MultiLocation::new(*assets_pallet, *asset_id).size_hint() + } + } + } + + fn encode_to(&self, dest: &mut T) { + match self { + Self::Id(id) => id.encode_to(dest), + Self::MultiLocation(assets_pallet, asset_id) => { + MultiLocation::new(*assets_pallet, *asset_id).encode_to(dest); + } + } + } +} + +#[derive(EncodeAsType, DecodeAsType, Encode)] +#[encode_as_type(crate_path = "subxt::ext::scale_encode")] +#[decode_as_type(crate_path = "subxt::ext::scale_decode")] +#[codec(crate = subxt::ext::codec)] +struct MultiLocation { + parents: u8, + interior: Junctions, +} + +impl MultiLocation { + fn new(assets_pallet: PalletIndex, asset_id: AssetId) -> Self { + Self { + parents: 0, + interior: Junctions::X2( + Junction::PalletInstance(assets_pallet), + Junction::GeneralIndex(asset_id.into()), + ), + } + } +} + +#[derive(EncodeAsType, DecodeAsType, Encode)] +#[encode_as_type(crate_path = "subxt::ext::scale_encode")] +#[decode_as_type(crate_path = "subxt::ext::scale_decode")] +#[codec(crate = subxt::ext::codec)] +enum Junctions { + #[codec(index = 2)] + X2(Junction, Junction), +} + +#[derive(EncodeAsType, DecodeAsType, Encode)] +#[encode_as_type(crate_path = "subxt::ext::scale_encode")] +#[decode_as_type(crate_path = "subxt::ext::scale_decode")] +#[codec(crate = subxt::ext::codec)] +enum Junction { + #[codec(index = 4)] + PalletInstance(PalletIndex), + #[codec(index = 5)] + GeneralIndex(u128), +} + +impl Junction { + const PALLET_INSTANCE: &'static str = "PalletInstance"; + const GENERAL_INDEX: &'static str = "GeneralIndex"; + + fn name(&self) -> &str { + match self { + Self::PalletInstance(_) => Self::PALLET_INSTANCE, + Self::GeneralIndex(_) => Self::GENERAL_INDEX, + } + } +} diff --git a/src/callback.rs b/src/callback.rs index 1fa0c7e..7763b55 100644 --- a/src/callback.rs +++ b/src/callback.rs @@ -1,2 +1 @@ - pub const MODULE: &str = module_path!(); diff --git a/src/database.rs b/src/database.rs index 3439919..b428a51 100644 --- a/src/database.rs +++ b/src/database.rs @@ -1,11 +1,22 @@ -use crate::{rpc::ConnectedChain, AssetId, Balance, BlockNumber, Nonce, Timestamp}; +use crate::{ + rpc::{ConnectedChain, Currency}, + AccountId, AssetId, Balance, BlockNumber, Config, Nonce, Timestamp, Version, +}; use anyhow::{Context, Result}; -use redb::{Database, ReadableTable, Table, TableDefinition, TableHandle, TypeName, Value}; -use std::{collections::HashMap, sync::Arc}; +use redb::{ + backends::{FileBackend, InMemoryBackend}, + Database, ReadableTable, Table, TableDefinition, TableHandle, TypeName, Value, +}; +use serde::Deserialize; +use std::{collections::HashMap, fs::File, io::ErrorKind, sync::Arc}; use subxt::ext::{ codec::{Compact, Decode, Encode}, - sp_core::sr25519::{Pair, Public}, + sp_core::{ + crypto::Ss58Codec, + sr25519::{Pair, Public}, + }, }; +use tokio::sync::RwLock; pub const MODULE: &str = module_path!(); @@ -15,8 +26,6 @@ const ROOT: TableDefinition<'_, &str, &[u8]> = TableDefinition::new("root"); const KEYS: TableDefinition<'_, PublicSlot, U256Slot> = TableDefinition::new("keys"); const CHAINS: TableDefinition<'_, ChainHash, BlockNumber> = TableDefinition::new("chains"); const INVOICES: TableDefinition<'_, InvoiceKey, Invoice> = TableDefinition::new("invoices"); -const HIT_LIST: TableDefinition<'_, Timestamp, (ChainHash, AssetId, Account)> = - TableDefinition::new("hit_list"); const ACCOUNTS: &str = "accounts"; @@ -28,6 +37,11 @@ const TRANSACTIONS: &str = "transactions"; type TRANSACTIONS_KEY = BlockNumber; type TRANSACTIONS_VALUE = (Account, Nonce, Transfer); +const HIT_LIST: &str = "hit_list"; + +type HIT_LIST_KEY = BlockNumber; +type HIT_LIST_VALUE = (Option, Account); + // `ROOT` keys // The database version must be stored in a separate slot to be used by the not implemented yet @@ -84,14 +98,36 @@ struct Invoice { price: BalanceSlot, callback: String, message: String, - asset: Option>, - transactions: Vec, + transactions: TransferTxs, +} + +#[derive(Encode, Decode, Debug)] +#[codec(crate = subxt::ext::codec)] +enum TransferTxs { + Asset { + #[codec(compact)] + id: AssetId, + // transactions: TransferTxsAsset, + }, + Native { + recipient: Account, + encoded: Vec, + exact_amount: Option>, + }, } +// #[derive(Encode, Decode, Debug)] +// #[codec(crate = subxt::ext::codec)] +// struct TransferTxsAsset { +// recipient: Account, +// encoded: Vec, +// #[codec(compact)] +// amount: BalanceSlot, +// } + #[derive(Encode, Decode, Debug)] #[codec(crate = subxt::ext::codec)] struct TransferTx { - encoded: Vec, recipient: Account, exact_amount: Option>, } @@ -121,124 +157,95 @@ impl Value for Invoice { } } +pub struct ConfigWoChains { + pub recipient: String, + pub debug: Option, + pub remark: Option, + pub depth: Option, + pub account_lifetime: BlockNumber, + pub rpc: String, +} + pub struct State { - db: Database, - // properties: Arc>, - // pair: Pair, - // rpc: String, - // destination: Option, + pub currencies: HashMap, + pub recipient: AccountId, + pub pair: Pair, + pub depth: Option, + pub account_lifetime: Timestamp, + pub debug: Option, + pub remark: Option, + + pub invoices: RwLock>, + pub rpc: String, +} + +#[derive(Deserialize, Debug)] +pub struct Invoicee { + pub callback: String, + pub amount: Balance, + pub paid: bool, + pub paym_acc: AccountId, } impl State { pub fn initialise( path_option: Option, + currencies: HashMap, current_pair: (Pair, Public), old_pairs: HashMap, - connected_chains: HashMap, + ConfigWoChains { + recipient, + debug, + remark, + depth, + account_lifetime, + rpc, + }: ConfigWoChains, ) -> Result> { - // let mut database = Database::create(path_option.unwrap()).unwrap(); - - // let tx = database - // .begin_write() - // .context("failed to begin a write transaction")?; - // let mut table = tx - // .open_table(ROOT) - // .with_context(|| format!("failed to open the `{}` table", ROOT.name()))?; - // drop( - // tx.open_table(INVOICES) - // .with_context(|| format!("failed to open the `{}` table", INVOICES.name()))?, - // ); - - // let last_block = match ( - // get_slot(&table, DB_VERSION_KEY)?, - // get_slot(&table, DAEMON_INFO)?, - // get_slot(&table, LAST_BLOCK)?, - // ) { - // (None, None, None) => { - // table - // .insert( - // DB_VERSION_KEY, - // Compact(DATABASE_VERSION).encode(), - // ) - // .context("failed to insert the database version")?; - // insert_daemon_info(&mut table, given_rpc.clone(), public)?; - - // None - // } - // (Some(encoded_db_version), Some(daemon_info), last_block_option) => { - // let Compact::(db_version) = - // decode_slot(&encoded_db_version, DB_VERSION_KEY)?; - // let DaemonInfo { rpc: db_rpc, key } = decode_slot(&daemon_info, DAEMON_INFO)?; - - // if db_version != DATABASE_VERSION { - // anyhow::bail!( - // "database contains an unsupported database version (\"{db_version}\"), expected \"{DATABASE_VERSION}\"" - // ); - // } - - // if public != key { - // anyhow::bail!( - // "public key from `{SEED}` doesn't equal the one from the database (\"{public_formatted}\")" - // ); - // } - - // if given_rpc != db_rpc { - // if override_rpc { - // log::warn!( - // "The saved RPC endpoint ({db_rpc:?}) differs from the given one ({given_rpc:?}) and will be overwritten by it because `{OVERRIDE_RPC}` is set." - // ); - - // insert_daemon_info(&mut table, given_rpc.clone(), public)?; - // } else { - // anyhow::bail!( - // "database contains a different RPC endpoint address ({db_rpc:?}), expected {given_rpc:?}" - // ); - // } - // } else if override_rpc { - // log::warn!( - // "`{OVERRIDE_RPC}` is set but the saved RPC endpoint ({db_rpc:?}) equals to the given one." - // ); - // } - - // if let Some(encoded_last_block) = last_block_option { - // Some(decode_slot::>(&encoded_last_block, LAST_BLOCK)?.0) - // } else { - // None - // } - // } - // _ => anyhow::bail!( - // "database was found but it doesn't contain `{DB_VERSION_KEY:?}` and/or `{DAEMON_INFO:?}`, maybe it was created by another program" - // ), - // }; - - // drop(table); - - // tx.commit().context("failed to commit a transaction")?; - - // let compacted = database - // .compact() - // .context("failed to compact the database")?; - - // if compacted { - // log::debug!("The database was successfully compacted."); - // } else { - // log::debug!("The database doesn't need the compaction."); - // } - - // log::info!("Public key from the given seed: \"{public_formatted}\"."); - - // Ok(( - // Arc::new(Self { - // db: database, - // properties: chain, - // pair, - // rpc: given_rpc, - // destination, - // }), - // last_block, - // )) - - todo!() + let builder = Database::builder(); + let is_new; + + let database = if let Some(path) = path_option { + tracing::info!("Creating/Opening the database at {path:?}."); + + match File::create_new(&path) { + Ok(file) => { + is_new = true; + + FileBackend::new(file).and_then(|backend| builder.create_with_backend(backend)) + } + Err(error) if error.kind() == ErrorKind::AlreadyExists => { + is_new = false; + + builder.create(path) + } + Err(error) => Err(error.into()) + } + } else { + tracing::warn!( + "The in-memory backend for the database is selected. All saved data will be deleted after the shutdown!" + ); + + is_new = true; + + builder.create_with_backend(InMemoryBackend::new()) + }.context("failed to create/open the database")?; + + // + + Ok(Arc::new(Self { + currencies, + recipient: AccountId::from_string(&recipient) + .context("failed to convert \"recipient\" from the config to an account address")?, + pair: current_pair.0, + depth, + account_lifetime, + debug, + remark, + + invoices: RwLock::new(HashMap::new()), + rpc, + })) } // pub fn rpc(&self) -> &str { diff --git a/src/main.rs b/src/main.rs index 4c3e15b..b71b75e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,10 +1,10 @@ use anyhow::{Context, Error, Result}; -use env_logger::{Builder, Env}; -use log::LevelFilter; use serde::Deserialize; use std::{ + borrow::Cow, collections::HashMap, env::{self, VarError}, + error::Error as _, fs, future::Future, net::{IpAddr, Ipv4Addr, SocketAddr}, @@ -12,34 +12,35 @@ use std::{ panic, str, }; use subxt::{ - config::PolkadotExtrinsicParams, - ext::{ - codec::{Encode, Output}, - scale_decode::DecodeAsType, - scale_encode::{self, EncodeAsType, TypeResolver}, - sp_core::{ - crypto::{AccountId32, Ss58Codec}, - sr25519::Pair, - Pair as _, - }, + config::{substrate::SubstrateHeader, PolkadotExtrinsicParams}, + ext::sp_core::{ + crypto::{AccountId32, Ss58Codec}, + sr25519::Pair, + Pair as _, }, PolkadotConfig, }; use tokio::{ signal, - sync::mpsc::{self, UnboundedSender}, + sync::mpsc::{self, UnboundedReceiver, UnboundedSender}, + task::JoinHandle, }; -use tokio_util::{sync::CancellationToken, task::TaskTracker}; +use tokio_util::{sync::CancellationToken, task}; use toml_edit::de; +use tracing_subscriber::{fmt::time::UtcTime, EnvFilter}; +mod asset; mod callback; mod database; mod rpc; mod server; +use asset::Asset; +use database::{ConfigWoChains, State}; +use rpc::Processor; + const CONFIG: &str = "KALATORI_CONFIG"; const LOG: &str = "KALATORI_LOG"; -const LOG_STYLE: &str = "KALATORI_LOG_STYLE"; const SEED: &str = "KALATORI_SEED"; const OLD_SEED: &str = "KALATORI_OLD_SEED_"; @@ -54,11 +55,12 @@ type Decimals = u8; type BlockNumber = u64; type ExtrinsicIndex = u32; type Version = u64; -type AccountId = ::AccountId; type Nonce = u32; type Timestamp = u64; type PalletIndex = u8; + type BlockHash = ::Hash; +type AccountId = ::AccountId; type OnlineClient = subxt::OnlineClient; struct RuntimeConfig; @@ -69,111 +71,38 @@ impl subxt::Config for RuntimeConfig { type Address = ::Address; type Signature = ::Signature; type Hasher = ::Hasher; - type Header = ::Header; + type Header = SubstrateHeader; type ExtrinsicParams = PolkadotExtrinsicParams; - type AssetId = u32; -} - -enum Asset { - Id(AssetId), - MultiLocation(PalletIndex, AssetId), -} - -impl EncodeAsType for Asset { - fn encode_as_type_to( - &self, - type_id: &R::TypeId, - types: &R, - out: &mut Vec, - ) -> Result<(), scale_encode::Error> { - match self { - Self::Id(id) => id.encode_as_type_to(type_id, types, out), - Self::MultiLocation(assets_pallet, asset_id) => { - MultiLocation::new(*assets_pallet, *asset_id).encode_as_type_to(type_id, types, out) - } - } - } -} - -impl Encode for Asset { - fn size_hint(&self) -> usize { - match self { - Self::Id(id) => id.size_hint(), - Self::MultiLocation(assets_pallet, asset_id) => { - MultiLocation::new(*assets_pallet, *asset_id).size_hint() - } - } - } - - fn encode_to(&self, dest: &mut T) { - match self { - Self::Id(id) => id.encode_to(dest), - Self::MultiLocation(assets_pallet, asset_id) => { - MultiLocation::new(*assets_pallet, *asset_id).encode_to(dest); - } - } - } -} - -#[derive(EncodeAsType, DecodeAsType, Encode)] -#[encode_as_type(crate_path = "subxt::ext::scale_encode")] -#[decode_as_type(crate_path = "subxt::ext::scale_decode")] -#[codec(crate = subxt::ext::codec)] -struct MultiLocation { - parents: u8, - interior: Junctions, -} - -impl MultiLocation { - fn new(assets_pallet: PalletIndex, asset_id: AssetId) -> Self { - Self { - parents: 0, - interior: Junctions::X2( - Junction::PalletInstance(assets_pallet), - Junction::GeneralIndex(asset_id.into()), - ), - } - } -} - -#[derive(EncodeAsType, DecodeAsType, Encode)] -#[encode_as_type(crate_path = "subxt::ext::scale_encode")] -#[decode_as_type(crate_path = "subxt::ext::scale_decode")] -#[codec(crate = subxt::ext::codec)] -enum Junctions { - #[codec(index = 2)] - X2(Junction, Junction), -} - -#[derive(EncodeAsType, DecodeAsType, Encode)] -#[encode_as_type(crate_path = "subxt::ext::scale_encode")] -#[decode_as_type(crate_path = "subxt::ext::scale_decode")] -#[codec(crate = subxt::ext::codec)] -pub enum Junction { - #[codec(index = 4)] - PalletInstance(PalletIndex), - #[codec(index = 5)] - GeneralIndex(u128), + type AssetId = Asset; } #[tokio::main] #[allow(clippy::too_many_lines)] async fn main() -> Result<()> { - let mut builder = Builder::new(); + let filter = match EnvFilter::try_from_env(LOG) { + Err(error) => { + let Some(VarError::NotPresent) = error + .source() + .expect("should always be `Some`") + .downcast_ref() + else { + return Err(error).with_context(|| format!("failed to parse `{LOG}`")); + }; + + if cfg!(debug_assertions) { + EnvFilter::try_new("debug") + } else { + EnvFilter::try_new(default_filter()) + } + .unwrap() + } + Ok(filter) => filter, + }; - if cfg!(debug_assertions) { - builder.filter_level(LevelFilter::Debug) - } else { - builder - .filter_level(LevelFilter::Off) - .filter_module(callback::MODULE, LevelFilter::Info) - .filter_module(database::MODULE, LevelFilter::Info) - .filter_module(rpc::MODULE, LevelFilter::Info) - .filter_module(server::MODULE, LevelFilter::Info) - .filter_module(env!("CARGO_PKG_NAME"), LevelFilter::Info) - } - .parse_env(Env::from(LOG).write_style(LOG_STYLE)) - .init(); + tracing_subscriber::fmt() + .with_timer(UtcTime::rfc_3339()) + .with_env_filter(filter) + .init(); let pair = Pair::from_string( &env::var(SEED).with_context(|| format!("failed to read `{SEED}`"))?, @@ -208,7 +137,7 @@ async fn main() -> Result<()> { let config_path = env::var(CONFIG).or_else(|error| match error { VarError::NotUnicode(_) => Err(error).with_context(|| format!("failed to read `{CONFIG}`")), VarError::NotPresent => { - log::debug!( + tracing::debug!( "`{CONFIG}` isn't present, using the default value instead: {DEFAULT_CONFIG:?}." ); @@ -236,29 +165,27 @@ async fn main() -> Result<()> { break 'database None; } } else if config.in_memory_db.is_some() { - log::warn!("`in_memory_db` is set in the config but ignored because `debug` isn't set"); + tracing::warn!( + "`in_memory_db` is set in the config but ignored because `debug` isn't set" + ); } Some(config.database.unwrap_or_else(|| { - log::debug!( + tracing::debug!( "`database` isn't present in the config, using the default value instead: {DEFAULT_DATABASE:?}." ); - DEFAULT_DATABASE.to_owned() + DEFAULT_DATABASE.into() })) }; - let recipient = AccountId::from_string(&config.recipient) - .context("failed to convert `recipient` from the config to an account address")?; - - log::info!( + tracing::info!( "Kalatori {} by {} is starting...", env!("CARGO_PKG_VERSION"), env!("CARGO_PKG_AUTHORS") ); let shutdown_notification = CancellationToken::new(); - let (error_tx, mut error_rx) = mpsc::unbounded_channel(); let shutdown_notification_for_panic = shutdown_notification.clone(); panic::set_hook(Box::new(move |panic_info| { @@ -266,85 +193,187 @@ async fn main() -> Result<()> { .location() .map(|location| format!(" at `{location}`")) .unwrap_or_default(); - let panic_message = panic_info - .payload() - .downcast_ref::<&str>() - .map_or_else(|| ".".into(), |message| format!(":\n{message}\n")); - - log::error!( - "A panic detected{at}{panic_message}\nThis is a bug. Please report it at {}.", + let payload = panic_info.payload(); + + let message = match payload.downcast_ref::<&str>() { + Some(string) => Some(*string), + None => payload.downcast_ref::().map(|string| &string[..]), + }; + let formatted_message = match message { + Some(string) => format!(":\n{string}\n"), + None => ".".into(), + }; + + tracing::error!( + "A panic detected{at}{formatted_message}\nThis is a bug. Please report it at {}.", env!("CARGO_PKG_REPOSITORY") ); shutdown_notification_for_panic.cancel(); })); - let chains = rpc::prepare(config.chain) + let (task_tracker, error_rx) = TaskTracker::new(); + + task_tracker.spawn( + "the shutdown listener", + shutdown_listener(shutdown_notification.clone()), + ); + + let (chains, currencies) = rpc::prepare(config.chain, config.account_lifetime, config.depth) .await .context("failed while preparing the RPC module")?; - // let (database, last_saved_block) = Database::initialise( - // database_path, - // override_rpc, - // pair, - // endpoint_properties, - // destination, - // ) - // .context("failed to initialise the database module")?; + let rpc = env::var("KALATORI_RPC").unwrap(); + + let state = State::initialise( + database_path, + currencies, + (pair, pair_public), + old_seeds, + ConfigWoChains { + recipient: config.recipient, + debug: config.debug, + remark: config.remark, + depth: config.depth, + account_lifetime: config.account_lifetime, + rpc, + }, + ) + .context("failed to initialise the database module")?; + - // let processor = Processor::new(api_config, database.clone(), shutdown_notification.clone()) - // .context("failed to initialise the RPC module")?; + task_tracker.spawn( + "proc", + Processor::ignite(state.clone(), shutdown_notification.clone()), + ); - let server = server::new(shutdown_notification.clone(), host) + let server = server::new(shutdown_notification.clone(), host, state) .await .context("failed to initialise the server module")?; - let task_tracker = TaskTracker::new(); - - task_tracker.close(); - task_tracker.spawn(try_task( - "the shutdown listener", - shutdown_listener(shutdown_notification.clone()), - error_tx.clone(), - )); // task_tracker.spawn(shutdown( // processor.ignite(last_saved_block, task_tracker.clone(), error_tx.clone()), // error_tx, // )); - task_tracker.spawn(try_task("the server module", server, error_tx)); + task_tracker.spawn("the server module", server); - while let Some((from, error)) = error_rx.recv().await { - log::error!("Received a fatal error from {from}!\n{error:?}"); + task_tracker + .wait_with_notification(error_rx, shutdown_notification) + .await; - if !shutdown_notification.is_cancelled() { - log::info!("Initialising the shutdown..."); + tracing::info!("Goodbye!"); - shutdown_notification.cancel(); + Ok(()) +} + +fn default_filter() -> String { + const TARGETS: &[&str] = &[ + callback::MODULE, + database::MODULE, + rpc::MODULE, + server::MODULE, + env!("CARGO_PKG_NAME"), + ]; + const COMMA: &str = ","; + const INFO: &str = "=info"; + const OFF: &str = "off"; + + let mut filter = String::with_capacity( + OFF.len().saturating_add( + TARGETS + .iter() + .map(|module| { + COMMA + .len() + .saturating_add(module.len()) + .saturating_add(INFO.len()) + }) + .sum(), + ), + ); + + filter.push_str(OFF); + + for target in TARGETS { + filter.push_str(COMMA); + filter.push_str(target); + filter.push_str(INFO); + } + + filter +} + +#[derive(Clone)] +struct TaskTracker { + inner: task::TaskTracker, + error_tx: UnboundedSender<(Cow<'static, str>, Error)>, +} + +impl TaskTracker { + fn new() -> (Self, UnboundedReceiver<(Cow<'static, str>, Error)>) { + let (error_tx, error_rx) = mpsc::unbounded_channel(); + let inner = task::TaskTracker::new(); + + inner.close(); + + (Self { inner, error_tx }, error_rx) + } + + fn spawn( + &self, + name: impl Into> + Send + 'static, + task: impl Future>> + Send + 'static, + ) -> JoinHandle<()> { + let error_tx = self.error_tx.clone(); + + self.inner.spawn(async move { + match task.await { + Ok(shutdown_message) if !shutdown_message.is_empty() => { + tracing::info!("{shutdown_message}"); + } + Err(error) => error_tx.send((name.into(), error)).unwrap(), + _ => {} + } + }) + } + + async fn wait_with_notification( + self, + mut error_rx: UnboundedReceiver<(Cow<'static, str>, Error)>, + shutdown_notification: CancellationToken, + ) { + drop(self.error_tx); + + while let Some((from, error)) = error_rx.recv().await { + tracing::error!("Received a fatal error from {from}!\n{error:?}"); + + if !shutdown_notification.is_cancelled() { + tracing::info!("Initialising the shutdown..."); + + shutdown_notification.cancel(); + } } + + self.inner.wait().await; } - task_tracker.wait().await; + async fn try_wait( + self, + mut error_rx: UnboundedReceiver<(Cow<'static, str>, Error)>, + ) -> Result<()> { + drop(self.error_tx); - log::info!("Goodbye!"); + if let Some((from, error)) = error_rx.recv().await { + return Err(error).with_context(|| format!("received a fatal error from {from}")); + } - Ok(()) -} + self.inner.wait().await; -async fn try_task<'a>( - name: &'a str, - task: impl Future>, - error_tx: UnboundedSender<(&'a str, Error)>, -) { - match task.await { - Ok(shutdown_message) if !shutdown_message.is_empty() => log::info!("{shutdown_message}"), - Err(error) => error_tx - .send((name, error)) - .expect("error channel shouldn't be dropped/closed"), - _ => {} + Ok(()) } } -async fn shutdown_listener(shutdown_notification: CancellationToken) -> Result { +async fn shutdown_listener(shutdown_notification: CancellationToken) -> Result> { tokio::select! { biased; signal = signal::ctrl_c() => { @@ -353,7 +382,7 @@ async fn shutdown_listener(shutdown_notification: CancellationToken) -> Result Result, host: Option, database: Option, remark: Option, @@ -385,7 +414,6 @@ struct Chain { #[serde(flatten)] native_token: Option, asset: Option>, - multi_location_assets: Option, } #[derive(Deserialize)] @@ -401,7 +429,7 @@ struct AssetInfo { id: AssetId, } -#[derive(Debug)] +#[derive(Deserialize, Debug)] struct Balance(u128); impl Deref for Balance { diff --git a/src/rpc.rs b/src/rpc.rs index 5a66e0a..91df300 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -1,27 +1,61 @@ use crate::{ - AssetId, Balance, BlockHash, BlockNumber, Chain, Decimals, OnlineClient, PalletIndex, - RuntimeConfig, + asset::Asset, + database::{Invoicee, State}, + server::{ + CurrencyInfo, OrderInfo, OrderStatus, PaymentStatus, ServerInfo, TokenKind, + WithdrawalStatus, + }, + AccountId, AssetId, AssetInfo, Balance, BlockHash, BlockNumber, Chain, Decimals, NativeToken, + Nonce, OnlineClient, PalletIndex, RuntimeConfig, TaskTracker, Timestamp, }; use anyhow::{Context, Result}; +use scale_info::TypeDef; +use serde::{Deserialize, Deserializer}; use std::{ - collections::HashMap, - fmt::{self, Debug, Formatter}, + borrow::Cow, + collections::{hash_map::Entry, HashMap}, + error::Error, + fmt::{self, Debug, Display, Formatter}, + num::NonZeroU64, + sync::Arc, }; use subxt::{ backend::{ - legacy::LegacyRpcMethods, - rpc::{reconnecting_rpc_client::Client, RpcClient}, + legacy::{LegacyBackend, LegacyRpcMethods}, + rpc::{reconnecting_rpc_client::Client, RpcClient, RpcSubscription}, + Backend, BackendExt, RuntimeVersion, }, + blocks::Block, + config::{DefaultExtrinsicParamsBuilder, Header}, constants::ConstantsClient, - dynamic::{self, At}, - ext::{scale_decode::DecodeAsType, sp_core::crypto::Ss58AddressFormat}, - storage::Storage, + dynamic::{self, At, Value}, + error::RpcError, + ext::{ + futures::TryFutureExt, + scale_decode::DecodeAsType, + scale_value, + sp_core::{ + crypto::{Ss58AddressFormat, Ss58Codec}, + sr25519::Pair, + }, + }, + runtime_api::RuntimeApiClient, + storage::{Storage, StorageClient}, + tx::{PairSigner, SubmittableExtrinsic}, + Config, Metadata, +}; +use tokio::sync::{ + mpsc::{self, UnboundedSender}, + oneshot::{self, Sender}, }; +use tokio_util::sync::CancellationToken; pub const MODULE: &str = module_path!(); -// const MAX_BLOCK_NUMBER_ERROR: &str = "block number type overflow is occurred"; -// const BLOCK_NONCE_ERROR: &str = "failed to fetch an account nonce by the scanner client"; +const MAX_BLOCK_NUMBER_ERROR: &str = "block number type overflow is occurred"; +const BLOCK_NONCE_ERROR: &str = "failed to fetch an account nonce by the scanner client"; + +const CHARGE_ASSET_TX_PAYMENT: &str = "ChargeAssetTxPayment"; // Pallets @@ -29,51 +63,75 @@ const SYSTEM: &str = "System"; const BALANCES: &str = "Balances"; const UTILITY: &str = "Utility"; const ASSETS: &str = "Assets"; +const BABE: &str = "Babe"; -// async fn fetch_best_block(methods: &LegacyRpcMethods) -> Result { -// methods -// .chain_get_block_hash(None) -// .await -// .context("failed to get the best block hash")? -// .context("received nothing after requesting the best block hash") -// } +// Runtime APIs -// async fn fetch_api_runtime( -// methods: &LegacyRpcMethods, -// backend: &impl Backend, -// ) -> Result<(Metadata, RuntimeVersion)> { -// let best_block = fetch_best_block(methods).await?; - -// Ok(( -// fetch_metadata(backend, best_block) -// .await -// .context("failed to fetch metadata")?, -// methods -// .state_get_runtime_version(Some(best_block)) -// .await -// .map(|runtime_version| RuntimeVersion { -// spec_version: runtime_version.spec_version, -// transaction_version: runtime_version.transaction_version, -// }) -// .context("failed to fetch the runtime version")?, -// )) -// } +const AURA: &str = "AuraApi"; -// async fn fetch_metadata(backend: &impl Backend, at: Hash) -> Result { -// const LATEST_SUPPORTED_METADATA_VERSION: u32 = 15; +type ConnectedChainsChannel = ( + Sender, ConnectedChain)>>, + (Arc, ConnectedChain), +); +type CurrenciesChannel = (Sender>, (String, Currency)); -// backend -// .metadata_at_version(LATEST_SUPPORTED_METADATA_VERSION, at) -// .or_else(|error| async { -// if let subxt::Error::Rpc(RpcError::ClientError(_)) | subxt::Error::Other(_) = error { -// backend.legacy_metadata(at).await -// } else { -// Err(error) -// } -// }) -// .await -// .map_err(Into::into) -// } +struct AssetsInfoFetcher<'a> { + assets: (AssetInfo, Vec), + storage: &'a Storage, + pallet_index: Option, +} + +async fn fetch_finalized_head_number_and_hash( + methods: &LegacyRpcMethods, +) -> Result<(BlockNumber, BlockHash)> { + let head_hash = methods + .chain_get_finalized_head() + .await + .context("failed to get the finalized head hash")?; + let head = methods + .chain_get_block(Some(head_hash)) + .await + .context("failed to get the finalized head")? + .context("received nothing after requesting the finalized head")?; + + Ok((head.block.header.number, head_hash)) +} + +async fn fetch_runtime( + methods: &LegacyRpcMethods, + backend: &impl Backend, + at: BlockHash, +) -> Result<(Metadata, RuntimeVersion)> { + Ok(( + fetch_metadata(backend, at) + .await + .context("failed to fetch metadata")?, + methods + .state_get_runtime_version(Some(at)) + .await + .map(|runtime_version| RuntimeVersion { + spec_version: runtime_version.spec_version, + transaction_version: runtime_version.transaction_version, + }) + .context("failed to fetch the runtime version")?, + )) +} + +async fn fetch_metadata(backend: &impl Backend, at: BlockHash) -> Result { + const LATEST_SUPPORTED_METADATA_VERSION: u32 = 15; + + backend + .metadata_at_version(LATEST_SUPPORTED_METADATA_VERSION, at) + .or_else(|error| async { + if let subxt::Error::Rpc(RpcError::ClientError(_)) | subxt::Error::Other(_) = error { + backend.legacy_metadata(at).await + } else { + Err(error) + } + }) + .await + .map_err(Into::into) +} fn fetch_constant( constants: &ConstantsClient, @@ -87,76 +145,202 @@ fn fetch_constant( } #[derive(Debug)] -pub struct ChainProperties { - pub address_format: Ss58AddressFormat, - pub existential_deposit: Balance, - pub assets: HashMap, - pub block_hash_count: BlockNumber, +struct ChainProperties { + address_format: Ss58AddressFormat, + existential_deposit: Option, + assets_pallet: Option, + block_hash_count: BlockNumber, + account_lifetime: BlockNumber, + depth: Option, +} + +#[derive(Debug)] +struct AssetsPallet { + multi_location: Option, + assets: HashMap, } #[derive(Debug)] -pub struct AssetProperties { - pub min_balance: Balance, - pub decimals: Decimals, +struct AssetProperties { + min_balance: Balance, + decimals: Decimals, +} + +impl AssetProperties { + async fn fetch(storage: &Storage, asset: AssetId) -> Result { + Ok(Self { + min_balance: check_sufficiency_and_fetch_min_balance(storage, asset).await?, + decimals: fetch_asset_decimals(storage, asset).await?, + }) + } } impl ChainProperties { async fn fetch( + chain: Arc, + currencies: UnboundedSender, constants: &ConstantsClient, - storage_finalized: Storage, - assets: Vec, - ) -> Result { + native_token_option: Option, + assets_fetcher: Option>, + account_lifetime: BlockNumber, + depth: Option, + ) -> Result<(Self, Option)> { const ADDRESS_PREFIX: (&str, &str) = (SYSTEM, "SS58Prefix"); const EXISTENTIAL_DEPOSIT: (&str, &str) = (BALANCES, "ExistentialDeposit"); const BLOCK_HASH_COUNT: (&str, &str) = (SYSTEM, "BlockHashCount"); - let ex_dep: u128 = fetch_constant(constants, EXISTENTIAL_DEPOSIT)?; - let mut assets_props = HashMap::new(); + let chain_clone = chain.clone(); + + let try_add_currency = |name, asset| async move { + let (tx, rx) = oneshot::channel(); + + currencies + .send((tx, (name, Currency { chain, asset }))) + .unwrap(); - for asset in assets { - assets_props.insert( - asset, - AssetProperties { - min_balance: fetch_min_balance(storage_finalized.clone(), asset).await?, - decimals: fetch_decimals(storage_finalized.clone(), asset).await?, + if let Some(( + name, + Currency { + chain: other_chain, .. }, - ); - } + )) = rx.await.unwrap() + { + Err(anyhow::anyhow!( + "chain {other_chain:?} already has the native token or an asset with the name {name:?}, all currency names must be unique" + )) + } else { + Ok(()) + } + }; - Ok(Self { - address_format: Ss58AddressFormat::custom(fetch_constant(constants, ADDRESS_PREFIX)?), - existential_deposit: Balance(ex_dep), - assets: assets_props, - block_hash_count: fetch_constant(constants, BLOCK_HASH_COUNT)?, + let assets_pallet = if let Some(AssetsInfoFetcher { + assets: (last_asset_info, assets_info), + storage, + pallet_index, + }) = assets_fetcher + { + async fn try_add_asset( + assets: &mut HashMap, + id: AssetId, + chain: Arc, + storage: &Storage, + ) -> Result<()> { + match assets.entry(id) { + Entry::Occupied(_) => Err(anyhow::anyhow!( + "chain {chain} has 2 assets with the same ID {id}", + )), + Entry::Vacant(entry) => { + entry.insert(AssetProperties::fetch(storage, id).await?); + + Ok(()) + } + } + } + + let mut assets = HashMap::with_capacity(assets_info.len().saturating_add(1)); + + for asset_info in assets_info { + try_add_currency.clone()(asset_info.name, Some(asset_info.id)).await?; + try_add_asset(&mut assets, asset_info.id, chain_clone.clone(), storage).await?; + } + + try_add_currency.clone()(last_asset_info.name, Some(last_asset_info.id)).await?; + try_add_asset( + &mut assets, + last_asset_info.id, + chain_clone.clone(), + storage, + ) + .await?; + + Some(AssetsPallet { + assets, + multi_location: pallet_index, + }) + } else { + None + }; + + let address_format = Ss58AddressFormat::custom(fetch_constant(constants, ADDRESS_PREFIX)?); + let block_hash_count = fetch_constant(constants, BLOCK_HASH_COUNT)?; + + Ok(if let Some(native_token) = native_token_option { + try_add_currency(native_token.native_token, None).await?; + + ( + Self { + address_format, + existential_deposit: Some( + fetch_constant(constants, EXISTENTIAL_DEPOSIT).map(Balance)?, + ), + assets_pallet, + block_hash_count, + account_lifetime, + depth, + }, + Some(native_token.decimals), + ) + } else { + ( + Self { + address_format, + existential_deposit: None, + assets_pallet, + block_hash_count, + account_lifetime, + depth, + }, + None, + ) }) } } -async fn fetch_min_balance( - storage_finalized: Storage, +async fn check_sufficiency_and_fetch_min_balance( + storage: &Storage, asset: AssetId, ) -> Result { const ASSET: &str = "Asset"; const MIN_BALANCE: &str = "min_balance"; + const IS_SUFFICIENT: &str = "is_sufficient"; - let asset_info = storage_finalized + let asset_info = storage .fetch(&dynamic::storage(ASSETS, ASSET, vec![asset.into()])) .await - .context("failed to fetch asset info from the chain")? - .context("received nothing after fetching asset info from the chain")? + .with_context(|| format!("failed to fetch asset {asset} info from a chain"))? + .with_context(|| { + format!("received nothing after fetching asset info {asset} from a chain") + })? .to_value() - .context("failed to decode account info")?; + .with_context(|| format!("failed to decode asset {asset} info"))?; + + let encoded_is_sufficient = asset_info + .at(IS_SUFFICIENT) + .with_context(|| format!("{IS_SUFFICIENT} field wasn't found in asset {asset} info"))?; + + if !encoded_is_sufficient.as_bool().with_context(|| { + format!( + "expected `bool` as the type of {IS_SUFFICIENT:?} in asset {asset} info, got `{:?}`", + encoded_is_sufficient.value + ) + })? { + anyhow::bail!("only sufficient assets are supported, asset {asset} isn't sufficient"); + } + let encoded_min_balance = asset_info .at(MIN_BALANCE) - .with_context(|| format!("{MIN_BALANCE} field wasn't found in asset info"))?; + .with_context(|| format!("{MIN_BALANCE} field wasn't found in asset {asset} info"))?; encoded_min_balance.as_u128().map(Balance).with_context(|| { - format!("expected `u128` as the type of the min balance, got {encoded_min_balance}") + format!( + "expected `u128` as the type of {MIN_BALANCE:?} in asset {asset} info, got `{:?}`", + encoded_min_balance.value + ) }) } -async fn fetch_decimals( - storage: Storage, +async fn fetch_asset_decimals( + storage: &Storage, asset: AssetId, ) -> Result { const METADATA: &str = "Metadata"; @@ -165,1165 +349,827 @@ async fn fetch_decimals( let asset_metadata = storage .fetch(&dynamic::storage(ASSETS, METADATA, vec![asset.into()])) .await - .context("failed to fetch asset info from the chain")? - .context("received nothing after fetching asset info from the chain")? + .with_context(|| format!("failed to fetch asset {asset} metadata from a chain"))? + .with_context(|| { + format!("received nothing after fetching asset {asset} metadata from a chain") + })? .to_value() - .context("failed to decode account info")?; + .with_context(|| format!("failed to decode asset {asset} metadata"))?; let encoded_decimals = asset_metadata .at(DECIMALS) - .with_context(|| format!("{DECIMALS} field wasn't found in asset info"))?; + .with_context(|| format!("{DECIMALS} field wasn't found in asset {asset} metadata"))?; - encoded_decimals - .as_u128() - .map(|num| num.try_into().expect("must be less than u64")) - .with_context(|| { - format!("expected `u128` as the type of the min balance, got {encoded_decimals}") - }) + let decimals = encoded_decimals.as_u128().with_context(|| { + format!( + "expected `u128` as the type of asset {asset} {DECIMALS:?}, got `{:?}`", + encoded_decimals.value + ) + })?; + + decimals.try_into().with_context(|| { + format!("asset {asset} {DECIMALS:?} must be less than `u8`, got {decimals}") + }) } -// impl ChainProperties { -// fn fetch_only_constants( -// constants: &ConstantsClient, -// decimals: Decimals, -// ) -> Result { -// const ADDRESS_PREFIX: (&str, &str) = (SYSTEM, "SS58Prefix"); -// const EXISTENTIAL_DEPOSIT: (&str, &str) = (BALANCES, "ExistentialDeposit"); -// const BLOCK_HASH_COUNT: (&str, &str) = (SYSTEM, "BlockHashCount"); - -// Ok(Self { -// address_format: Ss58AddressFormat::custom(fetch_constant(constants, ADDRESS_PREFIX)?), -// existential_deposit: fetch_constant(constants, EXISTENTIAL_DEPOSIT)?, -// block_hash_count: fetch_constant(constants, BLOCK_HASH_COUNT)?, -// decimals, -// }) -// } +pub async fn prepare( + chains: Vec, + account_lifetime: Timestamp, + depth: Option, +) -> Result<(HashMap, ConnectedChain>, HashMap)> { + let mut connected_chains = HashMap::with_capacity(chains.len()); + let mut currencies = HashMap::with_capacity( + chains + .iter() + .map(|chain| { + chain + .asset + .as_ref() + .map(Vec::len) + .unwrap_or_default() + .saturating_add(chain.native_token.is_some().into()) + }) + .sum(), + ); + + let (connected_chains_tx, mut connected_chains_rx) = + mpsc::unbounded_channel::(); + let (currencies_tx, mut currencies_rx) = mpsc::unbounded_channel::(); + + let connected_chains_jh = tokio::spawn(async move { + while let Some((tx, (name, chain))) = connected_chains_rx.recv().await { + tx.send(match connected_chains.entry(name) { + Entry::Occupied(entry) => Some(entry.remove_entry()), + Entry::Vacant(entry) => { + tracing::info!("Prepared the {:?} chain:\n{:#?}", entry.key(), chain); + + entry.insert(chain); + + None + } + }) + .unwrap(); + } -// async fn fetch( -// constants: &ConstantsClient, -// methods: &LegacyRpcMethods, -// ) -> Result { -// const DECIMALS_KEY: &str = "tokenDecimals"; - -// let system_properties = methods -// .system_properties() -// .await -// .context("failed to get the chain system properties")?; -// let encoded_decimals = system_properties -// .get(DECIMALS_KEY) -// .with_context(|| format!( -// "{DECIMALS_KEY:?} wasn't found in a response of the `system_properties` RPC call, set `{DECIMALS}` to set the decimal places number manually" -// ))?; -// let decimals = encoded_decimals -// .as_u64() -// .with_context(|| format!( -// "failed to decode the decimal places number, expected a positive integer, got \"{encoded_decimals}\"" -// ))?; - -// Self::fetch_only_constants(constants, decimals) -// } -// } + connected_chains + }); + + let currencies_jh = tokio::spawn(async move { + while let Some((tx, (name, currency))) = currencies_rx.recv().await { + tx.send(match currencies.entry(name) { + Entry::Occupied(entry) => Some(entry.remove_entry()), + Entry::Vacant(entry) => { + tracing::info!( + %currency.chain, ?currency.asset, + "Registered the currency {:?}.", + entry.key(), + ); + + entry.insert(currency); + + None + } + }) + .unwrap(); + } -// pub struct ApiConfig { -// api: Arc, -// methods: Arc>, -// backend: Arc>, -// } + currencies + }); -// pub struct EndpointProperties { -// pub url: CheckedUrl, -// pub chain: Arc>, -// } + let (task_tracker, error_rx) = TaskTracker::new(); -// pub struct CheckedUrl(String); + for chain in chains { + task_tracker.spawn( + format!("the {:?} chain preparator", chain.name), + prepare_chain( + chain, + connected_chains_tx.clone(), + currencies_tx.clone(), + account_lifetime, + depth, + ), + ); + } -// impl CheckedUrl { -// pub fn get(self) -> String { -// self.0 -// } -// } + drop((connected_chains_tx, currencies_tx)); -pub async fn prepare(chains: Vec) -> Result> { - let mut connected_chains = HashMap::with_capacity(chains.len()); + task_tracker.try_wait(error_rx).await?; - for chain in chains { - let endpoint = chain.endpoints.first().with_context(|| { - format!( - "{:?} chain doesn't have any `endpoints` in the config", - chain.name - ) - })?; - let rpc_client = RpcClient::new( - Client::builder() - .build(endpoint.into()) + Ok((connected_chains_jh.await?, currencies_jh.await?)) +} + +#[tracing::instrument(skip_all, fields(chain = chain.name))] +async fn prepare_chain( + chain: Chain, + connected_chains: UnboundedSender, + currencies: UnboundedSender, + account_lifetime: Timestamp, + depth_option: Option, +) -> Result> { + let chain_name: Arc = chain.name.into(); + let chain_name_clone = chain_name.clone(); + let endpoint = chain + .endpoints + .first() + .context("chain doesn't have any `endpoints` in the config")?; + let rpc_client = RpcClient::new( + Client::builder() + .build(endpoint.into()) + .await + .context("failed to construct the RPC client")?, + ); + + let methods = LegacyRpcMethods::new(rpc_client.clone()); + let backend = Arc::new(LegacyBackend::builder().build(rpc_client.clone())); + + let genesis = methods + .genesis_hash() + .await + .context("failed to fetch the genesis hash")?; + let (finalized_number, finalized_hash) = fetch_finalized_head_number_and_hash(&methods).await?; + let (metadata, runtime_version) = fetch_runtime(&methods, &*backend, finalized_hash).await?; + + let client = OnlineClient::from_backend_with( + genesis, + runtime_version, + metadata.clone(), + backend.clone(), + ) + .context("failed to construct the API client")?; + let constants = client.constants(); + + let (block_time, runtime_api) = if metadata.pallet_by_name(BABE).is_some() { + const EXPECTED_BLOCK_TIME: (&str, &str) = (BABE, "ExpectedBlockTime"); + + (fetch_constant(&constants, EXPECTED_BLOCK_TIME)?, None) + } else { + const SLOT_DURATION: &str = "slot_duration"; + + let runtime_api = client.runtime_api(); + + ( + runtime_api + .at(finalized_hash) + .call(dynamic::runtime_api_call( + AURA, + SLOT_DURATION, + Vec::::new(), + )) .await - .with_context(|| { - format!( - "failed to construct the RPC client for the {:?} chain", - chain.name - ) - })?, - ); + .context("failed to fetch Aura's slot duration")? + .as_type() + .context("failed to decode Aura's slot duration")?, + Some(runtime_api), + ) + }; - log::info!( - "Connected to an RPC server for the {:?} chain at {endpoint:?}.", - chain.name - ); + let block_time_non_zero = + NonZeroU64::new(block_time).context("block interval can't equal 0")?; - let methods = LegacyRpcMethods::new(rpc_client.clone()); - let client = OnlineClient::from_rpc_client(rpc_client) - .await + let account_lifetime_in_blocks = account_lifetime / block_time_non_zero; + + if account_lifetime_in_blocks == 0 { + anyhow::bail!("block interval is longer than the given `account-lifetime`"); + } + + let depth_in_blocks = if let Some(depth) = depth_option { + let depth_in_blocks = depth / block_time_non_zero; + + if depth_in_blocks > account_lifetime_in_blocks { + anyhow::bail!("`depth` can't be greater than `account-lifetime`"); + } + + Some( + NonZeroU64::new(depth_in_blocks) + .context("block interval is longer than the given `depth`")?, + ) + } else { + None + }; + + let rpc = endpoint.into(); + + let connected_chain = if let Some(assets) = chain + .asset + .and_then(|mut assets| assets.pop().map(|latest| (latest, assets))) + { + const ASSET_ID: &str = "asset_id"; + const SOME: &str = "Some"; + + let storage_client = client.storage(); + let storage = storage_client.at(finalized_hash); + + let extension = metadata + .extrinsic() + .signed_extensions() + .iter() + .find(|extension| extension.identifier() == CHARGE_ASSET_TX_PAYMENT) + .with_context(|| { + format!("failed to find the {CHARGE_ASSET_TX_PAYMENT:?} extension in metadata") + })? + .extra_ty(); + let types = metadata.types(); + + let TypeDef::Composite(ref extension_type) = types + .resolve(extension) + .with_context(|| { + format!("failed to resolve the type of the {CHARGE_ASSET_TX_PAYMENT:?} extension") + })? + .type_def + else { + anyhow::bail!("{CHARGE_ASSET_TX_PAYMENT:?} extension has an unexpected type"); + }; + + let asset_id_field = extension_type + .fields + .iter() + .find_map(|field| { + field + .name + .as_ref() + .and_then(|name| (name == ASSET_ID).then_some(field.ty.id)) + }) .with_context(|| { format!( - "failed to construct the API client for the {:?} chain", - chain.name - ) + "failed to find the field {ASSET_ID:?} in the {CHARGE_ASSET_TX_PAYMENT:?} extension" + ) })?; - let connected_chain = ConnectedChain { - methods, - genesis: client.genesis_hash(), - assets: client - .metadata() - .pallet_by_name(ASSETS) - .map(|pallet| pallet.index()), - properties: ChainProperties::fetch( - &client.constants(), - client.storage().at_latest().await?, - chain - .asset - .map(|assets| assets.into_iter().map(|asset| asset.id).collect()) - .unwrap_or_default(), + let TypeDef::Variant(ref option) = types.resolve(asset_id_field).with_context(|| { + format!( + "failed to resolve the type of the field {ASSET_ID:?} in the {CHARGE_ASSET_TX_PAYMENT:?} extension" ) - .await?, - client, + })?.type_def else { + anyhow::bail!( + "field {ASSET_ID:?} in the {CHARGE_ASSET_TX_PAYMENT:?} extension has an unexpected type" + ); }; - log::debug!("\n{connected_chain:#?}"); + let asset_id_some = option.variants.iter().find_map(|variant| { + if variant.name == SOME { + variant.fields.first().map(|field| { + if variant.fields.len() > 1 { + tracing::warn!( + ?variant.fields, + "The field {ASSET_ID:?} in the {CHARGE_ASSET_TX_PAYMENT:?} extension contains multiple inner fields instead of just 1." + ); + } + + field.ty.id + }) + } else { + None + } + }).with_context(|| format!( + "field {ASSET_ID:?} in the {CHARGE_ASSET_TX_PAYMENT:?} extension doesn't contain the {SOME:?} variant" + ))?; + + let asset_id = &types.resolve(asset_id_some).with_context(|| { + format!( + "failed to resolve the type of the {SOME:?} variant of the field {ASSET_ID:?} in the {CHARGE_ASSET_TX_PAYMENT:?} extension" + ) + })?.type_def; - if connected_chains - .insert(chain.name.clone(), connected_chain) - .is_some() - { - anyhow::bail!( - "found `[chain]`s in the config with the same name ({:?}), all chain names must be unique", - chain.name - ); + let pallet_index = if let TypeDef::Primitive(_) = asset_id { + None + } else { + Some(metadata.pallet_by_name_err(ASSETS)?.index()) + }; + + let (properties, decimals) = ChainProperties::fetch( + chain_name, + currencies, + &constants, + chain.native_token, + Some(AssetsInfoFetcher { + assets, + storage: &storage, + pallet_index, + }), + account_lifetime_in_blocks, + depth_in_blocks, + ) + .await?; + + ConnectedChain { + methods, + genesis, + rpc, + client, + storage: Some(storage_client), + properties, + constants, + decimals, + runtime_api, + backend, } + } else { + let (properties, decimals) = ChainProperties::fetch( + chain_name, + currencies, + &constants, + chain.native_token, + None, + account_lifetime_in_blocks, + depth_in_blocks, + ) + .await?; + + ConnectedChain { + methods, + genesis, + rpc, + client, + storage: None, + properties, + constants, + decimals, + runtime_api, + backend, + } + }; + + let (tx, rx) = oneshot::channel(); + + connected_chains + .send((tx, (chain_name_clone, connected_chain))) + .unwrap(); + + if let Some((name, _)) = rx.await.unwrap() { + anyhow::bail!( + "found `[chain]`s with the same name ({name:?}) in the config, all chain names must be unique", + ); } - Ok(connected_chains) + Ok("".into()) +} + +#[derive(Debug)] +pub struct Currency { + chain: Arc, + asset: Option, } pub struct ConnectedChain { + rpc: String, methods: LegacyRpcMethods, + backend: Arc>, client: OnlineClient, genesis: BlockHash, - assets: Option, properties: ChainProperties, + constants: ConstantsClient, + storage: Option>, + runtime_api: Option>, + decimals: Option, } impl Debug for ConnectedChain { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { f.debug_struct(stringify!(ConnectedChain)) + .field("rpc", &self.rpc) .field("genesis", &self.genesis) - .field("assets", &self.assets) .field("properties", &self.properties) + .field("decimals", &self.decimals) .finish_non_exhaustive() } } -// pub async fn prepare( -// chains: Vec, -// shutdown_notification: CancellationToken, -// ) -> Result<(ApiConfig, EndpointProperties, Updater)> { -// // TODO: -// // The current reconnecting client implementation automatically restores all subscriptions, -// // including unrecoverable ones, losing all notifications! For now, it shouldn't affect the -// // daemon, but may in the future, so we should consider creating our own implementation. -// let rpc = RpcClient::new( -// ClientBuilder::new() -// .build(url.clone()) -// .await -// .context("failed to construct the RPC client")?, -// ); - -// log::info!("Connected to an RPC server at \"{url}\"."); - -// let methods = Arc::new(LegacyRpcMethods::new(rpc.clone())); -// let backend = Arc::new(LegacyBackend::new(rpc)); - -// let (metadata, runtime_version) = fetch_api_runtime(&methods, &*backend) -// .await -// .context("failed to fetch the runtime of the API client")?; -// let genesis_hash = methods -// .genesis_hash() -// .await -// .context("failed to get the genesis hash")?; -// let api = Arc::new( -// OnlineClient::from_backend_with(genesis_hash, runtime_version, metadata, backend.clone()) -// .context("failed to construct the API client")?, -// ); -// let constants = api.constants(); - -// let (properties_result, decimals_set) = if let Some(decimals) = decimals_option { -// ( -// ChainProperties::fetch_only_constants(&constants, decimals), -// true, -// ) -// } else { -// (ChainProperties::fetch(&constants, &methods).await, false) -// }; -// let properties = properties_result?; - -// log::info!( -// "Chain properties:\n\ -// Decimal places number: {}.\n\ -// Address format: \"{}\" ({}).\n\ -// Existential deposit: {}.\n\ -// Block hash count: {}.", -// properties.decimals, -// properties.address_format, -// properties.address_format.prefix(), -// properties.existential_deposit, -// properties.block_hash_count -// ); - -// let arc_properties = Arc::new(RwLock::const_new(properties)); - -// Ok(( -// ApiConfig { -// api: api.clone(), -// methods: methods.clone(), -// backend: backend.clone(), -// }, -// EndpointProperties { -// url: CheckedUrl(url), -// chain: arc_properties.clone(), -// }, -// Updater { -// methods, -// backend, -// api, -// constants, -// shutdown_notification, -// properties: arc_properties, -// decimals_set, -// }, -// )) -// } - -// pub struct Updater { -// methods: Arc>, -// backend: Arc>, -// api: Arc, -// constants: ConstantsClient, -// shutdown_notification: CancellationToken, -// properties: Arc>, -// decimals_set: bool, -// } - -// impl Updater { -// pub async fn ignite(self) -> Result<&'static str> { -// loop { -// let mut updates = self -// .backend -// .stream_runtime_version() -// .await -// .context("failed to get the runtime updates stream")?; - -// if let Some(current_runtime_version_result) = updates.next().await { -// let current_runtime_version = current_runtime_version_result -// .context("failed to decode the current runtime version")?; - -// // The updates stream is always returns the current runtime version in the first -// // item. We don't skip it though because during a connection loss the runtime can be -// // updated, hence this condition will catch this. -// if self.api.runtime_version() != current_runtime_version { -// self.process_update() -// .await -// .context("failed to process the first API client update")?; -// } - -// loop { -// tokio::select! { -// biased; -// () = self.shutdown_notification.cancelled() => { -// return Ok("The API client updater is shut down."); -// } -// runtime_version = updates.next() => { -// if runtime_version.is_some() { -// self.process_update() -// .await -// .context( -// "failed to process an update for the API client" -// )?; -// } else { -// break; -// } -// } -// } -// } -// } - -// log::warn!( -// "Lost the connection while listening the endpoint for API client runtime updates. Retrying..." -// ); -// } -// } - -// async fn process_update(&self) -> Result<()> { -// // We don't use the runtime version from the updates stream because it doesn't provide the -// // best block hash, so we fetch it ourselves (in `fetch_api_runtime`) and use it to make sure -// // that metadata & the runtime version are from the same block. -// let (metadata, runtime_version) = fetch_api_runtime(&self.methods, &*self.backend) -// .await -// .context("failed to fetch a new runtime for the API client")?; - -// self.api.set_metadata(metadata); -// self.api.set_runtime_version(runtime_version); - -// let (mut current_properties, new_properties_result) = if self.decimals_set { -// let current_properties = self.properties.write().await; -// let new_properties_result = -// ChainProperties::fetch_only_constants(&self.constants, current_properties.decimals); - -// (current_properties, new_properties_result) -// } else { -// ( -// self.properties.write().await, -// ChainProperties::fetch(&self.constants, &self.methods).await, -// ) -// }; -// let new_properties = new_properties_result?; - -// let mut changed = String::new(); -// let mut add_change = |message: Arguments<'_>| { -// changed.write_fmt(message).unwrap(); -// }; - -// if new_properties.address_format != current_properties.address_format { -// add_change(format_args!( -// "\nOld {value}: \"{}\" ({}). New {value}: \"{}\" ({}).", -// current_properties.address_format, -// current_properties.address_format.prefix(), -// new_properties.address_format, -// new_properties.address_format.prefix(), -// value = "address format", -// )); -// } - -// if new_properties.existential_deposit != current_properties.existential_deposit { -// add_change(format_args!( -// "\nOld {value}: {}. New {value}: {}.", -// current_properties.existential_deposit, -// new_properties.existential_deposit, -// value = "existential deposit" -// )); -// } - -// if new_properties.decimals != current_properties.decimals { -// add_change(format_args!( -// "\nOld {value}: {}. New {value}: {}.", -// current_properties.decimals, -// new_properties.decimals, -// value = "decimal places number" -// )); -// } - -// if new_properties.block_hash_count != current_properties.block_hash_count { -// add_change(format_args!( -// "\nOld {value}: {}. New {value}: {}.", -// current_properties.block_hash_count, -// new_properties.block_hash_count, -// value = "block hash count" -// )); -// } - -// if !changed.is_empty() { -// *current_properties = new_properties; - -// log::warn!("The chain properties has been changed:{changed}"); -// } - -// log::info!("A runtime update has been found and applied for the API client."); - -// Ok(()) -// } -// } - -// #[derive(Debug)] -// struct Shutdown; - -// impl Error for Shutdown {} - -// // Not used, but required for the `anyhow::Context` trait. -// impl Display for Shutdown { -// fn fmt(&self, _: &mut Formatter<'_>) -> fmt::Result { -// unimplemented!() -// } -// } - -// struct Api { -// tx: TxClient, -// blocks: BlocksClient, -// } - -// struct Scanner { -// client: OnlineClient, -// blocks: BlocksClient, -// storage: StorageClient, -// } - -// struct ProcessorFinalized { -// database: Arc, -// client: OnlineClient, -// backend: Arc>, -// methods: Arc>, -// shutdown_notification: CancellationToken, -// } - -// impl ProcessorFinalized { -// async fn finalized_head_number_and_hash(&self) -> Result<(BlockNumber, Hash)> { -// let head_hash = self -// .methods -// .chain_get_finalized_head() -// .await -// .context("failed to get the finalized head hash")?; -// let head = self -// .methods -// .chain_get_block(Some(head_hash)) -// .await -// .context("failed to get the finalized head")? -// .context("received nothing after requesting the finalized head")?; - -// Ok((head.block.header.number, head_hash)) -// } - -// pub async fn ignite(self) -> Result<&'static str> { -// self.execute().await.or_else(|error| { -// error -// .downcast() -// .map(|Shutdown| "The RPC module is shut down.") -// }) -// } - -// async fn execute(mut self) -> Result<&'static str> { -// let write_tx = self.database.write()?; -// let mut write_invoices = write_tx.invoices()?; -// let (mut finalized_number, finalized_hash) = self.finalized_head_number_and_hash().await?; - -// self.set_client_metadata(finalized_hash).await?; - -// // TODO: -// // Design a new DB format to store unpaid accounts in a separate table. - -// for invoice_result in self.database.read()?.invoices()?.try_iter()? { -// let invoice = invoice_result?; - -// match invoice.1.value().status { -// InvoiceStatus::Unpaid(price) => { -// if self -// .balance(finalized_hash, &Account::from(*invoice.0.value())) -// .await? -// >= price -// { -// let mut changed_invoice = invoice.1.value(); - -// changed_invoice.status = InvoiceStatus::Paid(price); +#[derive(Debug)] +struct Shutdown; -// log::debug!("background scan {changed_invoice:?}"); +impl Error for Shutdown {} -// write_invoices -// .save(&Account::from(*invoice.0.value()), &changed_invoice)?; -// } -// } -// InvoiceStatus::Paid(_) => continue, -// } -// } - -// drop(write_invoices); +// Not used, but required for the `anyhow::Context` trait. +impl Display for Shutdown { + fn fmt(&self, _: &mut Formatter<'_>) -> fmt::Result { + unimplemented!() + } +} -// write_tx.commit()?; +pub struct Processor { + state: Arc, + backend: Arc>, + shutdown_notification: CancellationToken, + methods: LegacyRpcMethods, + client: OnlineClient, + storage: StorageClient, +} -// let mut subscription = self.finalized_heads().await?; +impl Processor { + pub async fn ignite(state: Arc, notif: CancellationToken) -> Result> { + let client = Client::builder().build(state.rpc.clone()).await.unwrap(); + let rpc_c = RpcClient::new(client); + let methods = LegacyRpcMethods::new(rpc_c.clone()); + let backend = Arc::new(LegacyBackend::builder().build(rpc_c)); + let onl = OnlineClient::from_backend(backend.clone()).await.unwrap(); + let st = onl.storage(); + + Processor { + state, + backend, + shutdown_notification: notif, + methods, + client: onl, + storage: st, + } + .execute() + .await + .or_else(|error| { + error + .downcast() + .map(|Shutdown| "The RPC module is shut down.".into()) + }) + } -// loop { -// self.process_finalized_heads(subscription, &mut finalized_number) -// .await?; + async fn execute(mut self) -> Result> { + let (head_number, head_hash) = self + .finalized_head_number_and_hash() + .await + .context("failed to get the chain head")?; -// log::warn!("Lost the connection while processing finalized heads. Retrying..."); + let mut next_unscanned_number; + let mut subscription; -// subscription = self -// .finalized_heads() -// .await -// .context("failed to update the subscription while processing finalized heads")?; -// } -// } + next_unscanned_number = head_number.checked_add(1).context(MAX_BLOCK_NUMBER_ERROR)?; + subscription = self.finalized_heads().await?; -// async fn process_skipped( -// &self, -// next_unscanned: &mut BlockNumber, -// head: BlockNumber, -// ) -> Result<()> { -// for skipped_number in *next_unscanned..head { -// if self.shutdown_notification.is_cancelled() { -// return Err(Shutdown.into()); -// } + loop { + self.process_finalized_heads(subscription, &mut next_unscanned_number) + .await?; -// let skipped_hash = self -// .methods -// .chain_get_block_hash(Some(skipped_number.into())) -// .await -// .context("failed to get the hash of a skipped block")? -// .context("received nothing after requesting the hash of a skipped block")?; + tracing::warn!("Lost the connection while processing finalized heads. Retrying..."); -// self.process_block(skipped_number, skipped_hash).await?; -// } + subscription = self + .finalized_heads() + .await + .context("failed to update the subscription while processing finalized heads")?; + } + } -// *next_unscanned = head; + async fn finalized_head_number_and_hash(&self) -> Result<(BlockNumber, BlockHash)> { + let head_hash = self + .methods + .chain_get_finalized_head() + .await + .context("failed to get the finalized head hash")?; + let head = self + .methods + .chain_get_block(Some(head_hash)) + .await + .context("failed to get the finalized head")? + .context("received nothing after requesting the finalized head")?; -// Ok(()) -// } + Ok((head.block.header.number, head_hash)) + } -// async fn process_finalized_heads( -// &mut self, -// mut subscription: RpcSubscription<::Header>, -// next_unscanned: &mut BlockNumber, -// ) -> Result<()> { -// loop { -// tokio::select! { -// biased; -// () = self.shutdown_notification.cancelled() => { -// return Err(Shutdown.into()); -// } -// head_result_option = subscription.next() => { -// if let Some(head_result) = head_result_option { -// let head = head_result.context( -// "received an error from the RPC client while processing finalized heads" -// )?; - -// self -// .process_skipped(next_unscanned, head.number) -// .await -// .context("failed to process a skipped gap in the listening mode")?; -// self.process_block(head.number, head.hash()).await?; - -// *next_unscanned = head.number -// .checked_add(1) -// .context(MAX_BLOCK_NUMBER_ERROR)?; -// } else { -// break; -// } -// } -// } -// } + async fn finalized_heads(&self) -> Result::Header>> { + self.methods + .chain_subscribe_finalized_heads() + .await + .context("failed to subscribe to finalized heads") + } -// Ok(()) -// } + async fn process_skipped( + &self, + next_unscanned: &mut BlockNumber, + head: BlockNumber, + ) -> Result<()> { + for skipped_number in *next_unscanned..head { + if self.shutdown_notification.is_cancelled() { + return Err(Shutdown.into()); + } + + let skipped_hash = self + .methods + .chain_get_block_hash(Some(skipped_number.into())) + .await + .context("failed to get the hash of a skipped block")? + .context("received nothing after requesting the hash of a skipped block")?; -// async fn finalized_heads(&self) -> Result::Header>> { -// self.methods -// .chain_subscribe_finalized_heads() -// .await -// .context("failed to subscribe to finalized heads") -// } + self.process_block(skipped_number, skipped_hash).await?; + } -// async fn process_block(&self, number: BlockNumber, hash: Hash) -> Result<()> { -// log::debug!("background block {number}"); - -// let block = self -// .client -// .blocks() -// .at(hash) -// .await -// .context("failed to obtain a block for processing")?; -// let events = block -// .events() -// .await -// .context("failed to obtain block events")?; - -// let read_tx = self.database.read()?; -// let read_invoices = read_tx.invoices()?; - -// let mut update = false; -// let mut invoices_changes = HashMap::new(); - -// for event_result in events.iter() { -// const UPDATE: &str = "CodeUpdated"; -// const TRANSFER: &str = "Transfer"; - -// let event = event_result.context("failed to decode an event")?; -// let metadata = event.event_metadata(); - -// match (metadata.pallet.name(), &*metadata.variant.name) { -// (SYSTEM, UPDATE) => update = true, -// (BALANCES, TRANSFER) => Transfer::deserialize( -// event -// .field_values() -// .context("failed to decode event's fields")?, -// ) -// .context("failed to deserialize a transfer event")? -// .process(&mut invoices_changes, &read_invoices)?, -// _ => {} -// } -// } + *next_unscanned = head; -// let write_tx = self.database.write()?; -// let mut write_invoices = write_tx.invoices()?; + Ok(()) + } -// for (invoice, mut changes) in invoices_changes { -// if let InvoiceStatus::Unpaid(price) = changes.invoice.status { -// let balance = self.balance(hash, &invoice).await?; + async fn process_finalized_heads( + &mut self, + mut subscription: RpcSubscription<::Header>, + next_unscanned: &mut BlockNumber, + ) -> Result<()> { + loop { + tokio::select! { + biased; + () = self.shutdown_notification.cancelled() => { + return Err(Shutdown.into()); + } + head_result_option = subscription.next() => { + if let Some(head_result) = head_result_option { + let head = head_result.context( + "received an error from the RPC client while processing finalized heads" + )?; + + self + .process_skipped(next_unscanned, head.number) + .await + .context("failed to process a skipped gap in the listening mode")?; + self.process_block(head.number, head.hash()).await?; + + *next_unscanned = head.number + .checked_add(1) + .context(MAX_BLOCK_NUMBER_ERROR)?; + } else { + break; + } + } + } + } -// if balance >= price { -// changes.invoice.status = InvoiceStatus::Paid(price); + Ok(()) + } -// write_invoices.save(&invoice, &changes.invoice)?; -// } -// } -// } + async fn process_block(&self, number: BlockNumber, hash: BlockHash) -> Result<()> { + tracing::debug!("Processing the block: {number}."); -// drop(write_invoices); + let block = self + .client + .blocks() + .at(hash) + .await + .context("failed to obtain a block for processing")?; + let events = block + .events() + .await + .context("failed to obtain block events")?; + + let invoices = &mut *self.state.invoices.write().await; + + // let mut update = false; + // let mut invoices_changes = HashMap::new(); + + for event_result in events.iter() { + const UPDATE: &str = "CodeUpdated"; + const TRANSFERRED: &str = "Transferred"; + let event = event_result.context("failed to decode an event")?; + let metadata = event.event_metadata(); + + #[allow(clippy::single_match)] + match (metadata.pallet.name(), &*metadata.variant.name) { + // (SYSTEM, UPDATE) => update = true, + (ASSETS, TRANSFERRED) => { + let tr = Transferred::deserialize( + event + .field_values() + .context("failed to decode event's fields")?, + ) + .context("failed to deserialize a transfer event")?; + + tracing::info!("{tr:?}"); + + #[allow(clippy::unnecessary_find_map)] + if let Some(invoic) = invoices.iter().find_map(|invoic| { + tracing::info!("{tr:?} {invoic:?}"); + tracing::info!("{}", tr.to == invoic.1.paym_acc); + tracing::info!("{}", *invoic.1.amount >= tr.amount); + + if tr.to == invoic.1.paym_acc && *invoic.1.amount <= tr.amount { + Some(invoic) + } else { + None + } + }) { + tracing::info!("{invoic:?}"); + + if !invoic.1.callback.is_empty() { + tracing::info!("{:?}", invoic.1.callback); + let req = ureq::post(&invoic.1.callback); + + let d = req + .send_json(OrderStatus { + order: invoic.0.clone(), + payment_status: PaymentStatus::Paid, + message: "".into(), + recipient: self.state.recipient.to_ss58check(), + server_info: ServerInfo { + version: env!("CARGO_PKG_VERSION"), + instance_id: String::new(), + debug: self.state.debug, + kalatori_remark: self.state.remark.clone(), + }, + order_info: Some(OrderInfo { + withdrawal_status: WithdrawalStatus::Waiting, + amount: invoic.1.amount.format(6), + currency: CurrencyInfo { + currency: "USDC".into(), + chain_name: "assethub-polkadot".into(), + kind: TokenKind::Assets, + decimals: 6, + rpc_url: self.state.rpc.clone(), + asset_id: Some(1337), + }, + callback: invoic.1.callback.clone(), + transactions: vec![], + payment_account: invoic.1.paym_acc.to_ss58check(), + }), + }) + .unwrap(); + } + + invoices.insert( + invoic.0.clone(), + Invoicee { + callback: invoic.1.callback.clone(), + amount: Balance(*invoic.1.amount), + paid: true, + paym_acc: invoic.1.paym_acc.clone(), + }, + ); + } + } + _ => {} + } + } -// write_tx.commit()?; + // for (invoice, changes) in invoices_changes { + // let price = match changes.invoice.status { + // InvoiceStatus::Unpaid(price) | InvoiceStatus::Paid(price) => price, + // }; -// if update { -// self.set_client_metadata(hash) -// .await -// .context("failed to update metadata in the finalized client")?; + // self.process_unpaid(&block, changes, hash, invoice, price) + // .await + // .context("failed to process an unpaid invoice")?; + // } -// log::info!("A metadata update has been found and applied for the finalized client."); -// } + Ok(()) + } -// Ok(()) -// } + async fn balance(&self, hash: BlockHash, account: &AccountId) -> Result { + const ACCOUNT: &str = "Account"; + const BALANCE: &str = "balance"; + + if let Some(account_info) = self + .storage + .at(hash) + .fetch(&dynamic::storage( + ASSETS, + ACCOUNT, + vec![ + Value::from(1337u32), + Value::from_bytes(AsRef::<[u8; 32]>::as_ref(account)), + ], + )) + .await + .context("failed to fetch account info from the chain")? + { + let decoded_account_info = account_info + .to_value() + .context("failed to decode account info")?; + let encoded_balance = decoded_account_info + .at(BALANCE) + .with_context(|| format!("{BALANCE} field wasn't found in account info"))?; + + encoded_balance.as_u128().map(Balance).with_context(|| { + format!("expected `u128` as the type of a balance, got {encoded_balance}") + }) + } else { + Ok(Balance(0)) + } + } -// async fn set_client_metadata(&self, at: Hash) -> Result<()> { -// let metadata = fetch_metadata(&*self.backend, at) -// .await -// .context("failed to fetch metadata for the scanner client")?; + async fn batch_transfer( + &self, + nonce: Nonce, + block_hash_count: BlockNumber, + signer: &PairSigner, + transfers: Vec, + ) -> Result> { + const FORCE_BATCH: &str = "force_batch"; + + let call = dynamic::tx(UTILITY, FORCE_BATCH, vec![Value::from(transfers)]); + let (number, hash) = self + .finalized_head_number_and_hash() + .await + .context("failed to get the chain head while constructing a transaction")?; + let extensions = DefaultExtrinsicParamsBuilder::new() + .mortal_unchecked(number.into(), hash, block_hash_count.into()) + .tip_of(0, Asset::Id(1337)); + + self.client + .tx() + .create_signed(&call, signer, extensions.build()) + .await + .context("failed to create a transfer transaction") + } -// self.client.set_metadata(metadata); + // async fn current_nonce(&self, account: &AccountId) -> Result { + // self.api + // .blocks + // .at(self.finalized_head_number_and_hash().await?.0) + // .await + // .context("failed to obtain the best block for fetching an account nonce")? + // .account_nonce(account) + // .await + // .context("failed to fetch an account nonce by the API client") + // } + + // async fn process_unpaid( + // &self, + // block: &Block, + // mut changes: InvoiceChanges, + // hash: BlockHash, + // invoice: AccountId, + // price: Balance, + // ) -> Result<()> { + // let balance = self.balance(hash, &invoice).await?; + + // if let Some(_remaining) = balance.checked_sub(*price) { + // changes.invoice.status = InvoiceStatus::Paid(price); + + // let block_nonce = block + // .account_nonce(&invoice) + // .await + // .context(BLOCK_NONCE_ERROR)?; + // let current_nonce = self.current_nonce(&invoice).await?; + + // if current_nonce <= block_nonce { + // let properties = self.database.properties().await; + // let block_hash_count = properties.block_hash_count; + // let signer = changes.invoice.signer(self.database.pair())?; + + // let transfers = vec![construct_transfer( + // &changes.invoice.recipient, + // price - EXPECTED_USDX_FEE, + // self.database.properties().await.usd_asset, + // )]; + // let tx = self + // .batch_transfer(current_nonce, block_hash_count, &signer, transfers.clone()) + // .await?; + + // self.methods + // .author_submit_extrinsic(tx.encoded()) + // .await + // .context("failed to submit an extrinsic") + // .unwrap(); + // } + // } + + // Ok(()) + // } +} -// Ok(()) -// } +fn construct_transfer(to: &AccountId, amount: u128) -> Value { + const TRANSFER_KEEP_ALIVE: &str = "transfer"; + + dbg!(amount); + + dynamic::tx( + ASSETS, + TRANSFER_KEEP_ALIVE, + vec![ + 1337.into(), + scale_value::value!(Id(Value::from_bytes(to))), + amount.into(), + ], + ) + .into_value() +} -// async fn balance(&self, hash: Hash, account: &Account) -> Result { -// const ACCOUNT: &str = "Account"; -// const ACCOUNT_BALANCES: &str = "data"; -// const FREE_BALANCE: &str = "free"; - -// let account_info = self -// .client -// .storage() -// .at(hash) -// .fetch_or_default(&dynamic::storage( -// SYSTEM, -// ACCOUNT, -// vec![AsRef::<[u8; 32]>::as_ref(account)], -// )) -// .await -// .context("failed to fetch account info from the chain")? -// .to_value() -// .context("failed to decode account info")?; -// let encoded_balance = account_info -// .at(ACCOUNT_BALANCES) -// .with_context(|| format!("{ACCOUNT_BALANCES} field wasn't found in account info"))? -// .at(FREE_BALANCE) -// .with_context(|| format!("{FREE_BALANCE} wasn't found in account balance info"))?; - -// encoded_balance.as_u128().with_context(|| { -// format!("expected `u128` as the type of a free balance, got {encoded_balance}") -// }) -// } -// } +#[derive(Debug)] +struct InvoiceChanges { + invoice: Invoicee, + incoming: HashMap, +} -// pub struct Processor { -// api: Api, -// scanner: Scanner, -// methods: Arc>, -// database: Arc, -// backend: Arc>, -// shutdown_notification: CancellationToken, -// } +#[derive(Deserialize, Debug)] +struct Transferred { + asset_id: u32, + // The implementation of `Deserialize` for `AccountId32` works only with strings. + #[serde(deserialize_with = "account_deserializer")] + from: AccountId, + #[serde(deserialize_with = "account_deserializer")] + to: AccountId, + amount: u128, +} -// impl Processor { -// pub fn new( -// ApiConfig { -// api, -// methods, -// backend, -// }: ApiConfig, -// database: Arc, -// shutdown_notification: CancellationToken, -// ) -> Result { -// let scanner = OnlineClient::from_backend_with( -// api.genesis_hash(), -// api.runtime_version(), -// api.metadata(), -// backend.clone(), -// ) -// .context("failed to initialize the scanner client")?; - -// Ok(Processor { -// api: Api { -// tx: api.tx(), -// blocks: api.blocks(), -// }, -// scanner: Scanner { -// blocks: scanner.blocks(), -// storage: scanner.storage(), -// client: scanner, -// }, -// methods, -// database, -// shutdown_notification, -// backend, -// }) -// } +fn account_deserializer<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de>, +{ + <([u8; 32],)>::deserialize(deserializer).map(|address| AccountId::new(address.0)) +} -// pub async fn ignite( +// impl Transferred { +// fn process( // self, -// latest_saved_block: Option, -// task_tracker: TaskTracker, -// error_tx: UnboundedSender, -// ) -> Result<&'static str> { -// self.execute(latest_saved_block, task_tracker, error_tx) -// .await -// .or_else(|error| { -// error -// .downcast() -// .map(|Shutdown| "The RPC module is shut down.") -// }) -// } - -// async fn execute( -// mut self, -// latest_saved_block: Option, -// task_tracker: TaskTracker, -// error_tx: UnboundedSender, -// ) -> Result<&'static str> { -// task_tracker.spawn(shutdown( -// ProcessorFinalized { -// database: self.database.clone(), -// client: self.scanner.client.clone(), -// backend: self.backend.clone(), -// methods: self.methods.clone(), -// shutdown_notification: self.shutdown_notification.clone(), -// } -// .ignite(), -// error_tx, -// )); - -// let (mut head_number, head_hash) = self -// .finalized_head_number_and_hash() -// .await -// .context("failed to get the chain head")?; - -// let mut next_unscanned_number; -// let mut subscription; - -// if let Some(latest_saved) = latest_saved_block { -// let latest_saved_hash = self -// .methods -// .chain_get_block_hash(Some(latest_saved.into())) -// .await -// .context("failed to get the hash of the last saved block")? -// .context("received nothing after requesting the hash of the last saved block")?; - -// self.set_scanner_metadata(latest_saved_hash).await?; - -// next_unscanned_number = latest_saved -// .checked_add(1) -// .context(MAX_BLOCK_NUMBER_ERROR)?; - -// let mut unscanned_amount = head_number.saturating_sub(next_unscanned_number); - -// if unscanned_amount >= SCANNER_TO_LISTENER_SWITCH_POINT { -// log::info!( -// "Detected {unscanned_amount} unscanned blocks! Catching up may take a while." -// ); - -// while unscanned_amount >= SCANNER_TO_LISTENER_SWITCH_POINT { -// self.process_skipped(&mut next_unscanned_number, head_number) -// .await -// .context("failed to process a skipped gap in the scanning mode")?; - -// (head_number, _) = self -// .finalized_head_number_and_hash() -// .await -// .context("failed to get a new chain head")?; -// unscanned_amount = head_number.saturating_sub(next_unscanned_number); -// } - -// log::info!( -// "Scanning of skipped blocks has been completed! Switching to the listening mode..." -// ); -// } - -// subscription = self.finalized_heads().await?; -// } else { -// self.set_scanner_metadata(head_hash).await?; - -// next_unscanned_number = head_number.checked_add(1).context(MAX_BLOCK_NUMBER_ERROR)?; -// subscription = self.finalized_heads().await?; -// } - -// // Skip all already scanned blocks in cases like the first startup (we always skip the first -// // block to fetch right metadata), an instant daemon restart, or a connection to a lagging -// // endpoint. -// 'skipping: loop { -// loop { -// tokio::select! { -// biased; -// () = self.shutdown_notification.cancelled() => { -// return Err(Shutdown.into()); -// } -// header_result_option = subscription.next() => { -// if let Some(header_result) = header_result_option { -// let header = header_result.context( -// "received an error from the RPC client while skipping saved finalized heads" -// )?; - -// if header.number >= next_unscanned_number { -// break 'skipping; -// } -// } else { -// break; -// } -// } -// } -// } - -// log::warn!("Lost the connection while skipping already scanned blocks. Retrying..."); - -// subscription = self -// .finalized_heads() -// .await -// .context("failed to update the subscription while skipping scanned blocks")?; -// } - -// loop { -// self.process_finalized_heads(subscription, &mut next_unscanned_number) -// .await?; - -// log::warn!("Lost the connection while processing finalized heads. Retrying..."); - -// subscription = self -// .finalized_heads() -// .await -// .context("failed to update the subscription while processing finalized heads")?; -// } -// } - -// async fn finalized_head_number_and_hash(&self) -> Result<(BlockNumber, Hash)> { -// let head_hash = self -// .methods -// .chain_get_finalized_head() -// .await -// .context("failed to get the finalized head hash")?; -// let head = self -// .methods -// .chain_get_block(Some(head_hash)) -// .await -// .context("failed to get the finalized head")? -// .context("received nothing after requesting the finalized head")?; - -// Ok((head.block.header.number, head_hash)) -// } - -// async fn set_scanner_metadata(&self, at: Hash) -> Result<()> { -// let metadata = fetch_metadata(&*self.backend, at) -// .await -// .context("failed to fetch metadata for the scanner client")?; - -// self.scanner.client.set_metadata(metadata); - -// Ok(()) -// } - -// async fn finalized_heads(&self) -> Result::Header>> { -// self.methods -// .chain_subscribe_finalized_heads() -// .await -// .context("failed to subscribe to finalized heads") -// } - -// async fn process_skipped( -// &self, -// next_unscanned: &mut BlockNumber, -// head: BlockNumber, -// ) -> Result<()> { -// for skipped_number in *next_unscanned..head { -// if self.shutdown_notification.is_cancelled() { -// return Err(Shutdown.into()); -// } - -// let skipped_hash = self -// .methods -// .chain_get_block_hash(Some(skipped_number.into())) -// .await -// .context("failed to get the hash of a skipped block")? -// .context("received nothing after requesting the hash of a skipped block")?; - -// self.process_block(skipped_number, skipped_hash).await?; -// } - -// *next_unscanned = head; - -// Ok(()) -// } - -// async fn process_finalized_heads( -// &mut self, -// mut subscription: RpcSubscription<::Header>, -// next_unscanned: &mut BlockNumber, -// ) -> Result<()> { -// loop { -// tokio::select! { -// biased; -// () = self.shutdown_notification.cancelled() => { -// return Err(Shutdown.into()); -// } -// head_result_option = subscription.next() => { -// if let Some(head_result) = head_result_option { -// let head = head_result.context( -// "received an error from the RPC client while processing finalized heads" -// )?; - -// self -// .process_skipped(next_unscanned, head.number) -// .await -// .context("failed to process a skipped gap in the listening mode")?; -// self.process_block(head.number, head.hash()).await?; - -// *next_unscanned = head.number -// .checked_add(1) -// .context(MAX_BLOCK_NUMBER_ERROR)?; -// } else { -// break; -// } -// } -// } -// } - -// Ok(()) -// } - -// async fn process_block(&self, number: BlockNumber, hash: Hash) -> Result<()> { -// log::info!("Processing the block: {number}."); - -// let block = self -// .scanner -// .blocks -// .at(hash) -// .await -// .context("failed to obtain a block for processing")?; -// let events = block -// .events() -// .await -// .context("failed to obtain block events")?; - -// let read_tx = self.database.read()?; -// let read_invoices = read_tx.invoices()?; - -// let mut update = false; -// let mut invoices_changes = HashMap::new(); - -// for event_result in events.iter() { -// const UPDATE: &str = "CodeUpdated"; -// const TRANSFER: &str = "Transfer"; - -// let event = event_result.context("failed to decode an event")?; -// let metadata = event.event_metadata(); - -// match (metadata.pallet.name(), &*metadata.variant.name) { -// (SYSTEM, UPDATE) => update = true, -// (BALANCES, TRANSFER) => Transfer::deserialize( -// event -// .field_values() -// .context("failed to decode event's fields")?, -// ) -// .context("failed to deserialize a transfer event")? -// .process(&mut invoices_changes, &read_invoices)?, -// _ => {} -// } -// } - -// for (invoice, changes) in invoices_changes { -// let price = match changes.invoice.status { -// InvoiceStatus::Unpaid(price) | InvoiceStatus::Paid(price) => price, -// }; - -// self.process_unpaid(&block, changes, hash, invoice, price) -// .await -// .context("failed to process an unpaid invoice")?; -// } - -// if update { -// self.set_scanner_metadata(hash) -// .await -// .context("failed to update metadata in the scanner client")?; - -// log::info!("A metadata update has been found and applied for the scanner client."); -// } - -// let write_tx = self.database.write()?; - -// write_tx.root()?.save_last_block(number)?; -// write_tx.commit()?; - -// Ok(()) -// } - -// async fn balance(&self, hash: Hash, account: &Account) -> Result { -// const ACCOUNT: &str = "Account"; -// const ACCOUNT_BALANCES: &str = "data"; -// const FREE_BALANCE: &str = "free"; - -// let account_info = self -// .scanner -// .storage -// .at(hash) -// .fetch_or_default(&dynamic::storage( -// SYSTEM, -// ACCOUNT, -// vec![AsRef::<[u8; 32]>::as_ref(account)], -// )) -// .await -// .context("failed to fetch account info from the chain")? -// .to_value() -// .context("failed to decode account info")?; -// let encoded_balance = account_info -// .at(ACCOUNT_BALANCES) -// .with_context(|| format!("{ACCOUNT_BALANCES} field wasn't found in account info"))? -// .at(FREE_BALANCE) -// .with_context(|| format!("{FREE_BALANCE} wasn't found in account balance info"))?; - -// encoded_balance.as_u128().with_context(|| { -// format!("expected `u128` as the type of a free balance, got {encoded_balance}") -// }) -// } - -// async fn batch_transfer( -// &self, -// nonce: Nonce, -// block_hash_count: BlockNumber, -// signer: &PairSigner, -// transfers: Vec, -// ) -> Result> { -// const FORCE_BATCH: &str = "force_batch"; - -// let call = dynamic::tx(UTILITY, FORCE_BATCH, vec![Value::from(transfers)]); -// let (number, hash) = self -// .finalized_head_number_and_hash() -// .await -// .context("failed to get the chain head while constructing a transaction")?; -// let extensions = ( -// (), -// (), -// (), -// (), -// CheckMortalityParams::mortal(block_hash_count.into(), number.into(), hash), -// ChargeTransactionPaymentParams::no_tip(), -// ); - -// self.api -// .tx -// .create_signed_with_nonce(&call, signer, nonce, extensions) -// .context("failed to create a transfer transaction") -// } - -// async fn current_nonce(&self, account: &Account) -> Result { -// self.api -// .blocks -// .at(fetch_best_block(&self.methods).await?) -// .await -// .context("failed to obtain the best block for fetching an account nonce")? -// .account_nonce(account) -// .await -// .context("failed to fetch an account nonce by the API client") -// } - -// async fn process_unpaid( -// &self, -// block: &Block, -// mut changes: InvoiceChanges, -// hash: Hash, -// invoice: Account, -// price: Balance, +// invoices_changes: &mut HashMap, +// invoices: &mut HashMap, // ) -> Result<()> { -// let balance = self.balance(hash, &invoice).await?; - -// if let Some(_remaining) = balance.checked_sub(price) { -// changes.invoice.status = InvoiceStatus::Paid(price); - -// let block_nonce = block -// .account_nonce(&invoice) -// .await -// .context(BLOCK_NONCE_ERROR)?; -// let current_nonce = self.current_nonce(&invoice).await?; - -// if current_nonce <= block_nonce { -// let properties = self.database.properties().await; -// let block_hash_count = properties.block_hash_count; -// let signer = changes.invoice.signer(self.database.pair())?; - -// let transfers = vec![construct_transfer(&changes.invoice.recipient, price)]; -// let tx = self -// .batch_transfer(current_nonce, block_hash_count, &signer, transfers.clone()) -// .await?; -// self.methods -// .author_submit_extrinsic(tx.encoded()) -// .await -// .context("failed to submit an extrinsic")?; -// } -// } - -// Ok(()) -// } -// } - -// fn construct_transfer(to: &Account, _amount: Balance) -> Value { -// const TRANSFER_ALL: &str = "transfer_all"; +// let usd_asset = 1337u32; -// dynamic::tx( -// BALANCES, -// TRANSFER_ALL, -// vec![scale_value::value!(Id(Value::from_bytes(to))), false.into()], -// ) -// .into_value() -// } - -// struct InvoiceChanges { -// invoice: Invoice, -// incoming: HashMap, -// } - -// #[derive(Deserialize)] -// struct Transfer { -// // The implementation of `Deserialize` for `AccountId32` works only with strings. -// #[serde(deserialize_with = "account_deserializer")] -// from: AccountId32, -// #[serde(deserialize_with = "account_deserializer")] -// to: AccountId32, -// amount: Balance, -// } +// tracing::debug!("Transferred event: {self:?}"); -// fn account_deserializer<'de, D>(deserializer: D) -> Result -// where -// D: Deserializer<'de>, -// { -// <([u8; 32],)>::deserialize(deserializer).map(|address| AccountId32::new(address.0)) -// } - -// impl Transfer { -// fn process( -// self, -// invoices_changes: &mut HashMap, -// invoices: &ReadInvoices<'_>, -// ) -> Result<()> { -// if self.from == self.to || self.amount == 0 { +// if self.from == self.to || self.amount == 0 || self.asset_id != usd_asset { // return Ok(()); // } @@ -1333,8 +1179,8 @@ impl Debug for ConnectedChain { // .into_mut() // .incoming // .entry(self.from) -// .and_modify(|amount| *amount = amount.saturating_add(self.amount)) -// .or_insert(self.amount); +// .and_modify(|amount| *amount = Balance(amount.saturating_add(self.amount))) +// .or_insert(Balance(self.amount)); // } // Entry::Vacant(entry) => { // if let (None, Some(encoded_invoice)) = diff --git a/src/server.rs b/src/server.rs index 0d01117..c9ef898 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,14 +1,16 @@ -use crate::{AssetId, BlockNumber, Decimals, ExtrinsicIndex}; +use crate::{ + database::{Invoicee, State}, AccountId, AssetId, Balance, BlockNumber, Decimals, ExtrinsicIndex +}; use anyhow::{Context, Result}; use axum::{ - extract::{rejection::RawPathParamsRejection, MatchedPath, Query, RawPathParams}, + extract::{self, rejection::RawPathParamsRejection, MatchedPath, Query, RawPathParams}, http::{header, HeaderName, StatusCode}, response::{IntoResponse, Response}, - routing::{get, post}, - Json, Router, + routing, Json, Router, }; use serde::{Serialize, Serializer}; -use std::{collections::HashMap, future::Future, net::SocketAddr}; +use std::{borrow::Cow, collections::HashMap, future::Future, net::SocketAddr, sync::Arc}; +use subxt::ext::sp_core::{crypto::Ss58Codec, DeriveJunction, Pair}; use tokio::net::TcpListener; use tokio_util::sync::CancellationToken; @@ -19,29 +21,29 @@ const CURRENCY: &str = "currency"; const CALLBACK: &str = "callback"; #[derive(Serialize)] -struct OrderStatus { - order: String, - payment_status: PaymentStatus, - message: String, - recipient: String, - server_info: ServerInfo, +pub struct OrderStatus { + pub order: String, + pub payment_status: PaymentStatus, + pub message: String, + pub recipient: String, + pub server_info: ServerInfo, #[serde(skip_serializing_if = "Option::is_none", flatten)] - order_info: Option, + pub order_info: Option, } #[derive(Serialize)] -struct OrderInfo { - withdrawal_status: WithdrawalStatus, - amount: f64, - currency: CurrencyInfo, - callback: String, - transactions: Vec, - payment_account: String, +pub struct OrderInfo { + pub withdrawal_status: WithdrawalStatus, + pub amount: f64, + pub currency: CurrencyInfo, + pub callback: String, + pub transactions: Vec, + pub payment_account: String, } #[derive(Serialize)] #[serde(rename_all = "lowercase")] -enum PaymentStatus { +pub enum PaymentStatus { Pending, Paid, Unknown, @@ -49,7 +51,7 @@ enum PaymentStatus { #[derive(Serialize)] #[serde(rename_all = "lowercase")] -enum WithdrawalStatus { +pub enum WithdrawalStatus { Waiting, Failed, Completed, @@ -84,35 +86,35 @@ enum Health { } #[derive(Serialize)] -struct CurrencyInfo { - currency: String, - chain_name: String, - kind: TokenKind, - decimals: Decimals, - rpc_url: String, +pub struct CurrencyInfo { + pub currency: String, + pub chain_name: String, + pub kind: TokenKind, + pub decimals: Decimals, + pub rpc_url: String, #[serde(skip_serializing_if = "Option::is_none")] - asset_id: Option, + pub asset_id: Option, } #[derive(Serialize)] #[serde(rename_all = "lowercase")] -enum TokenKind { +pub enum TokenKind { Assets, Balances, } #[derive(Serialize)] -struct ServerInfo { - version: &'static str, - instance_id: String, +pub struct ServerInfo { + pub version: &'static str, + pub instance_id: String, #[serde(skip_serializing_if = "Option::is_none")] - debug: Option, + pub debug: Option, #[serde(skip_serializing_if = "Option::is_none")] - kalatori_remark: Option, + pub kalatori_remark: Option, } #[derive(Serialize)] -struct TransactionInfo { +pub struct TransactionInfo { #[serde(skip_serializing_if = "Option::is_none", flatten)] finalized_tx: Option, transaction_bytes: String, @@ -154,12 +156,12 @@ enum TxStatus { pub async fn new( shutdown_notification: CancellationToken, host: SocketAddr, -) -> Result>> { + state: Arc, +) -> Result>>> { let v2 = Router::new() - .route("/order/:order_id", post(order)) - .route("/status", get(status)) - .route("/health", get(health)); - let app = Router::new().nest("/v2", v2); + .route("/order/:order_id", routing::post(order)) + .route("/status", routing::get(status)); + let app = Router::new().nest("/v2", v2).with_state(state); let listener = TcpListener::bind(host) .await @@ -194,7 +196,9 @@ struct InvalidParameter { message: String, } -fn process_order( +#[allow(clippy::too_many_lines)] +async fn process_order( + state: extract::State>, matched_path: &MatchedPath, path_result: Result, query: &HashMap, @@ -212,22 +216,61 @@ fn process_order( if query.is_empty() { // TODO: try to query an order from the database. - Ok(( - OrderStatus { - order, - payment_status: PaymentStatus::Unknown, - message: String::new(), - recipient: String::new(), - server_info: ServerInfo { - version: env!("CARGO_PKG_VERSION"), - instance_id: String::new(), - debug: None, - kalatori_remark: None, + let invoices = state.0.invoices.read().await; + + if let Some(invoice) = invoices.get(&order) { + Ok(( + OrderStatus { + order, + payment_status: if invoice.paid { + PaymentStatus::Paid + } else { + PaymentStatus::Unknown + }, + message: String::new(), + recipient: state.0.recipient.to_ss58check(), + server_info: ServerInfo { + version: env!("CARGO_PKG_VERSION"), + instance_id: String::new(), + debug: state.0.debug, + kalatori_remark: state.remark.clone(), + }, + order_info: Some(OrderInfo { + withdrawal_status: WithdrawalStatus::Waiting, + amount: invoice.amount.format(6), + currency: CurrencyInfo { + currency: "USDC".into(), + chain_name: "assethub-polkadot".into(), + kind: TokenKind::Assets, + decimals: 6, + rpc_url: state.rpc.clone(), + asset_id: Some(1337), + }, + callback: invoice.callback.clone(), + transactions: vec![], + payment_account: invoice.paym_acc.to_ss58check(), + }), }, - order_info: None, - }, - OrderSuccess::Found, - )) + OrderSuccess::Found, + )) + } else { + Ok(( + OrderStatus { + order, + payment_status: PaymentStatus::Unknown, + message: String::new(), + recipient: state.0.recipient.to_ss58check(), + server_info: ServerInfo { + version: env!("CARGO_PKG_VERSION"), + instance_id: String::new(), + debug: state.0.debug, + kalatori_remark: state.remark.clone(), + }, + order_info: None, + }, + OrderSuccess::Found, + )) + } } else { let get_parameter = |parameter: &str| { query @@ -243,40 +286,60 @@ fn process_order( // TODO: try to query & update or create an order in the database. - if currency == "USDCT" { + if currency != "USDC" { return Err(OrderError::UnknownCurrency); } - if amount < 50.0 { - return Err(OrderError::LessThanExistentialDeposit(50.0)); + if amount < 0.07 { + return Err(OrderError::LessThanExistentialDeposit(0.07)); } + let mut invoices = state.0.invoices.write().await; + let pay_acc: AccountId = state + .0 + .pair + .derive(vec![DeriveJunction::hard(order.clone())].into_iter(), None) + .unwrap() + .0 + .public() + .into(); + + invoices.insert( + order.clone(), + Invoicee { + callback: callback.clone(), + amount: Balance::parse(amount, 6), + paid: false, + paym_acc: pay_acc.clone(), + }, + ); + Ok(( OrderStatus { order, payment_status: PaymentStatus::Pending, message: String::new(), - recipient: String::new(), + recipient: state.0.recipient.to_ss58check(), server_info: ServerInfo { version: env!("CARGO_PKG_VERSION"), instance_id: String::new(), - debug: None, - kalatori_remark: None, + debug: state.0.debug, + kalatori_remark: state.0.remark.clone(), }, order_info: Some(OrderInfo { withdrawal_status: WithdrawalStatus::Waiting, amount, currency: CurrencyInfo { - currency, - chain_name: String::new(), - kind: TokenKind::Balances, - decimals: 0, - rpc_url: String::new(), - asset_id: None, + currency: "USDC".into(), + chain_name: "assethub-polkadot".into(), + kind: TokenKind::Assets, + decimals: 6, + rpc_url: state.rpc.clone(), + asset_id: Some(1337), }, callback, transactions: vec![], - payment_account: String::new(), + payment_account: pay_acc.to_ss58check(), }), }, OrderSuccess::Created, @@ -285,11 +348,12 @@ fn process_order( } async fn order( + state: extract::State>, matched_path: MatchedPath, path_result: Result, query: Query>, ) -> Response { - match process_order(&matched_path, path_result, &query) { + match process_order(state, &matched_path, path_result, &query).await { Ok((order_status, order_success)) => match order_success { OrderSuccess::Created => (StatusCode::CREATED, Json(order_status)), OrderSuccess::Found => (StatusCode::OK, Json(order_status)), @@ -336,47 +400,27 @@ async fn order( } } -async fn status() -> ([(HeaderName, &'static str); 1], Json) { +async fn status( + state: extract::State>, +) -> ([(HeaderName, &'static str); 1], Json) { ( [(header::CACHE_CONTROL, "no-store")], ServerStatus { description: ServerInfo { version: env!("CARGO_PKG_VERSION"), instance_id: String::new(), - debug: None, - kalatori_remark: None, + debug: state.0.debug, + kalatori_remark: state.0.remark.clone(), }, supported_currencies: vec![CurrencyInfo { - currency: String::new(), - chain_name: String::new(), - kind: TokenKind::Balances, - decimals: 0, - rpc_url: String::new(), - asset_id: None, + currency: "USDC".into(), + chain_name: "assethub-polkadot".into(), + kind: TokenKind::Assets, + decimals: 6, + rpc_url: state.rpc.clone(), + asset_id: Some(1337), }], } .into(), ) } - -async fn health() -> ([(HeaderName, &'static str); 1], Json) { - ( - [(header::CACHE_CONTROL, "no-store")], - ServerHealth { - description: ServerInfo { - version: env!("CARGO_PKG_VERSION"), - instance_id: String::new(), - debug: None, - kalatori_remark: None, - }, - connected_rpcs: [RpcInfo { - rpc_url: String::new(), - chain_name: String::new(), - status: Health::Critical, - }] - .into(), - status: Health::Degraded, - } - .into(), - ) -}