From 5e7f97688224b1d13d97f7d6f172ff561da96b6a Mon Sep 17 00:00:00 2001 From: tison Date: Fri, 15 Nov 2024 16:59:02 +0800 Subject: [PATCH] feat: distinguish topic content by format (#18) Signed-off-by: tison --- Cargo.lock | 584 ++++++++++++++++------ Cargo.toml | 10 +- api/protos/src/property/storage.rs | 9 + crates/kafka-broker/src/broker/admin.rs | 2 + crates/kafka-broker/src/broker/fetch.rs | 23 +- crates/kafka-broker/src/broker/produce.rs | 27 +- crates/runtime/src/lib.rs | 2 - crates/runtime/src/scheduled_task.rs | 135 ----- crates/telemetry/src/lib.rs | 17 +- crates/wal-broker/src/wal.rs | 14 + tests/wal/src/lib.rs | 2 + 11 files changed, 524 insertions(+), 301 deletions(-) delete mode 100644 crates/runtime/src/scheduled_task.rs diff --git a/Cargo.lock b/Cargo.lock index 717be18..ba41992 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -55,9 +55,9 @@ dependencies = [ [[package]] name = "allocator-api2" -version = "0.2.18" +version = "0.2.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c6cb57a04249c6480766f7f7cef5467412af1490f8d1e243141daddada3264f" +checksum = "45862d1c77f2228b9e10bc609d5bc203d86ebc9b87ad8d5d5167a6c9abf739d9" [[package]] name = "android-tzdata" @@ -76,9 +76,9 @@ dependencies = [ [[package]] name = "anstream" -version = "0.6.17" +version = "0.6.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23a1e53f0f5d86382dafe1cf314783b2044280f406e7e1506368220ad11b1338" +checksum = "8acc5369981196006228e28809f761875c0327210a891e941f4c683b3a99529b" dependencies = [ "anstyle", "anstyle-parse", @@ -91,9 +91,9 @@ dependencies = [ [[package]] name = "anstyle" -version = "1.0.9" +version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8365de52b16c035ff4fcafe0092ba9390540e3e352870ac09933bebcaa2c8c56" +checksum = "55cc3b69f167a1ef2e161439aa98aed94e6028e5f9a59be9a6ffb47aef1651f9" [[package]] name = "anstyle-parse" @@ -125,9 +125,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.91" +version = "1.0.93" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c042108f3ed77fd83760a5fd79b53be043192bb3b9dba91d8c574c0ada7850c8" +checksum = "4c95c10ba0b00a02636238b814946408b1322d5ac4760326e6fb8ec956d85775" [[package]] name = "async-compression" @@ -151,7 +151,7 @@ checksum = "721cae7de5c34fbb2acd27e21e6d2cf7b886dce0c27388d46c4e6c47ea4318dd" dependencies = [ "proc-macro2", "quote", - "syn 2.0.85", + "syn 2.0.87", ] [[package]] @@ -275,7 +275,7 @@ dependencies = [ "hyperlocal", "log", "pin-project-lite", - "rustls 0.23.15", + "rustls 0.23.16", "rustls-native-certs", "rustls-pemfile", "rustls-pki-types", @@ -284,7 +284,7 @@ dependencies = [ "serde_json", "serde_repr", "serde_urlencoded", - "thiserror", + "thiserror 1.0.69", "tokio", "tokio-util", "tower-service", @@ -326,9 +326,9 @@ dependencies = [ [[package]] name = "bstr" -version = "1.10.0" +version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "40723b8fb387abc38f4f4a37c09073622e41dd12327033091ef8950659e6dc0c" +checksum = "1a68f1f47cdf0ec8ee4b941b2eee2a80cb796db73118c0dd09ac63fbe405be22" dependencies = [ "memchr", "regex-automata", @@ -365,9 +365,9 @@ checksum = "9ac0150caa2ae65ca5bd83f25c7de183dea78d4d366469f148435e2acfbad0da" [[package]] name = "cc" -version = "1.1.31" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2e7962b54006dcfcc61cb72735f4d89bb97061dd6a7ed882ec6b8ee53714c6f" +checksum = "fd9de9f2205d5ef3fd67e685b0df337994ddd4495e2a28d185500d0e1edfea47" dependencies = [ "jobserver", "libc", @@ -403,9 +403,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.20" +version = "4.5.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b97f376d85a664d5837dbae44bf546e6477a679ff6610010f17276f686d867e8" +checksum = "fb3b4b9e5a7c7514dfa52869339ee98b3156b0bfb4e8a77c4ff4babb64b1604f" dependencies = [ "clap_builder", "clap_derive", @@ -413,9 +413,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.20" +version = "4.5.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19bc80abd44e4bed93ca373a0704ccbd1b710dc5749406201bb018272808dc54" +checksum = "b17a95aa67cc7b5ebd32aa5370189aa0d79069ef1c64ce893bd30fb24bff20ec" dependencies = [ "anstream", "anstyle", @@ -432,14 +432,14 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.85", + "syn 2.0.87", ] [[package]] name = "clap_lex" -version = "0.7.2" +version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97" +checksum = "afb84c814227b90d6895e01398aee0d8033c00e7466aca416fb6a8e0eb19d8a7" [[package]] name = "cmake" @@ -557,9 +557,9 @@ checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" [[package]] name = "cpufeatures" -version = "0.2.14" +version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "608697df725056feaccfa42cffdaeeec3fccc4ffc38358ecd19b243e716a78e0" +checksum = "0ca741a962e1b0bff6d724a1a0958b686406e853bb14061f218562e1896f95e6" dependencies = [ "libc", ] @@ -669,7 +669,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn 2.0.85", + "syn 2.0.87", ] [[package]] @@ -680,7 +680,7 @@ checksum = "d336a2a514f6ccccaa3e09b02d41d35330c07ddf03a62165fcec10bb561c7806" dependencies = [ "darling_core", "quote", - "syn 2.0.85", + "syn 2.0.87", ] [[package]] @@ -716,6 +716,17 @@ dependencies = [ "subtle", ] +[[package]] +name = "displaydoc" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.87", +] + [[package]] name = "dlv-list" version = "0.5.2" @@ -873,14 +884,14 @@ dependencies = [ "proc-macro-error2", "proc-macro2", "quote", - "syn 2.0.85", + "syn 2.0.87", ] [[package]] name = "fastrand" -version = "2.1.1" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8c02a5121d4ea3eb16a80748c74f5549a5665e4c21333c6098f283870fbdea6" +checksum = "486f806e73c5707928240ddc295403b1b93c96a02038563881c4a2fd84b81ac4" [[package]] name = "filetime" @@ -902,9 +913,9 @@ checksum = "b3ea1ec5f8307826a5b71094dd91fc04d4ae75d5709b20ad351c7fb4815c86ec" [[package]] name = "flate2" -version = "1.0.34" +version = "1.0.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1b589b4dc103969ad3cf85c950899926ec64300a1a46d76c03a6072957036f0" +checksum = "c936bfdafb507ebbf50b8074c54fa31c5be9a1e7e5f467dd659697041407d07c" dependencies = [ "crc32fast", "miniz_oxide", @@ -1031,7 +1042,7 @@ checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", - "syn 2.0.85", + "syn 2.0.87", ] [[package]] @@ -1116,7 +1127,7 @@ dependencies = [ "gix-date", "gix-utils", "itoa", - "thiserror", + "thiserror 1.0.69", "winnow 0.6.20", ] @@ -1129,7 +1140,7 @@ dependencies = [ "bstr", "itoa", "jiff", - "thiserror", + "thiserror 1.0.69", ] [[package]] @@ -1145,7 +1156,7 @@ dependencies = [ "gix-path", "gix-ref", "gix-sec", - "thiserror", + "thiserror 1.0.69", ] [[package]] @@ -1181,7 +1192,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "952c3a29f1bc1007cc901abce7479943abfa42016db089de33d0a4fa3c85bfe8" dependencies = [ "faster-hex", - "thiserror", + "thiserror 1.0.69", ] [[package]] @@ -1203,7 +1214,7 @@ checksum = "5102acdf4acae2644e38dbbd18cdfba9597a218f7d85f810fe5430207e03c2de" dependencies = [ "gix-tempfile", "gix-utils", - "thiserror", + "thiserror 1.0.69", ] [[package]] @@ -1222,7 +1233,7 @@ dependencies = [ "gix-validate", "itoa", "smallvec", - "thiserror", + "thiserror 1.0.69", "winnow 0.6.20", ] @@ -1236,7 +1247,7 @@ dependencies = [ "gix-trace", "home", "once_cell", - "thiserror", + "thiserror 1.0.69", ] [[package]] @@ -1256,7 +1267,7 @@ dependencies = [ "gix-utils", "gix-validate", "memmap2", - "thiserror", + "thiserror 1.0.69", "winnow 0.6.20", ] @@ -1308,7 +1319,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e187b263461bc36cea17650141567753bc6207d036cedd1de6e81a52f277ff68" dependencies = [ "bstr", - "thiserror", + "thiserror 1.0.69", ] [[package]] @@ -1360,9 +1371,9 @@ dependencies = [ [[package]] name = "hashbrown" -version = "0.15.0" +version = "0.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e087f84d4f86bf4b218b927129862374b72199ae7d8657835f1e89000eea4fb" +checksum = "3a9bfc1af68b1726ea47d3d5109de126281def866b33970e10fbab11b5dafab3" [[package]] name = "hashlink" @@ -1534,7 +1545,7 @@ dependencies = [ "http", "hyper", "hyper-util", - "rustls 0.23.15", + "rustls 0.23.16", "rustls-pki-types", "tokio", "tokio-rustls 0.26.0", @@ -1560,9 +1571,9 @@ dependencies = [ [[package]] name = "hyper-util" -version = "0.1.9" +version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41296eb09f183ac68eec06e03cdbea2e759633d4067b2f6552fc2e009bcad08b" +checksum = "df2dcfbe0677734ab2f3ffa7fa7bfd4706bfdc1ef393f2ee30184aed67e631b4" dependencies = [ "bytes", "futures-channel", @@ -1615,6 +1626,124 @@ dependencies = [ "cc", ] +[[package]] +name = "icu_collections" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db2fa452206ebee18c4b5c2274dbf1de17008e874b4dc4f0aea9d01ca79e4526" +dependencies = [ + "displaydoc", + "yoke", + "zerofrom", + "zerovec", +] + +[[package]] +name = "icu_locid" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13acbb8371917fc971be86fc8057c41a64b521c184808a698c02acc242dbf637" +dependencies = [ + "displaydoc", + "litemap", + "tinystr", + "writeable", + "zerovec", +] + +[[package]] +name = "icu_locid_transform" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01d11ac35de8e40fdeda00d9e1e9d92525f3f9d887cdd7aa81d727596788b54e" +dependencies = [ + "displaydoc", + "icu_locid", + "icu_locid_transform_data", + "icu_provider", + "tinystr", + "zerovec", +] + +[[package]] +name = "icu_locid_transform_data" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdc8ff3388f852bede6b579ad4e978ab004f139284d7b28715f773507b946f6e" + +[[package]] +name = "icu_normalizer" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19ce3e0da2ec68599d193c93d088142efd7f9c5d6fc9b803774855747dc6a84f" +dependencies = [ + "displaydoc", + "icu_collections", + "icu_normalizer_data", + "icu_properties", + "icu_provider", + "smallvec", + "utf16_iter", + "utf8_iter", + "write16", + "zerovec", +] + +[[package]] +name = "icu_normalizer_data" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8cafbf7aa791e9b22bec55a167906f9e1215fd475cd22adfcf660e03e989516" + +[[package]] +name = "icu_properties" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93d6020766cfc6302c15dbbc9c8778c37e62c14427cb7f6e601d849e092aeef5" +dependencies = [ + "displaydoc", + "icu_collections", + "icu_locid_transform", + "icu_properties_data", + "icu_provider", + "tinystr", + "zerovec", +] + +[[package]] +name = "icu_properties_data" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67a8effbc3dd3e4ba1afa8ad918d5684b8868b3b26500753effea8d2eed19569" + +[[package]] +name = "icu_provider" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ed421c8a8ef78d3e2dbc98a973be2f3770cb42b606e3ab18d6237c4dfde68d9" +dependencies = [ + "displaydoc", + "icu_locid", + "icu_provider_macros", + "stable_deref_trait", + "tinystr", + "writeable", + "yoke", + "zerofrom", + "zerovec", +] + +[[package]] +name = "icu_provider_macros" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ec89e9337638ecdc08744df490b221a7399bf8d164eb52a665454e60e075ad6" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.87", +] + [[package]] name = "ident_case" version = "1.0.1" @@ -1623,12 +1752,23 @@ checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" [[package]] name = "idna" -version = "0.5.0" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "634d9b1461af396cad843f47fdba5597a4f9e6ddd4bfb6ff5d85028c25cb12f6" +checksum = "686f825264d630750a544639377bae737628043f20d38bbc029e8f29ea968a7e" dependencies = [ - "unicode-bidi", - "unicode-normalization", + "idna_adapter", + "smallvec", + "utf8_iter", +] + +[[package]] +name = "idna_adapter" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "daca1df1c957320b2cf139ac61e7bd64fed304c5040df000a745aa1de3b4ef71" +dependencies = [ + "icu_normalizer", + "icu_properties", ] [[package]] @@ -1649,15 +1789,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "707907fe3c25f5424cce2cb7e1cbcafee6bdbe735ca90ef77c29e84591e5b9da" dependencies = [ "equivalent", - "hashbrown 0.15.0", + "hashbrown 0.15.1", "serde", ] [[package]] name = "insta" -version = "1.41.0" +version = "1.41.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1f72d3e19488cf7d8ea52d2fc0f8754fc933398b337cd3cbdb28aaeb35159ef" +checksum = "7e9ffc4d4892617c50a928c52b2961cb5174b6fc6ebf252b2fac9d21955c48b8" dependencies = [ "console", "lazy_static", @@ -1698,9 +1838,9 @@ checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" [[package]] name = "jiff" -version = "0.1.13" +version = "0.1.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a45489186a6123c128fdf6016183fcfab7113e1820eb813127e036e287233fb" +checksum = "b9d9d414fc817d3e3d62b2598616733f76c4cc74fbac96069674739b881295c8" dependencies = [ "jiff-tzdb-platform", "windows-sys 0.59.0", @@ -1758,9 +1898,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.161" +version = "0.2.162" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e9489c2807c139ffd9c1794f4af0ebe86a828db53ecdc7fea2111d0fed085d1" +checksum = "18d287de67fe55fd7e1581fe933d965a5a9477b38e949cfa9f8574ef01506398" [[package]] name = "libgit2-sys" @@ -1776,9 +1916,9 @@ dependencies = [ [[package]] name = "libm" -version = "0.2.10" +version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a00419de735aac21d53b0de5ce2c03bd3627277cf471300f27ebc89f7d828047" +checksum = "8355be11b20d696c8f18f6cc018c4e372165b1fa8126cef092399c9951984ffa" [[package]] name = "libredox" @@ -1826,6 +1966,12 @@ version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" +[[package]] +name = "litemap" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "643cb0b8d4fcc284004d5fd0d67ccf61dfffadb7f75e1e71bc420f4688a3a704" + [[package]] name = "local-ip-address" version = "0.6.3" @@ -1834,7 +1980,7 @@ checksum = "3669cf5561f8d27e8fc84cc15e58350e70f557d4d65f70e3154e54cd2f8e1782" dependencies = [ "libc", "neli", - "thiserror", + "thiserror 1.0.69", "windows-sys 0.59.0", ] @@ -1860,16 +2006,15 @@ dependencies = [ [[package]] name = "logforth" -version = "0.14.0" +version = "0.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "415235aa09d81980f2049c725f3c9db3b227f263497359e8292313d023c1a6e9" +checksum = "53b6ee539f677fa64d20cc0316c7c3f98fde0287ec320b9f2d2faeed84b8ffa2" dependencies = [ "anyhow", "colored", "env_filter", "jiff", "log", - "paste", ] [[package]] @@ -1903,11 +2048,11 @@ dependencies = [ [[package]] name = "mea" -version = "0.0.4" +version = "0.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b3f828cae3376d2bfaa33b56d14051537384da1dabbe41e80a7798c47e91f986" +checksum = "aed46b251aab2884f98280f495662d554cca4df5221982f6bd46de7bc289c73c" dependencies = [ - "crossbeam-queue", + "slab", ] [[package]] @@ -1983,7 +2128,7 @@ dependencies = [ "morax-version", "opendal", "serde", - "thiserror", + "thiserror 2.0.3", "toml", ] @@ -2000,7 +2145,7 @@ dependencies = [ "morax-runtime", "morax-storage", "serde", - "thiserror", + "thiserror 2.0.3", "tokio", "uuid", ] @@ -2015,7 +2160,7 @@ dependencies = [ "morax-runtime", "serde", "sqlx", - "thiserror", + "thiserror 2.0.3", "uuid", ] @@ -2038,7 +2183,7 @@ dependencies = [ "log", "pin-project", "serde", - "thiserror", + "thiserror 2.0.3", "tokio", "tokio-util", ] @@ -2058,7 +2203,7 @@ dependencies = [ "morax-runtime", "morax-wal-broker", "poem", - "thiserror", + "thiserror 2.0.3", "tokio", ] @@ -2069,7 +2214,7 @@ dependencies = [ "error-stack", "morax-protos", "opendal", - "thiserror", + "thiserror 2.0.3", "uuid", ] @@ -2108,7 +2253,7 @@ dependencies = [ "poem", "serde", "serde_json", - "thiserror", + "thiserror 2.0.3", ] [[package]] @@ -2121,7 +2266,7 @@ dependencies = [ "reqwest", "serde", "serde_json", - "thiserror", + "thiserror 2.0.3", ] [[package]] @@ -2288,9 +2433,9 @@ checksum = "1261fe7e33c73b354eab43b1273a57c8f967d0391e80353e51f764ac02cf6775" [[package]] name = "opendal" -version = "0.50.1" +version = "0.50.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "213222b6c86949314d8f51acb26d8241e7c8dd0879b016a79471d49f21ee592f" +checksum = "cb28bb6c64e116ceaf8dd4e87099d3cfea4a58e85e62b104fef74c91afba0f44" dependencies = [ "anyhow", "async-trait", @@ -2339,7 +2484,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.85", + "syn 2.0.87", ] [[package]] @@ -2421,7 +2566,7 @@ dependencies = [ "regex", "regex-syntax", "structmeta", - "syn 2.0.85", + "syn 2.0.87", ] [[package]] @@ -2462,7 +2607,7 @@ checksum = "3c0f5fad0874fc7abcd4d750e76917eaebbecaa2c20bde22e1dbeeba8beb758c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.85", + "syn 2.0.87", ] [[package]] @@ -2532,7 +2677,7 @@ dependencies = [ "serde_urlencoded", "smallvec", "sync_wrapper", - "thiserror", + "thiserror 1.0.69", "tokio", "tokio-rustls 0.25.0", "tokio-util", @@ -2549,7 +2694,7 @@ dependencies = [ "proc-macro-crate 3.2.0", "proc-macro2", "quote", - "syn 2.0.85", + "syn 2.0.87", ] [[package]] @@ -2605,7 +2750,7 @@ dependencies = [ "proc-macro-error-attr2", "proc-macro2", "quote", - "syn 2.0.85", + "syn 2.0.87", ] [[package]] @@ -2649,45 +2794,49 @@ dependencies = [ [[package]] name = "quinn" -version = "0.11.5" +version = "0.11.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c7c5fdde3cdae7203427dc4f0a68fe0ed09833edc525a03456b153b79828684" +checksum = "62e96808277ec6f97351a2380e6c25114bc9e67037775464979f3037c92d05ef" dependencies = [ "bytes", "pin-project-lite", "quinn-proto", "quinn-udp", "rustc-hash", - "rustls 0.23.15", + "rustls 0.23.16", "socket2", - "thiserror", + "thiserror 2.0.3", "tokio", "tracing", ] [[package]] name = "quinn-proto" -version = "0.11.8" +version = "0.11.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fadfaed2cd7f389d0161bb73eeb07b7b78f8691047a6f3e73caaeae55310a4a6" +checksum = "a2fe5ef3495d7d2e377ff17b1a8ce2ee2ec2a18cde8b6ad6619d65d0701c135d" dependencies = [ "bytes", + "getrandom", "rand", "ring", "rustc-hash", - "rustls 0.23.15", + "rustls 0.23.16", + "rustls-pki-types", "slab", - "thiserror", + "thiserror 2.0.3", "tinyvec", "tracing", + "web-time", ] [[package]] name = "quinn-udp" -version = "0.5.5" +version = "0.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fe68c2e9e1a1234e218683dbdf9f9dfcb094113c5ac2b938dfcb9bab4c4140b" +checksum = "7d5a626c6807713b15cac82a6acaccd6043c9a5408c24baae07611fec3f243da" dependencies = [ + "cfg_aliases", "libc", "once_cell", "socket2", @@ -2810,9 +2959,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.4.8" +version = "0.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "368758f23274712b504848e9d5a6f010445cc8b87a7cdb4d7cbee666c1288da3" +checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908" dependencies = [ "aho-corasick", "memchr", @@ -2827,9 +2976,9 @@ checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" [[package]] name = "reqsign" -version = "0.16.0" +version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03dd4ba7c3901dd43e6b8c7446a760d45bc1ea4301002e1a6fa48f97c3a796fa" +checksum = "eb0075a66c8bfbf4cc8b70dca166e722e1f55a3ea9250ecbb85f4d92a5f64149" dependencies = [ "anyhow", "async-trait", @@ -2855,9 +3004,9 @@ dependencies = [ [[package]] name = "reqwest" -version = "0.12.8" +version = "0.12.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f713147fbe92361e52392c73b8c9e48c04c6625bce969ef54dc901e58e042a7b" +checksum = "a77c62af46e79de0a562e1a9849205ffcb7fc1238876e9bd743357570e04046f" dependencies = [ "base64 0.22.1", "bytes", @@ -2881,7 +3030,7 @@ dependencies = [ "percent-encoding", "pin-project-lite", "quinn", - "rustls 0.23.15", + "rustls 0.23.16", "rustls-pemfile", "rustls-pki-types", "serde", @@ -2965,7 +3114,7 @@ dependencies = [ "pin-project-lite", "rand", "snap", - "thiserror", + "thiserror 1.0.69", "tokio", "tracing", "zstd", @@ -3024,9 +3173,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.38" +version = "0.38.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa260229e6538e52293eeb577aabd09945a09d6d9cc0fc550ed7529056c2e32a" +checksum = "99e4ea3e1cdc4b559b8e5650f9c8e5998e3e5c1343b4eaf034565f32318d63c0" dependencies = [ "bitflags 2.6.0", "errno", @@ -3051,9 +3200,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.15" +version = "0.23.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5fbb44d7acc4e873d613422379f69f237a1b141928c02f6bc6ccfddddc2d7993" +checksum = "eee87ff5d9b36712a58574e12e9f0ea80f915a5b0ac518d322b24a465617925e" dependencies = [ "once_cell", "ring", @@ -3090,6 +3239,9 @@ name = "rustls-pki-types" version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "16f1201b3c9a7ee8039bcadc17b7e605e2945b27eee7631788c1bd2b0643674b" +dependencies = [ + "web-time", +] [[package]] name = "rustls-webpki" @@ -3194,9 +3346,9 @@ dependencies = [ [[package]] name = "security-framework-sys" -version = "2.12.0" +version = "2.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea4a292869320c0272d7bc55a5a6aafaff59b4f63404a003887b679a2e05b4b6" +checksum = "fa39c7303dc58b5543c94d22c1766b0d31f2ee58306363ea622b10bbc075eaa2" dependencies = [ "core-foundation-sys", "libc", @@ -3210,22 +3362,22 @@ checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b" [[package]] name = "serde" -version = "1.0.213" +version = "1.0.215" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ea7893ff5e2466df8d720bb615088341b295f849602c6956047f8f80f0e9bc1" +checksum = "6513c1ad0b11a9376da888e3e0baa0077f1aed55c17f50e7b2397136129fb88f" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.213" +version = "1.0.215" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e85ad2009c50b58e87caa8cd6dac16bdf511bbfb7af6c33df902396aa480fa5" +checksum = "ad1e866f866923f252f05c889987993144fb74e722403468a4ebd70c3cd756c0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.85", + "syn 2.0.87", ] [[package]] @@ -3257,7 +3409,7 @@ checksum = "6c64451ba24fc7a6a2d60fc75dd9c83c90903b19028d4eff35e88fc1e86564e9" dependencies = [ "proc-macro2", "quote", - "syn 2.0.85", + "syn 2.0.87", ] [[package]] @@ -3308,7 +3460,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn 2.0.85", + "syn 2.0.87", ] [[package]] @@ -3341,9 +3493,9 @@ dependencies = [ [[package]] name = "shadow-rs" -version = "0.35.2" +version = "0.36.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1b2328fb3ec0d5302f95915e7e77cfc2ff943714d9970bc4b66e9eacf318687" +checksum = "58cfcd0643497a9f780502063aecbcc4a3212cbe4948fd25ee8fd179c2cf9a18" dependencies = [ "const_format", "git2", @@ -3486,14 +3638,14 @@ dependencies = [ "once_cell", "paste", "percent-encoding", - "rustls 0.23.15", + "rustls 0.23.16", "rustls-pemfile", "serde", "serde_json", "sha2", "smallvec", "sqlformat", - "thiserror", + "thiserror 1.0.69", "tokio", "tokio-stream", "tracing", @@ -3512,7 +3664,7 @@ dependencies = [ "quote", "sqlx-core", "sqlx-macros-core", - "syn 2.0.85", + "syn 2.0.87", ] [[package]] @@ -3535,7 +3687,7 @@ dependencies = [ "sqlx-mysql", "sqlx-postgres", "sqlx-sqlite", - "syn 2.0.85", + "syn 2.0.87", "tempfile", "tokio", "url", @@ -3578,7 +3730,7 @@ dependencies = [ "smallvec", "sqlx-core", "stringprep", - "thiserror", + "thiserror 1.0.69", "tracing", "uuid", "whoami", @@ -3617,7 +3769,7 @@ dependencies = [ "smallvec", "sqlx-core", "stringprep", - "thiserror", + "thiserror 1.0.69", "tracing", "uuid", "whoami", @@ -3647,6 +3799,12 @@ dependencies = [ "uuid", ] +[[package]] +name = "stable_deref_trait" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" + [[package]] name = "stringprep" version = "0.1.5" @@ -3673,7 +3831,7 @@ dependencies = [ "proc-macro2", "quote", "structmeta-derive", - "syn 2.0.85", + "syn 2.0.87", ] [[package]] @@ -3684,7 +3842,7 @@ checksum = "152a0b65a590ff6c3da95cabe2353ee04e6167c896b28e3b14478c2636c922fc" dependencies = [ "proc-macro2", "quote", - "syn 2.0.85", + "syn 2.0.87", ] [[package]] @@ -3784,9 +3942,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.85" +version = "2.0.87" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5023162dfcd14ef8f32034d8bcd4cc5ddc61ef7a247c024a33e24e1f24d21b56" +checksum = "25aa4ce346d03a6dcd68dd8b4010bcb74e54e62c90c573f394c46eae99aba32d" dependencies = [ "proc-macro2", "quote", @@ -3802,6 +3960,17 @@ dependencies = [ "futures-core", ] +[[package]] +name = "synstructure" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.87", +] + [[package]] name = "system-configuration" version = "0.6.1" @@ -3825,9 +3994,9 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.13.0" +version = "3.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0f2c9fc62d0beef6951ccffd757e241266a2c833136efbe35af6cd2567dca5b" +checksum = "28cce251fcbc87fac86a866eeb0d6c2d536fc16d06f184bb61aeae11aa4cee0c" dependencies = [ "cfg-if", "fastrand", @@ -3844,7 +4013,7 @@ checksum = "ae861f7d521762a2e5524ceeb3a518fab2c06c25e217a1d7270b8c5e158c141b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.85", + "syn 2.0.87", ] [[package]] @@ -3868,7 +4037,7 @@ dependencies = [ "serde", "serde_json", "serde_with", - "thiserror", + "thiserror 1.0.69", "tokio", "tokio-stream", "tokio-tar", @@ -3897,22 +4066,42 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.65" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" +dependencies = [ + "thiserror-impl 1.0.69", +] + +[[package]] +name = "thiserror" +version = "2.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c006c85c7651b3cf2ada4584faa36773bd07bac24acfb39f3c431b36d7e667aa" +dependencies = [ + "thiserror-impl 2.0.3", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d11abd9594d9b38965ef50805c5e469ca9cc6f197f883f717e0269a3057b3d5" +checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" dependencies = [ - "thiserror-impl", + "proc-macro2", + "quote", + "syn 2.0.87", ] [[package]] name = "thiserror-impl" -version = "1.0.65" +version = "2.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae71770322cbd277e69d762a16c444af02aa0575ac0d174f0b9562d3b37f8602" +checksum = "f077553d607adc1caf65430528a576c757a71ed73944b66ebb58ef2bbd243568" dependencies = [ "proc-macro2", "quote", - "syn 2.0.85", + "syn 2.0.87", ] [[package]] @@ -3957,6 +4146,16 @@ dependencies = [ "crunchy", ] +[[package]] +name = "tinystr" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9117f5d4db391c1cf6927e7bea3db74b9a1c1add8f7eda9ffd5364f40f57b82f" +dependencies = [ + "displaydoc", + "zerovec", +] + [[package]] name = "tinyvec" version = "1.8.0" @@ -3974,9 +4173,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.41.0" +version = "1.41.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "145f3413504347a2be84393cc8a7d2fb4d863b375909ea59f2158261aa258bbb" +checksum = "22cfb5bee7a6a52939ca9224d6ac897bb669134078daa8735560897f69de4d33" dependencies = [ "backtrace", "bytes", @@ -3998,7 +4197,7 @@ checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", - "syn 2.0.85", + "syn 2.0.87", ] [[package]] @@ -4028,7 +4227,7 @@ version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4" dependencies = [ - "rustls 0.23.15", + "rustls 0.23.16", "rustls-pki-types", "tokio", ] @@ -4144,7 +4343,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.85", + "syn 2.0.87", ] [[package]] @@ -4265,9 +4464,9 @@ checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" [[package]] name = "url" -version = "2.5.2" +version = "2.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22784dbdf76fdde8af1aeda5622b546b422b6fc585325248a2bf9f5e41e94d6c" +checksum = "8d157f1b96d14500ffdc1f10ba712e780825526c03d9a49b4d0324b0d9113ada" dependencies = [ "form_urlencoded", "idna", @@ -4275,6 +4474,18 @@ dependencies = [ "serde", ] +[[package]] +name = "utf16_iter" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8232dd3cdaed5356e0f716d285e4b40b932ac434100fe9b7e0e8e935b9e6246" + +[[package]] +name = "utf8_iter" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" + [[package]] name = "utf8parse" version = "0.2.2" @@ -4408,7 +4619,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.85", + "syn 2.0.87", "wasm-bindgen-shared", ] @@ -4442,7 +4653,7 @@ checksum = "26c6ab57572f7a24a4985830b120de1594465e5d500f24afe89e16b4e833ef68" dependencies = [ "proc-macro2", "quote", - "syn 2.0.85", + "syn 2.0.87", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -4497,9 +4708,9 @@ dependencies = [ [[package]] name = "which" -version = "6.0.3" +version = "7.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4ee928febd44d98f2f459a4a79bd4d928591333a494a10a868418ac1b39cf1f" +checksum = "c9cad3279ade7346b96e38731a641d7343dd6a53d55083dd54eadfa5a1b38c6b" dependencies = [ "either", "home", @@ -4765,6 +4976,18 @@ version = "0.0.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d135d17ab770252ad95e9a872d365cf3090e3be864a34ab46f48555993efc904" +[[package]] +name = "write16" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1890f4022759daae28ed4fe62859b1236caebfc61ede2f63ed4e695f3f6d936" + +[[package]] +name = "writeable" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e9df38ee2d2c3c5948ea468a8406ff0db0b29ae1ffde1bcf20ef305bcc95c51" + [[package]] name = "x" version = "0.1.1" @@ -4784,6 +5007,30 @@ dependencies = [ "rustix", ] +[[package]] +name = "yoke" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c5b1314b079b0930c31e3af543d8ee1757b1951ae1e1565ec704403a7240ca5" +dependencies = [ + "serde", + "stable_deref_trait", + "yoke-derive", + "zerofrom", +] + +[[package]] +name = "yoke-derive" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28cc31741b18cb6f1d5ff12f5b7523e3d6eb0852bbbad19d73905511d9849b95" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.87", + "synstructure", +] + [[package]] name = "zerocopy" version = "0.7.35" @@ -4802,7 +5049,28 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.85", + "syn 2.0.87", +] + +[[package]] +name = "zerofrom" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91ec111ce797d0e0784a1116d0ddcdbea84322cd79e5d5ad173daeba4f93ab55" +dependencies = [ + "zerofrom-derive", +] + +[[package]] +name = "zerofrom-derive" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ea7b4a3637ea8669cedf0f1fd5c286a17f3de97b8dd5a70a6c167a1730e63a5" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.87", + "synstructure", ] [[package]] @@ -4811,6 +5079,28 @@ version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde" +[[package]] +name = "zerovec" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa2b893d79df23bfb12d5461018d408ea19dfafe76c2c7ef6d4eba614f8ff079" +dependencies = [ + "yoke", + "zerofrom", + "zerovec-derive", +] + +[[package]] +name = "zerovec-derive" +version = "0.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6eafa6dfb17584ea3e2bd6e76e0cc15ad7af12b09abdd1ca55961bed9b1063c6" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.87", +] + [[package]] name = "zstd" version = "0.12.4" diff --git a/Cargo.toml b/Cargo.toml index 7197417..ad1bf77 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -59,8 +59,8 @@ gix-discover = { version = "0.36" } insta = { version = "1.40", features = ["json"] } local-ip-address = { version = "0.6" } log = { version = "0.4", features = ["kv_unstable_serde", "serde"] } -logforth = { version = "0.14" } -mea = { version = "0.0.4" } +logforth = { version = "0.18" } +mea = { version = "0.0.6" } mime = { version = "0.3" } opendal = { version = "0.50" } pin-project = { version = "1.1" } @@ -70,7 +70,7 @@ reqwest = { version = "0.12", features = ["json", "rustls-tls"] } scopeguard = { version = "1.2" } serde = { version = "1.0", features = ["derive"] } serde_json = { version = "1.0" } -shadow-rs = { version = "0.35.1" } +shadow-rs = { version = "0.36" } sqlx = { version = "0.8", features = [ "json", "postgres", @@ -80,13 +80,13 @@ sqlx = { version = "0.8", features = [ tempfile = { version = "3.13" } test-harness = { version = "0.3" } testcontainers = { version = "0.23", features = ["blocking"] } -thiserror = { version = "1.0" } +thiserror = { version = "2.0" } tokio = { version = "1.41", features = ["full"] } tokio-util = { version = "0.7", features = ["compat"] } toml = { version = "0.8" } url = { version = "2.5" } uuid = { version = "1.11", features = ["v4"] } -which = { version = "6.0" } +which = { version = "7.0" } # workspace dependencies kafka-api = { version = "0.4.1", path = "api/kafka-api" } diff --git a/api/protos/src/property/storage.rs b/api/protos/src/property/storage.rs index d02912d..9fb3c7d 100644 --- a/api/protos/src/property/storage.rs +++ b/api/protos/src/property/storage.rs @@ -17,9 +17,18 @@ use serde::Serialize; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TopicProps { + pub format: TopicFormat, pub storage: StorageProps, } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum TopicFormat { + #[serde(rename = "kafka")] + Kafka, + #[serde(rename = "wal")] + WAL, +} + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "scheme")] pub enum StorageProps { diff --git a/crates/kafka-broker/src/broker/admin.rs b/crates/kafka-broker/src/broker/admin.rs index d559788..6f160cf 100644 --- a/crates/kafka-broker/src/broker/admin.rs +++ b/crates/kafka-broker/src/broker/admin.rs @@ -30,6 +30,7 @@ use kafka_api::schemata::metadata_response::MetadataResponse; use kafka_api::schemata::metadata_response::MetadataResponseBroker; use kafka_api::schemata::metadata_response::MetadataResponsePartition; use kafka_api::schemata::metadata_response::MetadataResponseTopic; +use morax_protos::property::TopicFormat; use morax_protos::property::TopicProps; use crate::broker::Broker; @@ -143,6 +144,7 @@ impl Broker { partitions: topic.num_partitions.max(1), properties: TopicProps { storage: self.fallback_storage.clone(), + format: TopicFormat::Kafka, }, }; diff --git a/crates/kafka-broker/src/broker/fetch.rs b/crates/kafka-broker/src/broker/fetch.rs index 26c11c2..b744188 100644 --- a/crates/kafka-broker/src/broker/fetch.rs +++ b/crates/kafka-broker/src/broker/fetch.rs @@ -26,7 +26,9 @@ use kafka_api::schemata::offset_fetch_response::OffsetFetchResponsePartitions; use kafka_api::schemata::offset_fetch_response::OffsetFetchResponseTopic; use kafka_api::schemata::offset_fetch_response::OffsetFetchResponseTopics; use kafka_api::schemata::request_header::RequestHeader; +use morax_meta::Topic; use morax_meta::TopicPartitionSplit; +use morax_protos::property::TopicFormat; use morax_storage::TopicStorage; use crate::broker::Broker; @@ -159,7 +161,26 @@ impl Broker { } else { self.meta.get_topics_by_name(topic.topic.clone()).await } { - Ok(topic) => TopicStorage::new(topic.properties.0.storage), + Ok(Topic { properties, .. }) => match &properties.format { + TopicFormat::Kafka => TopicStorage::new(properties.0.storage), + format => { + log::error!("unsupported topic format: {format:?}"); + let mut partitions = vec![]; + for _ in topic.partitions.iter() { + partitions.push(PartitionData { + error_code: ErrorCode::UNSUPPORTED_FOR_MESSAGE_FORMAT.code(), + ..Default::default() + }); + } + responses.push(FetchableTopicResponse { + topic: topic.topic.clone(), + topic_id: topic.topic_id, + partitions, + ..Default::default() + }); + continue; + } + }, Err(err) => { log::error!("failed to fetch topic metadata: {err:?}"); let mut partitions = vec![]; diff --git a/crates/kafka-broker/src/broker/produce.rs b/crates/kafka-broker/src/broker/produce.rs index 2d1fbf4..7ae954d 100644 --- a/crates/kafka-broker/src/broker/produce.rs +++ b/crates/kafka-broker/src/broker/produce.rs @@ -20,6 +20,8 @@ use kafka_api::schemata::produce_request::ProduceRequest; use kafka_api::schemata::produce_response::PartitionProduceResponse; use kafka_api::schemata::produce_response::ProduceResponse; use kafka_api::schemata::produce_response::TopicProduceResponse; +use morax_meta::Topic; +use morax_protos::property::TopicFormat; use morax_storage::TopicStorage; use crate::broker::Broker; @@ -51,7 +53,30 @@ impl Broker { for topic in request.topic_data { let topic_name = topic.name.clone(); let topic_storage = match self.meta.get_topics_by_name(topic_name.clone()).await { - Ok(topic) => TopicStorage::new(topic.properties.0.storage), + Ok(Topic { properties, .. }) => match &properties.format { + TopicFormat::Kafka => TopicStorage::new(properties.0.storage), + format => { + log::error!("unsupported topic format: {format:?}"); + let mut partition_responses = vec![]; + for partition in topic.partition_data { + if partition.records.is_some() { + partition_responses.push(PartitionProduceResponse { + error_code: ErrorCode::UNSUPPORTED_FOR_MESSAGE_FORMAT.code(), + error_message: Some(format!( + "unsupported topic format: {format:?}" + )), + ..Default::default() + }); + } + } + responses.push(TopicProduceResponse { + name: topic_name, + partition_responses, + ..Default::default() + }); + continue; + } + }, Err(err) => { log::error!("malformed record batches: {err:?}"); let mut partition_responses = vec![]; diff --git a/crates/runtime/src/lib.rs b/crates/runtime/src/lib.rs index bff59fe..1791f97 100644 --- a/crates/runtime/src/lib.rs +++ b/crates/runtime/src/lib.rs @@ -17,5 +17,3 @@ pub use crate::global::*; mod runtime; pub use crate::runtime::*; - -pub mod scheduled_task; diff --git a/crates/runtime/src/scheduled_task.rs b/crates/runtime/src/scheduled_task.rs deleted file mode 100644 index d22b08d..0000000 --- a/crates/runtime/src/scheduled_task.rs +++ /dev/null @@ -1,135 +0,0 @@ -// Copyright 2024 tison -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::fmt::Debug; -use std::fmt::Display; -use std::fmt::Formatter; -use std::future::Future; -use std::future::IntoFuture; -use std::time::Duration; - -use crate::CanceledError; -use crate::JoinHandle; -use crate::Runtime; - -#[derive(Debug)] -pub struct ScheduledTask { - name: String, - task: JoinHandle, -} - -impl ScheduledTask { - pub fn cancel(&self) { - log::info!("cancelling scheduled task: {}", self.name); - self.task.cancel(); - } -} - -impl IntoFuture for ScheduledTask { - type Output = Result; - type IntoFuture = JoinHandle; - - fn into_future(self) -> Self::IntoFuture { - self.task - } -} - -impl Display for ScheduledTask { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "ScheduledTask({})", self.name) - } -} - -pub trait TaskFn { - type Error: Debug + Send + 'static; - - fn call(&mut self) -> impl Future> + Send; -} - -pub fn schedule_with_fixed_delay( - name: impl Into, - runtime: &Runtime, - initial_delay: Option, - delay: Duration, - mut task_fn: F, -) -> ScheduledTask<()> -where - F: TaskFn<()> + Send + 'static, -{ - let name = name.into(); - let name_clone = name.clone(); - - let task = runtime.spawn(async move { - if let Some(initial_delay) = initial_delay { - if initial_delay > Duration::ZERO { - tokio::time::sleep(initial_delay).await; - } - } - - loop { - if let Err(err) = task_fn.call().await { - log::error!(err:?; "failed to run scheduled task: {name_clone}"); - } - tokio::time::sleep(delay).await; - } - }); - - ScheduledTask { name, task } -} - -#[cfg(test)] -mod tests { - use std::convert::Infallible; - use std::sync::atomic::AtomicI32; - use std::sync::atomic::Ordering; - use std::sync::Arc; - - use super::*; - use crate::test_runtime; - - struct TickTask { - n: Arc, - } - - impl TaskFn for TickTask { - type Error = Infallible; - - async fn call(&mut self) -> Result<(), Infallible> { - let _ = self.n.fetch_add(1, Ordering::Relaxed); - Ok(()) - } - } - - #[test] - fn test_schedule_with_fixed_delay() { - let n = Arc::new(AtomicI32::new(0)); - let task_fn = TickTask { n: n.clone() }; - - let task = schedule_with_fixed_delay( - "TickTask", - test_runtime(), - None, - Duration::from_millis(100), - task_fn, - ); - - test_runtime().block_on(async move { - tokio::time::sleep(Duration::from_millis(550)).await; - task.cancel(); - task.cancel(); - task.await.unwrap_err(); // cancelled - assert!(n.load(Ordering::Relaxed) >= 4); - }); - } -} diff --git a/crates/telemetry/src/lib.rs b/crates/telemetry/src/lib.rs index 9aa837e..bdcbce3 100644 --- a/crates/telemetry/src/lib.rs +++ b/crates/telemetry/src/lib.rs @@ -13,25 +13,22 @@ // limitations under the License. use logforth::append; -use logforth::filter::env::EnvFilterBuilder; +use logforth::filter::env_filter::EnvFilterBuilder; use logforth::filter::EnvFilter; -use logforth::Dispatch; -use logforth::Logger; use morax_protos::config::TelemetryConfig; pub fn init(config: &TelemetryConfig) { - let mut logger = Logger::new(); + let mut logger = logforth::builder(); // stderr logger if let Some(ref stderr) = config.log.stderr { - logger = logger.dispatch( - Dispatch::new() - .filter(make_rust_log_filter_with_default_env(&stderr.filter)) - .append(append::Stderr::default()), - ); + logger = logger.dispatch(|d| { + d.filter(make_rust_log_filter_with_default_env(&stderr.filter)) + .append(append::Stderr::default()) + }); } - let _ = logger.apply(); + logger.apply(); } fn make_rust_log_filter(filter: &str) -> EnvFilter { diff --git a/crates/wal-broker/src/wal.rs b/crates/wal-broker/src/wal.rs index 9fa687d..a2b81b5 100644 --- a/crates/wal-broker/src/wal.rs +++ b/crates/wal-broker/src/wal.rs @@ -16,12 +16,14 @@ use std::sync::Arc; use base64::prelude::BASE64_STANDARD; use base64::Engine; +use error_stack::bail; use error_stack::Result; use error_stack::ResultExt; use morax_meta::CommitRecordBatchesRequest; use morax_meta::CreateTopicRequest; use morax_meta::FetchRecordBatchesRequest; use morax_meta::PostgresMetaService; +use morax_protos::property::TopicFormat; use morax_protos::rpc::AppendLogRequest; use morax_protos::rpc::AppendLogResponse; use morax_protos::rpc::CreateLogRequest; @@ -80,6 +82,12 @@ impl WALBroker { .get_topics_by_name(name.clone()) .await .change_context_lazy(make_error)?; + if !matches!(topic.properties.format, TopicFormat::WAL) { + bail!(BrokerError(format!( + "unsupported topic format: {:?}", + topic.properties.format + ))); + } let topic_storage = TopicStorage::new(topic.properties.0.storage); let splits = self @@ -133,6 +141,12 @@ impl WALBroker { .get_topics_by_name(name.clone()) .await .change_context_lazy(make_error)?; + if !matches!(topic.properties.format, TopicFormat::WAL) { + bail!(BrokerError(format!( + "unsupported topic format: {:?}", + topic.properties.format + ))); + } let topic_storage = TopicStorage::new(topic.properties.0.storage); let entry_cnt = request.entries.len(); diff --git a/tests/wal/src/lib.rs b/tests/wal/src/lib.rs index bcd5903..b945bda 100644 --- a/tests/wal/src/lib.rs +++ b/tests/wal/src/lib.rs @@ -18,6 +18,7 @@ use std::process::ExitCode; use morax_protos::config::LogConfig; use morax_protos::config::StderrAppenderConfig; use morax_protos::config::TelemetryConfig; +use morax_protos::property::TopicFormat; use morax_protos::property::TopicProps; use tests_toolkit::make_test_name; @@ -53,6 +54,7 @@ where client, topic_props: TopicProps { storage: state.env_props.storage, + format: TopicFormat::WAL, }, }) .await