diff --git a/.github/workflows/rust.yml b/.github/workflows/ci.yml similarity index 93% rename from .github/workflows/rust.yml rename to .github/workflows/ci.yml index f1ccfd0..244fde0 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/ci.yml @@ -1,10 +1,10 @@ -name: Rust +name: CI on: push: - branches: ['main'] + branches: ['master'] pull_request: - branches: ['main'] + branches: ['master'] env: CARGO_TERM_COLOR: always diff --git a/Cargo.lock b/Cargo.lock index 1af411c..1db2404 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3,309 +3,273 @@ version = 3 [[package]] -name = "anstream" -version = "0.3.2" +name = "async-attributes" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ca84f3628370c59db74ee214b3263d58f9aadd9b4fe7e711fd87dc452b7f163" +checksum = "a3203e79f4dd9bdda415ed03cf14dae5a2bf775c683a00f94e9cd1faf0f596e5" dependencies = [ - "anstyle", - "anstyle-parse", - "anstyle-query", - "anstyle-wincon", - "colorchoice", - "is-terminal", - "utf8parse", + "quote", + "syn 1.0.109", ] [[package]] -name = "anstyle" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a30da5c5f2d5e72842e00bcb57657162cdabef0931f40e2deb9b4140440cecd" - -[[package]] -name = "anstyle-parse" -version = "0.2.1" +name = "async-channel" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "938874ff5980b03a87c5524b3ae5b59cf99b1d6bc836848df7bc5ada9643c333" +checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" dependencies = [ - "utf8parse", + "concurrent-queue", + "event-listener", + "futures-core", ] [[package]] -name = "anstyle-query" -version = "1.0.0" +name = "async-executor" +version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ca11d4be1bab0c8bc8734a9aa7bf4ee8316d462a08c6ac5052f888fef5b494b" +checksum = "4b0c4a4f319e45986f347ee47fef8bf5e81c9abc3f6f58dc2391439f30df65f0" dependencies = [ - "windows-sys", + "async-lock", + "async-task", + "concurrent-queue", + "fastrand 2.0.1", + "futures-lite", + "slab", ] [[package]] -name = "anstyle-wincon" -version = "1.0.1" +name = "async-global-executor" +version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "180abfa45703aebe0093f79badacc01b8fd4ea2e35118747e5811127f926e188" +checksum = "f1b6f5d7df27bd294849f8eec66ecfc63d11814df7a4f5d74168a2394467b776" dependencies = [ - "anstyle", - "windows-sys", + "async-channel", + "async-executor", + "async-io", + "async-lock", + "blocking", + "futures-lite", + "once_cell", ] [[package]] -name = "autocfg" -version = "1.1.0" +name = "async-io" +version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" - -[[package]] -name = "bincode" -version = "1.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad" +checksum = "0fc5b45d93ef0529756f812ca52e44c221b35341892d3dcc34132ac02f3dd2af" dependencies = [ - "serde", + "async-lock", + "autocfg", + "cfg-if", + "concurrent-queue", + "futures-lite", + "log", + "parking", + "polling", + "rustix", + "slab", + "socket2", + "waker-fn", ] [[package]] -name = "bitflags" -version = "1.3.2" +name = "async-lock" +version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" - -[[package]] -name = "broker" -version = "0.1.0" +checksum = "287272293e9d8c41773cec55e365490fe034813a2f172f502d6ddcf75b2f582b" dependencies = [ - "clap", - "heed", - "serde", - "serde_json", - "shared_structures", - "uuid", + "event-listener", ] [[package]] -name = "bytemuck" -version = "1.13.1" +name = "async-std" +version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17febce684fd15d89027105661fec94afb475cb995fbc59d2865198446ba2eea" +checksum = "62565bb4402e926b29953c785397c6dc0391b7b446e45008b0049eb43cec6f5d" +dependencies = [ + "async-attributes", + "async-channel", + "async-global-executor", + "async-io", + "async-lock", + "crossbeam-utils", + "futures-channel", + "futures-core", + "futures-io", + "futures-lite", + "gloo-timers", + "kv-log-macro", + "log", + "memchr", + "once_cell", + "pin-project-lite", + "pin-utils", + "slab", + "wasm-bindgen-futures", +] [[package]] -name = "byteorder" -version = "1.4.3" +name = "async-task" +version = "4.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" +checksum = "b9441c6b2fe128a7c2bf680a44c34d0df31ce09e5b7e401fcca3faa483dbc921" [[package]] -name = "cc" -version = "1.0.79" +name = "atomic-waker" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50d30906286121d95be3d479533b458f87493b30a4b5f79a607db8f5d11aa91f" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" [[package]] -name = "cfg-if" -version = "1.0.0" +name = "autocfg" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] -name = "clap" -version = "4.3.8" +name = "bitflags" +version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9394150f5b4273a1763355bd1c2ec54cc5a2593f790587bcd6b2c947cfa9211" -dependencies = [ - "clap_builder", -] +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] -name = "clap_builder" -version = "4.3.8" +name = "blocking" +version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a78fbdd3cc2914ddf37ba444114bc7765bbdcb55ec9cbe6fa054f0137400717" +checksum = "8c36a4d0d48574b3dd360b4b7d95cc651d2b6557b6402848a27d4b228a473e2a" dependencies = [ - "anstream", - "anstyle", - "bitflags", - "clap_lex", - "once_cell", - "strsim", + "async-channel", + "async-lock", + "async-task", + "fastrand 2.0.1", + "futures-io", + "futures-lite", + "piper", + "tracing", ] [[package]] -name = "clap_lex" -version = "0.5.0" +name = "bumpalo" +version = "3.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2da6da31387c7e4ef160ffab6d5e7f00c42626fe39aea70a7b0f1773f7dd6c1b" +checksum = "7f30e7476521f6f8af1a1c4c0b8cc94f0bee37d91763d0ca2665f299b6cd8aec" [[package]] -name = "colorchoice" +name = "cfg-if" version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" - -[[package]] -name = "consumer" -version = "0.1.0" - -[[package]] -name = "core-foundation-sys" -version = "0.8.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e496a50fda8aacccc86d7529e2c1e0892dbd0f898a6b5645b5561b89c3210efa" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] -name = "crossbeam-channel" -version = "0.5.8" +name = "concurrent-queue" +version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a33c2bf77f2df06183c3aa30d1e96c0695a313d4f9c453cc3762a6db39f99200" +checksum = "f057a694a54f12365049b0958a1685bb52d567f5593b355fbf685838e873d400" dependencies = [ - "cfg-if", "crossbeam-utils", ] [[package]] -name = "crossbeam-deque" -version = "0.8.3" +name = "crossbeam-utils" +version = "0.8.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce6fd6f855243022dcecf8702fef0c297d4338e226845fe067f6341ad9fa0cef" +checksum = "5a22b2d63d4d1dc0b7f1b6b2747dd0088008a9be28b6ddf0b1e7d335e3037294" dependencies = [ "cfg-if", - "crossbeam-epoch", - "crossbeam-utils", ] [[package]] -name = "crossbeam-epoch" -version = "0.9.15" +name = "errno" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae211234986c545741a7dc064309f67ee1e5ad243d0e48335adc0484d960bcc7" +checksum = "ac3e13f66a2f95e32a39eaa81f6b95d42878ca0e1db0c7543723dfe12557e860" dependencies = [ - "autocfg", - "cfg-if", - "crossbeam-utils", - "memoffset", - "scopeguard", + "libc", + "windows-sys", ] [[package]] -name = "crossbeam-queue" -version = "0.3.8" +name = "event-listener" +version = "2.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d1cfb3ea8a53f37c40dea2c7bedcbd88bdfae54f5e2175d6ecaff1c988353add" -dependencies = [ - "cfg-if", - "crossbeam-utils", -] +checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" [[package]] -name = "crossbeam-utils" -version = "0.8.16" +name = "fastrand" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a22b2d63d4d1dc0b7f1b6b2747dd0088008a9be28b6ddf0b1e7d335e3037294" +checksum = "e51093e27b0797c359783294ca4f0a911c270184cb10f85783b118614a1501be" dependencies = [ - "cfg-if", + "instant", ] [[package]] -name = "either" -version = "1.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07" - -[[package]] -name = "errno" -version = "0.3.1" +name = "fastrand" +version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4bcfec3a70f97c962c307b2d2c56e358cf1d00b558d74262b5f929ee8cc7e73a" -dependencies = [ - "errno-dragonfly", - "libc", - "windows-sys", -] +checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5" [[package]] -name = "errno-dragonfly" -version = "0.1.2" +name = "futures-channel" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa68f1b12764fab894d2755d2518754e71b4fd80ecfb822714a1206c2aab39bf" +checksum = "ff4dd66668b557604244583e3e1e1eada8c5c2e96a6d0d6653ede395b78bbacb" dependencies = [ - "cc", - "libc", + "futures-core", ] [[package]] -name = "form_urlencoded" -version = "1.2.0" +name = "futures-core" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a62bc1cf6f830c2ec14a513a9fb124d0a213a629668a4186f329db21fe045652" -dependencies = [ - "percent-encoding", -] +checksum = "eb1d22c66e66d9d72e1758f0bd7d4fd0bee04cad842ee34587d68c07e45d088c" [[package]] -name = "getrandom" -version = "0.2.10" +name = "futures-io" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be4136b2a15dd319360be1c07d9933517ccf0be8f16bf62a3bee4f0d618df427" -dependencies = [ - "cfg-if", - "libc", - "wasi", -] +checksum = "8bf34a163b5c4c52d0478a4d757da8fb65cabef42ba90515efee0f6f9fa45aaa" [[package]] -name = "heed" -version = "0.11.0" +name = "futures-lite" +version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "269c7486ed6def5d7b59a427cec3e87b4d4dd4381d01e21c8c9f2d3985688392" +checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce" dependencies = [ - "bytemuck", - "byteorder", - "heed-traits", - "heed-types", - "libc", - "lmdb-rkv-sys", - "once_cell", - "page_size", - "serde", - "synchronoise", - "url", + "fastrand 1.9.0", + "futures-core", + "futures-io", + "memchr", + "parking", + "pin-project-lite", + "waker-fn", ] [[package]] -name = "heed-traits" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a53a94e5b2fd60417e83ffdfe136c39afacff0d4ac1d8d01cd66928ac610e1a2" - -[[package]] -name = "heed-types" -version = "0.8.0" +name = "gloo-timers" +version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a6cf0a6952fcedc992602d5cddd1e3fff091fbe87d38636e3ec23a31f32acbd" +checksum = "9b995a66bb87bebce9a0f4a95aed01daca4872c050bfcb21653361c03bc35e5c" dependencies = [ - "bincode", - "bytemuck", - "byteorder", - "heed-traits", - "serde", - "serde_json", + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", ] [[package]] name = "hermit-abi" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fed44880c466736ef9a5c5b5facefb5ed0785676d0c02d612db14e54f0d84286" +checksum = "d77f7ec81a6d05a3abb01ab6eb7590f6083d08449fe5a1c8b1e620283546ccb7" [[package]] -name = "idna" -version = "0.4.0" +name = "instant" +version = "0.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d20d6b07bfbc108882d88ed8e37d39636dcc260e15e30c45e6ba089610b917c" +checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" dependencies = [ - "unicode-bidi", - "unicode-normalization", + "cfg-if", ] [[package]] @@ -320,28 +284,28 @@ dependencies = [ ] [[package]] -name = "is-terminal" -version = "0.4.7" +name = "js-sys" +version = "0.3.64" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "adcf93614601c8129ddf72e2d5633df827ba6551541c6d8c59520a371475be1f" +checksum = "c5f195fe497f702db0f318b07fdd68edb16955aed830df8363d837542f8f935a" dependencies = [ - "hermit-abi", - "io-lifetimes", - "rustix", - "windows-sys", + "wasm-bindgen", ] [[package]] -name = "itoa" -version = "1.0.8" +name = "kv-log-macro" +version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62b02a5381cc465bd3041d84623d0fa3b66738b52b8e2fc3bab8ad63ab032f4a" +checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f" +dependencies = [ + "log", +] [[package]] name = "libc" -version = "0.2.146" +version = "0.2.149" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f92be4933c13fd498862a9e02a3055f8a8d9c039ce33db97306fd5a6caa7f29b" +checksum = "a08173bc88b7955d1b3145aa561539096c421ac8debde8cbc3612ec635fee29b" [[package]] name = "linux-raw-sys" @@ -350,54 +314,32 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519" [[package]] -name = "lmdb-rkv-sys" -version = "0.11.2" +name = "log" +version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61b9ce6b3be08acefa3003c57b7565377432a89ec24476bbe72e11d101f852fe" +checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" dependencies = [ - "cc", - "libc", - "pkg-config", + "value-bag", ] [[package]] -name = "memoffset" -version = "0.9.0" +name = "memchr" +version = "2.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a634b1c61a95585bd15607c6ab0c4e5b226e695ff2800ba0cdccddf208c406c" -dependencies = [ - "autocfg", -] +checksum = "f665ee40bc4a3c5590afb1e9677db74a508659dfd71e126420da8274909a0167" [[package]] -name = "ntapi" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8a3895c6391c39d7fe7ebc444a87eb2991b2a0bc718fdabd071eec617fc68e4" -dependencies = [ - "winapi", -] - -[[package]] -name = "num_cpus" -version = "1.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" +name = "nyx-network" +version = "0.1.0" dependencies = [ - "hermit-abi", - "libc", + "async-std", ] [[package]] -name = "observer" +name = "nyx-storage" version = "0.1.0" dependencies = [ - "clap", - "serde", - "serde_json", - "shared_structures", - "sysinfo", - "uuid", + "async-std", ] [[package]] @@ -407,82 +349,73 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d" [[package]] -name = "page_size" -version = "0.4.2" +name = "parking" +version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eebde548fbbf1ea81a99b128872779c437752fb99f217c45245e1a61dcd9edcd" -dependencies = [ - "libc", - "winapi", -] +checksum = "e52c774a4c39359c1d1c52e43f73dd91a75a614652c825408eec30c95a9b2067" [[package]] -name = "percent-encoding" -version = "2.3.0" +name = "pin-project-lite" +version = "0.2.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94" +checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58" [[package]] -name = "pkg-config" -version = "0.3.27" +name = "pin-utils" +version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] -name = "proc-macro2" -version = "1.0.64" +name = "piper" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78803b62cbf1f46fde80d7c0e803111524b9877184cfe7c3033659490ac7a7da" -dependencies = [ - "unicode-ident", -] - -[[package]] -name = "producer" -version = "0.1.0" +checksum = "668d31b1c4eba19242f2088b2bf3316b82ca31082a8335764db4e083db7485d4" dependencies = [ - "clap", - "serde", - "serde_json", - "shared_structures", + "atomic-waker", + "fastrand 2.0.1", + "futures-io", ] [[package]] -name = "quote" -version = "1.0.29" +name = "polling" +version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "573015e8ab27661678357f27dc26460738fd2b6c86e46f386fde94cb5d913105" +checksum = "4b2d323e8ca7996b3e23126511a523f7e62924d93ecd5ae73b333815b0eb3dce" dependencies = [ - "proc-macro2", + "autocfg", + "bitflags", + "cfg-if", + "concurrent-queue", + "libc", + "log", + "pin-project-lite", + "windows-sys", ] [[package]] -name = "rayon" -version = "1.7.0" +name = "proc-macro2" +version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d2df5196e37bcc87abebc0053e20787d73847bb33134a69841207dd0a47f03b" +checksum = "134c189feb4956b20f6f547d2cf727d4c0fe06722b20a0eec87ed445a97f92da" dependencies = [ - "either", - "rayon-core", + "unicode-ident", ] [[package]] -name = "rayon-core" -version = "1.11.0" +name = "quote" +version = "1.0.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b8f95bd6966f5c87776639160a66bd8ab9895d9d4ab01ddba9fc60661aebe8d" +checksum = "5267fca4496028628a95160fc423a33e8b2e6af8a5302579e322e4b520293cae" dependencies = [ - "crossbeam-channel", - "crossbeam-deque", - "crossbeam-utils", - "num_cpus", + "proc-macro2", ] [[package]] name = "rustix" -version = "0.37.20" +version = "0.37.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b96e891d04aa506a6d1f318d2771bcb1c7dfda84e126660ace067c9b474bb2c0" +checksum = "d4eb579851244c2c03e7c24f501c3432bed80b8f720af1d6e5b0e0f01555a035" dependencies = [ "bitflags", "errno", @@ -493,166 +426,156 @@ dependencies = [ ] [[package]] -name = "ryu" -version = "1.0.14" +name = "slab" +version = "0.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe232bdf6be8c8de797b22184ee71118d63780ea42ac85b61d1baa6d3b782ae9" +checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67" +dependencies = [ + "autocfg", +] [[package]] -name = "scopeguard" -version = "1.2.0" +name = "socket2" +version = "0.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +checksum = "64a4a911eed85daf18834cfaa86a79b7d266ff93ff5ba14005426219480ed662" +dependencies = [ + "libc", + "winapi", +] [[package]] -name = "serde" -version = "1.0.171" +name = "syn" +version = "1.0.109" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30e27d1e4fd7659406c492fd6cfaf2066ba8773de45ca75e855590f856dc34a9" +checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" dependencies = [ - "serde_derive", + "proc-macro2", + "quote", + "unicode-ident", ] [[package]] -name = "serde_derive" -version = "1.0.171" +name = "syn" +version = "2.0.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "389894603bd18c46fa56231694f8d827779c0951a667087194cf9de94ed24682" +checksum = "e96b79aaa137db8f61e26363a0c9b47d8b4ec75da28b7d1d614c2303e232408b" dependencies = [ "proc-macro2", "quote", - "syn", + "unicode-ident", ] [[package]] -name = "serde_json" -version = "1.0.102" +name = "tracing" +version = "0.1.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5062a995d481b2308b6064e9af76011f2921c35f97b0468811ed9f6cd91dfed" +checksum = "ee2ef2af84856a50c1d430afce2fdded0a4ec7eda868db86409b4543df0797f9" dependencies = [ - "itoa", - "ryu", - "serde", + "pin-project-lite", + "tracing-core", ] [[package]] -name = "shared_structures" -version = "0.1.0" -dependencies = [ - "serde", - "serde_json", - "uuid", -] +name = "tracing-core" +version = "0.1.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" [[package]] -name = "strsim" -version = "0.10.0" +name = "unicode-ident" +version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" +checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" [[package]] -name = "syn" -version = "2.0.25" +name = "value-bag" +version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15e3fc8c0c74267e2df136e5e5fb656a464158aa57624053375eb9c8c6e25ae2" -dependencies = [ - "proc-macro2", - "quote", - "unicode-ident", -] +checksum = "d92ccd67fb88503048c01b59152a04effd0782d035a83a6d256ce6085f08f4a3" [[package]] -name = "synchronoise" -version = "1.0.1" +name = "waker-fn" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3dbc01390fc626ce8d1cffe3376ded2b72a11bb70e1c75f404a210e4daa4def2" -dependencies = [ - "crossbeam-queue", -] +checksum = "f3c4517f54858c779bbcbf228f4fca63d121bf85fbecb2dc578cdf4a39395690" [[package]] -name = "sysinfo" -version = "0.29.8" +name = "wasm-bindgen" +version = "0.2.87" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d10ed79c22663a35a255d289a7fdcb43559fc77ff15df5ce6c341809e7867528" +checksum = "7706a72ab36d8cb1f80ffbf0e071533974a60d0a308d01a5d0375bf60499a342" dependencies = [ "cfg-if", - "core-foundation-sys", - "libc", - "ntapi", - "once_cell", - "rayon", - "winapi", + "wasm-bindgen-macro", ] [[package]] -name = "tinyvec" -version = "1.6.0" +name = "wasm-bindgen-backend" +version = "0.2.87" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87cc5ceb3875bb20c2890005a4e226a4651264a5c75edb2421b52861a0a0cb50" +checksum = "5ef2b6d3c510e9625e5fe6f509ab07d66a760f0885d858736483c32ed7809abd" dependencies = [ - "tinyvec_macros", + "bumpalo", + "log", + "once_cell", + "proc-macro2", + "quote", + "syn 2.0.38", + "wasm-bindgen-shared", ] [[package]] -name = "tinyvec_macros" -version = "0.1.1" +name = "wasm-bindgen-futures" +version = "0.4.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" - -[[package]] -name = "unicode-bidi" -version = "0.3.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "92888ba5573ff080736b3648696b70cafad7d250551175acbaa4e0385b3e1460" - -[[package]] -name = "unicode-ident" -version = "1.0.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22049a19f4a68748a168c0fc439f9516686aa045927ff767eca0a85101fb6e73" +checksum = "c02dbc21516f9f1f04f187958890d7e6026df8d16540b7ad9492bc34a67cea03" +dependencies = [ + "cfg-if", + "js-sys", + "wasm-bindgen", + "web-sys", +] [[package]] -name = "unicode-normalization" -version = "0.1.22" +name = "wasm-bindgen-macro" +version = "0.2.87" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c5713f0fc4b5db668a2ac63cdb7bb4469d8c9fed047b1d0292cc7b0ce2ba921" +checksum = "dee495e55982a3bd48105a7b947fd2a9b4a8ae3010041b9e0faab3f9cd028f1d" dependencies = [ - "tinyvec", + "quote", + "wasm-bindgen-macro-support", ] [[package]] -name = "url" -version = "2.4.0" +name = "wasm-bindgen-macro-support" +version = "0.2.87" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50bff7831e19200a85b17131d085c25d7811bc4e186efdaf54bbd132994a88cb" +checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" dependencies = [ - "form_urlencoded", - "idna", - "percent-encoding", + "proc-macro2", + "quote", + "syn 2.0.38", + "wasm-bindgen-backend", + "wasm-bindgen-shared", ] [[package]] -name = "utf8parse" -version = "0.2.1" +name = "wasm-bindgen-shared" +version = "0.2.87" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" +checksum = "ca6ad05a4870b2bf5fe995117d3728437bd27d7cd5f06f13c17443ef369775a1" [[package]] -name = "uuid" -version = "1.3.4" +name = "web-sys" +version = "0.3.64" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fa2982af2eec27de306107c027578ff7f423d65f7250e40ce0fea8f45248b81" +checksum = "9b85cbef8c220a6abc02aefd892dfc0fc23afb1c6a426316ec33253a3877249b" dependencies = [ - "getrandom", + "js-sys", + "wasm-bindgen", ] -[[package]] -name = "wasi" -version = "0.11.0+wasi-snapshot-preview1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" - [[package]] name = "winapi" version = "0.3.9" @@ -686,9 +609,9 @@ dependencies = [ [[package]] name = "windows-targets" -version = "0.48.0" +version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b1eb6f0cd7c80c79759c929114ef071b87354ce476d9d94271031c0497adfd5" +checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" dependencies = [ "windows_aarch64_gnullvm", "windows_aarch64_msvc", @@ -701,42 +624,42 @@ dependencies = [ [[package]] name = "windows_aarch64_gnullvm" -version = "0.48.0" +version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91ae572e1b79dba883e0d315474df7305d12f569b400fcf90581b06062f7e1bc" +checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" [[package]] name = "windows_aarch64_msvc" -version = "0.48.0" +version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2ef27e0d7bdfcfc7b868b317c1d32c641a6fe4629c171b8928c7b08d98d7cf3" +checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" [[package]] name = "windows_i686_gnu" -version = "0.48.0" +version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "622a1962a7db830d6fd0a69683c80a18fda201879f0f447f065a3b7467daa241" +checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" [[package]] name = "windows_i686_msvc" -version = "0.48.0" +version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4542c6e364ce21bf45d69fdd2a8e455fa38d316158cfd43b3ac1c5b1b19f8e00" +checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" [[package]] name = "windows_x86_64_gnu" -version = "0.48.0" +version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca2b8a661f7628cbd23440e50b05d705db3686f894fc9580820623656af974b1" +checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" [[package]] name = "windows_x86_64_gnullvm" -version = "0.48.0" +version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7896dbc1f41e08872e9d5e8f8baa8fdd2677f29468c4e156210174edc7f7b953" +checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" [[package]] name = "windows_x86_64_msvc" -version = "0.48.0" +version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a" +checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" diff --git a/Cargo.toml b/Cargo.toml index 7f76ec7..63b22cf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,17 +1,16 @@ [workspace] -members = ["broker", "observer", "shared_structures", "producer", "consumer"] +members = ["nyx-storage", "nyx-network"] [workspace.package] name = "nyx" version = "0.1.0" edition = "2021" -description = "Nyx is a multi-threaded version of some what a Kafka" +description = "Streaming data platform of the 21st century" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [workspace.dependencies] -uuid = { version = "1.3.3", features = ["v4"] } +nanoid = "0.4.0" clap = { version = "4.3.8", features = ["cargo"] } -serde = { version = "1.0.171", features = ["derive", "rc"] } -serde_json = "1.0.102" -sysinfo = "0.29.8" +async-std = { version = "1.12.0", features = ["attributes"] } +bincode = "1.3.3" diff --git a/broker/Cargo.toml b/broker/Cargo.toml deleted file mode 100644 index b89b998..0000000 --- a/broker/Cargo.toml +++ /dev/null @@ -1,17 +0,0 @@ -[package] -name = "broker" -version.workspace = true -edition.workspace = true -description = "core component of the message queue" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -shared_structures = { path = "../shared_structures" } - -uuid.workspace = true -serde.workspace = true -serde_json.workspace = true -clap.workspace = true - -heed = "0.11.0" diff --git a/broker/README.md b/broker/README.md deleted file mode 100644 index 8940b93..0000000 --- a/broker/README.md +++ /dev/null @@ -1,17 +0,0 @@ -# Broker - -Broker is one of the core components of nyx, this is the unit which maintaince the integrity of your partitions, manges and stores the actual data of the partition, and talks with the consumers which consume the data from it in their own pace, advancing an index one by one whenever they are ready to consume more data. - -Multiple brokers can run now on the same machine by specifiying a unique name when running a new broker instance. - -## Usage - -``` -cargo run --bin broker -- -``` - -### Running a broker with custom name - -``` -cargo run --bin broker -- --name -``` diff --git a/broker/src/lib.rs b/broker/src/lib.rs deleted file mode 100644 index a9a822d..0000000 --- a/broker/src/lib.rs +++ /dev/null @@ -1,209 +0,0 @@ -use std::{ - net::TcpStream, - path::PathBuf, - sync::{Arc, Mutex}, -}; - -use partition::PartitionDetails; -use shared_structures::{Broadcast, DirManager, EntityType, Message, Metadata, Status, Topic}; -use uuid::Uuid; - -mod partition; - -pub use partition::Partition; - -#[derive(Debug, serde::Serialize, serde::Deserialize)] -pub struct LocalMetadata { - id: String, - pub partitions: Vec, -} - -const METADATA_FILE: &str = "metadata.json"; - -#[derive(Debug)] -pub struct Broker { - pub local_metadata: LocalMetadata, - pub dir_manager: DirManager, - pub cluster_metadata: Metadata, - pub stream: TcpStream, - pub connected_producers: Arc>>, - pub addr: String, - pub custom_dir: Option, -} - -impl Broker { - /// Broker will automatically initiate a handshake with the Observer - pub fn new( - stream: TcpStream, - addr: String, - name: Option<&String>, - ) -> Result>, String> { - let custom_dir: Option = name.map(|f| format!("/broker/{}", f).into()); - - let cluster_metadata: Metadata = Metadata::default(); - - let connected_producers = Arc::new(Mutex::new(vec![])); - - let dir_manager = DirManager::with_dir(custom_dir.as_ref()); - - let mut broker = match dir_manager.open::(METADATA_FILE) { - Ok(mut local_metadata) => { - // Making sure to instantiatte a database for each local partition - local_metadata.partitions = local_metadata - .partitions - .iter_mut() - .map(|p| Partition::from(p.details.clone(), custom_dir.as_ref()).unwrap()) - .collect(); - - Self { - stream, - local_metadata, - dir_manager, - cluster_metadata, - connected_producers, - addr, - custom_dir, - } - } - Err(_e) => { - let id = Uuid::new_v4().to_string(); - - let local_metadata = LocalMetadata { - id, - partitions: vec![], - }; - - dir_manager.save(METADATA_FILE, &local_metadata)?; - - Self { - stream, - local_metadata, - dir_manager, - cluster_metadata, - connected_producers, - addr, - custom_dir, - } - } - }; - - broker.handshake()?; - - Ok(Arc::new(Mutex::new(broker))) - } - - fn handshake(&mut self) -> Result<(), String> { - Broadcast::to_many( - &mut self.stream, - &[ - Message::EntityWantsToConnect { - entity_type: EntityType::Broker, - }, - Message::BrokerConnectionDetails { - id: self.local_metadata.id.clone(), - addr: self.addr.clone(), - }, - ], - ) - } - - pub fn handle_raw_message( - &mut self, - raw_data: &str, - remote: Option<&mut TcpStream>, - ) -> Result<(), String> { - let message = serde_json::from_str::(raw_data).map_err(|e| e.to_string())?; - self.handle_message(&message, remote) - } - - // Messages from Producers and Observers are all processed here - // maybe better to split it into two functions for clarity. - fn handle_message( - &mut self, - message: &Message, - remote: Option<&mut TcpStream>, - ) -> Result<(), String> { - match message { - Message::CreatePartition { - id, - replica_id, - topic, - replica_count, - partition_number, - } => self.handle_create_partition( - id, - replica_id, - topic, - *replica_count, - *partition_number, - ), - Message::ClusterMetadata { metadata } => { - println!("New metadata received from the cluster: {:#?}", metadata); - self.cluster_metadata = metadata.clone(); - Ok(()) - } - Message::RequestClusterMetadata => { - if let Some(remote) = remote { - Broadcast::to( - remote, - &Message::ClusterMetadata { - metadata: self.cluster_metadata.clone(), - }, - ) - } else { - Err( - "RequestClusterMetadata is missing the requesting remote stream" - .to_string(), - ) - } - } - Message::ProducerMessage { - replica_id, - payload, - } => { - println!("Received a message for partition replica {}!!!", replica_id); - println!("Message: {:#?}", payload); - - if let Some(partition) = self - .local_metadata - .partitions - .iter_mut() - .find(|p| p.details.replica_id == *replica_id) - { - partition.put(payload) - } else { - Err("No corresponding partition replica was found on the broker.".to_string()) - } - } - _ => Err(format!( - "Message {:?} is not handled in `handle_message`.", - message - )), - } - } - - fn handle_create_partition( - &mut self, - id: &str, - replica_id: &str, - topic: &Topic, - replica_number: usize, - partition_number: usize, - ) -> Result<(), String> { - let partition_details = PartitionDetails { - id: id.to_string(), - replica_id: replica_id.to_string(), - status: Status::Up, - topic: topic.clone(), - role: shared_structures::Role::Follower, - partition_number, - replica_number, - }; - let partition = Partition::from(partition_details, self.custom_dir.as_ref())?; - self.local_metadata.partitions.push(partition); - self.dir_manager.save(METADATA_FILE, &self.local_metadata) - } -} - -#[cfg(test)] -mod tests {} diff --git a/broker/src/main.rs b/broker/src/main.rs deleted file mode 100644 index 1a33091..0000000 --- a/broker/src/main.rs +++ /dev/null @@ -1,187 +0,0 @@ -use std::{ - error::Error, - io::{BufRead, BufReader}, - net::{TcpListener, TcpStream}, - sync::{Arc, Mutex}, - time::Duration, -}; - -use broker::Broker; -use clap::{arg, command}; -use shared_structures::println_c; - -fn main() -> Result<(), Box> { - let matches = command!() - .arg(clap::Arg::new("host") - .required(true) - ) - .arg( - arg!(-n --name "Assigns a name to the broker, names are useful if you want to run two brokers on the same machine. Useful for nyx maintainers testing multi-node features.") - .required(false) - ).get_matches(); - - let addr = matches.get_one::("host").unwrap(); - let name = matches.get_one::("name"); - - let log_name = match name { - Some(n) => n, - None => "broker", - }; - - println_c(&format!("Initializing {}", log_name), 105); - - let tcp_stream: Option; - let mut sleep_interval = 1000; - - // Should trying to connect to the observer in intervals until success - loop { - match TcpStream::connect(addr) { - Ok(stream) => { - tcp_stream = Some(stream); - println!("Connection with the Observer has been established"); - break; - } - - Err(_) => { - println!( - "Failed to connect to the Observer, next retry in {}s", - Duration::from_millis(sleep_interval).as_secs_f32() - ); - std::thread::sleep(Duration::from_millis(sleep_interval)); - sleep_interval += 1500; - } - } - } - - let stream: TcpStream = tcp_stream.ok_or("Stream is wrong")?; - - // Port 0 means that we let the system find a free port in itself and use that - let listener = TcpListener::bind("localhost:0").map_err(|e| e.to_string())?; - - let host = listener.local_addr().unwrap(); - - let broker = Broker::new(stream, host.to_string(), name)?; - - let broker_lock = broker.lock().unwrap(); - - let reader_stream = broker_lock.stream.try_clone().map_err(|e| e.to_string())?; - - println_c( - &format!( - "Broker is ready to accept producers on port {}", - listener.local_addr().unwrap().port() - ), - 50, - ); - - let connected_producers = broker_lock.connected_producers.clone(); - - let broker_for_producers = broker.clone(); - - // Producers listener - std::thread::spawn(move || loop { - let connection = listener.incoming().next(); - - if let Some(stream) = connection { - match stream { - Ok(stream) => { - if let Ok(read_stream) = stream.try_clone() { - connected_producers.lock().unwrap().push(read_stream); - match handshake_with_producer(stream, broker_for_producers.clone()) { - Ok(()) => {} - Err(e) => { - println!( - "Error while handshaking with connectiong producer: {}", - e - ); - } - }; - } - } - Err(e) => println!("Error: {}", e), - } - } - }); - - println!("Initial data on the broker:"); - - for partition in broker_lock.local_metadata.partitions.iter() { - println!( - "Partition {}: {:#?}", - partition.details.replica_id, partition.database - ); - } - - drop(broker_lock); - - println_c("Initialization complete.", 35); - - let mut reader: BufReader = BufReader::new(reader_stream); - - let mut buf = String::with_capacity(1024); - - // Reader loop - loop { - let size = reader.read_line(&mut buf).map_err(|e| e.to_string())?; - - if size == 0 { - println!("Connection with observer has been closed, exiting."); - break; - } - let mut broker_lock = broker.lock().unwrap(); - - broker_lock.handle_raw_message(&buf, None)?; - - buf.clear(); - } - - Ok(()) -} - -fn handshake_with_producer( - mut stream: TcpStream, - broker: Arc>, -) -> Result<(), String> { - let reader_stream = stream.try_clone().map_err(|e| { - format!( - "Broker: failed to instantiate a producer reader stream, ({})", - e - ) - })?; - - println!("Spawning reader: {:#?}", reader_stream); - - std::thread::spawn(move || { - let mut buf = String::with_capacity(1024); - let mut reader = BufReader::new(reader_stream); - - loop { - let bytes_read = match reader.read_line(&mut buf) { - Ok(b) => b, - Err(e) => { - println!("Producer Read Stream Error: {}", e); - break; - } - }; - - if bytes_read == 0 { - println!("Producer is disconnect"); - break; - } - - let mut broker_lock = broker.lock().unwrap(); - - match broker_lock.handle_raw_message(&buf, Some(&mut stream)) { - Ok(_) => {} - Err(e) => { - println!("Failed to handle raw message: {}", e); - break; - } - }; - - buf.clear(); - } - }); - - Ok(()) -} diff --git a/broker/src/partition/db.rs b/broker/src/partition/db.rs deleted file mode 100644 index b940b9a..0000000 --- a/broker/src/partition/db.rs +++ /dev/null @@ -1,63 +0,0 @@ -use std::{fmt::Debug, path::PathBuf}; - -use heed::{ - types::{OwnedType, SerdeJson}, - Database, Env, EnvOpenOptions, -}; - -use shared_structures::DirManager; - -pub struct DB { - pub length: u64, - pub env: Env, - pub db: Database, SerdeJson>, -} - -impl DB { - pub fn with_dir(replica_id: &str, custom_dir: Option<&PathBuf>) -> Result { - let storage_dir_path = if let Some(custom_dir) = custom_dir { - let mut dir = custom_dir.clone(); - dir.push("storage"); - dir - } else { - "storage".into() - }; - let storage_dir = DirManager::with_dir(Some(&storage_dir_path)); - let db_file_name = format!("{}.mdb", replica_id); - let db_file_path = storage_dir - .create(&db_file_name) - .map_err(|e| format!("PartitionDB: {}", e))?; - let env = EnvOpenOptions::new() - .open(db_file_path) - .map_err(|e| format!("PartitionDB: {}", e))?; - let db: Database, SerdeJson> = env - .create_database(None) - .map_err(|e| format!("PartitionDB: {}", e))?; - - let txn = env.read_txn().map_err(|e| e.to_string())?; - let length = db.len(&txn).map_err(|e| e.to_string())?; - txn.commit().map_err(|e| e.to_string())?; - - Ok(Self { db, env, length }) - } -} - -impl Debug for DB { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let txn = self.env.read_txn().unwrap(); - let mut data = String::new(); - for key in 0..self.length { - let k = self.db.get(&txn, &(key as u128)).unwrap(); - if let Some(d) = k { - data.push_str(&format!("key: {} | data: {:?}\n", key, d)); - } - } - txn.commit().unwrap(); - - if !data.is_empty() { - write!(f, "{}", data) - } else { - write!(f, "Empty") - } - } -} diff --git a/broker/src/partition/mod.rs b/broker/src/partition/mod.rs deleted file mode 100644 index 348d2f5..0000000 --- a/broker/src/partition/mod.rs +++ /dev/null @@ -1,79 +0,0 @@ -use std::path::PathBuf; - -use shared_structures::{Role, Status, Topic}; - -use crate::partition::db::DB; - -mod db; - -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct PartitionDetails { - pub id: String, - pub replica_id: String, - pub status: Status, - pub topic: Topic, - pub role: Role, - pub partition_number: usize, - pub replica_number: usize, -} - -#[derive(Debug, serde::Serialize, serde::Deserialize)] -pub struct Partition { - pub details: PartitionDetails, - #[serde(skip_serializing, skip_deserializing)] - pub database: Option, -} - -impl Partition { - pub fn from(details: PartitionDetails, custom_dir: Option<&PathBuf>) -> Result { - let database = DB::with_dir(&details.replica_id, custom_dir)?; - - println!("Database for partition initialized"); - - Ok(Self { - details, - database: Some(database), - }) - } - - // pub fn send_candidacy_for_leadership(&self, observer: &TcpStream) -> Result<()> {} - - pub fn put(&mut self, value: &serde_json::Value) -> Result<(), String> { - if let Some(db) = self.database.as_mut() { - let mut wtxn = db.env.write_txn().map_err(|s| s.to_string())?; - let record = value.to_string(); - db.db - .put(&mut wtxn, &(db.length as u128), &record) - .map_err(|s| s.to_string())?; - wtxn.commit().map_err(|s| s.to_string())?; - db.length += 1; - } - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - #[cfg_attr(miri, ignore)] - fn creates_partition_on_broker() { - let topic = Topic::from("notifications".to_string()); - let custom_dir = PathBuf::from("just_for_test_dir"); - - let partition_info = PartitionDetails { - id: "mocked_partition_id".to_string(), - replica_id: "mocked_partition_replica_id".to_string(), - status: Status::Up, - topic, - role: Role::Follower, - partition_number: 1, - replica_number: 1, - }; - - let partition = Partition::from(partition_info, Some(&custom_dir)).unwrap(); - - assert_eq!(partition.details.id, "mocked_partition_id".to_string()) - } -} diff --git a/config/dev.properties b/config/dev.properties deleted file mode 100644 index d699877..0000000 --- a/config/dev.properties +++ /dev/null @@ -1,7 +0,0 @@ -name=whatever -version=0 -# The strategy by which Nyx is going to spread out the partition between all the brokers in a cluster -strategy=balanced -retention_period=7d -replica_factor=3 -throttle=500 \ No newline at end of file diff --git a/consumer/src/main.rs b/consumer/src/main.rs deleted file mode 100644 index e7a11a9..0000000 --- a/consumer/src/main.rs +++ /dev/null @@ -1,3 +0,0 @@ -fn main() { - println!("Hello, world!"); -} diff --git a/consumer/Cargo.toml b/nyx-network/Cargo.toml similarity index 76% rename from consumer/Cargo.toml rename to nyx-network/Cargo.toml index d482a53..1c393c3 100644 --- a/consumer/Cargo.toml +++ b/nyx-network/Cargo.toml @@ -1,8 +1,9 @@ [package] -name = "consumer" +name = "nyx-network" version = "0.1.0" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +async-std.workspace = true diff --git a/nyx-network/src/lib.rs b/nyx-network/src/lib.rs new file mode 100644 index 0000000..7d12d9a --- /dev/null +++ b/nyx-network/src/lib.rs @@ -0,0 +1,14 @@ +pub fn add(left: usize, right: usize) -> usize { + left + right +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn it_works() { + let result = add(2, 2); + assert_eq!(result, 4); + } +} diff --git a/nyx-storage/Cargo.toml b/nyx-storage/Cargo.toml new file mode 100644 index 0000000..5ae0195 --- /dev/null +++ b/nyx-storage/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "nyx-storage" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +async-std.workspace = true diff --git a/nyx-storage/src/batch.rs b/nyx-storage/src/batch.rs new file mode 100644 index 0000000..2e6c27f --- /dev/null +++ b/nyx-storage/src/batch.rs @@ -0,0 +1,95 @@ +use crate::{offset::Offset, MAX_BATCH_SIZE}; + +#[derive(PartialEq)] +pub enum BatchState { + ShouldFlush, + Allowable, +} + +#[derive(Debug)] +pub struct Prune<'a> { + pub buffer: &'a [u8], + pub offsets: &'a [Offset], +} + +impl<'a> Prune<'a> { + pub fn offsets_as_bytes(&self) -> &[u8] { + unsafe { + std::slice::from_raw_parts( + self.offsets.as_ptr() as *const u8, + self.offsets.len() * std::mem::size_of::(), + ) + } + } + + pub fn buffer_as_bytes(&self) -> &[u8] { + &self.buffer[..] + } +} + +#[derive(Debug)] +pub struct Batch { + buffer: [u8; MAX_BATCH_SIZE], + offsets: Vec, + current_batch_size: usize, + current_batch_index: usize, + current_segment_size: usize, +} + +impl Batch { + pub fn new() -> Self { + Self { + buffer: [0; MAX_BATCH_SIZE], + offsets: Vec::with_capacity(1024), + current_batch_size: 0, + current_batch_index: 0, + current_segment_size: 0, + } + } + + pub fn add( + &mut self, + buf: &[u8], + latest_segment_count: usize, + latest_segment_size: usize, + total_offsets: usize, + ) -> Result { + if self.current_batch_size + buf.len() < MAX_BATCH_SIZE { + if self.current_batch_index == 0 { + self.current_batch_index = latest_segment_count; + self.current_segment_size = latest_segment_size; + } + + let offset = Offset::new( + self.current_batch_index + total_offsets, + self.current_segment_size, + self.current_segment_size + buf.len(), + latest_segment_count, + )?; + + self.buffer[self.current_batch_size..self.current_batch_size + buf.len()] + .copy_from_slice(buf); + self.current_batch_size += buf.len(); + self.current_segment_size += buf.len(); + self.offsets.push(offset); + self.current_batch_index += 1; + + Ok(BatchState::Allowable) + } else { + Ok(BatchState::ShouldFlush) + } + } + + pub fn reset(&mut self) { + self.offsets = Vec::with_capacity(1024); + self.current_batch_size = 0; + self.current_batch_index = 0; + } + + pub fn get_prunable(&self) -> Prune<'_> { + Prune { + buffer: &self.buffer[..self.current_batch_size], + offsets: &self.offsets[..], + } + } +} diff --git a/nyx-storage/src/compactor.rs b/nyx-storage/src/compactor.rs new file mode 100644 index 0000000..b16437a --- /dev/null +++ b/nyx-storage/src/compactor.rs @@ -0,0 +1,13 @@ +use async_std::channel::Receiver; + +use crate::segment::Segment; + +pub struct Compactor { + queue: Receiver, +} + +impl Compactor { + pub async fn run(queue: Receiver) { + while let Ok(segment) = queue.recv().await {} + } +} diff --git a/nyx-storage/src/directory.rs b/nyx-storage/src/directory.rs new file mode 100644 index 0000000..2099857 --- /dev/null +++ b/nyx-storage/src/directory.rs @@ -0,0 +1,170 @@ +use std::{ + fmt::Debug, + io::{Error, ErrorKind}, +}; + +use async_std::{ + fs::{self, File, OpenOptions}, + io, +}; + +const NYX_BASE_PATH: &str = "nyx"; + +#[derive(Debug, PartialEq, Clone, Copy)] +pub enum DataType { + Partition, + Indices, +} + +#[derive(Debug, Default, Clone)] +pub struct Directory { + base_path: String, + title: String, +} + +// Path example: nyx/title/filename + +/// Directory is used to manage the internal creation and opening of the files. +impl Directory { + pub async fn new(title: &str) -> io::Result { + let dir = Self { + base_path: format!("{}/{}", NYX_BASE_PATH, title), + title: title.to_owned(), + }; + + let full_base_path = dir.get_base_path()?; + + match async_std::fs::create_dir_all(&full_base_path).await { + Ok(_) => {} + Err(e) => match e.kind() { + ErrorKind::AlreadyExists => {} + e => return Err(Error::new(e, format!("Couldn't create directory: {}", e))), + }, + }; + + Ok(dir) + } + + pub fn get_file_path(&self, datatype: DataType, count: usize) -> io::Result { + let base_path = self.get_base_path()?; + + match datatype { + DataType::Partition => Ok(format!("{}/{}_seg_{}.data", base_path, self.title, count)), + DataType::Indices => Ok(format!("{}/{}_seg_{}.index", base_path, self.title, count)), + } + } + + pub fn get_base_path(&self) -> io::Result { + let mut final_dir = Some(String::new()); + + let base_path = self.base_path.clone(); + + // Unix-based machines + if let Ok(home_dir) = std::env::var("HOME") { + let config_dir = format!("{}/.config/{}", home_dir, base_path); + final_dir = Some(config_dir); + } + // Windows based machines + else if let Ok(user_profile) = std::env::var("USERPROFILE") { + let config_dir = format!(r"{}/AppData/Roaming/{}", user_profile, base_path); + final_dir = Some(config_dir); + } + + final_dir.ok_or(Error::new(ErrorKind::NotFound, "Couldn't get the systems home directory. Please setup a HOME env variable and pass your system's home directory there.".to_string())) + } + + pub async fn open_read(&self, datatype: DataType, count: usize) -> io::Result { + let path = self.get_file_path(datatype, count)?; + + OpenOptions::new().read(true).open(path).await + } + + pub async fn open_write(&self, datatype: DataType, count: usize) -> io::Result { + let path = self + .get_file_path(datatype, count) + .map_err(|e| Error::new(ErrorKind::NotFound, e))?; + + OpenOptions::new() + .append(true) + .create(true) + .open(path) + .await + } + + pub async fn open_read_write(&self, datatype: DataType, count: usize) -> io::Result { + let path = self + .get_file_path(datatype, count) + .map_err(|e| Error::new(ErrorKind::NotFound, e))?; + + OpenOptions::new().read(true).append(true).open(path).await + } + + pub async fn open_read_write_create( + &self, + datatype: DataType, + count: usize, + ) -> io::Result { + let path = self + .get_file_path(datatype, count) + .map_err(|e| Error::new(ErrorKind::NotFound, e))?; + + OpenOptions::new() + .read(true) + .append(true) + .create(true) + .open(path) + .await + } + + pub async fn delete_file(&self, datatype: DataType, count: usize) -> io::Result<()> { + let path = self.get_file_path(datatype, count)?; + fs::remove_file(&path).await + } + + pub async fn delete_all(&self) -> io::Result<()> { + let path = self.get_base_path()?; + fs::remove_dir_all(&path).await + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[async_std::test] + #[cfg_attr(miri, ignore)] + async fn open_read_and_delete() { + let dir = Directory::new("events-replica-1").await.unwrap(); + + // Opening non-existing file is not possible in read-mode only + let open_partition_result = dir.open_read(DataType::Partition, 0).await; + + assert!(open_partition_result.is_err()); + + // Opening non-existing file is not possible in read-mode only + let open_indices_result = dir.open_read(DataType::Indices, 0).await; + + assert!(open_indices_result.is_err()); + } + + #[async_std::test] + #[cfg_attr(miri, ignore)] + async fn open_write_and_delete() { + let dir = Directory::new("events-replica-2").await.unwrap(); + let open_partition_result = dir.open_write(DataType::Partition, 0).await; + + assert!(open_partition_result.is_ok()); + + let open_indices_result = dir.open_write(DataType::Indices, 0).await; + + assert!(open_indices_result.is_ok()); + + let delete_partition_result = dir.delete_file(DataType::Partition, 0).await; + + assert!(delete_partition_result.is_ok()); + + let delete_indices_result = dir.delete_file(DataType::Indices, 0).await; + + assert!(delete_indices_result.is_ok()); + } +} diff --git a/nyx-storage/src/indices.rs b/nyx-storage/src/indices.rs new file mode 100644 index 0000000..9c6c18e --- /dev/null +++ b/nyx-storage/src/indices.rs @@ -0,0 +1,112 @@ +use std::{collections::HashMap, sync::Arc}; + +use async_std::io::{self, prelude::SeekExt, ReadExt}; + +use crate::{offset::Offset, segment::Segment}; + +#[derive(Debug)] +pub struct Indices { + pub data: HashMap, + pub total_bytes: usize, +} + +const OFFSET_SIZE: usize = std::mem::size_of::(); + +impl Indices { + pub async fn from(segments: &[Arc]) -> io::Result { + let mut indices = Self { + data: HashMap::new(), + total_bytes: 0, + }; + + for segment in segments { + let mut buf: [u8; 32] = [0u8; OFFSET_SIZE]; + let mut file = &(*segment).file; + + loop { + let n = file.read(&mut buf).await?; + + if n == 0 { + break; + } + + file.seek(io::SeekFrom::Current(OFFSET_SIZE as i64)); + + let index = usize::from_le_bytes([ + buf[0], buf[1], buf[2], buf[3], buf[4], buf[5], buf[6], buf[7], + ]); + + let start = usize::from_le_bytes([ + buf[8], buf[9], buf[10], buf[11], buf[12], buf[13], buf[14], buf[15], + ]); + + let data_size = usize::from_le_bytes([ + buf[16], buf[17], buf[18], buf[19], buf[20], buf[21], buf[22], buf[23], + ]); + + let segment_index = usize::from_le_bytes([ + buf[24], buf[25], buf[26], buf[27], buf[28], buf[29], buf[30], buf[31], + ]); + + indices + .data + .insert(index, Offset::from(index, start, data_size, segment_index)); + + indices.total_bytes += data_size; + } + } + + Ok(indices) + } +} + +#[cfg(test)] +mod tests { + use async_std::io::WriteExt; + + use crate::{directory::Directory, macros::function}; + + use super::*; + + async fn create_test_data(directory: &Directory) -> Vec> { + let mut offsets = vec![]; + + for i in 0..50 { + let offset = Offset::new(i, 15, 2500, 0).unwrap(); + offsets.push(offset); + } + + let mut file = directory + .open_write(crate::directory::DataType::Indices, 0) + .await + .unwrap(); + + for offset in offsets.iter() { + let offset_bytes = offset.as_bytes(); + file.write_all(offset_bytes).await.unwrap(); + } + + let segment = Segment::new(&directory, crate::directory::DataType::Indices, 0) + .await + .unwrap(); + + vec![Arc::new(segment)] + } + + #[async_std::test] + async fn indices_from() { + let path = format!("./{}", function!()); + let directory = Directory::new(&path).await.unwrap(); + let segments = create_test_data(&directory).await; + let indices_result = Indices::from(&segments).await.unwrap(); + + for (k, v) in indices_result.data { + assert_eq!(v, Offset::new(k, 15, 2500, 0).unwrap()) + } + + directory + .delete_file(crate::directory::DataType::Indices, 0) + .await + .unwrap(); + } +} diff --git a/nyx-storage/src/lib.rs b/nyx-storage/src/lib.rs new file mode 100644 index 0000000..912b064 --- /dev/null +++ b/nyx-storage/src/lib.rs @@ -0,0 +1,274 @@ +use std::sync::Arc; + +use async_std::io::{self, prelude::SeekExt, ReadExt, SeekFrom, WriteExt}; +use batch::{Batch, BatchState}; +use directory::{DataType, Directory}; +use indices::Indices; +use segment::Segment; +use segmentation_manager::SegmentationManager; + +mod batch; +mod compactor; +mod indices; +mod macros; +mod offset; +mod segment; +mod segmentation_manager; +mod storage_sender; + +pub mod directory; + +// 4KB +const MAX_MESSAGE_SIZE: usize = 4096; +// 4GB +const MAX_SEGMENT_SIZE: u64 = 4_000_000_000; +// 16KB +const MAX_BATCH_SIZE: usize = 16384; + +/// NOTE: Each partition of a topic should have a Storage for the data it stores +#[derive(Debug)] +pub struct Storage { + pub directory: Directory, + indices: Indices, + segmentation_manager: SegmentationManager, + retrivable_buffer: [u8; MAX_MESSAGE_SIZE], + batch: Batch, + compaction: bool, +} + +impl Storage { + pub async fn new(title: &str, compaction: bool) -> Result { + let directory = Directory::new(title) + .await + .map_err(|e| format!("Storage (Directory::new): {}", e))?; + + let segmentation_manager = SegmentationManager::from(&directory) + .await + .map_err(|e| format!("Storage (SegmentationManager::from): {}", e))?; + + let indices = Indices::from(segmentation_manager.indices_segments()) + .await + .map_err(|e| format!("Storage (Indices::from): {}", e))?; + + println!("{:#?}", indices); + + if compaction { + // async_std::task::spawn(Compactor::run(segment_receiver)); + } + + Ok(Self { + directory, + indices, + segmentation_manager, + retrivable_buffer: [0; MAX_MESSAGE_SIZE], + batch: Batch::new(), + compaction, + }) + } + + pub async fn set(&mut self, buf: &[u8]) -> Result<(), String> { + if buf.len() > MAX_MESSAGE_SIZE { + return Err(format!( + "Payload size {} kb, max payload allowed {} kb", + buf.len(), + MAX_MESSAGE_SIZE + )); + } + + let latest_segment_count = self + .segmentation_manager + .get_last_segment_count(DataType::Partition); + + let latest_segment_size = self + .segmentation_manager + .get_last_segment_size(DataType::Partition) + .await; + + let last_total_entries = self.len(); + + let batch_state = self.batch.add( + buf, + latest_segment_count, + latest_segment_size, + last_total_entries, + )?; + + if batch_state == BatchState::ShouldFlush { + self.flush().await?; + self.batch.add( + buf, + latest_segment_count, + latest_segment_size, + last_total_entries, + )?; + } + + Ok(()) + } + + pub async fn flush(&mut self) -> Result<(), String> { + self.prune_to_disk() + .await + .map_err(|e| format!("Storage (flush): {}", e))?; + self.batch.reset(); + Ok(()) + } + + async fn prune_to_disk(&mut self) -> io::Result { + let prune = &self.batch.get_prunable(); + + let latest_partition_segment = self + .segmentation_manager + .get_latest_segment(DataType::Partition) + .await?; + + let mut latest_partition_file = &latest_partition_segment.file; + + latest_partition_file + .write_all(prune.buffer_as_bytes()) + .await?; + + for offset in prune.offsets { + let length = self.indices.data.len(); + self.indices.data.insert(length, *offset); + } + + let latest_indices_segment = self + .segmentation_manager + .get_latest_segment(DataType::Indices) + .await?; + + let mut latest_indices_file = &latest_indices_segment.file; + + latest_indices_file + .write_all(prune.offsets_as_bytes()) + .await?; + + Ok(prune.buffer.len()) + } + + pub fn len(&self) -> usize { + self.indices.data.len() + } + + pub async fn get(&mut self, index: usize) -> Option<&[u8]> { + let offset = self.indices.data.get(&index).cloned()?; + + let segment = self + .segmentation_manager + .get_segment_by_index(DataType::Partition, offset.segment_count())?; + + self.seek_bytes_between(offset.start(), offset.data_size(), segment) + .await + } + + async fn seek_bytes_between( + &mut self, + start: usize, + data_size: usize, + segment: Arc, + ) -> Option<&[u8]> { + let mut segment_file = &(*segment).file; + + if let Err(e) = segment_file.seek(SeekFrom::Start(start as u64)).await { + println!("error {}", e); + } + + if let Err(e) = segment_file + .read(&mut self.retrivable_buffer[..data_size]) + .await + { + println!("error {}", e); + } + + Some(&self.retrivable_buffer[..data_size]) + } +} + +#[cfg(test)] +mod tests { + use std::time::Instant; + + use crate::macros::function; + + use super::*; + + async fn cleanup(storage: &Storage) { + storage.directory.delete_all().await.unwrap(); + } + + async fn setup_test_storage(title: &str, test_message: &[u8], count: usize) -> Storage { + let mut storage = Storage::new(title, false).await.unwrap(); + + let messages = vec![test_message; count]; + + let now = Instant::now(); + + for message in messages { + storage.set(message).await.unwrap(); + } + + let elapsed = now.elapsed(); + + println!("Write {} messages in: {:.2?}", count, elapsed); + + // Make sure all messages are written to the disk before we continue with our tests + storage.flush().await.unwrap(); + + assert_eq!(storage.len(), count); + + return storage; + } + + #[async_std::test] + #[cfg_attr(miri, ignore)] + async fn new_creates_instances() { + // (l)eader/(r)eplica_topic-name_partition-count + let storage = Storage::new("TEST_l_reservations_1", false).await; + + assert!(storage.is_ok()); + } + + #[async_std::test] + #[cfg_attr(miri, ignore)] + async fn get_returns_ok() { + let message_count = 500; + let test_message = b"messssagee"; + + let mut storage = setup_test_storage(&function!(), test_message, message_count).await; + + let length = storage.len(); + + let now = Instant::now(); + + for index in 0..length { + let message = storage.get(index).await; + + assert_eq!(message, Some(&test_message[..])); + } + + let elapsed = now.elapsed(); + + println!("Read {} messages in: {:.2?}", length, elapsed); + + assert_eq!(storage.len(), message_count); + + cleanup(&storage).await; + } + + #[async_std::test] + #[cfg_attr(miri, ignore)] + async fn get_returns_none_on_index_out_of_bounds() { + let total_count = 5; + + let test_message = b"hello world hello world hello worldrld hello worldrld hello worl"; + + let mut storage = setup_test_storage(&function!(), test_message, total_count).await; + + let get_result = storage.get(total_count).await; + + assert_eq!(get_result, None); + + cleanup(&storage).await; + } +} diff --git a/nyx-storage/src/macros.rs b/nyx-storage/src/macros.rs new file mode 100644 index 0000000..76d3252 --- /dev/null +++ b/nyx-storage/src/macros.rs @@ -0,0 +1,17 @@ +// Creates a string with the name of the function it currently resides in +// this function is used in tests where we create distinct files not to have +// a race condition in tests running in parallel +#[allow(unused_macros)] +macro_rules! function { + () => {{ + fn f() {} + fn type_name_of(_: T) -> &'static str { + std::any::type_name::() + } + let name = type_name_of(f); + name.strip_suffix("::f").unwrap().replace("::", "_") + }}; +} + +#[allow(unused_imports)] +pub(crate) use function; diff --git a/nyx-storage/src/offset.rs b/nyx-storage/src/offset.rs new file mode 100644 index 0000000..fa5cda4 --- /dev/null +++ b/nyx-storage/src/offset.rs @@ -0,0 +1,57 @@ +#[derive(Debug, Clone, Copy, PartialEq)] +#[repr(C)] +pub struct Offset { + index: usize, + start: usize, + data_size: usize, + segment_count: usize, +} + +impl Offset { + pub fn new( + index: usize, + start: usize, + end: usize, + segment_count: usize, + ) -> Result { + if start >= end { + return Err(format!( + "Start ({}) can't be greater or equal to end ({})", + start, end + )); + } + + Ok(Self { + index, + start, + data_size: end - start, + segment_count, + }) + } + + pub fn from(index: usize, start: usize, data_size: usize, segment_count: usize) -> Self { + Self { + index, + start, + data_size, + segment_count, + } + } + + pub fn as_bytes(&self) -> &[u8] { + let offset = self as *const _ as *const [u8; std::mem::size_of::()]; + unsafe { &(*offset) } + } + + pub fn start(&self) -> usize { + self.start + } + + pub fn data_size(&self) -> usize { + self.data_size + } + + pub fn segment_count(&self) -> usize { + self.segment_count + } +} diff --git a/nyx-storage/src/segment.rs b/nyx-storage/src/segment.rs new file mode 100644 index 0000000..7840d9d --- /dev/null +++ b/nyx-storage/src/segment.rs @@ -0,0 +1,37 @@ +use async_std::{fs::File, io}; + +use crate::{ + directory::{DataType, Directory}, + MAX_SEGMENT_SIZE, +}; + +#[derive(Debug)] +pub struct Segment { + clean: bool, + length: u64, + pub file: File, +} + +impl Segment { + pub async fn new(directory: &Directory, data_type: DataType, count: usize) -> io::Result { + let file = directory.open_read_write_create(data_type, count).await?; + + Ok(Self { + length: MAX_SEGMENT_SIZE, + clean: false, + file, + }) + } + + pub async fn from(data_type: DataType, file: File) -> io::Result { + Ok(Self { + length: MAX_SEGMENT_SIZE, + clean: false, + file, + }) + } +} + +// Read from 'clean' folder for the user + +// Compact files from 'dirty' folder diff --git a/nyx-storage/src/segmentation_manager.rs b/nyx-storage/src/segmentation_manager.rs new file mode 100644 index 0000000..6a9fa8d --- /dev/null +++ b/nyx-storage/src/segmentation_manager.rs @@ -0,0 +1,168 @@ +use std::{ptr::NonNull, sync::Arc}; + +use async_std::io; + +use crate::{ + directory::{DataType, Directory}, + segment::Segment, + MAX_SEGMENT_SIZE, +}; + +#[derive(PartialEq)] +pub enum SegmentMode { + Write, + Read, +} + +#[derive(Debug)] +pub struct SegmentationManager { + indices_segments: Vec>, + pub partition_segments: Vec>, + latest_index_segment: NonNull>>, + directory: Directory, +} + +impl SegmentationManager { + pub async fn new(directory: &Directory) -> io::Result { + let latest_indices_segment = + Segment::new(&directory, crate::directory::DataType::Indices, 0).await?; + + let latest_partition_segment = + Segment::new(&directory, crate::directory::DataType::Partition, 0).await?; + + Ok(Self { + indices_segments: vec![Arc::new(latest_indices_segment)], + partition_segments: vec![Arc::new(latest_partition_segment)], + directory: directory.clone(), + latest_index_segment: NonNull::dangling(), + }) + } + + pub async fn from(directory: &Directory) -> io::Result { + let mut indices_segments = vec![]; + let mut partition_segments = vec![]; + + let mut current_segment_candidate = 0; + + while let Some(file) = directory + .open_read_write(DataType::Partition, current_segment_candidate) + .await + .ok() + { + current_segment_candidate += 1; + partition_segments.push(Arc::new(Segment::from(DataType::Partition, file).await?)); + } + + current_segment_candidate = 0; + + while let Some(file) = directory + .open_read_write(DataType::Indices, current_segment_candidate) + .await + .ok() + { + current_segment_candidate += 1; + indices_segments.push(Arc::new(Segment::from(DataType::Indices, file).await?)); + } + + if indices_segments.len() == 0 { + let latest_indices_segment = + Segment::new(&directory, crate::directory::DataType::Indices, 0).await?; + indices_segments.push(Arc::new(latest_indices_segment)); + } + + if partition_segments.len() == 0 { + let latest_partition_segment = + Segment::new(&directory, crate::directory::DataType::Partition, 0).await?; + partition_segments.push(Arc::new(latest_partition_segment)); + } + + Ok(Self { + indices_segments, + partition_segments, + latest_index_segment: NonNull::dangling(), + directory: directory.clone(), + }) + } + + pub fn partition_segments(&self) -> &[Arc] { + &self.partition_segments[..] + } + + pub fn indices_segments(&self) -> &[Arc] { + &self.indices_segments[..] + } + + pub async fn create_segment(&mut self, data_type: DataType) -> io::Result> { + let new_segment_count = if data_type == DataType::Indices { + self.indices_segments.len() + } else { + self.partition_segments.len() + }; + + let new_segment = Segment::new(&self.directory, data_type, new_segment_count).await?; + + let new_segment = Arc::new(new_segment); + + if data_type == DataType::Indices { + self.indices_segments.push(new_segment.clone()); + } else { + self.partition_segments.push(new_segment.clone()); + } + + Ok(new_segment) + } + + pub fn get_segment_by_index(&self, data_type: DataType, index: usize) -> Option> { + let segments = if data_type == DataType::Indices { + &self.indices_segments + } else { + &self.partition_segments + }; + + segments.get(index).map(|segment| segment.clone()) + } + + pub fn get_last_segment_count(&self, data_type: DataType) -> usize { + if data_type == DataType::Indices { + self.indices_segments.len() - 1 + } else { + self.partition_segments.len() - 1 + } + } + + pub async fn get_last_segment_size(&self, data_type: DataType) -> usize { + // These unwraps are safe + if data_type == DataType::Indices { + self.indices_segments.last() + } else { + self.partition_segments.last() + } + .unwrap() + .file + .metadata() + .await + .unwrap() + .len() as usize + } + + fn get_last_segment(&self, data_type: DataType) -> Option> { + let segment = if data_type == DataType::Indices { + &self.indices_segments + } else { + &self.partition_segments + }; + + segment.last().map(|segment| segment.clone()) + } + + pub async fn get_latest_segment(&mut self, data_type: DataType) -> io::Result> { + // This is safe we should always have a valid segment otherwise best is to crash ASAP. + let latest_segment = self.get_last_segment(data_type).unwrap(); + + if latest_segment.file.metadata().await?.len() >= MAX_SEGMENT_SIZE { + self.create_segment(data_type).await + } else { + Ok(latest_segment) + } + } +} diff --git a/nyx-storage/src/storage_sender.rs b/nyx-storage/src/storage_sender.rs new file mode 100644 index 0000000..fda9166 --- /dev/null +++ b/nyx-storage/src/storage_sender.rs @@ -0,0 +1,43 @@ +use async_std::channel::Sender; + +pub struct StorageSender { + inner: Sender>, +} + +impl StorageSender { + pub fn new(inner: Sender>) -> Self { + Self { inner } + } + + pub async fn send(&mut self, data: &[u8]) -> Result<(), String> { + self.inner + .send(data.to_vec()) + .await + .map_err(|e| format!("StorageSender (send): {}", e)) + } +} + +#[cfg(test)] +mod tests { + use async_std::channel::bounded; + + use super::*; + + #[async_std::test] + #[cfg_attr(miri, ignore)] + async fn create_storage_sender_and_send() { + let (sender, receiver) = bounded::>(1); + let payload = b"testing storage sender"; + let mut storage_sender = StorageSender::new(sender); + let send_result = storage_sender.send(payload).await; + + assert!(send_result.is_ok()); + + let received_data = receiver.recv().await.unwrap(); + + assert_eq!( + String::from_utf8(received_data).unwrap(), + String::from_utf8(payload.to_vec()).unwrap() + ) + } +} diff --git a/observer/Cargo.toml b/observer/Cargo.toml deleted file mode 100644 index bec54b7..0000000 --- a/observer/Cargo.toml +++ /dev/null @@ -1,16 +0,0 @@ -[package] -name = "observer" -version.workspace = true -edition.workspace = true -description = "cluster orchestrator" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -shared_structures = { path = "../shared_structures" } - -clap.workspace = true -uuid.workspace = true -serde.workspace = true -serde_json.workspace = true -sysinfo.workspace = true diff --git a/observer/README.md b/observer/README.md deleted file mode 100644 index 5c6da89..0000000 --- a/observer/README.md +++ /dev/null @@ -1,95 +0,0 @@ -# Observer [WIP] - -Observer is a program that sends messages to brokers connected to it. For example, when a new topic is created, Observer is responsible for distributing the partitions of the topic across the brokers for maximum fault tolerance. - -### Typical flow of starting Nyx - -1. The user launches three brokers. - The brokers are in a state where they are looking for the observer with a specified host passed as a parameter. On development, we can start the broker using `cargo run --bin broker localhost:5555`. We assume that Observer runs on port 5555 locally. While Observer is shut down, the broker is just trying to connect to Observer in intervals (which can be customized). - - You can use as much Observers as you want, but for a good fault-tolerence we recommend odd number of instances, no more then 5 for most of projects. - - #### Start the Observer as Leader - - ``` - cargo run --bin observer - ``` - - #### Starting the Observers as a follower - - ``` - cargo run --bin observer -- -f - ``` - - You should have 1 Leader and even number of followers as a result for the recommened fault-tolerence. - -2. Once the Leader Observer is launched, brokers will find the Observer and will connect to it to star exchanging metadata, - be aware of different states through the Observer, receive the partitions and elect leaders. Once connected, Observer prints a message stating that it is ready to receive commands and execute them. - -### Available commands - -#### CREATE - -Create topic - -```bash -CREATE TOPIC [TOPIC_NAME] -``` - -Create partition for topic - -``` -CREATE PARTITION [TOPIC_NAME] -``` - -#### DELETE - -Delete topic (will fail if there are active partitions on the topic) - -``` -DELETE TOPIC [TOPIC_NAME] -``` - -Delete paritition from a topic (will delete a partition with its given number) - -``` -DELETE PARTITION [TOPIC_NAME] [PARTITION_NUM] -``` - -#### CONNECT/DISCONNECT - -Connects a new spawned broker to the Observer - -``` -CONNECT [ADDRESS] -``` - -Disconnect broker partition rebalancing will take place - -``` -DISCONNECT [BROKER_ID] -``` - -#### LIST - -List all entities - -``` -LIST ALL -``` - -List an entity of a choice - -``` -LIST [ENTITY_ID] -``` - -#### Exit - -Will exit the program - -``` -EXIT -``` - -Commands will return either `OK` after each execution or `ERR: [ERROR_DESCRIPTION]` to the terminal. diff --git a/observer/src/command_processor/command.rs b/observer/src/command_processor/command.rs deleted file mode 100644 index 7267a2d..0000000 --- a/observer/src/command_processor/command.rs +++ /dev/null @@ -1,27 +0,0 @@ -pub enum CommandName { - Create, - List, -} - -pub struct Command { - pub name: CommandName, - pub arguments: Vec, -} - -impl Command { - pub fn from(raw_command: &str) -> Result { - let mut tokens = raw_command.split_ascii_whitespace(); - let command = tokens.next().unwrap(); - - let name = match command { - "CREATE" => CommandName::Create, - "LIST" => CommandName::List, - _ => return Err("unrecognized command has been passed.".to_string()), - }; - - Ok(Self { - name, - arguments: tokens.map(|s| s.to_string()).collect(), - }) - } -} diff --git a/observer/src/command_processor/mod.rs b/observer/src/command_processor/mod.rs deleted file mode 100644 index 29e4225..0000000 --- a/observer/src/command_processor/mod.rs +++ /dev/null @@ -1,39 +0,0 @@ -pub mod command; - -pub use self::command::Command; -pub use self::command::CommandName; - -#[derive(Debug, Default)] -pub struct CommandProcessor { - buf: String, - history: Vec, -} - -impl CommandProcessor { - pub fn new() -> Self { - Self { - buf: String::new(), - history: Vec::with_capacity(64), - } - } - - pub fn process_raw_command(&mut self) -> Result { - std::io::stdin() - .read_line(&mut self.buf) - .map_err(|e| e.to_string())?; - - if self.buf.trim().is_empty() { - self.buf.clear(); - return Err("Empty command".to_string()); - } - - let command = Command::from(&self.buf)?; - self.add_history(self.buf.to_string()); - self.buf.clear(); - Ok(command) - } - - fn add_history(&mut self, raw_command: String) { - self.history.push(raw_command); - } -} diff --git a/observer/src/config/mod.rs b/observer/src/config/mod.rs deleted file mode 100644 index c2b8118..0000000 --- a/observer/src/config/mod.rs +++ /dev/null @@ -1,75 +0,0 @@ -use std::{collections::HashMap, fs, path::PathBuf}; - -#[derive(Debug)] -pub enum Value { - String(String), - Number(i32), - Float(f32), -} - -#[derive(Debug)] -pub struct Config { - inner: HashMap, -} - -impl Config { - pub fn from(path: PathBuf) -> Result { - let mut inner = HashMap::new(); - - let content = match fs::read_to_string(path) { - Ok(c) => c, - Err(e) => return Err(e.to_string()), - }; - - for line in content.lines() { - if line.starts_with('#') { - continue; - } - - let split: Vec<&str> = line.split_terminator('=').collect(); - - if split.len() != 2 { - return Err("property format is incorrect, should be key=value".to_string()); - } - - let key = split[0].to_string(); - let value = if let Ok(f) = split[1].parse::() { - if split[1].contains('.') { - Value::Float(f) - } else { - Value::Number(split[1].parse::().unwrap()) - } - } else if let Ok(i) = split[1].parse::() { - Value::Number(i) - } else { - Value::String(split[1].to_string()) - }; - - inner.entry(key).or_insert(value); - } - - Ok(Self { inner }) - } - - pub fn get(&self, k: &str) -> Option<&Value> { - self.inner.get(k) - } - - pub fn get_str(&self, k: &str) -> Option<&str> { - self.inner - .get(k) - .map(|v| if let Value::String(n) = v { n } else { "" }) - } - - pub fn get_number(&self, k: &str) -> Option { - self.inner - .get(k) - .map(|v| if let Value::Number(n) = v { *n } else { 0 }) - } - - pub fn get_float(&self, k: &str) -> Option { - self.inner - .get(k) - .map(|v| if let Value::Float(n) = v { *n } else { 0f32 }) - } -} diff --git a/observer/src/distribution_manager/broker.rs b/observer/src/distribution_manager/broker.rs deleted file mode 100644 index 6eb0ed0..0000000 --- a/observer/src/distribution_manager/broker.rs +++ /dev/null @@ -1,69 +0,0 @@ -use std::{io::BufReader, net::TcpStream}; - -use shared_structures::Status; - -use super::partition::Partition; - -#[derive(Debug)] -pub struct Broker { - pub id: String, - pub stream: Option, - pub partitions: Vec, - pub reader: Option>, - pub status: Status, - pub addr: String, -} - -impl Broker { - pub fn from(id: String, stream: Option, addr: String) -> Result { - if let Some(stream) = stream { - let read_stream = stream.try_clone().map_err(|e| e.to_string())?; - let reader = BufReader::new(read_stream); - - Ok(Self { - id, - partitions: vec![], - stream: Some(stream), - reader: Some(reader), - status: Status::Up, - addr, - }) - } else { - Ok(Self { - id, - partitions: vec![], - stream: None, - reader: None, - status: Status::Up, - addr, - }) - } - } - - pub fn restore(&mut self, stream: TcpStream, addr: String) -> Result<(), String> { - let read_stream = stream.try_clone().map_err(|e| e.to_string())?; - let reader = BufReader::new(read_stream); - - self.status = Status::Up; - self.stream = Some(stream); - self.reader = Some(reader); - self.addr = addr; - - for partition in self.partitions.iter_mut() { - partition.status = Status::Up - } - - Ok(()) - } - - pub fn disconnect(&mut self) { - self.status = Status::Down; - self.partitions - .iter_mut() - .for_each(|p| p.status = Status::Down); - } - - pub fn get_offline_partitions(&self) -> &[Partition] { - &self.partitions - } -} diff --git a/observer/src/distribution_manager/mod.rs b/observer/src/distribution_manager/mod.rs deleted file mode 100644 index 639b6cd..0000000 --- a/observer/src/distribution_manager/mod.rs +++ /dev/null @@ -1,844 +0,0 @@ -use std::{ - io::{BufRead, BufReader}, - net::TcpStream, - path::PathBuf, - sync::{Arc, Mutex, MutexGuard}, - time::Duration, -}; - -mod broker; -mod partition; - -pub use broker::Broker; -pub use partition::Partition; -use shared_structures::{ - metadata::{BrokerDetails, PartitionDetails}, - Broadcast, DirManager, Message, Metadata, Reader, Status, Topic, -}; - -use crate::{config::Config, CLUSTER_FILE}; - -#[derive(Debug)] -pub struct DistributionManager { - pub brokers: Arc>>, - pub topics: Vec>>, - pub cluster_dir: DirManager, - pub followers: Vec, - config: Config, - pending_replication_partitions: Vec<(usize, Partition)>, -} - -impl DistributionManager { - pub fn from(config: Config, name: Option<&String>) -> Result>, String> { - let custom_dir = if let Some(name) = name { - let custom_path = format!("/observer/{}/", name); - Some(PathBuf::from(custom_path)) - } else { - Some(PathBuf::from("/observer")) - }; - - println!("Custom dir: {:?}", custom_dir); - - let cluster_dir = DirManager::with_dir(custom_dir.as_ref()); - - let cluster_metadata = match cluster_dir.open::(CLUSTER_FILE) { - Ok(m) => m, - Err(_) => Metadata::default(), - }; - - let mut distribution_manager = Self { - brokers: Arc::new(Mutex::new(vec![])), - topics: vec![], - config, - pending_replication_partitions: vec![], - cluster_dir, - followers: vec![], - }; - - distribution_manager.load_cluster_state(&cluster_metadata)?; - - Ok(Arc::new(Mutex::new(distribution_manager))) - } - - pub fn load_cluster_state(&mut self, cluster_metadata: &Metadata) -> Result<(), String> { - self.topics = cluster_metadata - .topics - .iter() - .map(|t| Arc::new(Mutex::new(t.clone()))) - .collect(); - - let mut brokers_lock = self.brokers.lock().unwrap(); - - for b in cluster_metadata.brokers.iter() { - let partitions: Vec<_> = b - .partitions - .iter() - .map(|p| { - // Theoratically we should never have a case where we don't find the topic of the - // partition in the system, this is why I allow myself to unwrap here, and crash the system - // if such case occures (Indicates a serious bug in the system). - let topic = self - .topics - .iter() - .find(|t| { - let t_lock = t.lock().unwrap(); - - t_lock.name == p.topic.name - }) - .unwrap(); - - Partition { - id: p.id.clone(), - replica_id: p.replica_id.clone(), - partition_number: p.partition_number, - replica_count: p.replica_count, - role: p.role, - status: Status::Down, - topic: topic.clone(), - } - }) - .collect(); - - let offline_broker = Broker { - id: b.id.clone(), - partitions, - stream: None, - reader: None, - status: Status::Down, - addr: b.addr.clone(), - }; - - brokers_lock.push(offline_broker); - } - - Ok(()) - } - - pub fn save_cluster_state(&self) -> Result<(), String> { - let metadata = self.get_cluster_metadata()?; - self.cluster_dir.save(CLUSTER_FILE, &metadata) - } - - // Will return the broker id that has been added or restored to the Observer - // TODO: Should check whether a broker that is being added already exists in the system + if it's Status is `Down` - // meaning that this broker has disconnected in one of many possible ways, including user interference, unexpected system crush - // or any other reason. Observer should try and sync with the brokers via the brokers provided id. - pub fn connect_broker(&mut self, stream: TcpStream) -> Result { - println!("NEW BROKER: {:?}", stream); - // Handshake process between the Broker and Observer happening in get_broker_metadata - let (id, addr, stream) = self.get_broker_metadata(stream)?; - println!("BROKER METADATA: {} {} {:?}", id, addr, stream); - let mut brokers_lock = self.brokers.lock().unwrap(); - println!("AQUIRED BROKER LOCK"); - let broker_id = - if let Some(disconnected_broker) = brokers_lock.iter_mut().find(|b| b.id == id) { - disconnected_broker.restore(stream, addr)?; - self.spawn_broker_reader(disconnected_broker)?; - disconnected_broker.id.clone() - } else { - let mut broker = Broker::from(id, Some(stream), addr)?; - self.spawn_broker_reader(&broker)?; - // Need to replicate the pending partitions if there is any - replicate_pending_partitions_once( - &mut self.pending_replication_partitions, - &mut broker, - )?; - let broker_id = broker.id.clone(); - brokers_lock.push(broker); - broker_id - }; - - // Releaseing lock for broadcast_cluster_metadata - drop(brokers_lock); - - self.broadcast_cluster_metadata()?; - - Ok(broker_id) - } - - pub fn get_cluster_metadata(&self) -> Result { - let brokers = self.brokers.lock().unwrap(); - - let metadata_brokers: Vec = brokers - .iter() - .map(|b| BrokerDetails { - id: b.id.clone(), - addr: b.addr.clone(), - status: b.status, - partitions: b - .partitions - .iter() - .map(|p| PartitionDetails { - id: p.id.clone(), - replica_id: p.replica_id.to_string(), - role: p.role, - topic: p.topic.lock().unwrap().clone(), - partition_number: p.partition_number, - replica_count: p.replica_count, - }) - .collect(), - }) - .collect(); - - let topics: Vec<_> = self - .topics - .iter() - .map(|t| { - let t_lock = t.lock().unwrap(); - t_lock.clone() - }) - .collect(); - - Ok(Metadata { - brokers: metadata_brokers, - topics, - }) - } - - fn broadcast_cluster_metadata(&mut self) -> Result<(), String> { - let metadata = self.get_cluster_metadata()?; - - self.save_cluster_state()?; - - let mut brokers = self.brokers.lock().unwrap(); - - let mut broker_streams: Vec<_> = brokers - .iter_mut() - .filter_map(|b| b.stream.as_mut()) - .collect(); - - let mut followers_streams: Vec<_> = self.followers.iter_mut().collect(); - - println!("Followers: {:?}", followers_streams); - - let message = shared_structures::Message::ClusterMetadata { metadata }; - - println!("BROADCASTING!!!"); - - Broadcast::all(&mut followers_streams[..], &message)?; - Broadcast::all(&mut broker_streams[..], &message) - } - - // Will return the name of created topic on success - pub fn create_topic(&mut self, topic_name: &str) -> Result { - let brokers_lock = self.brokers.lock().unwrap(); - - let available_brokers = brokers_lock - .iter() - .filter(|b| b.status == Status::Up) - .count(); - - if available_brokers == 0 { - return Err( - "No brokers have been found, please make sure at least one broker is connected." - .to_string(), - ); - } - - let topic_exists = self.topics.iter().any(|t| { - let t = t.lock().unwrap(); - t.name == *topic_name - }); - - if topic_exists { - return Err(format!("Topic `{}` already exist.", topic_name)); - } - - let topic = Topic::new_shared(topic_name.to_string()); - self.topics.push(topic); - - Ok(topic_name.to_string()) - } - - // Need to rebalance if new partition is added to the broker - pub fn create_partition(&mut self, topic_name: &str) -> Result { - let mut brokers_lock = self.brokers.lock().unwrap(); - - if brokers_lock.len() == 0 { - return Err( - "No brokers have been found, please make sure at least one broker is connected." - .to_string(), - ); - } - - let topic = self.topics.iter_mut().find(|t| { - let t = t.lock().unwrap(); - t.name == *topic_name - }); - - let replica_factor = self - .config - .get_number("replica_factor") - .ok_or("Replica factor is not defined in the config, action aborted.")?; - - if let Some(topic) = topic { - let mut topic_lock = topic.lock().unwrap(); - // We've got 1 partition, and N replications for each partition (where N brokers count) - topic_lock.partition_count += 1; - - let partition = Partition::new(topic, topic_lock.partition_count); - - drop(topic_lock); - - // Need to add partition replicas - replicate_partition( - &mut self.pending_replication_partitions, - &mut brokers_lock, - replica_factor as usize, - &partition, - )?; - - // Releaseing lock for broadcast_cluster_metadata - drop(brokers_lock); - - self.broadcast_cluster_metadata()?; - - Ok(partition.id.clone()) - - // TODO: Should begin leadership race among replications of the Partition. - } else { - Err(format!("Topic `{}` doesn't exist.", topic_name)) - } - } - - fn get_broker_metadata( - &self, - mut stream: TcpStream, - ) -> Result<(String, String, TcpStream), String> { - if let Message::BrokerConnectionDetails { id, addr } = - Reader::read_one_message(&mut stream)? - { - Ok((id, addr, stream)) - } else { - Err("Handshake with client failed, wrong message received from client.".to_string()) - } - } - - // TODO: What should the distribution_manager do when there is only one broker, and it has disconnected due to a crash? - // Distribution manager should start a failover, meaning it should find the replica that has disconnected and set all partition to PendingCreation - // Each Partition should have a replica_id which is unique per replica to find it if such case occures. - - // TODO: What happens when a broker has lost connection? We need to find a new leader for all partition leaders. - fn spawn_broker_reader(&self, broker: &Broker) -> Result<(), String> { - if let Some(broker_stream) = &broker.stream { - let watch_stream = broker_stream.try_clone().map_err(|e| e.to_string())?; - - let brokers = Arc::clone(&self.brokers); - let broker_id = broker.id.clone(); - - let throttle = self - .config - .get_number("throttle") - .ok_or("Throttle is missing form the configuration file.")?; - - std::thread::spawn(move || { - let mut reader = BufReader::new(watch_stream); - let mut buf = String::with_capacity(1024); - - loop { - let size = match reader.read_line(&mut buf) { - Ok(s) => s, - Err(e) => { - println!("Error in broker read thread: {}", e); - println!("Retrying to read with throttling at {}ms", throttle); - std::thread::sleep(Duration::from_millis(throttle as u64)); - continue; - } - }; - - // TODO: Think what should happen to the metadata of the broker that has been disconnected. - if size == 0 { - println!("Broker {} has disconnected.", broker_id); - - let mut brokers_lock = brokers.lock().unwrap(); - - if let Some(broker) = brokers_lock.iter_mut().find(|b| b.id == broker_id) { - broker.disconnect(); - - let offline_partitions: Vec<_> = broker - .get_offline_partitions() - .iter() - .map(|p| (&p.id, &p.replica_id, p.replica_count)) - .collect(); - - for offline_partition in offline_partitions.iter() { - println!( - "Broker {}:\t{}\t{}\t{}", - broker.id, - offline_partition.0, - offline_partition.1, - offline_partition.2 - ); - } - } else { - println!("Failed to find the Broker in the system, this can lead to major data loses."); - println!("Please let us know about this message by creating an issue on our GitHub repository https://github.com/pwbh/nyx/issues/new"); - } - break; - } - - buf.clear(); - } - }); - Ok(()) - } else { - println!("Ignoring spawning broker reader as Observer follower"); - Ok(()) - } - } -} - -pub fn broadcast_replicate_partition( - broker: &mut Broker, - replica: &mut Partition, -) -> Result<(), String> { - if let Some(broker_stream) = &mut broker.stream { - Broadcast::to( - broker_stream, - &Message::CreatePartition { - id: replica.id.clone(), - replica_id: replica.replica_id.clone(), - topic: replica.topic.lock().unwrap().clone(), - partition_number: replica.partition_number, - replica_count: replica.replica_count, - }, - )?; - } else { - println!("Ignoring broadcasting message as Observer follower") - } - // After successful creation of the partition on the broker, - // we can set its status on the observer to Active. - replica.status = Status::Up; - - Ok(()) -} - -fn replicate_pending_partitions_once( - pending_replication_partitions: &mut Vec<(usize, Partition)>, - new_broker: &mut Broker, -) -> Result<(), String> { - for (replications_needed, partition) in pending_replication_partitions.iter_mut().rev() { - let mut replica = Partition::replicate(partition, partition.replica_count + 1); - broadcast_replicate_partition(new_broker, &mut replica)?; - new_broker.partitions.push(replica); - partition.replica_count += 1; - *replications_needed -= 1; - } - - println!("{:#?}", pending_replication_partitions); - - // Remove totally replicated partitions - loop { - let current = pending_replication_partitions.last(); - - if let Some((pending_replications, _)) = current { - if *pending_replications == 0 { - pending_replication_partitions.pop(); - } else { - break; - } - } else { - break; - } - } - - Ok(()) -} - -fn replicate_partition( - pending_replication_partitions: &mut Vec<(usize, Partition)>, - brokers_lock: &mut MutexGuard<'_, Vec>, - replica_factor: usize, - partition: &Partition, -) -> Result<(), String> { - // Here we create a variable containing the total available brokers in the cluster to check whether it is less - // then replication factor, if so we certain either way that we will be able to replicate to all partitions - let total_available_brokers = brokers_lock - .iter() - .filter(|b| b.status == Status::Up) - .count(); - let mut future_replications_required = replica_factor as i32 - total_available_brokers as i32; - - if future_replications_required > 0 { - // total_available_brokers - is the next replication that should be added by the count. - let replica = Partition::replicate(partition, total_available_brokers); - pending_replication_partitions.push((future_replications_required as usize, replica)); - } else { - future_replications_required = 0; - } - - let current_max_replications = replica_factor - future_replications_required as usize; - - for replica_count in 1..=current_max_replications { - let least_distributed_broker = get_least_distributed_broker(brokers_lock, partition)?; - let mut replica = Partition::replicate(partition, replica_count); - broadcast_replicate_partition(least_distributed_broker, &mut replica)?; - least_distributed_broker.partitions.push(replica); - } - - Ok(()) -} - -fn get_least_distributed_broker<'a>( - brokers_lock: &'a mut MutexGuard<'_, Vec>, - partition: &'a Partition, -) -> Result<&'a mut Broker, String> { - let mut brokers_lock_iter = brokers_lock.iter().enumerate(); - - let first_element = brokers_lock_iter - .next() - .ok_or("At least 1 registerd broker is expected in the system.")?; - - let mut least_distribured_broker_index: usize = first_element.0; - let mut least_distributed_broker: usize = first_element.1.partitions.len(); - - for (i, b) in brokers_lock_iter { - if b.partitions.iter().all(|p| p.id != partition.id) - && least_distributed_broker > b.partitions.len() - { - least_distributed_broker = b.partitions.len(); - least_distribured_broker_index = i; - } - } - - Ok(&mut brokers_lock[least_distribured_broker_index]) -} - -#[cfg(test)] -mod tests { - use std::{fs, io::Write, net::TcpListener}; - - use uuid::Uuid; - - use super::*; - - fn cleanup_after_test(custom_test_name: &str) { - let mut custom_path = PathBuf::new(); - custom_path.push("/observer/"); - custom_path.push(custom_test_name); - - let test_files_path = DirManager::get_base_dir(Some(&custom_path)).unwrap(); - match fs::remove_dir_all(&test_files_path) { - Ok(_) => { - println!("Deleted {:?}", test_files_path) - } - - Err(e) => println!("Could not delete {:?} | {}", test_files_path, e), - } - } - - fn config_mock() -> Config { - Config::from("../config/dev.properties".into()).unwrap() - } - - fn get_custom_test_name() -> String { - format!("test_{}", Uuid::new_v4().to_string()) - } - - fn bootstrap_distribution_manager( - config: Option, - custom_test_name: &str, - ) -> Arc> { - let config = if let Some(config) = config { - config - } else { - config_mock() - }; - - let distribution_manager: Arc> = - DistributionManager::from(config, Some(&custom_test_name.to_string())).unwrap(); - - distribution_manager - } - - fn mock_connecting_broker(addr: &str) -> TcpStream { - let mut mock_stream = TcpStream::connect(addr).unwrap(); - - let mut payload = serde_json::to_string(&Message::BrokerConnectionDetails { - id: uuid::Uuid::new_v4().to_string(), - addr: "localhost:123123".to_string(), - }) - .unwrap(); - payload.push('\n'); - mock_stream.write(payload.as_bytes()).unwrap(); - - let read_stream = mock_stream.try_clone().unwrap(); - - std::thread::spawn(|| { - let mut reader = BufReader::new(read_stream); - let mut buf = String::with_capacity(1024); - - loop { - let size = reader.read_line(&mut buf).unwrap(); - - println!("{}", buf); - - if size == 0 { - break; - } - - buf.clear(); - } - }); - - mock_stream - } - - fn setup_distribution_for_tests( - config: Config, - port: &str, - custom_test_name: &str, - ) -> Arc> { - let distribution_manager = bootstrap_distribution_manager(Some(config), &custom_test_name); - let mut distribution_manager_lock = distribution_manager.lock().unwrap(); - - let addr = format!("localhost:{}", port); - let listener = TcpListener::bind(&addr).unwrap(); - - // Create 3 brokers to test the balancing of created partitions - mock_connecting_broker(&addr); - mock_connecting_broker(&addr); - mock_connecting_broker(&addr); - - // Simulate acceptence of brokers - for _ in 0..3 { - let stream = listener.incoming().next().unwrap().unwrap(); - distribution_manager_lock.connect_broker(stream).unwrap(); - } - - drop(distribution_manager_lock); - - distribution_manager - } - - #[test] - #[cfg_attr(miri, ignore)] - fn create_brokers_works_as_expected() { - let custom_test_name = get_custom_test_name(); - let distribution_manager = bootstrap_distribution_manager(None, &custom_test_name); - let mut distribution_manager_lock = distribution_manager.lock().unwrap(); - - let addr = "localhost:0"; - let listener = TcpListener::bind(addr).unwrap(); - - let connection_addr = format!("localhost:{}", listener.local_addr().unwrap().port()); - - let spawned_thread = std::thread::spawn(move || { - let (stream, _) = listener.accept().unwrap(); - stream - }); - - mock_connecting_broker(&connection_addr); - - let stream = spawned_thread.join().unwrap(); - - let result = distribution_manager_lock.connect_broker(stream); - - println!("{:?}", result); - - assert!(result.is_ok()); - - cleanup_after_test(&custom_test_name); - } - - #[test] - #[cfg_attr(miri, ignore)] - fn create_topic_fails_when_no_brokers() { - let custom_test_name = get_custom_test_name(); - let distribution_manager = bootstrap_distribution_manager(None, &custom_test_name); - let mut distribution_manager_lock = distribution_manager.lock().unwrap(); - - let topic_name = "new_user_registered"; - - // Before - let result = distribution_manager_lock - .create_topic(topic_name) - .unwrap_err(); - - assert!(result.contains("No brokers have been found")); - - cleanup_after_test(&custom_test_name); - } - - #[test] - #[cfg_attr(miri, ignore)] - fn create_topic_works_as_expected_when_brokers_exist() { - let custom_test_name = get_custom_test_name(); - - let config = config_mock(); - - // After brokers have connnected to the Observer - let distribution_manager = setup_distribution_for_tests(config, "5001", &custom_test_name); - let mut distribution_manager_lock = distribution_manager.lock().unwrap(); - - let topic_name = "new_user_registered"; - - let topics_count_before_add = distribution_manager_lock.topics.iter().count(); - - distribution_manager_lock.create_topic(topic_name).unwrap(); - - let topic_count_after_add = distribution_manager_lock.topics.iter().count(); - - assert_eq!(topic_count_after_add, topics_count_before_add + 1); - - // We cant add the same topic name twice - Should error - let result = distribution_manager_lock - .create_topic(topic_name) - .unwrap_err(); - - assert!(result.contains("already exist.")); - - let topics_count_before_add = distribution_manager_lock.topics.iter().count(); - - let another_topic_name = "notification_resent"; - - distribution_manager_lock - .create_topic(another_topic_name) - .unwrap(); - - let topic_count_after_add = distribution_manager_lock.topics.iter().count(); - - assert_eq!(topic_count_after_add, topics_count_before_add + 1); - - let last_element_in_topics = distribution_manager_lock - .topics - .last() - .unwrap() - .lock() - .unwrap(); - - assert_eq!(last_element_in_topics.name, another_topic_name); - - cleanup_after_test(&custom_test_name); - } - - fn get_brokers_with_replicas( - brokers_lock: &MutexGuard<'_, Vec>, - partition_id: &str, - ) -> usize { - brokers_lock - .iter() - .filter(|b| b.partitions.iter().find(|p| p.id == partition_id).is_some()) - .count() - } - - #[test] - #[cfg_attr(miri, ignore)] - fn create_partition_distributes_replicas() { - let custom_test_name = get_custom_test_name(); - let config = config_mock(); - - let replica_factor = config.get_number("replica_factor").unwrap(); - - let distribution_manager = setup_distribution_for_tests(config, "5002", &custom_test_name); - let mut distribution_manager_lock = distribution_manager.lock().unwrap(); - - let notifications_topic = "notifications"; - - // Create 'notifications' topic - distribution_manager_lock - .create_topic(notifications_topic) - .unwrap(); - - // First partition for topic 'notifications' - let partition_id_1 = distribution_manager_lock - .create_partition(notifications_topic) - .unwrap(); - - let brokers_lock = distribution_manager_lock.brokers.lock().unwrap(); - let total_brokers_with_replicas = get_brokers_with_replicas(&brokers_lock, &partition_id_1); - assert_eq!(total_brokers_with_replicas, replica_factor as usize); - drop(brokers_lock); - - // Second partition for topic 'notifications' - let partition_id_2 = distribution_manager_lock - .create_partition(notifications_topic) - .unwrap(); - - let brokers_lock = distribution_manager_lock.brokers.lock().unwrap(); - let total_brokers_with_replicas = get_brokers_with_replicas(&brokers_lock, &partition_id_2); - assert_eq!(total_brokers_with_replicas, replica_factor as usize); - drop(brokers_lock); - - let comments_topic = "comments"; - - // Create 'comments' topic - distribution_manager_lock - .create_topic(comments_topic) - .unwrap(); - - // First partition for topic 'comments' - let partition_id_3 = distribution_manager_lock - .create_partition(comments_topic) - .unwrap(); - - let brokers_lock = distribution_manager_lock.brokers.lock().unwrap(); - let total_brokers_with_replicas = get_brokers_with_replicas(&brokers_lock, &partition_id_3); - assert_eq!(total_brokers_with_replicas, replica_factor as usize); - drop(brokers_lock); - - // Second partition for topic 'comments' - let partition_id_4 = distribution_manager_lock - .create_partition(comments_topic) - .unwrap(); - - let brokers_lock = distribution_manager_lock.brokers.lock().unwrap(); - let total_brokers_with_replicas = get_brokers_with_replicas(&brokers_lock, &partition_id_4); - assert_eq!(total_brokers_with_replicas, replica_factor as usize); - drop(brokers_lock); - - let friend_requests_topic = "friend_requests"; - - // Create 'friend_requests' topic - distribution_manager_lock - .create_topic(friend_requests_topic) - .unwrap(); - - // First partition for topic 'friend_requests' - let partition_id_5 = distribution_manager_lock - .create_partition("friend_requests") - .unwrap(); - - let brokers_lock = distribution_manager_lock.brokers.lock().unwrap(); - let total_brokers_with_replicas = get_brokers_with_replicas(&brokers_lock, &partition_id_5); - assert_eq!(total_brokers_with_replicas, replica_factor as usize); - drop(brokers_lock); - - let mut unique_partitions = distribution_manager_lock - .brokers - .lock() - .unwrap() - .iter() - .flat_map(|b| b.partitions.clone()) - .collect::>(); - - unique_partitions.dedup_by_key(|p| p.id.clone()); - - println!("PARTITIONS: {:#?}", unique_partitions); - - let total_replicas_in_brokers: usize = distribution_manager_lock - .brokers - .lock() - .unwrap() - .iter() - .map(|b| b.partitions.len()) - .sum(); - - assert_eq!(unique_partitions.len(), total_replicas_in_brokers); - - // Testing that all brokers have the same amount of partition replicas (meaning it was balanced well - max fault tolerence) - let brokers_lock = distribution_manager_lock.brokers.lock().unwrap(); - - if let Some((first, other)) = brokers_lock.split_first() { - for broker in other { - assert_eq!(first.partitions.len(), broker.partitions.len()); - } - } - - drop(brokers_lock); - - println!("{:#?}", distribution_manager_lock.brokers); - - cleanup_after_test(&custom_test_name); - } -} diff --git a/observer/src/distribution_manager/partition.rs b/observer/src/distribution_manager/partition.rs deleted file mode 100644 index f6038d4..0000000 --- a/observer/src/distribution_manager/partition.rs +++ /dev/null @@ -1,36 +0,0 @@ -use std::sync::{Arc, Mutex}; - -use shared_structures::{Role, Status, Topic}; - -#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -pub struct Partition { - pub id: String, - pub replica_id: String, - pub status: Status, - pub topic: Arc>, - pub role: Role, - pub partition_number: usize, - pub replica_count: usize, -} - -impl Partition { - pub fn new(topic: &Arc>, partition_number: usize) -> Self { - Self { - id: uuid::Uuid::new_v4().to_string(), - replica_id: uuid::Uuid::new_v4().to_string(), - status: Status::Created, - topic: topic.clone(), - role: Role::Follower, - partition_number, - replica_count: 0, - } - } - - pub fn replicate(partition: &Self, replica_count: usize) -> Self { - Self { - replica_id: uuid::Uuid::new_v4().to_string(), - replica_count, - ..partition.clone() - } - } -} diff --git a/observer/src/lib.rs b/observer/src/lib.rs deleted file mode 100644 index 06094e5..0000000 --- a/observer/src/lib.rs +++ /dev/null @@ -1,97 +0,0 @@ -pub mod command_processor; -pub mod config; -pub mod distribution_manager; - -use std::{ - net::TcpListener, - sync::{Arc, Mutex}, -}; - -use command_processor::CommandProcessor; -use config::Config; -use distribution_manager::DistributionManager; -use shared_structures::Role; -use sysinfo::{CpuExt, DiskExt, System, SystemExt}; -use uuid::Uuid; - -pub const DEV_CONFIG: &str = "dev.properties"; -pub const PROD_CONFIG: &str = "prod.properties"; - -const DEFAULT_PORT: u16 = 2828; - -pub const CLUSTER_FILE: &str = "cluster.json"; - -pub struct Observer { - pub id: String, - pub role: Role, - pub listener: TcpListener, - pub distribution_manager: Arc>, - pub command_processor: CommandProcessor, - pub system: System, -} - -impl Observer { - pub fn from( - config_path: &str, - leader: Option<&String>, - name: Option<&String>, - ) -> Result { - let role = if leader.is_none() { - Role::Leader - } else { - Role::Follower - }; - - let mut system = System::new_all(); - - let port: u16 = if let Ok(port) = std::env::var("PORT") { - port.parse::().unwrap_or(DEFAULT_PORT) - } else { - DEFAULT_PORT - }; - - let config = Config::from(config_path.into())?; - - let distribution_manager = DistributionManager::from(config, name)?; - - let command_processor = CommandProcessor::new(); - - let host = format!("localhost:{}", port); - - let listener = TcpListener::bind(host).map_err(|e| e.to_string())?; - - system.refresh_all(); - - let mut total_disk_utilization: f64 = 0.0; - - // Display disk status - for disk in system.disks() { - total_disk_utilization += disk.total_space() as f64 - } - - total_disk_utilization = - (total_disk_utilization / system.disks().len() as f64) * 1.0 * 10f64.powf(-9.0); - - println!("Disk space: {:.2} GiB", total_disk_utilization); - let mut total_cpu_utilization = 0f32; - - for cpu in system.cpus() { - total_cpu_utilization += cpu.cpu_usage(); - } - - total_cpu_utilization /= system.cpus().len() as f32; - - println!("CPU utilization: {:.1}%", total_cpu_utilization); - - let observer = Self { - id: Uuid::new_v4().to_string(), - role, - distribution_manager, - command_processor, - listener, - system, - }; - - Ok(observer) - } -} diff --git a/observer/src/main.rs b/observer/src/main.rs deleted file mode 100644 index eb15164..0000000 --- a/observer/src/main.rs +++ /dev/null @@ -1,296 +0,0 @@ -use clap::{arg, command}; -use observer::{distribution_manager::DistributionManager, Observer, DEV_CONFIG, PROD_CONFIG}; -use shared_structures::{println_c, Broadcast, EntityType, Message, MessageDecoder, Reader, Role}; -use std::{ - io::{BufRead, BufReader}, - net::TcpStream, - sync::{Arc, Mutex, MutexGuard}, -}; - -// TODO: Leader should delegate all messages to followers, for example it should delegate create broker commands to followers, etc. -fn main() -> Result<(), String> { - let default_config_path_by_env = get_config_path_by_env(); - let matches = command!().arg( - clap::Arg::new("config") - .required(false) - ).arg( - arg!(-f --follow "Runs the Observer as a follower for leader located at , Host MUST by booted without -f flag.") - .required(false) - ).arg( - arg!(-n --name "Assigns a name to the broker, names are useful if you want to run two brokers on the same machine. Useful for nyx maintainers testing multi-node features.") - .required(false) - ).get_matches(); - - let leader = matches.get_one::("follow"); - let name = matches.get_one::("name"); - - if leader.is_some() && name.is_none() { - return Err("Name should be provided if following functionality enabled.".to_string()); - } - - let config_path = matches - .get_one::("config") - .unwrap_or(&default_config_path_by_env); - - let mut observer = Observer::from(config_path, leader, name)?; - - if observer.role == Role::Leader { - println_c( - &format!( - "Observer is ready to accept brokers on port {}", - observer.listener.local_addr().unwrap().port() - ), - 35, - ); - } else if let Some(name) = name { - println_c(&format!("Started following leader as {}", name), 50) - } else { - println_c("Started following leader", 50) - } - - let mut connections_distribution_manager = observer.distribution_manager.clone(); - - // Connections listener - std::thread::spawn(move || loop { - let connection = observer.listener.incoming().next(); - - if let Some(stream) = connection { - match stream { - Ok(mut stream) => { - println!("stream: {:#?}", stream); - if let Ok(message) = Reader::read_one_message(&mut stream) { - match message { - Message::EntityWantsToConnect { - entity_type: EntityType::Observer, - } => { - match handle_connect_observer_follower( - &mut connections_distribution_manager, - stream, - ) { - Ok(observer_follower_id) => { - println!( - "Observer follower connected {}", - observer_follower_id - ) - } - Err(e) => { - println!("Error while establishing connection: {}", e) - } - } - } - Message::EntityWantsToConnect { - entity_type: EntityType::Broker, - } => match handle_connect_broker( - &mut connections_distribution_manager, - stream, - ) { - Ok(broker_id) => println!("Broker {} connected", broker_id), - Err(e) => { - println!("Error while establishing connection: {}", e) - } - }, - _ => { - println!("Handhsake failed, message could not be verified from connecting entity.") - } - } - } else { - println!("Could not decode the provided message, skipping connection.") - } - } - Err(e) => println!("Failed to establish basic TCP connection: {}", e), - } - } - }); - - let mut followers_distribution_manager = observer.distribution_manager.clone(); - - println!("{:?}", leader); - - // Leader obsrver exists, enabling the follower functionality, instead of - // the leader functionality which is able to create partitions, create topics etc - if let Some(leader) = leader.cloned() { - // TODO: connect to leader - let mut leader_stream = TcpStream::connect(leader).unwrap(); - - match Broadcast::to( - &mut leader_stream, - &shared_structures::Message::EntityWantsToConnect { - entity_type: EntityType::Observer, - }, - ) { - Ok(_) => println!("Sent connection request to leader."), - Err(e) => { - return Err(format!("Failed connecting to leader: {}", e)); - } - }; - - let mut reader: BufReader<&mut TcpStream> = BufReader::new(&mut leader_stream); - let mut buf = String::with_capacity(1024); - - loop { - // TODO: constantly read delegated messages from leader - let bytes_read = reader - .read_line(&mut buf) - .map_err(|e| format!("Leader follower error: {}", e))?; - - if bytes_read == 0 { - println!("Leader has closed connection. Exiting."); - break; - } - - match handle_delegated_message(&buf, &mut followers_distribution_manager) { - Ok(_) => println!("Received delgated cluster metadata successfully"), - Err(e) => println!("Cluster metadata delegation error: {}", e), - }; - - buf.clear(); - } - } else { - // This will make sure our main thread will never exit until the user will issue an EXIT command by himself - loop { - match observer.command_processor.process_raw_command() { - Ok(command) => match command { - observer::command_processor::Command { - name: observer::command_processor::CommandName::Create, - .. - } => { - match handle_create_command(&mut observer.distribution_manager, &command) { - Ok(()) => println!("\x1b[38;5;2mOK\x1b[0m"), - Err(e) => println!("\x1b[38;5;1mERROR:\x1b[0m {}", e), - } - } - observer::command_processor::Command { - name: observer::command_processor::CommandName::List, - .. - } => match handle_list_command(&mut observer.distribution_manager, &command) { - Ok(()) => println!("\x1b[38;5;2mOK\x1b[0m"), - Err(e) => println!("\x1b[38;5;1mERROR:\x1b[0m {}", e), - }, - }, - Err(e) => println!("\x1b[38;5;1mERROR:\x1b[0m {}", e), - }; - } - } - - Ok(()) -} - -fn get_config_path_by_env() -> String { - let file_name = if cfg!(debug_assertions) { - DEV_CONFIG - } else { - PROD_CONFIG - }; - - format!("./config/{}", file_name) -} - -fn handle_delegated_message( - raw_message: &str, - distribution_manager: &mut Arc>, -) -> Result<(), String> { - let delegated_message = MessageDecoder::decode(raw_message)?; - - println!("Delegated messaeg: {:?}", delegated_message); - - match delegated_message { - Message::ClusterMetadata { metadata } => { - let mut distribution_manager_lock = distribution_manager.lock().unwrap(); - distribution_manager_lock.load_cluster_state(&metadata)?; - distribution_manager_lock.save_cluster_state() - } - _ => Err("Could not read delegated cluster metadata".to_string()), - } -} - -fn handle_list_command( - distribution_manager: &mut Arc>, - command: &observer::command_processor::Command, -) -> Result<(), String> { - println!("TRYING TO AQUIRE LOCK!"); - let distribution_manager_lock = distribution_manager.lock().unwrap(); - println!("LOCK AQUIRED!!!"); - let level = command.arguments.first().unwrap(); - - if level == "ALL" { - print_list_all(&distribution_manager_lock); - } else { - return Err(format!( - "Requested listing depth `{}` is not supported", - level - )); - } - - Ok(()) -} - -fn print_list_all(distribution_manager_lock: &MutexGuard<'_, DistributionManager>) { - let brokers_lock = distribution_manager_lock.brokers.lock().unwrap(); - - println!("."); - for broker in brokers_lock.iter() { - println!("├── Broker {}", broker.id); - for partition in broker.partitions.iter() { - println!("│ ├── Partition {}", partition.id) - } - } -} - -fn handle_create_command( - distribution_manager: &mut Arc>, - command: &observer::command_processor::Command, -) -> Result<(), String> { - let mut arguments_iter = command.arguments.iter(); - - match arguments_iter.next() { - Some(entity) => match entity.trim() { - "TOPIC" => handle_create_topic(distribution_manager, &mut arguments_iter), - "PARTITION" => handle_create_partition(distribution_manager, &mut arguments_iter), - _ => Err("Unrecognized entity has been provided.".to_string()), - }, - None => Err("Entity type was not provided.".to_string()), - } -} - -fn handle_connect_observer_follower( - distribution_manager: &mut Arc>, - stream: TcpStream, -) -> Result { - let mut distribution_manager_lock = distribution_manager.lock().unwrap(); - let stream_addr = stream.peer_addr().map_err(|e| e.to_string())?; - distribution_manager_lock.followers.push(stream); - Ok(stream_addr.ip().to_string()) -} - -fn handle_connect_broker( - distribution_manager: &mut Arc>, - stream: TcpStream, -) -> Result { - let mut distribution_manager_lock = distribution_manager.lock().unwrap(); - distribution_manager_lock.connect_broker(stream) -} - -fn handle_create_topic( - distribution_manager: &mut Arc>, - arguments_iter: &mut std::slice::Iter<'_, String>, -) -> Result<(), String> { - let topic_name = arguments_iter - .next() - .ok_or("Please provide topic name for which you want to create the topic.".to_string())?; - let mut distribution_manager_lock: std::sync::MutexGuard<'_, DistributionManager> = - distribution_manager.lock().unwrap(); - distribution_manager_lock.create_topic(topic_name)?; - Ok(()) -} - -fn handle_create_partition( - distribution_manager: &mut Arc>, - arguments_iter: &mut std::slice::Iter<'_, String>, -) -> Result<(), String> { - let topic_name = arguments_iter.next().ok_or( - "Please provide a valid topic name for which you want to create a partition.".to_string(), - )?; - let mut distribution_manager_lock = distribution_manager.lock().unwrap(); - distribution_manager_lock.create_partition(topic_name)?; - Ok(()) -} diff --git a/producer/Cargo.toml b/producer/Cargo.toml deleted file mode 100644 index f226d40..0000000 --- a/producer/Cargo.toml +++ /dev/null @@ -1,14 +0,0 @@ -[package] -name = "producer" -edition.workspace = true -version.workspace = true -description = "Producer is the entity which pushes messages onto the brokers" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -shared_structures = { path = "../shared_structures" } - -clap.workspace = true -serde.workspace = true -serde_json.workspace = true diff --git a/producer/src/lib.rs b/producer/src/lib.rs deleted file mode 100644 index e0a01e9..0000000 --- a/producer/src/lib.rs +++ /dev/null @@ -1,109 +0,0 @@ -use std::{ - io::{BufRead, BufReader}, - net::TcpStream, -}; - -use shared_structures::{metadata::BrokerDetails, Broadcast, Reader}; - -pub struct Producer { - pub mode: String, - pub broker_details: BrokerDetails, - pub stream: TcpStream, - pub topic: String, - pub destination_replica_id: String, -} - -impl Producer { - pub fn from(brokers: &str, mode: &str, topic: &str) -> Result { - let brokers: Vec<_> = brokers - .split_terminator(',') - .map(|b| b.to_string()) - .collect(); - - if brokers.is_empty() { - return Err("No brokers were provided".to_string()); - } - - // Get metadata from the first broker we are connecting to (Doesn't really matter from which one) - // We are just looking for the broker that holds the leader to the topic we want to push to - let mut stream = TcpStream::connect(&brokers[0]).map_err(|e| e.to_string())?; - - // Request cluster metadata from the first random broker we are conected to in the provided list - Broadcast::to( - &mut stream, - &shared_structures::Message::RequestClusterMetadata, - )?; - - let message = Reader::read_one_message(&mut stream)?; - - match message { - shared_structures::Message::ClusterMetadata { - metadata: cluster_metadata, - } => { - let broker_details = cluster_metadata - .brokers - .iter() - .find(|b| b.partitions.iter().any(|p| p.topic.name == topic)) - .ok_or("Broker with desired partition has not been found.")?; - - let partition_details = broker_details - .partitions - .iter() - .find(|p| p.topic.name == topic) - .ok_or("Couldn't find the desited partition on selected broker")?; - - // If the random broker we connected to happen to be the correct one, - // no need to reconnect already connected. - - let peer_addr = stream.peer_addr().map_err(|e| format!("Producer: {}", e))?; - - let stream = if peer_addr.to_string() == broker_details.addr { - stream - } else { - TcpStream::connect(&broker_details.addr).map_err(|e| e.to_string())? - }; - - println!("Stream: {:#?}", stream); - - let producer = Self { - mode: mode.to_string(), - broker_details: broker_details.clone(), - stream, - topic: topic.to_string(), - destination_replica_id: partition_details.replica_id.clone(), - }; - - producer.open_broker_reader()?; - - Ok(producer) - } - _ => Err("Wrong message received on handshake".to_string()), - } - } - - fn open_broker_reader(&self) -> Result<(), String> { - let reader_stream = self - .stream - .try_clone() - .map_err(|e| format!("Producer: {}", e))?; - - std::thread::spawn(|| { - let mut buf = String::with_capacity(1024); - let mut reader = BufReader::new(reader_stream); - - loop { - let bytes_read = reader.read_line(&mut buf).unwrap(); - - if bytes_read == 0 { - break; - } - - println!("Recieved message from broker: {:#?}", buf); - - buf.clear(); - } - }); - - Ok(()) - } -} diff --git a/producer/src/main.rs b/producer/src/main.rs deleted file mode 100644 index f9767a0..0000000 --- a/producer/src/main.rs +++ /dev/null @@ -1,46 +0,0 @@ -use std::io::stdin; - -use clap::{arg, command}; -use producer::Producer; -use serde_json::json; -use shared_structures::Broadcast; - -fn main() -> Result<(), String> { - let matches = command!() - .arg(arg!(-b --brokers "List of brokers to connect to seperated by comma e.g. localhost:3000,localhost:4000,...").required(true)) - .arg(arg!(-t --topic "The name of the topic onto which producer is going to push messages").required(true)) - .arg(arg!(-m --mode "In which mode you want to run the producer 'test' or 'production', defaults to 'production'").required(false).default_value("production")) - .get_matches(); - - let brokers = matches.get_one::("brokers").unwrap(); - let mode = matches.get_one::("mode").unwrap(); - let topic = matches.get_one::("topic").unwrap(); - - let mut producer = Producer::from(brokers, mode, topic)?; - - println!("Broker details: {:#?}", producer.broker_details); - - println!("Broadcasting a test message to the partition"); - - Broadcast::to( - &mut producer.stream, - &shared_structures::Message::ProducerMessage { - replica_id: producer.destination_replica_id, - payload: json!({"message": "test"}), - }, - )?; - - let mut buf = String::with_capacity(1024); - - loop { - stdin().read_line(&mut buf).unwrap(); - - if buf == "EXIT" { - break; - } - - buf.clear() - } - - Ok(()) -} diff --git a/shared_structures/Cargo.toml b/shared_structures/Cargo.toml deleted file mode 100644 index edc8460..0000000 --- a/shared_structures/Cargo.toml +++ /dev/null @@ -1,12 +0,0 @@ -[package] -name = "shared_structures" -version.workspace = true -edition.workspace = true -description = "Shared structures between the Observer and Broker" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -uuid.workspace = true -serde.workspace = true -serde_json.workspace = true diff --git a/shared_structures/src/broadcast/mod.rs b/shared_structures/src/broadcast/mod.rs deleted file mode 100644 index 5b92bfa..0000000 --- a/shared_structures/src/broadcast/mod.rs +++ /dev/null @@ -1,164 +0,0 @@ -use std::{io::Write, net::TcpStream}; - -use crate::Message; - -pub struct Broadcast; - -impl Broadcast { - pub fn all(streams: &mut [&mut TcpStream], message: &Message) -> Result<(), String> { - let mut payload = serde_json::to_string(message) - .map_err(|_| "Couldn't serialize the data structure to send.".to_string())?; - - payload.push('\n'); - - for stream in streams.iter_mut() { - let bytes_written = stream - .write(payload.as_bytes()) - .map_err(|e| e.to_string())?; - - if bytes_written == 0 { - return Err( - "0 bytes have been written, might be an error, please create a new issue in nyx repository.".to_string() - ); - } - } - Ok(()) - } - - pub fn to(stream: &mut TcpStream, message: &Message) -> Result<(), String> { - let mut payload = serde_json::to_string(message) - .map_err(|_| "Couldn't serialize the data structure to send.".to_string())?; - - payload.push('\n'); - - let bytes_written = stream - .write(payload.as_bytes()) - .map_err(|e| e.to_string())?; - - println!("Message broadcasted with {} bytes", bytes_written); - - if bytes_written == 0 { - return Err("0 bytes have been written, might be an error, please create a new issue in nyx repository.".to_string()); - } - - Ok(()) - } - - pub fn to_many(stream: &mut TcpStream, messages: &[Message]) -> Result<(), String> { - let mut payloads: Vec = vec![]; - - for message in messages { - let mut payload = serde_json::to_string(message) - .map_err(|_| "Couldn't serialize the data structure to send.".to_string())?; - payload.push('\n'); - payloads.push(payload); - } - - println!("{:#?}", payloads); - - let payload_bytes: Vec<_> = payloads - .iter() - .flat_map(|p| p.as_bytes().to_owned()) - .collect(); - - let bytes = &payload_bytes[..]; - - let bytes_written = stream.write(bytes).map_err(|e| e.to_string())?; - - if bytes_written == 0 { - return Err("0 bytes have been written, might be an error, please create a new issue in nyx repository.".to_string()); - } - - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use std::{io::Read, net::TcpListener}; - - use crate::Topic; - - use super::*; - - #[test] - #[cfg_attr(miri, ignore)] - fn all_broadcasts_messages_to_everyone() { - let listener = TcpListener::bind("localhost:0").unwrap(); - - let port = listener.local_addr().unwrap().port(); - - let connect_to1 = format!("localhost:{}", port); - let connect_to2 = format!("localhost:{}", port); - let connect_to3 = format!("localhost:{}", port); - - let thread1 = std::thread::spawn(move || TcpStream::connect(connect_to1).unwrap()); - - let thread2 = std::thread::spawn(move || TcpStream::connect(connect_to2).unwrap()); - - let thread3 = std::thread::spawn(move || TcpStream::connect(connect_to3).unwrap()); - - let (mut server_to_client_stream_one, _) = listener.accept().unwrap(); - let (mut server_to_client_stream_two, _) = listener.accept().unwrap(); - let (mut server_to_client_stream_three, _) = listener.accept().unwrap(); - - let mut client_to_server_stream_one = thread1.join().unwrap(); - let mut client_to_server_stream_two = thread2.join().unwrap(); - let mut client_to_server_stream_three = thread3.join().unwrap(); - - let mut streams = [ - &mut server_to_client_stream_one, - &mut server_to_client_stream_two, - &mut server_to_client_stream_three, - ]; - - let test_message = Message::CreatePartition { - id: uuid::Uuid::new_v4().to_string(), - replica_id: uuid::Uuid::new_v4().to_string(), - topic: Topic::from("notifications".to_string()), - replica_count: 1, - partition_number: 1, - }; - - let result = Broadcast::all(&mut streams, &test_message); - - assert!(result.is_ok()); - - // Closing the streams so we can read to EOF later - for stream in streams { - stream.shutdown(std::net::Shutdown::Both).unwrap(); - } - - let mut buf = String::with_capacity(1024); - - { - let result = client_to_server_stream_one - .read_to_string(&mut buf) - .unwrap(); - let data: Message = serde_json::from_str::(buf.trim()).unwrap(); - assert!(result > 0); - assert!(matches!(data, Message::CreatePartition { .. })); - buf.clear(); - } - - { - let result = client_to_server_stream_two - .read_to_string(&mut buf) - .unwrap(); - let data: Message = serde_json::from_str::(buf.trim()).unwrap(); - assert!(result > 0); - assert!(matches!(data, Message::CreatePartition { .. })); - buf.clear(); - } - - { - let result = client_to_server_stream_three - .read_to_string(&mut buf) - .unwrap(); - let data: Message = serde_json::from_str::(buf.trim()).unwrap(); - assert!(result > 0); - assert!(matches!(data, Message::CreatePartition { .. })); - buf.clear(); - } - } -} diff --git a/shared_structures/src/dir_manager/mod.rs b/shared_structures/src/dir_manager/mod.rs deleted file mode 100644 index e1f7259..0000000 --- a/shared_structures/src/dir_manager/mod.rs +++ /dev/null @@ -1,184 +0,0 @@ -use std::{ - fmt::Debug, - fs::{self}, - io::Write, - path::PathBuf, -}; - -#[derive(Debug, Default)] -pub struct DirManager { - custom_dir: Option, -} - -impl DirManager { - /// Creates a directory manager in the predefined path of the root directory for Nyx. - pub fn new() -> Self { - Self { custom_dir: None } - } - - /// Create a directory manager in the predefined path of the root directory - /// for Nyx at specified custom directory `custom_dir`, this manager can then perform - /// different fs actions such as opening a file in the directory, saving the file to the directory, - /// and creating empty files in the directory safely in the context - /// of the directory it was created in. - /// - /// when creating a DirManager by passing `custom_dir` the DirManager will still - /// work in the context of the Nyx foder. - pub fn with_dir(custom_dir: Option<&PathBuf>) -> Self { - Self { - custom_dir: custom_dir.cloned(), - } - } - - pub fn create(&self, path: &str) -> Result { - let nyx_dir = Self::get_base_dir(self.custom_dir.as_ref())?; - let nyx_dir_str = nyx_dir - .to_str() - .ok_or("Failed while validating UTF-8 path integrity.")?; - let total_path = format!("{}/{}", nyx_dir_str, path); - match fs::create_dir_all(&total_path) { - Ok(_) => {} - Err(e) => { - println!("DirManager -> create func error: {}", e) - } - }; - Ok(total_path.into()) - } - - pub fn save<'de, T: serde::Serialize + serde::Deserialize<'de>>( - &self, - path: &str, - content: &T, - ) -> Result<(), String> { - let nyx_dir = Self::get_base_dir(self.custom_dir.as_ref())?; - fs::create_dir_all(nyx_dir).map_err(|e| e.to_string())?; - let filepath = Self::get_filepath(path, self.custom_dir.as_ref())?; - let mut file = std::fs::File::create(filepath).map_err(|e| e.to_string())?; - let payload = serde_json::to_string(content).map_err(|e| e.to_string())?; - file.write(payload.as_bytes()).map_err(|e| e.to_string())?; - Ok(()) - } - - pub fn open( - &self, - path: &str, - ) -> Result { - let filepath = Self::get_filepath(path, self.custom_dir.as_ref())?; - let content = fs::read_to_string(filepath).map_err(|e| e.to_string())?; - let data = serde_json::from_str::(&content).map_err(|e| e.to_string())?; - Ok(data) - } - - fn get_filepath(path: &str, custom_path: Option<&PathBuf>) -> Result { - let dir = Self::get_base_dir(custom_path)?; - let dir_str = dir - .to_str() - .ok_or("Not valid UTF-8 path is passed.".to_string())?; - - let filepath = format!("{}/{}", dir_str, path); - Ok(filepath.into()) - } - - pub fn get_base_dir(custom_path: Option<&PathBuf>) -> Result { - let final_path = if let Some(custom_path) = custom_path { - let dist = custom_path - .clone() - .to_str() - .ok_or("Invalid format provided for the directory")? - .to_string(); - format!("nyx/{}", dist) - } else { - "nyx".to_string() - }; - - let mut final_dir: Option = None; - // Unix-based machines - if let Ok(home_dir) = std::env::var("HOME") { - let config_dir = format!("{}/.config/{}", home_dir, final_path); - final_dir = Some(config_dir.into()); - } - // Windows based machines - else if let Ok(user_profile) = std::env::var("USERPROFILE") { - let config_dir = format!(r"{}/AppData/Roaming/{}", user_profile, final_path); - final_dir = Some(config_dir.into()); - } - - final_dir.ok_or("Couldn't get the systems home directory. Please setup a HOME env variable and pass your system's home directory there.".to_string()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[derive(Debug, serde::Serialize, serde::Deserialize)] - struct LocalMetadata { - id: String, - partitions: Vec, - } - - fn setup_nyx_dir_with_local_metadata(custom_dir: &PathBuf) -> DirManager { - let file_manager = DirManager::with_dir(Some(custom_dir)); - - file_manager - .save( - "metadata.json", - &LocalMetadata { - id: "some_mocked_id".to_string(), - partitions: vec![], - }, - ) - .unwrap(); - - file_manager - } - - fn cleanup_nyx_dir(custom_dir: &PathBuf) { - let nyx_dir = DirManager::get_base_dir(Some(custom_dir)).unwrap(); - fs::remove_dir_all(nyx_dir).unwrap(); - } - - #[test] - #[cfg_attr(miri, ignore)] - fn get_local_metadata_directory_returns_dir_as_expected() { - let dir = DirManager::get_base_dir(None).unwrap(); - assert!(dir.to_str().unwrap().contains("nyx")); - } - - #[test] - #[cfg_attr(miri, ignore)] - fn get_local_metadata_filepath_returns_filepath_as_expected() { - let filepath = DirManager::get_filepath("metadata.json", None).unwrap(); - assert!(filepath.to_str().unwrap().contains("nyx/metadata.json")); - } - - #[test] - #[cfg_attr(miri, ignore)] - fn save_local_metadata_file_saves_file_to_designated_dir() { - let custom_dir: PathBuf = "save_metadata_file_saves_file_to_designated_dir".into(); - let file_manager = DirManager::with_dir(Some(&custom_dir)); - file_manager - .save( - "metadata.json", - &LocalMetadata { - id: "broker_metadata_id".to_string(), - partitions: vec![], - }, - ) - .unwrap(); - let filepath = DirManager::get_filepath("metadata.json", Some(&custom_dir)).unwrap(); - let file = fs::File::open(filepath); - assert!(file.is_ok()); - cleanup_nyx_dir(&custom_dir); - } - - #[test] - #[cfg_attr(miri, ignore)] - fn open_local_metadata_succeeds() { - let custom_dir: PathBuf = "tries_to_get_metadata_succeeds".into(); - let file_manager = setup_nyx_dir_with_local_metadata(&custom_dir); - let result = file_manager.open::("metadata.json"); - assert!(result.is_ok()); - cleanup_nyx_dir(&custom_dir); - } -} diff --git a/shared_structures/src/lib.rs b/shared_structures/src/lib.rs deleted file mode 100644 index b9d7b99..0000000 --- a/shared_structures/src/lib.rs +++ /dev/null @@ -1,87 +0,0 @@ -mod broadcast; -mod dir_manager; -mod message_decoder; -mod reader; -mod topic; - -pub mod metadata; - -pub use broadcast::Broadcast; -pub use dir_manager::DirManager; -pub use message_decoder::MessageDecoder; -pub use metadata::Metadata; -pub use reader::Reader; -pub use topic::Topic; - -#[derive(Clone, Copy, Debug, PartialEq, serde::Serialize, serde::Deserialize)] -pub enum Status { - Created, - Down, - Up, - // For replicas that are replicating data from the leader so that they - // won't be touched by consumers yet until they are Up. - Booting, -} - -#[derive(Clone, Copy, Debug, PartialEq, serde::Serialize, serde::Deserialize)] -pub enum Role { - Follower, - Leader, -} - -#[derive(Clone, Copy, Debug, serde::Serialize, serde::Deserialize)] -pub enum EntityType { - Broker, - Observer, -} - -// TODO: Think of a way to better organize this enum or split it into more enums -#[derive(Debug, serde::Serialize, serde::Deserialize)] -pub enum Message { - CreatePartition { - id: String, - replica_id: String, - topic: Topic, - partition_number: usize, - replica_count: usize, - }, - RequestLeadership { - broker_id: String, - partition_id: String, - replica_id: String, - }, - // Should deny leadership request with the addr of broker where leader resides. - DenyLeadership { - leader_addr: String, - }, - BrokerConnectionDetails { - id: String, - addr: String, - }, - ProducerWantsToConnect { - topic: String, - }, - FollowerWantsToConnect { - entity_type: EntityType, - }, - EntityWantsToConnect { - entity_type: EntityType, - }, - RequestClusterMetadata, - ClusterMetadata { - metadata: Metadata, - }, - ProducerMessage { - replica_id: String, - payload: serde_json::Value, - }, -} - -pub fn println_c(text: &str, color: usize) { - if color > 255 { - panic!("Color is out of range 0 to 255"); - } - - let t = format!("\x1b[38;5;{}m{}\x1b[0m", color, text); - println!("{}", t) -} diff --git a/shared_structures/src/message_decoder.rs b/shared_structures/src/message_decoder.rs deleted file mode 100644 index dcbd0db..0000000 --- a/shared_structures/src/message_decoder.rs +++ /dev/null @@ -1,10 +0,0 @@ -use crate::Message; - -pub struct MessageDecoder; - -impl MessageDecoder { - pub fn decode(raw_message: &str) -> Result { - serde_json::from_str::(raw_message) - .map_err(|e| format!("Error while deserialziing: {}", e)) - } -} diff --git a/shared_structures/src/metadata.rs b/shared_structures/src/metadata.rs deleted file mode 100644 index 1028b4b..0000000 --- a/shared_structures/src/metadata.rs +++ /dev/null @@ -1,24 +0,0 @@ -use crate::{Role, Status, Topic}; - -#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -pub struct PartitionDetails { - pub id: String, - pub replica_id: String, - pub role: Role, - pub topic: Topic, - pub partition_number: usize, - pub replica_count: usize, -} -#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -pub struct BrokerDetails { - pub id: String, - pub addr: String, - pub status: Status, - pub partitions: Vec, -} - -#[derive(Default, Clone, Debug, serde::Serialize, serde::Deserialize)] -pub struct Metadata { - pub brokers: Vec, - pub topics: Vec, -} diff --git a/shared_structures/src/reader.rs b/shared_structures/src/reader.rs deleted file mode 100644 index 8b72bd1..0000000 --- a/shared_structures/src/reader.rs +++ /dev/null @@ -1,33 +0,0 @@ -use std::{io::Read, net::TcpStream}; - -use crate::Message; - -pub struct Reader; - -impl Reader { - pub fn read_one_message(stream: &mut TcpStream) -> Result { - let message = Self::read_until_char(stream, '\n')?; - - serde_json::from_str::(&message) - .map_err(|e| format!("Error while deserialziing: {}", e)) - } - - fn read_until_char(stream: &mut TcpStream, target_char: char) -> Result { - let mut buffer = [0u8; 1]; // Read one byte at a time - let mut result = String::new(); - - loop { - stream.read_exact(&mut buffer).map_err(|e| e.to_string())?; // Read one byte into the buffer - - let byte_read = buffer[0]; - let character = byte_read as char; - result.push(character); - - if character == target_char { - break; - } - } - - Ok(result) - } -} diff --git a/shared_structures/src/topic.rs b/shared_structures/src/topic.rs deleted file mode 100644 index 9a7c39c..0000000 --- a/shared_structures/src/topic.rs +++ /dev/null @@ -1,23 +0,0 @@ -use std::sync::{Arc, Mutex}; - -#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -pub struct Topic { - pub name: String, - pub partition_count: usize, -} - -impl Topic { - pub fn from(name: String) -> Self { - Self { - name, - partition_count: 0, - } - } - - pub fn new_shared(name: String) -> Arc> { - Arc::new(Mutex::new(Self { - name, - partition_count: 0, - })) - } -}