diff --git a/Cargo.lock b/Cargo.lock index 7091221e..5a758806 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -125,7 +125,7 @@ checksum = "721cae7de5c34fbb2acd27e21e6d2cf7b886dce0c27388d46c4e6c47ea4318dd" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.89", ] [[package]] @@ -146,9 +146,9 @@ checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" [[package]] name = "axum" -version = "0.7.7" +version = "0.7.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "504e3947307ac8326a5437504c517c4b56716c9d98fac0028c2acc7ca47d70ae" +checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f" dependencies = [ "async-trait", "axum-core", @@ -172,7 +172,7 @@ dependencies = [ "serde_path_to_error", "serde_urlencoded", "sha1", - "sync_wrapper 1.0.1", + "sync_wrapper 1.0.2", "tokio", "tokio-tungstenite", "tower", @@ -196,7 +196,7 @@ dependencies = [ "mime", "pin-project-lite", "rustversion", - "sync_wrapper 1.0.1", + "sync_wrapper 1.0.2", "tower-layer", "tower-service", "tracing", @@ -274,9 +274,9 @@ dependencies = [ "proc-macro2", "quote", "regex", - "rustc-hash", + "rustc-hash 1.1.0", "shlex", - "syn 2.0.87", + "syn 2.0.89", ] [[package]] @@ -346,9 +346,9 @@ checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" [[package]] name = "bytemuck" -version = "1.19.0" +version = "1.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8334215b81e418a0a7bdb8ef0849474f40bb10c8b71f1c4ed315cff49f32494d" +checksum = "8b37c88a63ffd85d15b406896cc343916d7cf57838a847b3a6f2ca5d39a5695a" [[package]] name = "byteorder" @@ -370,12 +370,13 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "cc" -version = "1.1.6" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2aba8f4e9906c7ce3c73463f62a7f0c65183ada1a2d47e397cc8810827f9694f" +checksum = "fd9de9f2205d5ef3fd67e685b0df337994ddd4495e2a28d185500d0e1edfea47" dependencies = [ "jobserver", "libc", + "shlex", ] [[package]] @@ -508,18 +509,18 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.20" +version = "4.5.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b97f376d85a664d5837dbae44bf546e6477a679ff6610010f17276f686d867e8" +checksum = "fb3b4b9e5a7c7514dfa52869339ee98b3156b0bfb4e8a77c4ff4babb64b1604f" dependencies = [ "clap_builder", ] [[package]] name = "clap_builder" -version = "4.5.20" +version = "4.5.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19bc80abd44e4bed93ca373a0704ccbd1b710dc5749406201bb018272808dc54" +checksum = "b17a95aa67cc7b5ebd32aa5370189aa0d79069ef1c64ce893bd30fb24bff20ec" dependencies = [ "anstream", "anstyle", @@ -536,14 +537,14 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.89", ] [[package]] name = "clap_lex" -version = "0.7.2" +version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97" +checksum = "afb84c814227b90d6895e01398aee0d8033c00e7466aca416fb6a8e0eb19d8a7" [[package]] name = "cmake" @@ -610,9 +611,9 @@ checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" [[package]] name = "cpufeatures" -version = "0.2.15" +version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ca741a962e1b0bff6d724a1a0958b686406e853bb14061f218562e1896f95e6" +checksum = "16b80225097f2e5ae4e7179dd2266824648f3e2f49d9134d584b76389d31c4c3" dependencies = [ "libc", ] @@ -740,6 +741,16 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-skiplist" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df29de440c58ca2cc6e587ec3d22347551a32435fbde9d2bff64e78a9ffa151b" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.20" @@ -821,7 +832,7 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.89", ] [[package]] @@ -837,6 +848,20 @@ dependencies = [ "parking_lot_core", ] +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "data-encoding" version = "2.6.0" @@ -928,6 +953,12 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" +[[package]] +name = "double-ended-peekable" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0d05e1c0dbad51b52c38bda7adceef61b9efc2baf04acfe8726a8c4630a6f57" + [[package]] name = "ed25519" version = "2.2.3" @@ -976,7 +1007,19 @@ checksum = "ba7795da175654fe16979af73f81f26a8ea27638d8d9823d317016888a63dc4c" dependencies = [ "num-traits", "quote", - "syn 2.0.87", + "syn 2.0.89", +] + +[[package]] +name = "enum_dispatch" +version = "0.3.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa18ce2bc66555b3218614519ac839ddb759a7d6720732f979ef8d13be147ecd" +dependencies = [ + "once_cell", + "proc-macro2", + "quote", + "syn 2.0.89", ] [[package]] @@ -1039,6 +1082,22 @@ version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28dea519a9695b9977216879a3ebfddf92f1c08c05d984f8996aecd6ecdc811d" +[[package]] +name = "fjall" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e33c3128fbd83d9d70ebcf093f3f91d8c20016af0aac545f80afbadd9dcd098" +dependencies = [ + "byteorder", + "dashmap 6.1.0", + "log", + "lsm-tree", + "path-absolutize", + "std-semaphore", + "tempfile", + "xxhash-rust", +] + [[package]] name = "flume" version = "0.11.1" @@ -1122,7 +1181,7 @@ checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.89", ] [[package]] @@ -1199,6 +1258,12 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" +[[package]] +name = "guardian" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "493913a18c0d7bebb75127a26a432162c59edbe06f6cf712001e3e769345e8b5" + [[package]] name = "half" version = "2.4.1" @@ -1318,9 +1383,9 @@ checksum = "91f255a4535024abf7640cb288260811fc14794f62b063652ed349f9a6c2348e" [[package]] name = "hyper" -version = "1.5.0" +version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbbff0a806a4728c99295b254c8838933b5b082d75e3cb70c8dab21fdfbcfa9a" +checksum = "97818827ef4f364230e16705d4706e2897df2bb60617d6ca15d598025a3c481f" dependencies = [ "bytes", "futures-channel", @@ -1475,9 +1540,9 @@ dependencies = [ [[package]] name = "itoa" -version = "1.0.11" +version = "1.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" +checksum = "540654e97a3f4470a492cd30ff187bc95d89557a903a2bbf112e2fae98104ef2" [[package]] name = "jobserver" @@ -1537,7 +1602,7 @@ dependencies = [ "proc-macro2", "quote", "regex", - "syn 2.0.87", + "syn 2.0.89", ] [[package]] @@ -1548,9 +1613,9 @@ checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" [[package]] name = "libc" -version = "0.2.162" +version = "0.2.164" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18d287de67fe55fd7e1581fe933d965a5a9477b38e949cfa9f8574ef01506398" +checksum = "433bfe06b8c75da9b2e3fbea6e5329ff87748f0b144ef75306e674c3f6f7c13f" [[package]] name = "libloading" @@ -1584,6 +1649,28 @@ version = "0.4.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" +[[package]] +name = "lsm-tree" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7952bc71e90c0b58ce441dcf6cf8624cac042125dec1183ec9c48144f74378d" +dependencies = [ + "byteorder", + "crossbeam-skiplist", + "double-ended-peekable", + "enum_dispatch", + "guardian", + "log", + "path-absolutize", + "quick_cache", + "rustc-hash 2.0.0", + "self_cell", + "tempfile", + "value-log", + "varint-rs", + "xxhash-rust", +] + [[package]] name = "matchers" version = "0.1.0" @@ -1632,6 +1719,12 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" +[[package]] +name = "min-max-heap" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2687e6cf9c00f48e9284cf9fd15f2ef341d03cc7743abf9df4c5f07fdee50b18" + [[package]] name = "minimad" version = "0.13.1" @@ -1702,7 +1795,9 @@ dependencies = [ "daumtils", "ed25519-dalek", "eyre", + "fjall", "moor-db", + "moor-db-fjall", "moor-db-relbox", "moor-db-wiredtiger", "moor-kernel", @@ -1739,6 +1834,20 @@ dependencies = [ "uuid", ] +[[package]] +name = "moor-db-fjall" +version = "0.1.0" +dependencies = [ + "bytes", + "fjall", + "moor-db", + "moor-values", + "strum", + "tempfile", + "tracing", + "uuid", +] + [[package]] name = "moor-db-relbox" version = "0.1.0" @@ -1792,6 +1901,7 @@ dependencies = [ "md-5 0.10.6", "moor-compiler", "moor-db", + "moor-db-fjall", "moor-db-relbox", "moor-db-wiredtiger", "moor-moot", @@ -2062,6 +2172,24 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" +[[package]] +name = "path-absolutize" +version = "3.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4af381fe79fa195b4909485d99f73a80792331df0625188e707854f0b3383f5" +dependencies = [ + "path-dedot", +] + +[[package]] +name = "path-dedot" +version = "3.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07ba0ad7e047712414213ff67533e6dd477af0a4e1d14fb52343e53d30ea9397" +dependencies = [ + "once_cell", +] + [[package]] name = "pem" version = "3.0.4" @@ -2109,7 +2237,7 @@ dependencies = [ "pest_meta", "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.89", ] [[package]] @@ -2249,14 +2377,14 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "64d1ec885c64d0457d564db4ec299b2dae3f9c02808b8ad9c3a089c591b18033" dependencies = [ "proc-macro2", - "syn 2.0.87", + "syn 2.0.89", ] [[package]] name = "proc-macro2" -version = "1.0.89" +version = "1.0.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f139b0662de085916d1fb67d2b4169d1addddda1919e696f3252b740b629986e" +checksum = "37d3544b3f2748c54e147655edb5025752e2303145b5aefb3c3ea2c78b973bb0" dependencies = [ "unicode-ident", ] @@ -2276,6 +2404,16 @@ dependencies = [ "sha2 0.9.9", ] +[[package]] +name = "quick_cache" +version = "0.6.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d7c94f8935a9df96bb6380e8592c70edf497a643f94bd23b2f76b399385dbf4" +dependencies = [ + "equivalent", + "hashbrown 0.14.5", +] + [[package]] name = "quote" version = "1.0.37" @@ -2400,13 +2538,13 @@ checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" [[package]] name = "relbox" version = "0.2.0" -source = "git+https://github.com/rdaum/Relbox.git#174aa93f485a398036f85d06ee926472fd92687c" +source = "git+https://github.com/rdaum/Relbox.git#eeb857afb19afecc15d25cc78a16070e2221120e" dependencies = [ "atomic-wait", "binary-layout", "crossbeam-channel", "crossbeam-queue", - "dashmap", + "dashmap 5.5.3", "daumtils", "hi_sparse_bitset", "human_bytes", @@ -2485,6 +2623,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" +[[package]] +name = "rustc-hash" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "583034fd73374156e66797ed8e5b0d5690409c9226b22d87cb7f19821c05d152" + [[package]] name = "rustc_version" version = "0.4.1" @@ -2496,9 +2640,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.40" +version = "0.38.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99e4ea3e1cdc4b559b8e5650f9c8e5998e3e5c1343b4eaf034565f32318d63c0" +checksum = "d7f649912bc1495e167a6edee79151c84b1bad49748cb4f1f1167f459f6224f6" dependencies = [ "bitflags 2.6.0", "errno", @@ -2562,9 +2706,9 @@ dependencies = [ [[package]] name = "scc" -version = "2.2.4" +version = "2.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8d25269dd3a12467afe2e510f69fb0b46b698e5afb296b59f2145259deaf8e8" +checksum = "66b202022bb57c049555430e11fc22fea12909276a80a4c3d368da36ac1d88ed" dependencies = [ "sdd", ] @@ -2581,6 +2725,12 @@ version = "3.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49c1eeaf4b6a87c7479688c6d52b9f1153cedd3c489300564f932b065c6eab95" +[[package]] +name = "self_cell" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d369a96f978623eb3dc28807c4852d6cc617fed53da5d3c400feff1ef34a714a" + [[package]] name = "semver" version = "1.0.23" @@ -2604,14 +2754,14 @@ checksum = "ad1e866f866923f252f05c889987993144fb74e722403468a4ebd70c3cd756c0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.89", ] [[package]] name = "serde_json" -version = "1.0.132" +version = "1.0.133" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d726bfaff4b320266d395898905d0eba0345aae23b54aee3a737e260fd46db03" +checksum = "c7fceb2473b9166b2294ef05efcb65a3db80803f0b03ef86a5fc88a2b85ee377" dependencies = [ "itoa", "memchr", @@ -2672,7 +2822,7 @@ checksum = "5d69265a08751de7844521fd15003ae0a888e035773ba05695c5c759a6f89eef" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.89", ] [[package]] @@ -2843,6 +2993,12 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" +[[package]] +name = "std-semaphore" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33ae9eec00137a8eed469fb4148acd9fc6ac8c3f9b110f52cd34698c8b5bfa0e" + [[package]] name = "strict" version = "0.2.0" @@ -2874,7 +3030,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.87", + "syn 2.0.89", ] [[package]] @@ -2896,9 +3052,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.87" +version = "2.0.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25aa4ce346d03a6dcd68dd8b4010bcb74e54e62c90c573f394c46eae99aba32d" +checksum = "44d46482f1c1c87acd84dea20c1bf5ebff4c757009ed6bf19cfd36fb10e92c4e" dependencies = [ "proc-macro2", "quote", @@ -2913,9 +3069,9 @@ checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" [[package]] name = "sync_wrapper" -version = "1.0.1" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394" +checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263" [[package]] name = "synstructure" @@ -2925,7 +3081,7 @@ checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.89", ] [[package]] @@ -2972,9 +3128,9 @@ dependencies = [ [[package]] name = "termimad" -version = "0.31.0" +version = "0.31.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cda3a7471f9978706978454c45ef8dda67e9f8f3cdb9319eb2e9323deb6ae62" +checksum = "ea6a5d4cf55d9f1cb04fcda48f725772d0733ae34e030dfc4dd36e738a5965f4" dependencies = [ "coolor", "crokey", @@ -3004,7 +3160,7 @@ dependencies = [ "cfg-if", "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.89", ] [[package]] @@ -3015,7 +3171,7 @@ checksum = "5c89e72a01ed4c579669add59014b9a524d609c0c88c6a585ce37485879f6ffb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.89", "test-case-core", ] @@ -3027,7 +3183,7 @@ checksum = "5c87c84c1e2198f924fb928d5c775e40aa508e5d6aa62d608f234c0270493d14" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.89", "unicode-ident", ] @@ -3073,7 +3229,7 @@ checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.89", ] [[package]] @@ -3084,7 +3240,7 @@ checksum = "f077553d607adc1caf65430528a576c757a71ed73944b66ebb58ef2bbd243568" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.89", ] [[package]] @@ -3177,7 +3333,7 @@ checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.89", ] [[package]] @@ -3291,7 +3447,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.89", ] [[package]] @@ -3361,7 +3517,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04659ddb06c87d233c566112c1c9c5b9e98256d9af50ec3bc9c8327f873a7568" dependencies = [ "quote", - "syn 2.0.87", + "syn 2.0.89", ] [[package]] @@ -3402,9 +3558,9 @@ checksum = "2896d95c02a80c6d6a5d6e953d479f5ddf2dfdb6a244441010e373ac0fb88971" [[package]] name = "unicode-ident" -version = "1.0.13" +version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e91b56cd4cadaeb79bbf1a5645f6b4f8dc5bde8834ad5894a8db35fda9efa1fe" +checksum = "adb9e6ca4f869e1180728b7950e35922a7fc6397f7b641499e8f3ef06e50dc83" [[package]] name = "unicode-width" @@ -3463,6 +3619,29 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" +[[package]] +name = "value-log" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e7c4b687fea1a6fe681fabdcc3e21cd01ce6df68d92c037ef2f3dacdd1daf4d" +dependencies = [ + "byteorder", + "bytes", + "log", + "min-max-heap", + "path-absolutize", + "quick_cache", + "rustc-hash 2.0.0", + "tempfile", + "xxhash-rust", +] + +[[package]] +name = "varint-rs" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f54a172d0620933a27a4360d3db3e2ae0dd6cceae9730751a036bbf182c4b23" + [[package]] name = "version-compare" version = "0.2.0" @@ -3519,7 +3698,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.89", "wasm-bindgen-shared", ] @@ -3541,7 +3720,7 @@ checksum = "26c6ab57572f7a24a4985830b120de1594465e5d500f24afe89e16b4e833ef68" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.89", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -3564,9 +3743,9 @@ dependencies = [ [[package]] name = "wide" -version = "0.7.28" +version = "0.7.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b828f995bf1e9622031f8009f8481a85406ce1f4d4588ff746d872043e855690" +checksum = "58e6db2670d2be78525979e9a5f9c69d296fd7d670549fe9ebf70f8708cb5019" dependencies = [ "bytemuck", "safe_arch", @@ -3772,6 +3951,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "xxhash-rust" +version = "0.8.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a5cbf750400958819fb6178eaa83bee5cd9c29a26a40cc241df8c70fdd46984" + [[package]] name = "yansi" version = "1.0.1" @@ -3780,9 +3965,9 @@ checksum = "cfe53a6657fd280eaa890a3bc59152892ffa3e30101319d168b781ed6529b049" [[package]] name = "yoke" -version = "0.7.4" +version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c5b1314b079b0930c31e3af543d8ee1757b1951ae1e1565ec704403a7240ca5" +checksum = "120e6aef9aa629e3d4f52dc8cc43a015c7724194c97dfaf45180d2daf2b77f40" dependencies = [ "serde", "stable_deref_trait", @@ -3791,13 +3976,13 @@ dependencies = [ [[package]] name = "yoke-derive" -version = "0.7.4" +version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28cc31741b18cb6f1d5ff12f5b7523e3d6eb0852bbbad19d73905511d9849b95" +checksum = "2380878cad4ac9aac1e2435f3eb4020e8374b5f13c296cb75b4620ff8e229154" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.89", "synstructure", ] @@ -3819,14 +4004,14 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.89", ] [[package]] name = "zerofrom" -version = "0.1.4" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91ec111ce797d0e0784a1116d0ddcdbea84322cd79e5d5ad173daeba4f93ab55" +checksum = "cff3ee08c995dee1859d998dea82f7374f2826091dd9cd47def953cae446cd2e" [[package]] name = "zeroize" @@ -3845,7 +4030,7 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.89", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index c01a7535..01076cde 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ members = [ "crates/compiler", "crates/daemon", "crates/db", + "crates/db-fjall", "crates/db-relbox", "crates/db-wiredtiger", "crates/kernel", @@ -133,6 +134,7 @@ thiserror = "2.0" paste = "1.0" # For the DB & values layer. +fjall = { version = "2.3.2", default-features = false, features = ["ssi_tx", "bytes"] } libc = "0.2" relbox = { git = "https://github.com/rdaum/Relbox.git", version = "0.2.0" } text_io = "0.1" # Used for reading text dumps. diff --git a/crates/daemon/Cargo.toml b/crates/daemon/Cargo.toml index 4e9c415b..c922a7e6 100644 --- a/crates/daemon/Cargo.toml +++ b/crates/daemon/Cargo.toml @@ -16,6 +16,7 @@ moor-db-relbox = { path = "../db-relbox", optional = true } relbox = { workspace = true, optional = true } moor-db = { path = "../db" } +moor-db-fjall = { path = "../db-fjall" } moor-db-wiredtiger = { path = "../db-wiredtiger" } moor-kernel = { path = "../kernel" } moor-values = { path = "../values" } @@ -32,6 +33,7 @@ bytes.workspace = true color-eyre.workspace = true daumtils.workspace = true eyre.workspace = true +fjall.workspace = true oneshot.workspace = true signal-hook.workspace = true tempfile.workspace = true diff --git a/crates/daemon/src/connections_fjall.rs b/crates/daemon/src/connections_fjall.rs new file mode 100644 index 00000000..98d8c2b7 --- /dev/null +++ b/crates/daemon/src/connections_fjall.rs @@ -0,0 +1,583 @@ +// Copyright (C) 2024 Ryan Daum +// +// This program is free software: you can redistribute it and/or modify it under +// the terms of the GNU General Public License as published by the Free Software +// Foundation, version 3. +// +// This program is distributed in the hope that it will be useful, but WITHOUT +// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +// FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License along with +// this program. If not, see . +// + +//! An implementation of the connections db that uses relbox. + +use std::collections::HashSet; +use std::fmt::{Debug, Display, Formatter}; +use std::path::PathBuf; +use std::thread::sleep; +use std::time::{Duration, SystemTime}; + +use eyre::Error; +use moor_db::{RelationalError, RelationalTransaction, StringHolder, SystemTimeHolder}; +use strum::{AsRefStr, Display, EnumCount, EnumIter, EnumProperty}; +use tracing::{error, warn}; +use uuid::Uuid; + +use bytes::Bytes; +use moor_db_fjall::{FjallDb, FjallTransaction}; +use moor_kernel::tasks::sessions::SessionError; +use moor_values::model::{CommitResult, ValSet}; +use moor_values::Objid; +use moor_values::{AsByteBuffer, DecodingError, EncodingError}; +use rpc_common::RpcMessageError; + +use crate::connections::{ConnectionsDB, CONNECTION_TIMEOUT_DURATION}; +use crate::connections_fjall::ConnectionRelation::{ + ClientActivity, ClientConnectTime, ClientConnection, ClientName, ClientPingTime, +}; +use crate::connections_fjall::Sequences::ConnectionId; + +#[repr(usize)] +// Don't warn about same-prefix, "I did that on purpose" +#[allow(clippy::enum_variant_names)] +#[derive( + Copy, Clone, Debug, Eq, PartialEq, EnumIter, EnumCount, Display, EnumProperty, AsRefStr, +)] +enum ConnectionRelation { + // One to many, client id <-> connection/player object. Secondary index will seek on object id. + #[strum(props(SecondaryIndexed = "true",))] + ClientConnection = 0, + /// Client -> SystemTime of last activity + ClientActivity = 1, + /// Client connect time. + ClientConnectTime = 2, + /// Client last ping time. + ClientPingTime = 3, + /// Client hostname / connection "name" + ClientName = 4, +} + +#[repr(u8)] +enum Sequences { + ConnectionId = 0, +} + +impl From for u8 { + fn from(val: Sequences) -> Self { + val as u8 + } +} +impl From for usize { + fn from(val: ConnectionRelation) -> Self { + val as usize + } +} + +pub struct ConnectionsFjall { + db: FjallDb, +} + +impl ConnectionsFjall { + pub fn new(path: Option) -> Self { + let (db, _) = FjallDb::open(path.as_deref()); + + Self { db } + } +} + +impl ConnectionsFjall { + fn most_recent_client_connection( + tx: &FjallTransaction, + connection_obj: Objid, + ) -> Result, RelationalError> { + let clients: ClientSet = + tx.seek_by_codomain::(ClientConnection, connection_obj)?; + + // Seek the most recent activity for the connection, so pull in the activity relation for + // each client. + let mut times = Vec::new(); + for client in clients.iter() { + if let Some(last_activity) = + tx.seek_unique_by_domain::(ClientActivity, client)? + { + times.push((client, last_activity.0)); + } else { + warn!( + ?client, + ?connection_obj, + "Unable to find last activity for client" + ); + } + } + times.sort_by(|(_, a), (_, b)| b.partial_cmp(a).unwrap()); + Ok(times) + } +} + +fn retry_tx_action< + R, + F: FnMut(&FjallTransaction) -> Result, +>( + db: &FjallDb, + mut f: F, +) -> Result { + for _try_num in 0..50 { + let tx = db.new_transaction(); + let r = f(&tx); + + let r = match r { + Ok(r) => r, + Err(RelationalError::ConflictRetry) => { + error!("Conflict in transaction, retrying"); + tx.rollback(); + sleep(Duration::from_millis(100)); + continue; + } + Err(e) => { + error!(?e, "Non-rollback error in transaction"); + return Err(e); + } + }; + // Commit the transaction. + if let CommitResult::Success = tx.commit() { + return Ok(r); + } + sleep(Duration::from_millis(100)) + } + panic!("Unable to commit transaction after 50 tries"); +} + +#[derive(Debug, Clone, PartialEq, Eq, Copy)] +struct ClientId(Uuid); + +impl AsByteBuffer for ClientId { + fn size_bytes(&self) -> usize { + 16 + } + + fn with_byte_buffer R>(&self, mut f: F) -> Result { + let mut bytes = [0u8; 16]; + bytes.copy_from_slice(self.0.as_bytes()); + Ok(f(&bytes)) + } + + fn make_copy_as_vec(&self) -> Result, EncodingError> { + Ok(self.0.as_bytes().to_vec()) + } + + fn from_bytes(bytes: Bytes) -> Result + where + Self: Sized, + { + let bytes = bytes.as_ref(); + assert_eq!(bytes.len(), 16, "Decode client id: Invalid UUID length"); + let mut uuid_bytes = [0u8; 16]; + uuid_bytes.copy_from_slice(bytes); + Ok(ClientId(Uuid::from_bytes(uuid_bytes))) + } + + fn as_bytes(&self) -> Result { + let buf = self.0.as_bytes(); + assert_eq!(buf.len(), 16, "Encode client id: Invalid UUID length"); + Ok(Bytes::copy_from_slice(buf)) + } +} +impl Display for ClientId { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "ClientId({})", self.0) + } +} + +#[derive(Debug)] +struct ClientSet(Vec); +impl ValSet for ClientSet { + fn empty() -> Self { + Self(Vec::new()) + } + + fn from_items(items: &[ClientId]) -> Self { + Self(items.to_vec()) + } + + fn iter(&self) -> impl Iterator { + self.0.iter().cloned() + } + + fn len(&self) -> usize { + self.0.len() + } + + fn is_empty(&self) -> bool { + self.0.is_empty() + } +} + +impl FromIterator for ClientSet { + fn from_iter>(iter: T) -> Self { + Self(iter.into_iter().collect()) + } +} + +impl ConnectionsDB for ConnectionsFjall { + fn update_client_connection( + &self, + from_connection: Objid, + to_player: Objid, + ) -> Result<(), Error> { + Ok(retry_tx_action(&self.db, |tx| { + let client_ids = tx.seek_by_codomain::( + ClientConnection, + from_connection, + )?; + if client_ids.is_empty() { + error!(?from_connection, ?to_player, "No client ids for connection"); + return Err(RelationalError::NotFound); + } + // TODO use join once it's implemented + for client_id in client_ids.iter() { + tx.upsert(ClientConnection, client_id, to_player)?; + } + Ok(()) + })?) + } + + fn new_connection( + &self, + client_id: Uuid, + hostname: String, + player: Option, + ) -> Result { + retry_tx_action(&self.db, |tx| { + let connection_oid = match player { + None => { + // The connection object is pulled from the sequence, then we invert it and subtract from + // -4 to get the connection object, since they always grow downwards from there. + let connection_id = tx.increment_sequence(ConnectionId); + let connection_id: i64 = -4 - connection_id; + Objid(connection_id) + } + Some(player) => player, + }; + + // Insert the initial tuples for the connection. + let client_id = ClientId(client_id); + let now = SystemTimeHolder(SystemTime::now()); + tx.insert_tuple(ClientConnection, client_id, connection_oid)?; + tx.insert_tuple(ClientActivity, client_id, now.clone())?; + tx.insert_tuple(ClientConnectTime, client_id, now.clone())?; + tx.insert_tuple(ClientPingTime, client_id, now)?; + tx.insert_tuple(ClientName, client_id, StringHolder(hostname.clone()))?; + + Ok(connection_oid) + }) + .map_err(|e| RpcMessageError::InternalError(e.to_string())) + } + + fn record_client_activity(&self, client_id: Uuid, _connobj: Objid) -> Result<(), Error> { + Ok(retry_tx_action(&self.db, |tx| { + let client_id = ClientId(client_id); + tx.upsert( + ClientActivity, + client_id, + SystemTimeHolder(SystemTime::now()), + )?; + Ok(()) + })?) + } + + fn notify_is_alive(&self, client_id: Uuid, _connection: Objid) -> Result<(), Error> { + Ok(retry_tx_action(&self.db, |tx| { + let client_id = ClientId(client_id); + tx.upsert( + ClientPingTime, + client_id, + SystemTimeHolder(SystemTime::now()), + )?; + Ok(()) + })?) + } + + fn ping_check(&self) { + let now = SystemTime::now(); + let timeout_threshold = now - CONNECTION_TIMEOUT_DURATION; + + retry_tx_action::<(), _>(&self.db, |tx| { + // Full scan the last ping relation, and compare the last ping time to the current time. + // If the difference is greater than the timeout duration, then we need to remove the + // connection from all the relations. + + let expired = tx.scan_with_predicate::<_, ClientId, SystemTimeHolder>( + ClientPingTime, + |_, ping| ping.0 < timeout_threshold, + )?; + + for expired_ping in expired.iter() { + let client_id = expired_ping.0; + tx.remove_by_domain(ClientConnection, client_id)?; + tx.remove_by_domain(ClientActivity, client_id)?; + tx.remove_by_domain(ClientConnectTime, client_id)?; + tx.remove_by_domain(ClientPingTime, client_id)?; + tx.remove_by_domain(ClientName, client_id)?; + } + Ok::<(), RelationalError>(()) + }) + .expect("Unable to commit transaction"); + } + + fn last_activity_for(&self, connection_obj: Objid) -> Result { + let result = retry_tx_action(&self.db, |tx| { + let mut client_times = Self::most_recent_client_connection(tx, connection_obj)?; + let Some(time) = client_times.pop() else { + return Err(RelationalError::NotFound); + }; + Ok(time.1) + }); + match result { + Ok(time) => Ok(time), + Err(RelationalError::NotFound) => { + Err(SessionError::NoConnectionForPlayer(connection_obj)) + } + Err(e) => panic!("Unexpected error: {:?}", e), + } + } + + fn connection_name_for(&self, connection_obj: Objid) -> Result { + let result = retry_tx_action(&self.db, |tx| { + let mut client_times = Self::most_recent_client_connection(tx, connection_obj)?; + let Some(most_recent) = client_times.pop() else { + return Err(RelationalError::NotFound); + }; + let client_id = most_recent.0; + let Some(name) = + tx.seek_unique_by_domain::(ClientName, client_id)? + else { + return Err(RelationalError::NotFound); + }; + Ok(name) + }); + match result { + Ok(name) => Ok(name.0), + Err(RelationalError::NotFound) => { + Err(SessionError::NoConnectionForPlayer(connection_obj)) + } + Err(e) => panic!("Unexpected error: {:?}", e), + } + } + + fn connected_seconds_for(&self, player: Objid) -> Result { + retry_tx_action(&self.db, |tx| { + // In this case we need to find the earliest connection time for the player, and then + // subtract that from the current time. + let clients = + tx.seek_by_codomain::(ClientConnection, player)?; + if clients.is_empty() { + return Err(RelationalError::NotFound); + } + + let mut times: Vec<(ClientId, SystemTime)> = vec![]; + for client in clients.iter() { + if let Some(connect_time) = + tx.seek_unique_by_domain::<_, SystemTimeHolder>(ClientConnectTime, client)? + { + { + times.push((client, connect_time.0)); + } + } + } + + times.sort_by(|(_, a), (_, b)| b.partial_cmp(a).unwrap()); + let earliest = times.pop().expect("No connection for player"); + let earliest = earliest.1; + let now = SystemTime::now(); + let duration = now.duration_since(earliest).expect("Invalid duration"); + Ok(duration.as_secs_f64()) + }) + .map_err(|e| match e { + RelationalError::NotFound => SessionError::NoConnectionForPlayer(player), + _ => panic!("Unexpected error: {:?}", e), + }) + } + + fn client_ids_for(&self, player: Objid) -> Result, SessionError> { + retry_tx_action(&self.db, |tx| { + let clients = + tx.seek_by_codomain::(ClientConnection, player)?; + Ok(clients.iter().map(|c| c.0).collect()) + }) + .map_err(|e| match e { + RelationalError::NotFound => SessionError::NoConnectionForPlayer(player), + _ => panic!("Unexpected error: {:?}", e), + }) + } + + fn connections(&self) -> Vec { + // Full scan from ClientConnection relation to get all connections, and dump them into a + // hashset (to remove dupes) and return as a vector. + retry_tx_action(&self.db, |tx| { + let mut connections = HashSet::new(); + let clients = + tx.scan_with_predicate::<_, ClientId, Objid>(ClientConnection, |_, _| true)?; + + for entry in clients.iter() { + let oid = entry.1; + connections.insert(oid); + } + Ok::, RelationalError>(connections.into_iter().collect()) + }) + .expect("Unable to commit transaction") + } + + fn connection_object_for_client(&self, client_id: Uuid) -> Option { + retry_tx_action(&self.db, |tx| { + tx.seek_unique_by_domain(ClientConnection, ClientId(client_id)) + }) + .unwrap() + } + + fn remove_client_connection(&self, client_id: Uuid) -> Result<(), Error> { + Ok(retry_tx_action(&self.db, |tx| { + tx.remove_by_domain(ClientConnection, ClientId(client_id))?; + tx.remove_by_domain(ClientActivity, ClientId(client_id))?; + tx.remove_by_domain(ClientConnectTime, ClientId(client_id))?; + tx.remove_by_domain(ClientPingTime, ClientId(client_id))?; + tx.remove_by_domain(ClientName, ClientId(client_id))?; + Ok(()) + })?) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use moor_values::Objid; + + use crate::connections::ConnectionsDB; + use crate::connections_fjall::ConnectionsFjall; + + /// Simple test of: + /// * Attach a connection<->client + /// * Record activity & verify + /// * Update connection<->client to a new connection + /// * Verify the old connection has no clients + /// * Verify the new connection has the client + /// * Remove the connection<->client + /// * Verify the connection has no clients + #[test] + fn test_single_connection() { + let db = Arc::new(ConnectionsFjall::new(None)); + let mut jh = vec![]; + + for x in 1..10 { + let db = db.clone(); + jh.push(std::thread::spawn(move || { + let client_id = uuid::Uuid::new_v4(); + let oid = db + .new_connection(client_id, "localhost".to_string(), None) + .unwrap(); + let client_ids = db.client_ids_for(oid).unwrap(); + assert_eq!(client_ids.len(), 1); + assert_eq!(client_ids[0], client_id); + db.record_client_activity(client_id, oid).unwrap(); + db.notify_is_alive(client_id, oid).unwrap(); + let last_activity = db.last_activity_for(oid); + assert!( + last_activity.is_ok(), + "Unable to get last activity for {x} ({oid}) client {client_id}", + ); + let last_activity = last_activity.unwrap().elapsed().unwrap().as_secs_f64(); + assert!(last_activity < 1.0); + assert_eq!(db.connection_object_for_client(client_id), Some(oid)); + let connection_object = Objid(x); + db.update_client_connection(oid, connection_object) + .unwrap_or_else(|e| { + panic!("Unable to update client connection for {:?}: {:?}", x, e) + }); + let client_ids = db.client_ids_for(connection_object).unwrap(); + assert_eq!(client_ids.len(), 1); + assert_eq!(client_ids[0], client_id); + db.remove_client_connection(client_id).unwrap(); + assert!(db.connection_object_for_client(client_id).is_none()); + let client_ids = db.client_ids_for(connection_object).unwrap(); + assert!(client_ids.is_empty()); + })); + } + for j in jh { + j.join().unwrap(); + } + } + + /// Test that a given player can have multiple clients connected to it. + #[test] + fn test_multiple_connections() { + let db = Arc::new(ConnectionsFjall::new(None)); + let mut jh = vec![]; + for x in 1..50 { + let db = db.clone(); + jh.push(std::thread::spawn(move || { + let client_id1 = uuid::Uuid::new_v4(); + let client_id2 = uuid::Uuid::new_v4(); + let con_oid1 = db + .new_connection(client_id1, "localhost".to_string(), None) + .unwrap(); + let con_oid2 = db + .new_connection(client_id2, "localhost".to_string(), None) + .unwrap(); + let new_conn = Objid(x); + db.update_client_connection(con_oid1, new_conn) + .expect("Unable to update client connection"); + let client_ids = db.client_ids_for(new_conn).unwrap(); + assert_eq!(client_ids.len(), 1); + assert!(client_ids.contains(&client_id1)); + + db.update_client_connection(con_oid2, new_conn) + .expect("Unable to update client connection"); + let client_ids = db.client_ids_for(new_conn).unwrap(); + assert_eq!( + client_ids.len(), + 2, + "Client ids: {:?}, should be ({client_id1}, {client_id2}) in {x}th oid", + client_ids + ); + assert!(client_ids.contains(&client_id2)); + + db.record_client_activity(client_id1, new_conn).unwrap(); + let last_activity = db + .last_activity_for(new_conn) + .unwrap() + .elapsed() + .unwrap() + .as_secs_f64(); + assert!(last_activity < 1.0); + db.remove_client_connection(client_id1).unwrap(); + let client_ids = db.client_ids_for(new_conn).unwrap(); + assert_eq!(client_ids.len(), 1); + assert!(client_ids.contains(&client_id2)); + })); + } + for j in jh { + j.join().unwrap(); + } + } + + // Validate that ping check works. + #[test] + fn ping_test() { + let db = Arc::new(ConnectionsFjall::new(None)); + let client_id1 = uuid::Uuid::new_v4(); + let ob = db + .new_connection(client_id1, "localhost".to_string(), None) + .unwrap(); + db.ping_check(); + let client_ids = db.connections(); + assert_eq!(client_ids.len(), 1); + assert_eq!(db.connection_object_for_client(client_id1), Some(ob)); + + let client_ids = db.client_ids_for(ob).unwrap(); + assert_eq!(client_ids.len(), 1); + assert_eq!(client_ids[0], client_id1); + } +} diff --git a/crates/daemon/src/main.rs b/crates/daemon/src/main.rs index 2657100f..d7f20f61 100644 --- a/crates/daemon/src/main.rs +++ b/crates/daemon/src/main.rs @@ -22,15 +22,15 @@ use ed25519_dalek::SigningKey; use eyre::Report; use moor_db::DatabaseFlavour; -use pem::Pem; -use rand::rngs::OsRng; -use rusty_paseto::core::Key; -use tracing::{info, warn}; - +use moor_db_fjall::FjallDbWorldStateSource; use moor_db_wiredtiger::WiredTigerDatabaseBuilder; use moor_kernel::config::Config; use moor_kernel::tasks::scheduler::Scheduler; use moor_kernel::textdump::{textdump_load, EncodingMode}; +use pem::Pem; +use rand::rngs::OsRng; +use rusty_paseto::core::Key; +use tracing::{info, warn}; use crate::rpc_server::RpcServer; @@ -43,6 +43,7 @@ use rpc_common::load_keypair; mod connections; +mod connections_fjall; #[cfg(feature = "relbox")] mod connections_rb; mod connections_wt; @@ -50,6 +51,7 @@ mod rpc_hosts; mod rpc_server; mod rpc_session; mod sys_ctrl; +mod tasks_fjall; mod tasks_wt; #[macro_export] @@ -181,7 +183,7 @@ struct Args { // For those that don't appreciate proper English. alias = "db-flavor", help = "The database flavour to use", - default_value = "wiredtiger" + default_value = "fjall" )] db_flavour: DatabaseFlavour, @@ -292,6 +294,11 @@ fn main() -> Result<(), Report> { info!(path = ?args.db, "Opened database"); (db_source, freshly_made) } + DatabaseFlavour::Fjall => { + let (db, fresh) = FjallDbWorldStateSource::open(Some(&args.db)); + let boxed_db: Box = Box::new(db); + (boxed_db, fresh) + } #[cfg(feature = "relbox")] DatabaseFlavour::RelBox => { let db_source_builder = RelBoxDatabaseBuilder::new() @@ -343,6 +350,10 @@ fn main() -> Result<(), Report> { let (tasks_db, _) = tasks_wt::WiredTigerTasksDb::open(Some(&args.tasks_db)); Box::new(tasks_db) } + DatabaseFlavour::Fjall => { + let (tasks_db, _) = tasks_fjall::FjallTasksDB::open(&args.tasks_db); + Box::new(tasks_db) + } #[cfg(feature = "relbox")] DatabaseFlavour::RelBox => { warn!("RelBox does not support tasks persistence yet. Using a no-op tasks database. Suspended tasks will not resume on restart."); diff --git a/crates/daemon/src/rpc_server.rs b/crates/daemon/src/rpc_server.rs index 68b8daec..898da4e7 100644 --- a/crates/daemon/src/rpc_server.rs +++ b/crates/daemon/src/rpc_server.rs @@ -23,6 +23,7 @@ use std::time::{Duration, SystemTime}; use eyre::{Context, Error}; use crate::connections::ConnectionsDB; +use crate::connections_fjall::ConnectionsFjall; #[cfg(feature = "relbox")] use crate::connections_rb::ConnectionsRb; use crate::connections_wt::ConnectionsWT; @@ -116,6 +117,7 @@ impl RpcServer { .expect("Unable to bind ZMQ PUB socket"); let connections: Arc = match db_flavor { DatabaseFlavour::WiredTiger => Arc::new(ConnectionsWT::new(Some(connections_db_path))), + DatabaseFlavour::Fjall => Arc::new(ConnectionsFjall::new(Some(connections_db_path))), #[cfg(feature = "relbox")] DatabaseFlavour::RelBox => Arc::new(ConnectionsRb::new(Some(connections_db_path))), }; diff --git a/crates/daemon/src/tasks_fjall.rs b/crates/daemon/src/tasks_fjall.rs new file mode 100644 index 00000000..040c126b --- /dev/null +++ b/crates/daemon/src/tasks_fjall.rs @@ -0,0 +1,320 @@ +// Copyright (C) 2024 Ryan Daum +// +// This program is free software: you can redistribute it and/or modify it under +// the terms of the GNU General Public License as published by the Free Software +// Foundation, version 3. +// +// This program is distributed in the hope that it will be useful, but WITHOUT +// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +// FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License along with +// this program. If not, see . +// + +use fjall::{Config, Keyspace, PartitionCreateOptions, PartitionHandle}; +use moor_kernel::tasks::{TasksDb, TasksDbError}; +use moor_kernel::SuspendedTask; +use moor_values::tasks::TaskId; +use moor_values::BINCODE_CONFIG; +use std::path::Path; +use tracing::error; + +pub struct FjallTasksDB { + _keyspace: Keyspace, + tasks_partition: PartitionHandle, +} + +impl FjallTasksDB { + pub fn open(path: &Path) -> (Self, bool) { + let keyspace = Config::new(path).open().unwrap(); + let fresh = keyspace.partition_count() == 0; + let tasks_partition = keyspace + .open_partition("tasks", PartitionCreateOptions::default()) + .unwrap(); + ( + Self { + _keyspace: keyspace, + tasks_partition, + }, + fresh, + ) + } +} + +impl TasksDb for FjallTasksDB { + fn load_tasks(&self) -> Result, TasksDbError> { + let pi = self.tasks_partition.iter(); + let mut tasks = vec![]; + for entry in pi { + let entry = entry.map_err(|_| TasksDbError::CouldNotLoadTasks)?; + let task_id = TaskId::from_le_bytes(entry.0.as_ref().try_into().map_err(|e| { + error!("Failed to deserialize TaskId from record: {:?}", e); + TasksDbError::CouldNotLoadTasks + })?); + let tasks_bytes = entry.1.as_ref(); + let (task, _): (SuspendedTask, usize) = + bincode::decode_from_slice(tasks_bytes, *BINCODE_CONFIG) + .map_err(|e| { + error!("Failed to deserialize SuspendedTask record: {:?}", e); + TasksDbError::CouldNotLoadTasks + }) + .expect("Failed to deserialize record"); + if task_id != task.task.task_id { + panic!("Task ID mismatch: {:?} != {:?}", task_id, task.task.task_id); + } + tasks.push(task); + } + Ok(tasks) + } + + fn save_task(&self, task: &SuspendedTask) -> Result<(), TasksDbError> { + let task_id = task.task.task_id.to_le_bytes(); + let task_bytes = bincode::encode_to_vec(task, *BINCODE_CONFIG).map_err(|e| { + error!("Failed to serialize record: {:?}", e); + TasksDbError::CouldNotSaveTask + })?; + + self.tasks_partition + .insert(task_id, &task_bytes) + .map_err(|e| { + error!("Failed to insert record: {:?}", e); + TasksDbError::CouldNotSaveTask + })?; + + Ok(()) + } + + fn delete_task(&self, task_id: TaskId) -> Result<(), TasksDbError> { + let task_id = task_id.to_le_bytes(); + self.tasks_partition.remove(task_id).map_err(|e| { + error!("Failed to delete record: {:?}", e); + TasksDbError::CouldNotDeleteTask + })?; + Ok(()) + } + + fn delete_all_tasks(&self) -> Result<(), TasksDbError> { + for entry in self.tasks_partition.iter() { + let entry = entry.map_err(|_| TasksDbError::CouldNotDeleteTask)?; + self.tasks_partition.remove(entry.0.as_ref()).map_err(|e| { + error!("Failed to delete record: {:?}", e); + TasksDbError::CouldNotDeleteTask + })?; + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use crate::tasks_fjall::FjallTasksDB; + use moor_kernel::tasks::sessions::NoopClientSession; + use moor_kernel::tasks::{ServerOptions, TaskStart, TasksDb}; + use moor_kernel::{SuspendedTask, Task, WakeCondition}; + use moor_values::SYSTEM_OBJECT; + use std::sync::atomic::AtomicBool; + use std::sync::Arc; + + // Verify creation of an empty DB, including creation of tables. + #[test] + fn open_reopen() { + let tmpdir = tempfile::tempdir().expect("Unable to create temporary directory"); + let path = tmpdir.path(); + { + let (db, is_fresh) = FjallTasksDB::open(path); + assert!(is_fresh); + let tasks = db.load_tasks().unwrap(); + assert_eq!(tasks.len(), 0); + } + { + let (db, is_fresh) = FjallTasksDB::open(path); + assert!(!is_fresh); + let tasks = db.load_tasks().unwrap(); + assert_eq!(tasks.len(), 0); + } + } + + // Verify putting a single task into a fresh db, closing it and reopening it, and getting it out + #[test] + fn save_load() { + let task_id = 0; + let so = ServerOptions { + bg_seconds: 0, + bg_ticks: 0, + fg_seconds: 0, + fg_ticks: 0, + max_stack_depth: 0, + }; + + /* + perms: Objid, + server_options: &ServerOptions, + kill_switch: Arc, + */ + let task = Task::new( + task_id, + SYSTEM_OBJECT, + Arc::new(TaskStart::StartEval { + player: SYSTEM_OBJECT, + program: Default::default(), + }), + SYSTEM_OBJECT, + &so, + Arc::new(AtomicBool::new(false)), + ); + + // Mock task... + let suspended = SuspendedTask { + wake_condition: WakeCondition::Never, + task, + session: Arc::new(NoopClientSession::new()), + result_sender: None, + }; + let tmpdir = tempfile::tempdir().expect("Unable to create temporary directory"); + let path = tmpdir.path(); + + { + let (db, is_fresh) = FjallTasksDB::open(path); + assert!(is_fresh); + db.save_task(&suspended).unwrap(); + let tasks = db.load_tasks().unwrap(); + assert_eq!(tasks.len(), 1); + assert_eq!(tasks[0].task.task_id, task_id); + } + + { + let (db, is_fresh) = FjallTasksDB::open(path); + assert!(!is_fresh); + let tasks = db.load_tasks().unwrap(); + assert_eq!(tasks.len(), 1); + assert_eq!(tasks[0].task.task_id, task_id); + } + } + + // Create a series of tasks, save them, load them, and verify they are the same. + #[test] + fn save_load_multiple() { + let mut tasks = vec![]; + for task_id in 0..50 { + let so = ServerOptions { + bg_seconds: 0, + bg_ticks: 0, + fg_seconds: 0, + fg_ticks: 0, + max_stack_depth: 0, + }; + + let task = Task::new( + task_id, + SYSTEM_OBJECT, + Arc::new(TaskStart::StartEval { + player: SYSTEM_OBJECT, + program: Default::default(), + }), + SYSTEM_OBJECT, + &so, + Arc::new(AtomicBool::new(false)), + ); + + // Mock task... + let suspended = SuspendedTask { + wake_condition: WakeCondition::Never, + task, + session: Arc::new(NoopClientSession::new()), + result_sender: None, + }; + tasks.push(suspended); + } + + // Write em + let tmpdir = tempfile::tempdir().expect("Unable to create temporary directory"); + let path = tmpdir.path(); + { + let (db, is_fresh) = FjallTasksDB::open(path); + assert!(is_fresh); + for task in tasks.iter() { + db.save_task(task).unwrap(); + } + } + + // Load em + let (db, is_fresh) = FjallTasksDB::open(path); + assert!(!is_fresh); + let loaded_tasks = db.load_tasks().unwrap(); + assert_eq!(loaded_tasks.len(), tasks.len()); + for (task, loaded_task) in tasks.iter().zip(loaded_tasks.iter()) { + assert_eq!(task.task.task_id, loaded_task.task.task_id); + } + } + + // Create a series of tasks, save them, delete a few, and load verify the rest are there and + // the deleted are not. + #[test] + fn save_delete_load_multiple() { + let mut tasks = vec![]; + for task_id in 0..50 { + let so = ServerOptions { + bg_seconds: 0, + bg_ticks: 0, + fg_seconds: 0, + fg_ticks: 0, + max_stack_depth: 0, + }; + + let task = Task::new( + task_id, + SYSTEM_OBJECT, + Arc::new(TaskStart::StartEval { + player: SYSTEM_OBJECT, + program: Default::default(), + }), + SYSTEM_OBJECT, + &so, + Arc::new(AtomicBool::new(false)), + ); + + // Mock task... + let suspended = SuspendedTask { + wake_condition: WakeCondition::Never, + task, + session: Arc::new(NoopClientSession::new()), + result_sender: None, + }; + tasks.push(suspended); + } + + // Write em + let tmpdir = tempfile::tempdir().expect("Unable to create temporary directory"); + let path = tmpdir.path(); + { + let (db, is_fresh) = FjallTasksDB::open(path); + assert!(is_fresh); + for task in tasks.iter() { + db.save_task(task).unwrap(); + } + } + + { + // Delete some + let (db, is_fresh) = FjallTasksDB::open(path); + assert!(!is_fresh); + for task_id in 0..50 { + if task_id % 2 == 0 { + db.delete_task(task_id).unwrap(); + } + } + } + + // Load em + let (db, is_fresh) = FjallTasksDB::open(path); + assert!(!is_fresh); + let loaded_tasks = db.load_tasks().unwrap(); + assert_eq!(loaded_tasks.len(), 25); + + // Go through the loaded tasks and make sure the deleted ones are not there. + for task in loaded_tasks.iter() { + assert!(task.task.task_id % 2 != 0); + } + } +} diff --git a/crates/daemon/src/tasks_wt.rs b/crates/daemon/src/tasks_wt.rs index 13b8b466..391af256 100644 --- a/crates/daemon/src/tasks_wt.rs +++ b/crates/daemon/src/tasks_wt.rs @@ -167,7 +167,7 @@ impl TasksDb for WiredTigerTasksDb { .open_session(self.session_config.clone()) .map_err(|e| { error!("Failed to open session: {:?}", e); - TasksDbError::CouldNotLoadTasks + TasksDbError::CouldNotSaveTask })?; session.begin_transaction(None).unwrap(); @@ -175,34 +175,34 @@ impl TasksDb for WiredTigerTasksDb { .open_cursor(&self.tasks_table, Some(CursorConfig::new().raw(true))) .map_err(|e| { error!("Failed to open cursor: {:?}", e); - TasksDbError::CouldNotLoadTasks + TasksDbError::CouldNotSaveTask })?; let task_id = task.task.task_id.to_le_bytes(); let task_bytes = bincode::encode_to_vec(task, *BINCODE_CONFIG).map_err(|e| { error!("Failed to serialize record: {:?}", e); - TasksDbError::CouldNotLoadTasks + TasksDbError::CouldNotSaveTask })?; cursor .set_key(Datum::from_vec(task_id.to_vec())) .map_err(|e| { error!("Failed to set key: {:?}", e); - TasksDbError::CouldNotLoadTasks + TasksDbError::CouldNotSaveTask })?; cursor.set_value(Datum::from_vec(task_bytes)).map_err(|e| { error!("Failed to set value: {:?}", e); - TasksDbError::CouldNotLoadTasks + TasksDbError::CouldNotSaveTask })?; cursor.insert().map_err(|e| { error!("Failed to insert record: {:?}", e); - TasksDbError::CouldNotLoadTasks + TasksDbError::CouldNotSaveTask })?; session.commit().map_err(|e| { error!("Failed to commit transaction: {:?}", e); - TasksDbError::CouldNotLoadTasks + TasksDbError::CouldNotSaveTask })?; Ok(()) @@ -215,7 +215,7 @@ impl TasksDb for WiredTigerTasksDb { .open_session(self.session_config.clone()) .map_err(|e| { error!("Failed to open session: {:?}", e); - TasksDbError::CouldNotLoadTasks + TasksDbError::CouldNotDeleteTask })?; session.begin_transaction(None).unwrap(); @@ -223,7 +223,7 @@ impl TasksDb for WiredTigerTasksDb { .open_cursor(&self.tasks_table, Some(CursorConfig::new().raw(true))) .map_err(|e| { error!("Failed to open cursor: {:?}", e); - TasksDbError::CouldNotLoadTasks + TasksDbError::CouldNotDeleteTask })?; let task_id = task_id.to_le_bytes(); @@ -232,16 +232,16 @@ impl TasksDb for WiredTigerTasksDb { .set_key(Datum::from_vec(task_id.to_vec())) .map_err(|e| { error!("Failed to set key: {:?}", e); - TasksDbError::CouldNotLoadTasks + TasksDbError::CouldNotDeleteTask })?; cursor.remove().map_err(|e| { error!("Failed to remove record: {:?}", e); - TasksDbError::CouldNotLoadTasks + TasksDbError::CouldNotDeleteTask })?; session.commit().map_err(|e| { error!("Failed to commit transaction: {:?}", e); - TasksDbError::CouldNotLoadTasks + TasksDbError::CouldNotDeleteTask })?; Ok(()) @@ -255,7 +255,7 @@ impl TasksDb for WiredTigerTasksDb { .open_session(self.session_config.clone()) .map_err(|e| { error!("Failed to open session: {:?}", e); - TasksDbError::CouldNotLoadTasks + TasksDbError::CouldNotDeleteTask })?; session.begin_transaction(None).unwrap(); @@ -263,12 +263,12 @@ impl TasksDb for WiredTigerTasksDb { .open_cursor(&self.tasks_table, Some(CursorConfig::new().raw(true))) .map_err(|e| { error!("Failed to open cursor: {:?}", e); - TasksDbError::CouldNotLoadTasks + TasksDbError::CouldNotDeleteTask })?; cursor.reset().map_err(|e| { error!("Failed to reset cursor to start: {:?}", e); - TasksDbError::CouldNotLoadTasks + TasksDbError::CouldNotDeleteTask })?; loop { @@ -279,19 +279,19 @@ impl TasksDb for WiredTigerTasksDb { } Err(e) => { error!("Failed to advance cursor: {:?}", e); - return Err(TasksDbError::CouldNotLoadTasks); + return Err(TasksDbError::CouldNotDeleteTask); } } cursor.remove().map_err(|e| { error!("Failed to remove record: {:?}", e); - TasksDbError::CouldNotLoadTasks + TasksDbError::CouldNotDeleteTask })?; } session.commit().map_err(|e| { error!("Failed to commit transaction: {:?}", e); - TasksDbError::CouldNotLoadTasks + TasksDbError::CouldNotDeleteTask })?; Ok(()) } diff --git a/crates/db-fjall/Cargo.toml b/crates/db-fjall/Cargo.toml new file mode 100644 index 00000000..180fdcd4 --- /dev/null +++ b/crates/db-fjall/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "moor-db-fjall" +version = "0.1.0" +authors.workspace = true +categories.workspace = true +edition.workspace = true +keywords.workspace = true +license.workspace = true +readme.workspace = true +repository.workspace = true +rust-version.workspace = true +description = "Database layer using RelBox" + +[dev-dependencies] +tempfile.workspace = true + +[dependencies] +moor-db = { path = "../db" } +moor-values = { path = "../values" } + +fjall.workspace = true + +## General usefulness +bytes.workspace = true +strum.workspace = true +tempfile.workspace = true +uuid.workspace = true + +## Logging & tracing +tracing.workspace = true diff --git a/crates/db-fjall/src/fjall_db.rs b/crates/db-fjall/src/fjall_db.rs new file mode 100644 index 00000000..0cc8258a --- /dev/null +++ b/crates/db-fjall/src/fjall_db.rs @@ -0,0 +1,110 @@ +// Copyright (C) 2024 Ryan Daum +// +// This program is free software: you can redistribute it and/or modify it under +// the terms of the GNU General Public License as published by the Free Software +// Foundation, version 3. +// +// This program is distributed in the hope that it will be useful, but WITHOUT +// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +// FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License along with +// this program. If not, see . +// + +use crate::fjall_relation::RelationPartitions; +use crate::FjallTransaction; +use fjall::{Config, PartitionCreateOptions, TxKeyspace, TxPartitionHandle}; +use moor_db::WorldStateTable; +use std::fmt::Display; +use std::marker::PhantomData; +use std::path::Path; +use strum::{EnumProperty, IntoEnumIterator}; +use tempfile::TempDir; + +#[cfg(test)] +mod tests { + use crate::fjall_db::FjallDb; + use moor_values::model::WorldStateSource; + + #[test] + fn test_fjall_db_open_close() { + let (db, fresh) = FjallDb::open(None); + assert!(fresh); + db.checkpoint().unwrap(); + } +} + +pub struct FjallDb +where + Relation: Send + Sync + Display + Into + Copy, +{ + pub(crate) keyspace: TxKeyspace, + sequences_partition: TxPartitionHandle, + relations_partitions: Vec, + phantom_data: PhantomData, + + /// If this is a temporary database, this will be Some(TempDir) that will be cleaned up when + /// the database is dropped. + _tmpdir: Option, +} + +impl FjallDb +where + Relation: Send + Sync + Display + Into + Copy, +{ + pub fn open(path: Option<&Path>) -> (Self, bool) { + let tmpdir = if path.is_none() { + Some(TempDir::new().unwrap()) + } else { + None + }; + + let path = path.unwrap_or_else(|| tmpdir.as_ref().unwrap().path()); + let keyspace = Config::new(path).open_transactional().unwrap(); + let sequences_partition = keyspace + .open_partition("sequences", PartitionCreateOptions::default()) + .unwrap(); + let mut relations_partitions = Vec::new(); + + // If the partitions count in the keyspaces is not equal to the count of relations in the + // WorldStateTable, we're "fresh" + let fresh = keyspace.partition_count() != WorldStateTable::iter().count(); + for relation in WorldStateTable::iter() { + let partition = keyspace + .open_partition(&relation.to_string(), PartitionCreateOptions::default()) + .unwrap(); + let has_secondary = relation + .get_str("SecondaryIndexed") + .map(|it| it == "true") + .unwrap_or(false); + let secondary = (has_secondary).then(|| { + keyspace + .open_partition( + &format!("{}_secondary", relation.to_string()), + PartitionCreateOptions::default(), + ) + .unwrap() + }); + relations_partitions.push(RelationPartitions { + primary: partition, + secondary, + }); + } + ( + Self { + keyspace, + sequences_partition, + relations_partitions, + phantom_data: Default::default(), + _tmpdir: tmpdir, + }, + fresh, + ) + } + + pub fn new_transaction(&self) -> FjallTransaction { + let tx = self.keyspace.write_tx().unwrap(); + FjallTransaction::new(tx, &self.sequences_partition, &self.relations_partitions) + } +} diff --git a/crates/db-fjall/src/fjall_relation.rs b/crates/db-fjall/src/fjall_relation.rs new file mode 100644 index 00000000..1942fe66 --- /dev/null +++ b/crates/db-fjall/src/fjall_relation.rs @@ -0,0 +1,918 @@ +// Copyright (C) 2024 Ryan Daum +// +// This program is free software: you can redistribute it and/or modify it under +// the terms of the GNU General Public License as published by the Free Software +// Foundation, version 3. +// +// This program is distributed in the hope that it will be useful, but WITHOUT +// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +// FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License along with +// this program. If not, see . +// + +use crate::value_set::ValueSet; +use bytes::Bytes; +use fjall::{TxPartitionHandle, WriteTransaction}; +use moor_db::RelationalError::NotFound; +use moor_db::{RelationalError, RelationalTransaction}; +use moor_values::model::{CommitResult, ValSet}; +use moor_values::{AsByteBuffer, DecodingError, EncodingError}; +use std::fmt::{Debug, Display}; +use std::mem::MaybeUninit; + +#[derive(Clone)] +pub(crate) struct RelationPartitions { + // Domain -> Codomain + pub(crate) primary: TxPartitionHandle, + // Codomain -> Domains (Vector) + pub(crate) secondary: Option, +} + +pub type OpResult = std::result::Result; + +#[derive(Clone, Eq, PartialEq)] +pub(crate) struct CompositeDomain +where + DomainA: Clone + Eq + PartialEq + AsByteBuffer, + DomainB: Clone + Eq + PartialEq + AsByteBuffer, +{ + data: Bytes, + phantom: std::marker::PhantomData<(DomainA, DomainB)>, +} + +impl Debug for CompositeDomain +where + DomainA: Clone + Eq + PartialEq + AsByteBuffer, + DomainB: Clone + Eq + PartialEq + AsByteBuffer, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_tuple("CompositeDomain").field(&self.data).finish() + } +} + +impl CompositeDomain +where + DomainA: Clone + Eq + PartialEq + AsByteBuffer, + DomainB: Clone + Eq + PartialEq + AsByteBuffer, +{ + fn new(data: Bytes) -> Self { + Self { + data, + phantom: Default::default(), + } + } + + fn composite_key(domain_a: DomainA, domain_b: DomainB) -> CompositeDomain { + let mut key = Vec::new(); + key.extend_from_slice(&domain_a.as_bytes().unwrap()); + key.extend_from_slice(&domain_b.as_bytes().unwrap()); + CompositeDomain { + data: Bytes::from(key), + phantom: Default::default(), + } + } +} + +impl AsByteBuffer for CompositeDomain +where + DomainA: Clone + Eq + PartialEq + AsByteBuffer, + DomainB: Clone + Eq + PartialEq + AsByteBuffer, +{ + fn size_bytes(&self) -> usize { + self.data.len() + } + + fn with_byte_buffer R>(&self, mut f: F) -> Result { + Ok(f(self.data.as_ref())) + } + + fn make_copy_as_vec(&self) -> Result, EncodingError> { + Ok(self.data.to_vec()) + } + + fn from_bytes(bytes: Bytes) -> Result + where + Self: Sized, + { + Ok(Self::new(bytes)) + } + + fn as_bytes(&self) -> Result { + Ok(self.data.clone()) + } +} + +pub struct FjallTransaction +where + Relation: Send + Sync + Display, +{ + tx: MaybeUninit, + sequences_partition: TxPartitionHandle, + relations_partitions: Vec, + phantom: std::marker::PhantomData, +} + +impl FjallTransaction +where + Relation: Send + Sync + Display + Into + Copy, +{ + pub(crate) fn new( + tx: WriteTransaction, + sequences_partition: &TxPartitionHandle, + relations_partitions: &Vec, + ) -> Self { + Self { + tx: MaybeUninit::new(tx), + sequences_partition: sequences_partition.clone(), + relations_partitions: relations_partitions.clone(), + phantom: Default::default(), + } + } + + #[allow(clippy::mut_from_ref)] + fn write_tx(&self) -> &mut WriteTransaction { + // Dirty tricks to make mutable. Ugh. + // TODO: We have to do this because I made the choice somewhere up the pipe to have the + // world state transaction be !mut. There were... reasons... but I will need to revisit + // this. + unsafe { &mut *(self.tx.as_ptr() as *mut WriteTransaction) } + } + + fn take_tx(self) -> WriteTransaction { + unsafe { self.tx.assume_init() } + } + + fn primary_partition(&self, rel: Relation) -> TxPartitionHandle { + let idx: usize = rel.into(); + self.relations_partitions[idx].primary.clone() + } + + fn secondary_partition(&self, rel: Relation) -> Option { + let idx: usize = rel.into(); + self.relations_partitions[idx].secondary.clone() + } +} + +impl FjallTransaction +where + Relation: Send + Sync + Display + Into + Copy, +{ + // Lookup in a codomain index and a get a Vec + fn codomain_lookup< + Domain: Clone + Eq + PartialEq + AsByteBuffer, + Codomain: Clone + Eq + PartialEq + AsByteBuffer, + >( + &self, + tx: &mut WriteTransaction, + table: TxPartitionHandle, + codomain: Codomain, + ) -> OpResult> { + let result = tx.get(&table, codomain.as_bytes().unwrap()).unwrap(); + if result.is_none() { + return Err(RelationalError::NotFound); + } + let bytes = result.unwrap(); + let cset = ValueSet::new(bytes.into()); + Ok(cset) + } + + // Remove a domain from a codomain index. + fn codomain_remove( + &self, + tx: &mut WriteTransaction, + table: TxPartitionHandle, + codomain_bytes: Bytes, + domain_bytes: Bytes, + ) -> OpResult<()> { + let result = tx.get(&table, codomain_bytes.clone()).unwrap(); + let cset: ValueSet = match result { + Some(value) => ValueSet::new(value.into()), + None => { + return Err(RelationalError::NotFound); + } + }; + + let new_cset = cset.without_bytes(&domain_bytes); + tx.insert(&table, codomain_bytes, &new_cset.data()); + Ok(()) + } + + // Insert a domain into a codomain index. + fn codomain_insert< + Domain: Clone + Eq + PartialEq + AsByteBuffer, + Codomain: Clone + Eq + PartialEq + AsByteBuffer, + >( + &self, + tx: &mut WriteTransaction, + table: TxPartitionHandle, + codomain: Codomain, + domain: Domain, + ) -> OpResult<()> { + let result = tx.get(&table, codomain.as_bytes().unwrap()).unwrap(); + let cset = match result { + Some(value) => ValueSet::new(value.into()), + None => ValueSet::new(Bytes::from(vec![0, 0, 0, 0])), + }; + let new_cset = cset.append(domain); + tx.insert(&table, codomain.as_bytes().unwrap(), &new_cset.data()); + + Ok(()) + } +} + +impl RelationalTransaction for FjallTransaction +where + Relation: Send + Sync + Display + Into + Copy, +{ + fn commit(self) -> CommitResult { + let tx = self.take_tx(); + match tx.commit() { + Ok(Ok(())) => CommitResult::Success, + Ok(Err(_)) => CommitResult::ConflictRetry, + Err(e) => { + // This is a fundamental database error, so we should panic. + panic!("Error committing transaction: {:?}", e); + } + } + } + + fn rollback(self) { + let tx = self.take_tx(); + tx.rollback(); + } + + fn increment_sequence>(&self, seq: S) -> i64 { + let tx = self.write_tx(); + let seq_num = seq.into(); + let seq_name = format!("seq_{}", seq_num); + let prev = tx + .get(&self.sequences_partition, &seq_name) + .unwrap() + .map(|v| { + let mut bytes = [0; 8]; + bytes.copy_from_slice(&v); + i64::from_le_bytes(bytes) + }) + .unwrap_or(-1); + let next = prev + 1; + let mut next_bytes = [0; 8]; + next_bytes.copy_from_slice(&next.to_le_bytes()); + tx.insert(&self.sequences_partition, &seq_name, &next_bytes); + next + } + + fn update_sequence_max>(&self, seq: S, value: i64) -> i64 { + let tx = self.write_tx(); + let seq_num = seq.into(); + let seq_name = format!("seq_{}", seq_num); + let prev = tx + .get(&self.sequences_partition, &seq_name) + .unwrap() + .map(|v| { + let mut bytes = [0; 8]; + bytes.copy_from_slice(&v); + i64::from_le_bytes(bytes) + }) + .unwrap_or(0); + let next = prev.max(value as i64); + let mut next_bytes = [0; 8]; + next_bytes.copy_from_slice(&next.to_le_bytes()); + tx.insert(&self.sequences_partition, &seq_name, &next_bytes); + next + } + + fn get_sequence>(&self, seq: S) -> Option { + let tx = self.write_tx(); + let seq_num = seq.into(); + let seq_name = format!("seq_{}", seq_num); + let Some(seq_val) = tx.get(&self.sequences_partition, &seq_name).unwrap() else { + return None; + }; + let mut bytes = [0; 8]; + bytes.copy_from_slice(&seq_val); + Some(u64::from_le_bytes(bytes) as i64) + } + + fn remove_by_domain( + &self, + rel: Relation, + domain: Domain, + ) -> OpResult<()> { + let tx = self.write_tx(); + let table = self.primary_partition(rel); + let domain_bytes = domain.as_bytes().unwrap(); + let result = tx.take(&table, domain_bytes.clone()).unwrap(); + if result.is_none() { + return Err(RelationalError::NotFound); + } + + // Remove from secondary index if it exists. + if let Some(secondary) = self.secondary_partition(rel) { + let codomain_bytes = Bytes::from(result.unwrap()); + self.codomain_remove::(tx, secondary, codomain_bytes, domain_bytes)?; + } + + Ok(()) + } + + fn remove_by_composite_domain< + DomainA: Clone + Eq + PartialEq + AsByteBuffer, + DomainB: Clone + Eq + PartialEq + AsByteBuffer, + >( + &self, + rel: Relation, + domain_a: DomainA, + domain_b: DomainB, + ) -> OpResult<()> { + let tx = self.write_tx(); + let table = &self.primary_partition(rel); + let key = CompositeDomain::composite_key(domain_a, domain_b); + let result = tx.take(&table, key.as_bytes().unwrap()).unwrap(); + if result.is_none() { + return Err(RelationalError::NotFound); + } + // Remove from secondary index if it exists. + if let Some(secondary) = self.secondary_partition(rel) { + let codomain_bytes = Bytes::from(result.unwrap()); + self.codomain_remove::>( + tx, + secondary, + codomain_bytes, + key.as_bytes().unwrap(), + )?; + } + + Ok(()) + } + + fn remove_by_codomain< + Domain: Clone + Eq + PartialEq + AsByteBuffer, + Codomain: Clone + Eq + PartialEq + AsByteBuffer, + >( + &self, + rel: Relation, + codomain: Codomain, + ) -> OpResult<()> { + // Seek the codomain index first to find the domain. + // If we find it, remove it from both places, otherwise return NotFound. + let tx = self.write_tx(); + let secondary = self.secondary_partition(rel).expect("No secondary index"); + let result = tx.get(&secondary, codomain.as_bytes().unwrap()).unwrap(); + if result.is_none() { + return Err(RelationalError::NotFound); + } + + let domain_bytes = Bytes::from(result.unwrap()); + let primary = self.primary_partition(rel); + let result = tx.get(&primary, &domain_bytes).unwrap(); + if result.is_none() { + return Err(RelationalError::NotFound); + } + + self.codomain_remove::(tx, secondary, codomain.as_bytes().unwrap(), domain_bytes) + } + + fn upsert< + Domain: Clone + Eq + PartialEq + AsByteBuffer, + Codomain: Clone + Eq + PartialEq + AsByteBuffer, + >( + &self, + rel: Relation, + domain: Domain, + codomain: Codomain, + ) -> OpResult<()> { + let tx = self.write_tx(); + let table = self.primary_partition(rel); + let key = domain.as_bytes().unwrap(); + let value = codomain.as_bytes().unwrap(); + + // Check for an old value. + let old_value = tx.get(&table, &key).unwrap(); + tx.insert(&table, &key, &value); + if let Some(secondary) = self.secondary_partition(rel) { + // Remove the old value from the secondary index. + if let Some(old_value) = old_value { + self.codomain_remove::(tx, secondary.clone(), old_value.into(), key)?; + } + self.codomain_insert(tx, secondary, codomain, domain)?; + } + Ok(()) + } + + fn insert_tuple< + Domain: Clone + Eq + PartialEq + AsByteBuffer, + Codomain: Clone + Eq + PartialEq + AsByteBuffer, + >( + &self, + rel: Relation, + domain: Domain, + codomain: Codomain, + ) -> OpResult<()> { + // Have to check if the tuple already exists. + let tx = self.write_tx(); + let table = self.primary_partition(rel); + let key = domain.as_bytes().unwrap(); + if tx.get(&table, &key).unwrap().is_some() { + return Err(RelationalError::Duplicate( + "Tuple already exists".to_string(), + )); + } + let value = codomain.as_bytes().unwrap(); + tx.insert(&table, &key, &value); + if let Some(secondary) = self.secondary_partition(rel) { + self.codomain_insert(tx, secondary, codomain, domain)?; + } + Ok(()) + } + + fn scan_with_predicate( + &self, + rel: Relation, + pred: P, + ) -> OpResult> + where + P: Fn(&Domain, &Codomain) -> bool, + Codomain: Clone + Eq + PartialEq + AsByteBuffer, + Domain: Clone + Eq + PartialEq + AsByteBuffer, + { + let tx = self.write_tx(); + let table = self.primary_partition(rel); + let mut results = Vec::new(); + + for entry in tx.iter(&table) { + let (key, value) = entry.unwrap(); + let domain = Domain::from_bytes(key.into()).unwrap(); + let codomain = Codomain::from_bytes(value.into()).unwrap(); + if pred(&domain, &codomain) { + results.push((domain, codomain)); + } + } + Ok(results) + } + + fn seek_unique_by_domain< + Domain: Clone + Eq + PartialEq + AsByteBuffer, + Codomain: Clone + Eq + PartialEq + AsByteBuffer, + >( + &self, + rel: Relation, + domain: Domain, + ) -> OpResult> { + let tx = self.write_tx(); + let table = self.primary_partition(rel); + let key = domain.as_bytes().unwrap(); + let result = tx.get(&table, &key).unwrap(); + match result { + Some(value) => Ok(Some(Codomain::from_bytes(value.into()).unwrap())), + None => Ok(None), + } + } + + fn tuple_size_for_unique_domain( + &self, + rel: Relation, + domain: Domain, + ) -> OpResult> { + let tx = self.write_tx(); + let table = self.primary_partition(rel); + let key = domain.as_bytes().unwrap(); + let result = tx.get(&table, &key).unwrap(); + match result { + Some(value) => Ok(Some(value.len())), + None => Ok(None), + } + } + + fn tuple_size_for_unique_codomain( + &self, + rel: Relation, + codomain: Codomain, + ) -> OpResult> { + let tx = self.write_tx(); + let table = self.secondary_partition(rel).unwrap(); + let key = codomain.as_bytes().unwrap(); + let result = tx.get(&table, &key).unwrap(); + match result { + Some(value) => { + let cset = ValueSet::new(value.into()); + Ok(cset + .find(codomain) + .map(|idx| cset.at(idx).unwrap().size_bytes())) + } + None => Ok(None), + } + } + + fn seek_unique_by_codomain< + Domain: Clone + Eq + PartialEq + AsByteBuffer, + Codomain: Clone + Eq + PartialEq + AsByteBuffer, + >( + &self, + rel: Relation, + codomain: Codomain, + ) -> OpResult { + let cset: ValueSet = self.codomain_lookup( + self.write_tx(), + self.secondary_partition(rel).unwrap(), + codomain, + )?; + let len = cset.len(); + if len == 0 { + return Err(RelationalError::NotFound); + } + if cset.len() != 1 { + return Err(RelationalError::Duplicate(format!( + "Multiple tuples found for codomain. {} values present", + cset.len() + ))); + } + + cset.at(0).map(Ok).unwrap() + } + + fn seek_by_codomain< + Domain: Clone + Eq + PartialEq + AsByteBuffer, + Codomain: Clone + Eq + PartialEq + AsByteBuffer, + ResultSet: ValSet, + >( + &self, + rel: Relation, + codomain: Codomain, + ) -> OpResult { + let cset = match self.codomain_lookup( + self.write_tx(), + self.secondary_partition(rel).unwrap(), + codomain, + ) { + Ok(cset) => cset, + Err(NotFound) => { + return Ok(ResultSet::empty()); + } + Err(e) => { + return Err(e); + } + }; + + let cset_iter = cset.iter(); + Ok(ResultSet::from_iter(cset_iter)) + } + + fn seek_by_unique_composite_domain< + DomainA: Clone + Eq + PartialEq + AsByteBuffer, + DomainB: Clone + Eq + PartialEq + AsByteBuffer, + Codomain: Clone + Eq + PartialEq + AsByteBuffer, + >( + &self, + rel: Relation, + domain_a: DomainA, + domain_b: DomainB, + ) -> OpResult> { + let key = CompositeDomain::composite_key(domain_a, domain_b); + self.seek_unique_by_domain(rel, key) + } + + fn tuple_size_by_composite_domain< + DomainA: Clone + Eq + PartialEq + AsByteBuffer, + DomainB: Clone + Eq + PartialEq + AsByteBuffer, + >( + &self, + rel: Relation, + domain_a: DomainA, + domain_b: DomainB, + ) -> OpResult> { + let key = CompositeDomain::composite_key(domain_a, domain_b); + self.tuple_size_for_unique_domain(rel, key) + } + + fn insert_composite_domain_tuple< + Codomain: Clone + Eq + PartialEq + AsByteBuffer, + DomainA: Clone + Eq + PartialEq + AsByteBuffer, + DomainB: Clone + Eq + PartialEq + AsByteBuffer, + >( + &self, + rel: Relation, + domain_a: DomainA, + domain_b: DomainB, + codomain: Codomain, + ) -> OpResult<()> { + let key = CompositeDomain::composite_key(domain_a, domain_b); + self.insert_tuple(rel, key, codomain) + } + + fn delete_composite_if_exists< + DomainA: Clone + Eq + PartialEq + AsByteBuffer, + DomainB: Clone + Eq + PartialEq + AsByteBuffer, + >( + &self, + rel: Relation, + domain_a: DomainA, + domain_b: DomainB, + ) -> OpResult<()> { + let key = CompositeDomain::composite_key(domain_a, domain_b); + let result = self.remove_by_domain(rel, key); + match result { + Ok(()) => Ok(()), + Err(NotFound) => Ok(()), + Err(e) => Err(e), + } + } + + fn upsert_composite< + DomainA: Clone + Eq + PartialEq + AsByteBuffer, + DomainB: Clone + Eq + PartialEq + AsByteBuffer, + Codomain: Clone + Eq + PartialEq + AsByteBuffer, + >( + &self, + rel: Relation, + domain_a: DomainA, + domain_b: DomainB, + value: Codomain, + ) -> OpResult<()> { + let key = CompositeDomain::composite_key(domain_a, domain_b); + self.upsert(rel, key, value) + } + + fn delete_if_exists( + &self, + rel: Relation, + domain: Domain, + ) -> OpResult<()> { + let result = self.remove_by_domain(rel, domain); + match result { + Ok(()) => Ok(()), + Err(NotFound) => Ok(()), + Err(e) => Err(e), + } + } +} + +#[cfg(test)] +mod tests { + use crate::fjall_db::FjallDb; + use crate::fjall_relation::tests::TestRelation::{ + CompositeToOne, OneToOne, OneToOneSecondaryIndexed, + }; + use moor_db::RelationalTransaction; + use moor_values::model::{ObjSet, ValSet}; + use moor_values::Objid; + use strum::{AsRefStr, Display, EnumCount, EnumIter, EnumProperty}; + + /// The set of binary relations that are used to represent the world state in the moor system. + #[repr(usize)] + #[derive( + Copy, Clone, Debug, Eq, PartialEq, EnumIter, EnumCount, Display, EnumProperty, AsRefStr, + )] + pub enum TestRelation { + /// Object<->Parent + OneToOne = 0, + /// Object<->Location + #[strum(props(SecondaryIndexed = "true"))] + OneToOneSecondaryIndexed = 1, + /// (Object, UUID)->PropertyValue (Var) + #[strum(props(CompositeDomain = "true", Domain_A_Size = "8", Domain_B_Size = "16"))] + CompositeToOne = 8, + /// Set of sequences sequence_id -> current_value + Sequences = 9, + } + + impl Into for TestRelation { + fn into(self) -> usize { + self as usize + } + } + + fn test_db() -> FjallDb { + let (db, _) = FjallDb::open(None); + db + } + + #[test] + fn test_insert_seek_unique() { + let db = test_db(); + let tx = db.new_transaction(); + + tx.insert_tuple(OneToOne, Objid(1), Objid(2)).unwrap(); + tx.insert_tuple(OneToOne, Objid(2), Objid(3)).unwrap(); + tx.insert_tuple(OneToOne, Objid(3), Objid(4)).unwrap(); + assert_eq!( + tx.seek_unique_by_domain::(OneToOne, Objid(1)) + .unwrap(), + Some(Objid(2)) + ); + assert_eq!( + tx.seek_unique_by_domain::(OneToOne, Objid(2)) + .unwrap(), + Some(Objid(3)) + ); + assert_eq!( + tx.seek_unique_by_domain::(OneToOne, Objid(3)) + .unwrap(), + Some(Objid(4)) + ); + } + + #[test] + fn test_composite_insert_seek_unique() { + let db = test_db(); + let tx = db.new_transaction(); + + tx.insert_composite_domain_tuple(CompositeToOne, Objid(1), Objid(2), Objid(3)) + .unwrap(); + tx.insert_composite_domain_tuple(CompositeToOne, Objid(2), Objid(3), Objid(4)) + .unwrap(); + tx.insert_composite_domain_tuple(CompositeToOne, Objid(3), Objid(4), Objid(5)) + .unwrap(); + + assert_eq!( + tx.seek_by_unique_composite_domain::( + CompositeToOne, + Objid(1), + Objid(2) + ) + .unwrap(), + Some(Objid(3)) + ); + assert_eq!( + tx.seek_by_unique_composite_domain::( + CompositeToOne, + Objid(2), + Objid(3) + ) + .unwrap(), + Some(Objid(4)) + ); + assert_eq!( + tx.seek_by_unique_composite_domain::( + CompositeToOne, + Objid(3), + Objid(4) + ) + .unwrap(), + Some(Objid(5)) + ); + + // Now upsert an existing value... + tx.upsert_composite(CompositeToOne, Objid(1), Objid(2), Objid(4)) + .unwrap(); + assert_eq!( + tx.seek_by_unique_composite_domain::( + CompositeToOne, + Objid(1), + Objid(2) + ) + .unwrap(), + Some(Objid(4)) + ); + + // And insert a new using upsert + tx.upsert_composite(CompositeToOne, Objid(4), Objid(5), Objid(6)) + .unwrap(); + assert_eq!( + tx.seek_by_unique_composite_domain::( + CompositeToOne, + Objid(4), + Objid(5) + ) + .unwrap(), + Some(Objid(6)) + ); + } + + #[test] + fn test_codomain_index() { + let tmpdir = tempfile::tempdir().unwrap(); + let db = test_db(); + let tx = db.new_transaction(); + tx.insert_tuple(OneToOneSecondaryIndexed, Objid(3), Objid(2)) + .unwrap(); + tx.insert_tuple(OneToOneSecondaryIndexed, Objid(2), Objid(1)) + .unwrap(); + tx.insert_tuple(OneToOneSecondaryIndexed, Objid(1), Objid(0)) + .unwrap(); + tx.insert_tuple(OneToOneSecondaryIndexed, Objid(4), Objid(0)) + .unwrap(); + + assert_eq!( + tx.seek_unique_by_domain::(OneToOneSecondaryIndexed, Objid(3)) + .unwrap(), + Some(Objid(2)) + ); + assert_eq!( + tx.seek_unique_by_domain::(OneToOneSecondaryIndexed, Objid(2)) + .unwrap(), + Some(Objid(1)) + ); + assert_eq!( + tx.seek_unique_by_domain::(OneToOneSecondaryIndexed, Objid(1)) + .unwrap(), + Some(Objid(0)) + ); + + assert_eq!( + tx.seek_by_codomain::(OneToOneSecondaryIndexed, Objid(0)) + .unwrap(), + ObjSet::from_items(&[Objid(1), Objid(4)]) + ); + assert_eq!( + tx.seek_unique_by_codomain::(OneToOneSecondaryIndexed, Objid(1)) + .unwrap(), + Objid(2) + ); + assert_eq!( + tx.seek_unique_by_codomain::(OneToOneSecondaryIndexed, Objid(2)) + .unwrap(), + Objid(3) + ); + + assert_eq!( + tx.seek_by_codomain::(OneToOneSecondaryIndexed, Objid(3)) + .unwrap(), + ObjSet::empty() + ); + assert_eq!( + tx.seek_by_codomain::(OneToOneSecondaryIndexed, Objid(0)) + .unwrap(), + ObjSet::from_items(&[Objid(1), Objid(4)]) + ); + assert_eq!( + tx.seek_by_codomain::(OneToOneSecondaryIndexed, Objid(1)) + .unwrap(), + ObjSet::from_items(&[Objid(2)]) + ); + assert_eq!( + tx.seek_by_codomain::(OneToOneSecondaryIndexed, Objid(2)) + .unwrap(), + ObjSet::from_items(&[Objid(3)]) + ); + + // Now commit and re-verify. + assert_eq!(tx.commit(), super::CommitResult::Success); + let tx = db.new_transaction(); + + assert_eq!( + tx.seek_unique_by_domain::(OneToOneSecondaryIndexed, Objid(3)) + .unwrap(), + Some(Objid(2)) + ); + assert_eq!( + tx.seek_unique_by_domain::(OneToOneSecondaryIndexed, Objid(2)) + .unwrap(), + Some(Objid(1)) + ); + assert_eq!( + tx.seek_unique_by_domain::(OneToOneSecondaryIndexed, Objid(1)) + .unwrap(), + Some(Objid(0)) + ); + + assert_eq!( + tx.seek_by_codomain::(OneToOneSecondaryIndexed, Objid(3)) + .unwrap(), + ObjSet::empty(), + ); + assert_eq!( + tx.seek_by_codomain::(OneToOneSecondaryIndexed, Objid(2)) + .unwrap(), + ObjSet::from_items(&[Objid(3)]) + ); + assert_eq!( + tx.seek_by_codomain::(OneToOneSecondaryIndexed, Objid(1)) + .unwrap(), + ObjSet::from_items(&[Objid(2)]) + ); + assert_eq!( + tx.seek_by_codomain::(OneToOneSecondaryIndexed, Objid(0)) + .unwrap(), + ObjSet::from_items(&[Objid(1), Objid(4)]) + ); + + // And then update a value and verify. + tx.upsert::(OneToOneSecondaryIndexed, Objid(1), Objid(2)) + .unwrap(); + assert_eq!( + tx.seek_unique_by_codomain::(OneToOneSecondaryIndexed, Objid(1)) + .unwrap(), + Objid(2) + ); + // Verify that the secondary index is updated... First check for new value. + let children: ObjSet = tx + .seek_by_codomain::(OneToOneSecondaryIndexed, Objid(2)) + .unwrap(); + assert_eq!(children.len(), 2); + assert!( + children.contains(Objid(1)), + "Expected children of 2 to contain 1" + ); + assert!( + !children.contains(Objid(0)), + "Expected children of 2 to not contain 0" + ); + // Now check the old value. + let children = tx + .seek_by_codomain::(OneToOneSecondaryIndexed, Objid(0)) + .unwrap(); + assert_eq!(children, ObjSet::from_items(&[Objid(4)])); + } +} diff --git a/crates/db-fjall/src/fjall_worldstate.rs b/crates/db-fjall/src/fjall_worldstate.rs new file mode 100644 index 00000000..e68d7e90 --- /dev/null +++ b/crates/db-fjall/src/fjall_worldstate.rs @@ -0,0 +1,181 @@ +// Copyright (C) 2024 Ryan Daum +// +// This program is free software: you can redistribute it and/or modify it under +// the terms of the GNU General Public License as published by the Free Software +// Foundation, version 3. +// +// This program is distributed in the hope that it will be useful, but WITHOUT +// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +// FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License along with +// this program. If not, see . +// + +use crate::fjall_db::FjallDb; +use fjall::PersistMode; +use moor_db::db_worldstate::DbTxWorldState; +use moor_db::loader::LoaderInterface; +use moor_db::{Database, RelationalWorldStateTransaction, WorldStateTable}; +use moor_values::model::{WorldState, WorldStateError, WorldStateSource}; + +impl WorldStateSource for FjallDb { + fn new_world_state(&self) -> Result, WorldStateError> { + let tx = self.new_transaction(); + let rel_tx = Box::new(RelationalWorldStateTransaction { tx: Some(tx) }); + Ok(Box::new(DbTxWorldState { tx: rel_tx })) + } + + fn checkpoint(&self) -> Result<(), WorldStateError> { + self.keyspace.persist(PersistMode::SyncAll).unwrap(); + Ok(()) + } +} + +impl Database for FjallDb { + fn loader_client(&self) -> Result, WorldStateError> { + let tx = self.new_transaction(); + let rel_tx = Box::new(RelationalWorldStateTransaction { tx: Some(tx) }); + Ok(Box::new(DbTxWorldState { tx: rel_tx })) + } +} + +#[cfg(test)] +mod tests { + use crate::fjall_db::FjallDb; + use crate::FjallTransaction; + use moor_db::{ + perform_reparent_props, perform_test_create_object, perform_test_create_object_fixed_id, + perform_test_descendants, perform_test_location_contents, perform_test_max_object, + perform_test_object_move_commits, perform_test_parent_children, + perform_test_recycle_object, perform_test_regression_properties, + perform_test_rename_property, perform_test_simple_property, + perform_test_transitive_property_resolution, + perform_test_transitive_property_resolution_clear_property, perform_test_verb_add_update, + perform_test_verb_resolve, perform_test_verb_resolve_inherited, + perform_test_verb_resolve_wildcard, RelationalWorldStateTransaction, WorldStateTable, + }; + use std::path::Path; + + fn test_db() -> super::FjallDb { + let (db, _) = super::FjallDb::open(None); + db + } + pub fn begin_tx( + db: &FjallDb, + ) -> RelationalWorldStateTransaction> { + RelationalWorldStateTransaction { + tx: Some(db.new_transaction()), + } + } + + #[test] + fn test_create_object() { + let db = test_db(); + perform_test_create_object(|| begin_tx(&db)); + } + + #[test] + fn test_create_object_fixed_id() { + let db = test_db(); + perform_test_create_object_fixed_id(|| begin_tx(&db)); + } + + #[test] + fn test_parent_children() { + let db = test_db(); + perform_test_parent_children(|| begin_tx(&db)); + } + + #[test] + fn test_descendants() { + let db = test_db(); + perform_test_descendants(|| begin_tx(&db)); + } + + #[test] + fn test_location_contents() { + let db = test_db(); + perform_test_location_contents(|| begin_tx(&db)); + } + + /// Test data integrity of object moves between commits. + #[test] + fn test_object_move_commits() { + let db = test_db(); + perform_test_object_move_commits(|| begin_tx(&db)); + } + + #[test] + fn test_simple_property() { + let db = test_db(); + perform_test_simple_property(|| begin_tx(&db)); + } + + /// Regression test for updating-verbs failing. + #[test] + fn test_verb_add_update() { + let db = test_db(); + perform_test_verb_add_update(|| begin_tx(&db)); + } + + #[test] + fn test_transitive_property_resolution() { + let db = test_db(); + perform_test_transitive_property_resolution(|| begin_tx(&db)); + } + + #[test] + fn test_transitive_property_resolution_clear_property() { + let db = test_db(); + perform_test_transitive_property_resolution_clear_property(|| begin_tx(&db)); + } + + #[test] + fn test_rename_property() { + let db = test_db(); + perform_test_rename_property(|| begin_tx(&db)); + } + + #[test] + fn test_regression_properties() { + let db = test_db(); + perform_test_regression_properties(|| begin_tx(&db)); + } + + #[test] + fn test_verb_resolve() { + let db = test_db(); + perform_test_verb_resolve(|| begin_tx(&db)); + } + + #[test] + fn test_verb_resolve_inherited() { + let db = test_db(); + perform_test_verb_resolve_inherited(|| begin_tx(&db)); + } + + #[test] + fn test_verb_resolve_wildcard() { + let db = test_db(); + perform_test_verb_resolve_wildcard(|| begin_tx(&db)); + } + + #[test] + fn test_reparent() { + let db = test_db(); + perform_reparent_props(|| begin_tx(&db)); + } + + #[test] + fn test_recycle_object() { + let db = test_db(); + perform_test_recycle_object(|| begin_tx(&db)); + } + + #[test] + fn test_max_object() { + let db = test_db(); + perform_test_max_object(|| begin_tx(&db)); + } +} diff --git a/crates/db-fjall/src/lib.rs b/crates/db-fjall/src/lib.rs new file mode 100644 index 00000000..c8e6dc5f --- /dev/null +++ b/crates/db-fjall/src/lib.rs @@ -0,0 +1,24 @@ +// Copyright (C) 2024 Ryan Daum +// +// This program is free software: you can redistribute it and/or modify it under +// the terms of the GNU General Public License as published by the Free Software +// Foundation, version 3. +// +// This program is distributed in the hope that it will be useful, but WITHOUT +// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +// FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License along with +// this program. If not, see . +// + +pub use crate::fjall_db::FjallDb; +pub use crate::fjall_relation::FjallTransaction; +use moor_db::WorldStateTable; +mod fjall_db; +mod fjall_relation; +mod fjall_worldstate; +mod value_set; + +pub type FjallDbWorldStateSource = FjallDb; +pub type FjallDbTransaction = FjallTransaction; diff --git a/crates/db-fjall/src/value_set.rs b/crates/db-fjall/src/value_set.rs new file mode 100644 index 00000000..0365458e --- /dev/null +++ b/crates/db-fjall/src/value_set.rs @@ -0,0 +1,176 @@ +// Copyright (C) 2024 Ryan Daum +// +// This program is free software: you can redistribute it and/or modify it under +// the terms of the GNU General Public License as published by the Free Software +// Foundation, version 3. +// +// This program is distributed in the hope that it will be useful, but WITHOUT +// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +// FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License along with +// this program. If not, see . +// + +use bytes::Bytes; +use moor_values::AsByteBuffer; + +pub struct ValueSet +where + Value: Clone + Eq + PartialEq + AsByteBuffer, +{ + contents: Bytes, + _phantom: std::marker::PhantomData, +} + +pub(crate) struct ValueSetIterator +where + Value: Clone + Eq + PartialEq + AsByteBuffer, +{ + contents: Bytes, + offset: usize, + _phantom: std::marker::PhantomData, +} + +impl Iterator for ValueSetIterator +where + Value: Clone + Eq + PartialEq + AsByteBuffer, +{ + type Item = Value; + + fn next(&mut self) -> Option { + if self.offset >= self.contents.len() { + return None; + } + let len_bytes = &self.contents[self.offset..self.offset + 4]; + let len_bytes: [u8; 4] = len_bytes.try_into().unwrap(); + let len = u32::from_le_bytes(len_bytes) as usize; + self.offset += 4; + let value_bytes = &self.contents[self.offset..self.offset + len]; + self.offset += len; + Value::from_bytes(Bytes::copy_from_slice(value_bytes)).ok() + } +} + +impl ValueSet +where + Value: Clone + Eq + PartialEq + AsByteBuffer, +{ + // contents is encoded as: + // num values u32 + // [ value_len u32, value_bytes ] ... + pub(crate) fn new(contents: Bytes) -> Self { + Self { + contents, + _phantom: Default::default(), + } + } + + pub(crate) fn iter(&self) -> ValueSetIterator { + ValueSetIterator { + contents: self.contents.clone(), + offset: 4, + _phantom: Default::default(), + } + } + + pub(crate) fn data(&self) -> Bytes { + self.contents.clone() + } + + pub(crate) fn len(&self) -> usize { + let bytes = self.contents.as_ref(); + let mut len_bytes = [0; 4]; + len_bytes.copy_from_slice(&bytes[..4]); + u32::from_le_bytes(len_bytes) as usize + } + + pub(crate) fn at(&self, idx: usize) -> Option { + let bytes = self.contents.as_ref(); + let len = self.len(); + if idx >= len { + return None; + } + // linear scan. + let mut offset = 4; + for _ in 0..idx { + let len_bytes_slice = &bytes[offset..offset + 4]; + let len_bytes = len_bytes_slice.try_into().unwrap(); + let len = u32::from_le_bytes(len_bytes) as usize; + offset += len + 4; + } + + let len_bytes_slice = &bytes[offset..offset + 4]; + let len_bytes = len_bytes_slice.try_into().unwrap(); + let len = u32::from_le_bytes(len_bytes) as usize; + + let value_bytes = &bytes[offset + 4..offset + 4 + len]; + Value::from_bytes(Bytes::copy_from_slice(value_bytes)).ok() + } + + pub(crate) fn without_bytes(&self, value_bytes: &[u8]) -> Self { + // Linear scan until we find a match for `value_bytes`. + let mut num_values = self.len(); + let mut new_values = Vec::with_capacity(self.contents.len()); + + // add the 'num values' field, but empty for now + new_values.extend_from_slice(&[0, 0, 0, 0]); + + let bytes = self.contents.as_ref(); + let mut offset = 4; + let mut found = false; + for _ in 0..num_values { + let len_bytes = &bytes[offset..offset + 4]; + let len_bytes: [u8; 4] = len_bytes.try_into().unwrap(); + let len = u32::from_le_bytes(len_bytes) as usize; + let value = &bytes[offset + 4..offset + 4 + len]; + if value != value_bytes { + new_values.extend_from_slice(&len_bytes); + new_values.extend_from_slice(value); + } else { + found = true; + num_values -= 1; + } + offset += len + 4; + } + if found { + // update the 'num values' field + let num_values_bytes = (num_values as u32).to_le_bytes(); + new_values[..4].copy_from_slice(&num_values_bytes); + Self::new(Bytes::from(new_values)) + } else { + Self::new(self.contents.clone()) + } + } + + pub(crate) fn append(&self, value: Value) -> Self { + let num_values = self.len(); + let mut new_values = Vec::with_capacity(self.contents.len() + value.size_bytes() + 4); + new_values.extend_from_slice(&self.contents); + let len_bytes = (value.size_bytes() as u32).to_le_bytes(); + new_values.extend_from_slice(&len_bytes); + new_values.extend_from_slice(&value.as_bytes().unwrap()); + let num_values_bytes = ((num_values + 1) as u32).to_le_bytes(); + new_values[..4].copy_from_slice(&num_values_bytes); + Self::new(Bytes::from(new_values)) + } + + pub(crate) fn find(&self, value: Value) -> Option { + let bytes = self.contents.as_ref(); + let mut len_bytes = [0; 4]; + len_bytes.copy_from_slice(&bytes[..4]); + let mut offset = 4; + let mut idx = 0; + let v_bytes = value.as_bytes().unwrap(); + while offset < bytes.len() { + let len = u32::from_le_bytes(len_bytes) as usize; + let value_bytes = &bytes[offset..offset + len]; + if value_bytes == v_bytes { + return Some(idx); + } + offset += len + 4; + idx += 1; + } + None + } +} diff --git a/crates/db-relbox/src/rb_worldstate.rs b/crates/db-relbox/src/rb_worldstate.rs index 7053da76..70874358 100644 --- a/crates/db-relbox/src/rb_worldstate.rs +++ b/crates/db-relbox/src/rb_worldstate.rs @@ -85,10 +85,11 @@ mod tests { use moor_db::{ perform_reparent_props, perform_test_create_object, perform_test_create_object_fixed_id, - perform_test_descendants, perform_test_location_contents, perform_test_object_move_commits, - perform_test_parent_children, perform_test_recycle_object, - perform_test_regression_properties, perform_test_rename_property, - perform_test_simple_property, perform_test_transitive_property_resolution, + perform_test_descendants, perform_test_location_contents, perform_test_max_object, + perform_test_object_move_commits, perform_test_parent_children, + perform_test_recycle_object, perform_test_regression_properties, + perform_test_rename_property, perform_test_simple_property, + perform_test_transitive_property_resolution, perform_test_transitive_property_resolution_clear_property, perform_test_verb_add_update, perform_test_verb_resolve, perform_test_verb_resolve_inherited, perform_test_verb_resolve_wildcard, RelationalWorldStateTransaction, WorldStateSequence, @@ -214,4 +215,10 @@ mod tests { let db = test_db(); perform_test_recycle_object(|| begin_tx(&db)); } + + #[test] + fn test_max_object() { + let db = test_db(); + perform_test_max_object(|| begin_tx(&db)); + } } diff --git a/crates/db-relbox/src/rel_transaction.rs b/crates/db-relbox/src/rel_transaction.rs index a325abf5..be47ddd3 100644 --- a/crates/db-relbox/src/rel_transaction.rs +++ b/crates/db-relbox/src/rel_transaction.rs @@ -101,17 +101,17 @@ where } fn increment_sequence>(&self, seq: S) -> i64 { - self.tx.increment_sequence(seq.into() as usize) as i64 + self.tx.increment_sequence(seq.into() as usize) + 1 } fn update_sequence_max>(&self, seq: S, value: i64) -> i64 { let seq_num = seq.into() as usize; - self.tx.update_sequence_max(seq_num, value as u64); - self.tx.sequence_current(seq_num) as i64 + self.tx.update_sequence_max(seq_num, value); + self.tx.sequence_current(seq_num) } - fn get_sequence>(&self, seq: S) -> i64 { - self.tx.sequence_current(seq.into() as usize) as i64 + fn get_sequence>(&self, seq: S) -> Option { + Some(self.tx.sequence_current(seq.into() as usize)) } fn remove_by_domain( @@ -141,7 +141,10 @@ where .map_err(err_map) } - fn remove_by_codomain( + fn remove_by_codomain< + Domain: Clone + Eq + PartialEq + AsByteBuffer, + Codomain: Clone + Eq + PartialEq + AsByteBuffer, + >( &self, _rel: T, _codomain: Codomain, @@ -168,8 +171,8 @@ where } fn insert_tuple< - Domain: Clone + Eq + PartialEq + AsByteBuffer + Debug, - Codomain: Clone + Eq + PartialEq + AsByteBuffer + Debug, + Domain: Clone + Eq + PartialEq + AsByteBuffer, + Codomain: Clone + Eq + PartialEq + AsByteBuffer, >( &self, rel: T, @@ -321,8 +324,8 @@ where } fn seek_by_codomain< - Domain: Clone + Eq + PartialEq + AsByteBuffer + Debug, - Codomain: Clone + Eq + PartialEq + AsByteBuffer + Debug, + Domain: Clone + Eq + PartialEq + AsByteBuffer, + Codomain: Clone + Eq + PartialEq + AsByteBuffer, ResultSet: ValSet, >( &self, diff --git a/crates/db-wiredtiger/src/worldstate/wt_worldstate.rs b/crates/db-wiredtiger/src/worldstate/wt_worldstate.rs index 6259e247..abc37a92 100644 --- a/crates/db-wiredtiger/src/worldstate/wt_worldstate.rs +++ b/crates/db-wiredtiger/src/worldstate/wt_worldstate.rs @@ -106,10 +106,11 @@ mod tests { use crate::WiredTigerRelTransaction; use moor_db::{ perform_reparent_props, perform_test_create_object, perform_test_create_object_fixed_id, - perform_test_descendants, perform_test_location_contents, perform_test_object_move_commits, - perform_test_parent_children, perform_test_recycle_object, - perform_test_regression_properties, perform_test_rename_property, - perform_test_simple_property, perform_test_transitive_property_resolution, + perform_test_descendants, perform_test_location_contents, perform_test_max_object, + perform_test_object_move_commits, perform_test_parent_children, + perform_test_recycle_object, perform_test_regression_properties, + perform_test_rename_property, perform_test_simple_property, + perform_test_transitive_property_resolution, perform_test_transitive_property_resolution_clear_property, perform_test_verb_add_update, perform_test_verb_resolve, perform_test_verb_resolve_inherited, perform_test_verb_resolve_wildcard, RelationalWorldStateTransaction, WorldStateTable, @@ -235,4 +236,10 @@ mod tests { let db = test_db(); perform_test_recycle_object(|| begin_tx(&db)); } + + #[test] + fn test_max_object() { + let db = test_db(); + perform_test_max_object(|| begin_tx(&db)); + } } diff --git a/crates/db-wiredtiger/src/wtrel/rel_db.rs b/crates/db-wiredtiger/src/wtrel/rel_db.rs index bb89de16..881986ee 100644 --- a/crates/db-wiredtiger/src/wtrel/rel_db.rs +++ b/crates/db-wiredtiger/src/wtrel/rel_db.rs @@ -69,7 +69,7 @@ where } let connection = Connection::open(path, options).unwrap(); - let sequences = Arc::new([(); MAX_NUM_SEQUENCES].map(|_| AtomicI64::new(0))); + let sequences = Arc::new([(); MAX_NUM_SEQUENCES].map(|_| AtomicI64::new(-1))); Arc::new(WiredTigerRelDb { connection, diff --git a/crates/db-wiredtiger/src/wtrel/rel_transaction.rs b/crates/db-wiredtiger/src/wtrel/rel_transaction.rs index 81df6567..25456961 100644 --- a/crates/db-wiredtiger/src/wtrel/rel_transaction.rs +++ b/crates/db-wiredtiger/src/wtrel/rel_transaction.rs @@ -12,7 +12,6 @@ // this program. If not, see . // -use std::fmt::Debug; use std::sync::atomic::AtomicI64; use std::sync::Arc; @@ -107,7 +106,7 @@ where } fn increment_sequence>(&self, seq: S) -> i64 { - self.sequences[seq.into() as usize].fetch_add(1, std::sync::atomic::Ordering::Relaxed) + self.sequences[seq.into() as usize].fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1 } /// Update the given sequence to `value` iff `value` is greater than the current value. @@ -115,10 +114,14 @@ where let sequence = &self.sequences[seq.into() as usize]; loop { let current = sequence.load(std::sync::atomic::Ordering::SeqCst); + let max = std::cmp::max(current, value); + if max <= current { + return current; + } if sequence .compare_exchange( current, - std::cmp::max(current, value), + max, std::sync::atomic::Ordering::SeqCst, std::sync::atomic::Ordering::SeqCst, ) @@ -129,8 +132,8 @@ where } } - fn get_sequence>(&self, seq: S) -> i64 { - self.sequences[seq.into() as usize].load(std::sync::atomic::Ordering::Relaxed) + fn get_sequence>(&self, seq: S) -> Option { + Some(self.sequences[seq.into() as usize].load(std::sync::atomic::Ordering::Relaxed)) } fn remove_by_domain( @@ -173,7 +176,10 @@ where Ok(()) } - fn remove_by_codomain( + fn remove_by_codomain< + Domain: Clone + Eq + PartialEq + AsByteBuffer, + Codomain: Clone + Eq + PartialEq + AsByteBuffer, + >( &self, rel: Tables, codomain: Codomain, @@ -230,8 +236,8 @@ where Ok(()) } fn insert_tuple< - Domain: Clone + Eq + PartialEq + AsByteBuffer + Debug, - Codomain: Clone + Eq + PartialEq + AsByteBuffer + Debug, + Domain: Clone + Eq + PartialEq + AsByteBuffer, + Codomain: Clone + Eq + PartialEq + AsByteBuffer, >( &self, rel: Tables, @@ -254,8 +260,8 @@ where Ok(_) => {} Err(Error::DuplicateKey) => { return Err(RelationalError::Duplicate(format!( - "Duplicate key {:?} for relation {}", - domain, rel + "Duplicate key for relation {}", + rel ))); } Err(e) => panic!("Unexpected error: {:?}", e), @@ -383,8 +389,8 @@ where } fn seek_by_codomain< - Domain: Clone + Eq + PartialEq + AsByteBuffer + Debug, - Codomain: Clone + Eq + PartialEq + AsByteBuffer + Debug, + Domain: Clone + Eq + PartialEq + AsByteBuffer, + Codomain: Clone + Eq + PartialEq + AsByteBuffer, ResultSet: ValSet, >( &self, diff --git a/crates/db/src/lib.rs b/crates/db/src/lib.rs index 39f843bf..1e213357 100644 --- a/crates/db/src/lib.rs +++ b/crates/db/src/lib.rs @@ -46,6 +46,8 @@ pub enum DatabaseFlavour { /// WiredTiger, a high-performance, scalable, transactional storage engine, also used in MongoDB. /// Adaptation still under development. WiredTiger, + /// Fjall, and LSM-based storage engine (https://github.com/fjall-rs/fjall) + Fjall, /// In-house in-memory MVCC transactional store based on copy-on-write hashes and trees and /// custom buffer pool management. Consider experimental. #[cfg(feature = "relbox")] @@ -58,6 +60,7 @@ impl From<&str> for DatabaseFlavour { "wiredtiger" => DatabaseFlavour::WiredTiger, #[cfg(feature = "relbox")] "relbox" => DatabaseFlavour::RelBox, + "fjall" => DatabaseFlavour::Fjall, _ => panic!("Unknown database flavour: {}", s), } } diff --git a/crates/db/src/relational_transaction.rs b/crates/db/src/relational_transaction.rs index 9abf4197..f3f9fa67 100644 --- a/crates/db/src/relational_transaction.rs +++ b/crates/db/src/relational_transaction.rs @@ -45,7 +45,7 @@ pub trait RelationalTransaction: Send { fn increment_sequence>(&self, seq: S) -> i64; fn update_sequence_max>(&self, seq: S, value: i64) -> i64; - fn get_sequence>(&self, seq: S) -> i64; + fn get_sequence>(&self, seq: S) -> Option; fn remove_by_domain( &self, @@ -61,7 +61,10 @@ pub trait RelationalTransaction: Send { domain_a: DomainA, domain_b: DomainB, ) -> Result<()>; - fn remove_by_codomain( + fn remove_by_codomain< + Domain: Clone + Eq + PartialEq + AsByteBuffer, + Codomain: Clone + Eq + PartialEq + AsByteBuffer, + >( &self, rel: Relation, codomain: Codomain, @@ -76,8 +79,8 @@ pub trait RelationalTransaction: Send { codomain: Codomain, ) -> Result<()>; fn insert_tuple< - Domain: Clone + Eq + PartialEq + AsByteBuffer + Debug, - Codomain: Clone + Eq + PartialEq + AsByteBuffer + Debug, + Domain: Clone + Eq + PartialEq + AsByteBuffer, + Codomain: Clone + Eq + PartialEq + AsByteBuffer, >( &self, rel: Relation, @@ -121,8 +124,8 @@ pub trait RelationalTransaction: Send { ) -> Result; fn seek_by_codomain< - Domain: Clone + Eq + PartialEq + AsByteBuffer + Debug, - Codomain: Clone + Eq + PartialEq + AsByteBuffer + Debug, + Domain: Clone + Eq + PartialEq + AsByteBuffer, + Codomain: Clone + Eq + PartialEq + AsByteBuffer, ResultSet: ValSet, >( &self, diff --git a/crates/db/src/relational_worldstate.rs b/crates/db/src/relational_worldstate.rs index c46a32bc..19c6722f 100644 --- a/crates/db/src/relational_worldstate.rs +++ b/crates/db/src/relational_worldstate.rs @@ -125,7 +125,8 @@ impl> WorldStateTransaction self.tx .as_ref() .unwrap() - .get_sequence(WorldStateSequence::MaximumObject), + .get_sequence(WorldStateSequence::MaximumObject) + .unwrap_or(-1), )) } @@ -222,7 +223,7 @@ impl> WorldStateTransaction self.tx .as_ref() .unwrap() - .update_sequence_max(WorldStateSequence::MaximumObject, id.0 + 1); + .update_sequence_max(WorldStateSequence::MaximumObject, id.0); Ok(id) } diff --git a/crates/db/src/worldstate_tests.rs b/crates/db/src/worldstate_tests.rs index a94b67c8..89f5cbba 100644 --- a/crates/db/src/worldstate_tests.rs +++ b/crates/db/src/worldstate_tests.rs @@ -150,7 +150,7 @@ where let a = tx .create_object( - None, + Some(Objid(0)), ObjAttrs::new(NOTHING, NOTHING, NOTHING, BitEnum::new(), "test"), ) .unwrap(); @@ -1155,3 +1155,22 @@ where WorldStateError::PropertyNotFound(d, "test2".to_string()) ); } + +// Verify that 'max_object' is the highest object id in the database, not one higher. +pub fn perform_test_max_object(begin_tx: F) +where + F: Fn() -> RelationalWorldStateTransaction, + TX: RelationalTransaction, +{ + let tx = begin_tx(); + // Max object in a virgin DB should return #-1 + let max_obj = tx.get_max_object().unwrap(); + assert_eq!(max_obj, Objid(-1)); + let obj = tx + .create_object( + None, + ObjAttrs::new(NOTHING, NOTHING, NOTHING, BitEnum::new(), "test"), + ) + .unwrap(); + assert_eq!(tx.get_max_object().unwrap(), obj); +} diff --git a/crates/kernel/Cargo.toml b/crates/kernel/Cargo.toml index 51660e5c..9ab24958 100644 --- a/crates/kernel/Cargo.toml +++ b/crates/kernel/Cargo.toml @@ -14,6 +14,7 @@ description = "The actual implementation of most of the moor system; virtual mac [dev-dependencies] moor-db-wiredtiger = { path = "../db-wiredtiger" } +moor-db-fjall = { path = "../db-fjall" } criterion.workspace = true eyre.workspace = true diff --git a/crates/kernel/src/tasks/tasks_db.rs b/crates/kernel/src/tasks/tasks_db.rs index bd9445e8..b4f2e7a8 100644 --- a/crates/kernel/src/tasks/tasks_db.rs +++ b/crates/kernel/src/tasks/tasks_db.rs @@ -19,6 +19,10 @@ use moor_values::tasks::TaskId; pub enum TasksDbError { #[error("Could not load tasks")] CouldNotLoadTasks, + #[error("Could not save task")] + CouldNotSaveTask, + #[error("Could not delete task")] + CouldNotDeleteTask, #[error("Task not found: {0}")] TaskNotFound(TaskId), } diff --git a/crates/kernel/testsuite/common/mod.rs b/crates/kernel/testsuite/common/mod.rs index 64397dbd..79313004 100644 --- a/crates/kernel/testsuite/common/mod.rs +++ b/crates/kernel/testsuite/common/mod.rs @@ -22,6 +22,7 @@ use EncodingMode::UTF8; use moor_compiler::Program; use moor_compiler::{compile, CompileOptions}; use moor_db::Database; +use moor_db_fjall::FjallDb; #[cfg(feature = "relbox")] use moor_db_relbox::RelBoxWorldState; use moor_db_wiredtiger::WiredTigerDB; @@ -71,6 +72,13 @@ pub fn create_wiredtiger_db() -> Box { db } +pub fn create_fjall_db() -> Box { + let (db, _) = FjallDb::open(None); + let db = Box::new(db); + load_textdump(db.as_ref()); + db +} + #[allow(dead_code)] pub fn compile_verbs(db: &dyn Database, verbs: &[(&str, &Program)]) { let mut tx = db.new_world_state().unwrap(); diff --git a/crates/kernel/testsuite/moot_suite.rs b/crates/kernel/testsuite/moot_suite.rs index 7eadba40..4883397e 100644 --- a/crates/kernel/testsuite/moot_suite.rs +++ b/crates/kernel/testsuite/moot_suite.rs @@ -20,6 +20,7 @@ use std::{path::Path, sync::Arc}; use eyre::Context; +use crate::common::create_fjall_db; #[cfg(feature = "relbox")] use common::create_relbox_db; use common::{create_wiredtiger_db, testsuite_dir}; @@ -123,6 +124,11 @@ fn test_wiredtiger(path: &Path) { } test_each_file::test_each_path! { in "./crates/kernel/testsuite/moot" as wiredtiger => test_wiredtiger } +fn test_fjall(path: &Path) { + test(create_fjall_db(), path); +} +test_each_file::test_each_path! { in "./crates/kernel/testsuite/moot" as fjall => test_fjall } + struct NoopSessionFactory {} impl SessionFactory for NoopSessionFactory { fn mk_background_session( @@ -170,5 +176,5 @@ fn test(db: Box, path: &Path) { fn test_single() { // cargo test -p moor-kernel --test moot-suite test_single -- --ignored // CARGO_PROFILE_RELEASE_DEBUG=true cargo flamegraph --test moot-suite -- test_single --ignored - test_wiredtiger(&testsuite_dir().join("moot/truthiness.moot")); + test_fjall(&testsuite_dir().join("moot/truthiness.moot")); } diff --git a/crates/kernel/testsuite/regression_suite.rs b/crates/kernel/testsuite/regression_suite.rs index 614881c9..1d26a7cd 100644 --- a/crates/kernel/testsuite/regression_suite.rs +++ b/crates/kernel/testsuite/regression_suite.rs @@ -12,10 +12,10 @@ // this program. If not, see . // -use common::{create_wiredtiger_db, AssertRunAsVerb}; - +use crate::common::create_fjall_db; #[cfg(feature = "relbox")] use crate::common::create_relbox_db; +use common::{create_wiredtiger_db, AssertRunAsVerb}; mod common; @@ -35,3 +35,11 @@ fn test_testhelper_verb_redefinition_wiredtiger() { db.assert_run_as_verb("return create(#2).name;", Ok("".into())); db.assert_run_as_verb("return 200;", Ok(200.into())); } + +#[test] +fn test_testhelper_verb_redefinition_fjall() { + let db = create_fjall_db(); + db.assert_run_as_verb("return 42;", Ok(42.into())); + db.assert_run_as_verb("return create(#2).name;", Ok("".into())); + db.assert_run_as_verb("return 200;", Ok(200.into())); +}