From bd1f16d6d3c5975161554a34477130a99c77780e Mon Sep 17 00:00:00 2001
From: Jesse Bakker <jesse@getdozer.io>
Date: Tue, 19 Sep 2023 11:40:33 +0200
Subject: [PATCH] Add compact in-memory representation of RecordRef

---
 Cargo.lock                                    | 183 ++++--
 Cargo.toml                                    |   1 +
 dozer-cli/Cargo.toml                          |   1 +
 dozer-cli/src/pipeline/dummy_sink.rs          |   2 +-
 dozer-cli/src/pipeline/log_sink.rs            |   6 +-
 dozer-core/Cargo.toml                         |   1 +
 dozer-core/src/checkpoint/mod.rs              |   3 +-
 dozer-core/src/epoch/manager.rs               |   2 +-
 dozer-core/src/errors.rs                      |   6 +-
 dozer-core/src/executor/execution_dag.rs      |   2 +-
 dozer-core/src/executor/processor_node.rs     |   2 +-
 dozer-core/src/executor/receiver_loop.rs      |   2 +-
 dozer-core/src/executor_operation.rs          |  40 +-
 dozer-core/src/forwarder.rs                   |   2 +-
 dozer-core/src/lib.rs                         |   1 -
 dozer-core/src/node.rs                        |   2 +-
 dozer-core/src/processor_record/mod.rs        |  51 --
 dozer-core/src/record_store.rs                |   7 +-
 .../src/tests/dag_base_create_errors.rs       |   2 +-
 dozer-core/src/tests/dag_base_errors.rs       |   2 +-
 dozer-core/src/tests/dag_base_run.rs          |   2 +-
 dozer-core/src/tests/dag_ports.rs             |   2 +-
 dozer-core/src/tests/dag_schemas.rs           |   2 +-
 dozer-core/src/tests/processors.rs            |   2 +-
 dozer-core/src/tests/sinks.rs                 |   2 +-
 dozer-recordstore/Cargo.toml                  |  13 +
 dozer-recordstore/fuzz/Cargo.toml             |  37 ++
 .../fuzz/fuzz_targets/fuzz_recordref.rs       |  30 +
 dozer-recordstore/src/lib.rs                  | 542 ++++++++++++++++++
 .../src}/store.rs                             | 102 ++--
 dozer-sql/Cargo.toml                          |   1 +
 dozer-sql/src/aggregation/factory.rs          |   2 +-
 dozer-sql/src/aggregation/processor.rs        |   6 +-
 dozer-sql/src/errors.rs                       |  14 +-
 dozer-sql/src/expression/tests/test_common.rs |   2 +-
 .../pipeline/expression/tests/test_common.rs  | 114 ++++
 dozer-sql/src/product/join/factory.rs         |   3 +-
 dozer-sql/src/product/join/operator/mod.rs    |   6 +-
 dozer-sql/src/product/join/operator/table.rs  |   6 +-
 dozer-sql/src/product/join/processor.rs       |   2 +-
 dozer-sql/src/product/set/operator.rs         |   2 +-
 dozer-sql/src/product/set/record_map/mod.rs   |   8 +-
 dozer-sql/src/product/set/set_factory.rs      |   2 +-
 dozer-sql/src/product/set/set_processor.rs    |   2 +-
 dozer-sql/src/product/table/factory.rs        |   2 +-
 dozer-sql/src/product/table/processor.rs      |   2 +-
 dozer-sql/src/projection/factory.rs           |   2 +-
 dozer-sql/src/projection/processor.rs         |   6 +-
 dozer-sql/src/selection/factory.rs            |   2 +-
 dozer-sql/src/selection/processor.rs          |   2 +-
 dozer-sql/src/table_operator/factory.rs       |   2 +-
 dozer-sql/src/table_operator/lifetime.rs      |   2 +-
 dozer-sql/src/table_operator/operator.rs      |   2 +-
 dozer-sql/src/table_operator/processor.rs     |   2 +-
 .../src/table_operator/tests/operator_test.rs |   2 +-
 dozer-sql/src/tests/builder_test.rs           |   2 +-
 dozer-sql/src/utils/serialize.rs              |   6 +-
 dozer-sql/src/window/factory.rs               |   2 +-
 dozer-sql/src/window/operator.rs              |   2 +-
 dozer-sql/src/window/processor.rs             |   2 +-
 dozer-sql/src/window/tests/operator_test.rs   |   2 +-
 dozer-tests/Cargo.toml                        |   1 +
 dozer-tests/src/sql_tests/helper/pipeline.rs  |   2 +-
 dozer-types/Cargo.toml                        |   2 +
 dozer-types/src/json_types.rs                 |   6 +-
 dozer-types/src/types/field.rs                |  31 +-
 dozer-types/src/types/mod.rs                  |  12 +
 67 files changed, 1074 insertions(+), 244 deletions(-)
 delete mode 100644 dozer-core/src/processor_record/mod.rs
 create mode 100644 dozer-recordstore/Cargo.toml
 create mode 100644 dozer-recordstore/fuzz/Cargo.toml
 create mode 100644 dozer-recordstore/fuzz/fuzz_targets/fuzz_recordref.rs
 create mode 100644 dozer-recordstore/src/lib.rs
 rename {dozer-core/src/processor_record => dozer-recordstore/src}/store.rs (71%)
 create mode 100644 dozer-sql/src/pipeline/expression/tests/test_common.rs

diff --git a/Cargo.lock b/Cargo.lock
index bcff98cebf..3a83358583 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -104,7 +104,7 @@ dependencies = [
  "tokio",
  "tokio-util 0.7.8",
  "tracing",
- "zstd",
+ "zstd 0.12.3+zstd.1.5.2",
 ]
 
 [[package]]
@@ -453,6 +453,15 @@ dependencies = [
  "num-traits",
 ]
 
+[[package]]
+name = "arbitrary"
+version = "1.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e2d098ff73c1ca148721f37baad5ea6a465a13f9573aba8641fbbbae8164a54e"
+dependencies = [
+ "derive_arbitrary",
+]
+
 [[package]]
 name = "arc-swap"
 version = "1.6.0"
@@ -691,13 +700,28 @@ version = "0.10.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "619743e34b5ba4e9703bba34deac3427c72507c7159f5fd030aea8cac0cfe341"
 
+[[package]]
+name = "async-compression"
+version = "0.3.15"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "942c7cd7ae39e91bde4820d74132e9862e62c2f386c3aa90ccf55949f5bad63a"
+dependencies = [
+ "brotli",
+ "flate2",
+ "futures-core",
+ "memchr",
+ "pin-project-lite",
+ "tokio",
+ "zstd 0.11.2+zstd.1.5.2",
+ "zstd-safe 5.0.2+zstd.1.5.2",
+]
+
 [[package]]
 name = "async-compression"
 version = "0.4.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "62b74f44609f0f91493e3082d3734d98497e094777144380ea4db9f9905dd5b6"
 dependencies = [
- "brotli",
  "bzip2",
  "flate2",
  "futures-core",
@@ -706,8 +730,8 @@ dependencies = [
  "pin-project-lite",
  "tokio",
  "xz2",
- "zstd",
- "zstd-safe",
+ "zstd 0.12.3+zstd.1.5.2",
+ "zstd-safe 6.0.4+zstd.1.5.4",
 ]
 
 [[package]]
@@ -1205,6 +1229,12 @@ version = "0.13.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8"
 
+[[package]]
+name = "base64"
+version = "0.20.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0ea22880d78093b0cbe17c89f64a7d457941e65759157ec6cb31a31d652b05e5"
+
 [[package]]
 name = "base64"
 version = "0.21.0"
@@ -1595,6 +1625,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "ec837a71355b28f6556dbd569b37b3f363091c0bd4b2e735674521b4c5fd9bc5"
 dependencies = [
  "android-tzdata",
+ "arbitrary",
  "iana-time-zone",
  "js-sys",
  "num-traits",
@@ -1802,8 +1833,8 @@ version = "0.5.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "c2895653b4d9f1538a83970077cb01dfc77a4810524e51a110944688e916b18e"
 dependencies = [
- "prost 0.11.9",
- "prost-types 0.11.9",
+ "prost 0.11.8",
+ "prost-types 0.11.8",
  "tonic 0.9.2",
  "tracing-core",
 ]
@@ -1820,7 +1851,7 @@ dependencies = [
  "futures",
  "hdrhistogram",
  "humantime",
- "prost-types 0.11.9",
+ "prost-types 0.11.8",
  "serde",
  "serde_json",
  "thread_local",
@@ -2263,7 +2294,7 @@ dependencies = [
  "arrow",
  "arrow-array",
  "arrow-schema",
- "async-compression",
+ "async-compression 0.4.1",
  "async-trait",
  "bytes",
  "bzip2",
@@ -2299,7 +2330,7 @@ dependencies = [
  "url",
  "uuid",
  "xz2",
- "zstd",
+ "zstd 0.12.3+zstd.1.5.2",
 ]
 
 [[package]]
@@ -2413,7 +2444,7 @@ dependencies = [
  "datafusion-common",
  "datafusion-expr",
  "object_store",
- "prost 0.11.9",
+ "prost 0.11.8",
 ]
 
 [[package]]
@@ -2507,6 +2538,17 @@ dependencies = [
  "syn 1.0.109",
 ]
 
+[[package]]
+name = "derive_arbitrary"
+version = "1.3.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "53e0efad4403bfc52dc201159c4b842a246a14b98c64b55dfd0f2d89729dfeb8"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.29",
+]
+
 [[package]]
 name = "derive_more"
 version = "0.99.17"
@@ -2642,6 +2684,7 @@ dependencies = [
  "dozer-cache",
  "dozer-core",
  "dozer-ingestion",
+ "dozer-recordstore",
  "dozer-sql",
  "dozer-storage",
  "dozer-tracing",
@@ -2676,6 +2719,7 @@ dependencies = [
  "crossbeam",
  "daggy",
  "dozer-log",
+ "dozer-recordstore",
  "dozer-storage",
  "dozer-tracing",
  "dozer-types",
@@ -2775,12 +2819,21 @@ dependencies = [
  "pyo3-asyncio",
 ]
 
+[[package]]
+name = "dozer-recordstore"
+version = "0.1.0"
+dependencies = [
+ "dozer-types",
+ "triomphe",
+]
+
 [[package]]
 name = "dozer-sql"
 version = "0.1.39"
 dependencies = [
  "ahash 0.8.3",
  "dozer-core",
+ "dozer-recordstore",
  "dozer-sql-expression",
  "dozer-storage",
  "dozer-tracing",
@@ -2838,6 +2891,7 @@ dependencies = [
  "dozer-cache",
  "dozer-cli",
  "dozer-core",
+ "dozer-recordstore",
  "dozer-sql",
  "dozer-tracing",
  "dozer-types",
@@ -2879,6 +2933,7 @@ name = "dozer-types"
 version = "0.1.39"
 dependencies = [
  "ahash 0.8.3",
+ "arbitrary",
  "arrow",
  "arrow-schema",
  "bincode",
@@ -2891,8 +2946,8 @@ dependencies = [
  "ordered-float 3.9.1",
  "parking_lot",
  "prettytable-rs",
- "prost 0.12.0",
- "prost-types 0.12.0",
+ "prost 0.12.1",
+ "prost-types 0.12.1",
  "pyo3",
  "rust_decimal",
  "serde",
@@ -4828,9 +4883,9 @@ dependencies = [
 
 [[package]]
 name = "mime"
-version = "0.3.17"
+version = "0.3.16"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a"
+checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d"
 
 [[package]]
 name = "mime_guess"
@@ -5468,7 +5523,7 @@ dependencies = [
  "opentelemetry-semantic-conventions",
  "opentelemetry_api",
  "opentelemetry_sdk",
- "prost 0.11.9",
+ "prost 0.11.8",
  "thiserror",
  "tokio",
  "tonic 0.9.2",
@@ -5482,7 +5537,7 @@ checksum = "b1e3f814aa9f8c905d0ee4bde026afd3b2577a97c10e1699912e3e44f0c4cbeb"
 dependencies = [
  "opentelemetry_api",
  "opentelemetry_sdk",
- "prost 0.11.9",
+ "prost 0.11.8",
  "tonic 0.9.2",
 ]
 
@@ -5688,7 +5743,7 @@ dependencies = [
  "thrift",
  "tokio",
  "twox-hash",
- "zstd",
+ "zstd 0.12.3+zstd.1.5.2",
 ]
 
 [[package]]
@@ -6143,29 +6198,29 @@ dependencies = [
 
 [[package]]
 name = "prost"
-version = "0.11.9"
+version = "0.11.8"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0b82eaa1d779e9a4bc1c3217db8ffbeabaae1dca241bf70183242128d48681cd"
+checksum = "e48e50df39172a3e7eb17e14642445da64996989bc212b583015435d39a58537"
 dependencies = [
  "bytes",
- "prost-derive 0.11.9",
+ "prost-derive 0.11.8",
 ]
 
 [[package]]
 name = "prost"
-version = "0.12.0"
+version = "0.12.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "aa8473a65b88506c106c28ae905ca4a2b83a2993640467a41bb3080627ddfd2c"
+checksum = "f4fdd22f3b9c31b53c060df4a0613a1c7f062d4115a2b984dd15b1858f7e340d"
 dependencies = [
  "bytes",
- "prost-derive 0.12.0",
+ "prost-derive 0.12.1",
 ]
 
 [[package]]
 name = "prost-build"
-version = "0.12.0"
+version = "0.12.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "30d3e647e9eb04ddfef78dfee2d5b3fefdf94821c84b710a3d8ebc89ede8b164"
+checksum = "8bdf592881d821b83d471f8af290226c8d51402259e9bb5be7f9f8bdebbb11ac"
 dependencies = [
  "bytes",
  "heck",
@@ -6175,8 +6230,8 @@ dependencies = [
  "once_cell",
  "petgraph 0.6.3 (registry+https://github.com/rust-lang/crates.io-index)",
  "prettyplease",
- "prost 0.12.0",
- "prost-types 0.12.0",
+ "prost 0.12.1",
+ "prost-types 0.12.1",
  "regex",
  "syn 2.0.29",
  "tempfile",
@@ -6185,9 +6240,9 @@ dependencies = [
 
 [[package]]
 name = "prost-derive"
-version = "0.11.9"
+version = "0.11.8"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e5d2d8d10f3c6ded6da8b05b5fb3b8a5082514344d56c9f871412d29b4e075b4"
+checksum = "4ea9b0f8cbe5e15a8a042d030bd96668db28ecb567ec37d691971ff5731d2b1b"
 dependencies = [
  "anyhow",
  "itertools 0.10.5",
@@ -6198,9 +6253,9 @@ dependencies = [
 
 [[package]]
 name = "prost-derive"
-version = "0.12.0"
+version = "0.12.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "56075c27b20ae524d00f247b8a4dc333e5784f889fe63099f8e626bc8d73486c"
+checksum = "265baba7fabd416cf5078179f7d2cbeca4ce7a9041111900675ea7c4cb8a4c32"
 dependencies = [
  "anyhow",
  "itertools 0.11.0",
@@ -6218,28 +6273,28 @@ dependencies = [
  "base64 0.21.0",
  "logos",
  "once_cell",
- "prost 0.12.0",
- "prost-types 0.12.0",
+ "prost 0.12.1",
+ "prost-types 0.12.1",
  "serde",
  "serde-value",
 ]
 
 [[package]]
 name = "prost-types"
-version = "0.11.9"
+version = "0.11.8"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "213622a1460818959ac1181aaeb2dc9c7f63df720db7d788b3e24eacd1983e13"
+checksum = "379119666929a1afd7a043aa6cf96fa67a6dce9af60c88095a4686dbce4c9c88"
 dependencies = [
- "prost 0.11.9",
+ "prost 0.11.8",
 ]
 
 [[package]]
 name = "prost-types"
-version = "0.12.0"
+version = "0.12.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "cebe0a918c97f86c217b0f76fd754e966f8b9f41595095cf7d74cb4e59d730f6"
+checksum = "e081b29f63d83a4bc75cfc9f3fe424f9156cf92d8a4f0c9407cce9a1b67327cf"
 dependencies = [
- "prost 0.12.0",
+ "prost 0.12.1",
 ]
 
 [[package]]
@@ -6929,6 +6984,7 @@ version = "1.32.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "a4c4216490d5a413bc6d10fa4742bd7d4955941d062c0ef873141d6b0e7b30fd"
 dependencies = [
+ "arbitrary",
  "arrayvec",
  "borsh",
  "bytes",
@@ -8260,7 +8316,7 @@ dependencies = [
  "hyper-timeout",
  "percent-encoding",
  "pin-project",
- "prost 0.11.9",
+ "prost 0.11.8",
  "tokio",
  "tokio-stream",
  "tower",
@@ -8287,7 +8343,7 @@ dependencies = [
  "hyper-timeout",
  "percent-encoding",
  "pin-project",
- "prost 0.12.0",
+ "prost 0.12.1",
  "rustls-native-certs 0.6.3",
  "rustls-pemfile",
  "tokio",
@@ -8318,8 +8374,8 @@ version = "0.10.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "16e61add39c1426d5f21eae2cc196e97e1f5a5ea7bcf491df3885797992a86eb"
 dependencies = [
- "prost 0.12.0",
- "prost-types 0.12.0",
+ "prost 0.12.1",
+ "prost-types 0.12.1",
  "tokio",
  "tokio-stream",
  "tonic 0.10.0",
@@ -8367,13 +8423,13 @@ dependencies = [
 
 [[package]]
 name = "tower-http"
-version = "0.4.4"
+version = "0.4.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "61c5bb1d698276a2443e5ecfabc1008bf15a36c12e6a7176e7bf089ea9131140"
+checksum = "5d1d42a9b3f3ec46ba828e8d376aec14592ea199f70a06a548587ecd1c4ab658"
 dependencies = [
- "async-compression",
- "base64 0.21.0",
- "bitflags 2.4.0",
+ "async-compression 0.3.15",
+ "base64 0.20.0",
+ "bitflags 1.3.2",
  "bytes",
  "futures-core",
  "futures-util",
@@ -8496,6 +8552,16 @@ dependencies = [
  "tracing-log",
 ]
 
+[[package]]
+name = "triomphe"
+version = "0.1.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0eee8098afad3fb0c54a9007aab6804558410503ad676d4633f9c2559a00ac0f"
+dependencies = [
+ "serde",
+ "stable_deref_trait",
+]
+
 [[package]]
 name = "trust-dns-proto"
 version = "0.21.2"
@@ -9326,13 +9392,32 @@ dependencies = [
  "flate2",
 ]
 
+[[package]]
+name = "zstd"
+version = "0.11.2+zstd.1.5.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "20cc960326ece64f010d2d2107537f26dc589a6573a316bd5b1dba685fa5fde4"
+dependencies = [
+ "zstd-safe 5.0.2+zstd.1.5.2",
+]
+
 [[package]]
 name = "zstd"
 version = "0.12.3+zstd.1.5.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "76eea132fb024e0e13fd9c2f5d5d595d8a967aa72382ac2f9d39fcc95afd0806"
 dependencies = [
- "zstd-safe",
+ "zstd-safe 6.0.4+zstd.1.5.4",
+]
+
+[[package]]
+name = "zstd-safe"
+version = "5.0.2+zstd.1.5.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1d2a5585e04f9eea4b2a3d1eca508c4dee9592a89ef6f450c11719da0726f4db"
+dependencies = [
+ "libc",
+ "zstd-sys",
 ]
 
 [[package]]
diff --git a/Cargo.toml b/Cargo.toml
index f90ae5752f..f4475ce534 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -14,6 +14,7 @@ members = [
   "dozer-log-js",
   "dozer-log-python",
   "dozer-utils",
+  "dozer-recordstore",
 ]
 resolver = "2"
 
diff --git a/dozer-cli/Cargo.toml b/dozer-cli/Cargo.toml
index f57a297196..3215ad6ea5 100644
--- a/dozer-cli/Cargo.toml
+++ b/dozer-cli/Cargo.toml
@@ -18,6 +18,7 @@ dozer-sql = { path = "../dozer-sql" }
 dozer-types = { path = "../dozer-types" }
 dozer-tracing = { path = "../dozer-tracing" }
 dozer-storage = { path = "../dozer-storage" }
+dozer-recordstore = { path = "../dozer-recordstore" }
 
 uuid = { version = "1.3.0", features = ["v4", "serde"] }
 tokio = { version = "1", features = ["full"] }
diff --git a/dozer-cli/src/pipeline/dummy_sink.rs b/dozer-cli/src/pipeline/dummy_sink.rs
index e9ebbec45b..9517719cab 100644
--- a/dozer-cli/src/pipeline/dummy_sink.rs
+++ b/dozer-cli/src/pipeline/dummy_sink.rs
@@ -5,9 +5,9 @@ use dozer_core::{
     epoch::Epoch,
     executor_operation::ProcessorOperation,
     node::{PortHandle, Sink, SinkFactory},
-    processor_record::ProcessorRecordStore,
     DEFAULT_PORT_HANDLE,
 };
+use dozer_recordstore::ProcessorRecordStore;
 use dozer_types::{errors::internal::BoxedError, types::Schema};
 
 #[derive(Debug)]
diff --git a/dozer-cli/src/pipeline/log_sink.rs b/dozer-cli/src/pipeline/log_sink.rs
index f6e5234da3..f04bf00952 100644
--- a/dozer-cli/src/pipeline/log_sink.rs
+++ b/dozer-cli/src/pipeline/log_sink.rs
@@ -8,9 +8,9 @@ use dozer_core::{
     epoch::Epoch,
     executor_operation::ProcessorOperation,
     node::{PortHandle, Sink, SinkFactory},
-    processor_record::ProcessorRecordStore,
     DEFAULT_PORT_HANDLE,
 };
+use dozer_recordstore::ProcessorRecordStore;
 use dozer_tracing::LabelsAndProgress;
 use dozer_types::indicatif::ProgressBar;
 use dozer_types::types::Schema;
@@ -105,9 +105,7 @@ impl Sink for LogSink {
         self.log
             .lock()
             .write(dozer_cache::dozer_log::replication::LogOperation::Op {
-                op: record_store
-                    .load_operation(&op)
-                    .map_err(Into::<BoxedError>::into)?,
+                op: op.load(record_store)?,
             });
         self.update_counter();
         Ok(())
diff --git a/dozer-core/Cargo.toml b/dozer-core/Cargo.toml
index 0f090fd84e..9da25e20b1 100644
--- a/dozer-core/Cargo.toml
+++ b/dozer-core/Cargo.toml
@@ -11,6 +11,7 @@ dozer-log = { path = "../dozer-log/" }
 dozer-storage = {path = "../dozer-storage/"}
 dozer-types = {path = "../dozer-types/"}
 dozer-tracing = {path = "../dozer-tracing/"}
+dozer-recordstore = {path = "../dozer-recordstore/"}
 
 uuid = {version = "1.3.0", features = ["v1", "v4", "fast-rng"]}
 crossbeam = "0.8.2"
diff --git a/dozer-core/src/checkpoint/mod.rs b/dozer-core/src/checkpoint/mod.rs
index 4278c5d2c4..8afda79b92 100644
--- a/dozer-core/src/checkpoint/mod.rs
+++ b/dozer-core/src/checkpoint/mod.rs
@@ -7,6 +7,7 @@ use dozer_log::{
     storage::{self, Object, Queue, Storage},
     tokio::task::JoinHandle,
 };
+use dozer_recordstore::ProcessorRecordStore;
 use dozer_types::{
     bincode,
     log::{error, info},
@@ -18,7 +19,7 @@ use dozer_types::{
 };
 use tempdir::TempDir;
 
-use crate::{errors::ExecutionError, processor_record::ProcessorRecordStore};
+use crate::errors::ExecutionError;
 
 #[derive(Debug)]
 pub struct CheckpointFactory {
diff --git a/dozer-core/src/epoch/manager.rs b/dozer-core/src/epoch/manager.rs
index 897285d133..3b6ebfb129 100644
--- a/dozer-core/src/epoch/manager.rs
+++ b/dozer-core/src/epoch/manager.rs
@@ -1,3 +1,4 @@
+use dozer_recordstore::ProcessorRecordStore;
 use dozer_types::node::{NodeHandle, SourceStates, TableState};
 use dozer_types::parking_lot::Mutex;
 use std::collections::HashMap;
@@ -7,7 +8,6 @@ use std::thread::sleep;
 use std::time::{Duration, SystemTime};
 
 use crate::checkpoint::{CheckpointFactory, CheckpointWriter};
-use crate::processor_record::ProcessorRecordStore;
 
 use super::EpochCommonInfo;
 
diff --git a/dozer-core/src/errors.rs b/dozer-core/src/errors.rs
index 6c2a1bb7db..b709c4c791 100644
--- a/dozer-core/src/errors.rs
+++ b/dozer-core/src/errors.rs
@@ -1,7 +1,7 @@
 use std::path::PathBuf;
 
 use crate::node::PortHandle;
-use dozer_storage::errors::StorageError;
+use dozer_recordstore::RecordStoreError;
 use dozer_types::errors::internal::BoxedError;
 use dozer_types::node::NodeHandle;
 use dozer_types::thiserror::Error;
@@ -35,8 +35,8 @@ pub enum ExecutionError {
     Source(#[source] BoxedError),
     #[error("File system error {0:?}: {1}")]
     FileSystemError(PathBuf, #[source] std::io::Error),
-    #[error("Storage error: {0}")]
-    Storage(#[from] StorageError),
+    #[error("Recordstore error: {0}")]
+    RecordStore(#[from] RecordStoreError),
     #[error("Object storage error: {0}")]
     ObjectStorage(#[from] dozer_log::storage::Error),
     #[error("Checkpoint writer thread panicked")]
diff --git a/dozer-core/src/executor/execution_dag.rs b/dozer-core/src/executor/execution_dag.rs
index 68d87138d7..a22e269a15 100644
--- a/dozer-core/src/executor/execution_dag.rs
+++ b/dozer-core/src/executor/execution_dag.rs
@@ -15,7 +15,6 @@ use crate::{
     executor_operation::ExecutorOperation,
     hash_map_to_vec::insert_vec_element,
     node::PortHandle,
-    processor_record::ProcessorRecordStore,
     record_store::{create_record_writer, RecordWriter},
 };
 use crossbeam::channel::{bounded, Receiver, Sender};
@@ -23,6 +22,7 @@ use daggy::petgraph::{
     visit::{EdgeRef, IntoEdges, IntoEdgesDirected, IntoNodeIdentifiers},
     Direction,
 };
+use dozer_recordstore::ProcessorRecordStore;
 use dozer_tracing::LabelsAndProgress;
 
 pub type SharedRecordWriter = Rc<RefCell<Option<Box<dyn RecordWriter>>>>;
diff --git a/dozer-core/src/executor/processor_node.rs b/dozer-core/src/executor/processor_node.rs
index 799a351de4..c8ce1597e0 100644
--- a/dozer-core/src/executor/processor_node.rs
+++ b/dozer-core/src/executor/processor_node.rs
@@ -8,13 +8,13 @@ use dozer_types::node::NodeHandle;
 use crate::epoch::Epoch;
 use crate::error_manager::ErrorManager;
 use crate::executor_operation::{ExecutorOperation, ProcessorOperation};
-use crate::processor_record::ProcessorRecordStore;
 use crate::{
     builder_dag::NodeKind,
     errors::ExecutionError,
     forwarder::ProcessorChannelManager,
     node::{PortHandle, Processor},
 };
+use dozer_recordstore::ProcessorRecordStore;
 
 use super::{execution_dag::ExecutionDag, name::Name, receiver_loop::ReceiverLoop};
 
diff --git a/dozer-core/src/executor/receiver_loop.rs b/dozer-core/src/executor/receiver_loop.rs
index a31a9feea0..bab8004795 100644
--- a/dozer-core/src/executor/receiver_loop.rs
+++ b/dozer-core/src/executor/receiver_loop.rs
@@ -105,7 +105,7 @@ mod tests {
         types::{Field, Record},
     };
 
-    use crate::processor_record::{ProcessorRecord, ProcessorRecordStore};
+    use dozer_recordstore::{ProcessorRecord, ProcessorRecordStore};
 
     use super::*;
 
diff --git a/dozer-core/src/executor_operation.rs b/dozer-core/src/executor_operation.rs
index 5f734cc818..c5f229935c 100644
--- a/dozer-core/src/executor_operation.rs
+++ b/dozer-core/src/executor_operation.rs
@@ -1,4 +1,7 @@
-use crate::{epoch::Epoch, processor_record::ProcessorRecord};
+use dozer_recordstore::{ProcessorRecord, ProcessorRecordStore};
+use dozer_types::types::Operation;
+
+use crate::{epoch::Epoch, errors::ExecutionError};
 
 #[derive(Clone, Debug, PartialEq, Eq)]
 /// A CDC event.
@@ -15,6 +18,41 @@ pub enum ProcessorOperation {
     },
 }
 
+impl ProcessorOperation {
+    pub fn new(
+        op: &Operation,
+        record_store: &ProcessorRecordStore,
+    ) -> Result<Self, ExecutionError> {
+        Ok(match op {
+            Operation::Delete { old } => ProcessorOperation::Delete {
+                old: record_store.create_record(old)?,
+            },
+            Operation::Insert { new } => ProcessorOperation::Insert {
+                new: record_store.create_record(new)?,
+            },
+            Operation::Update { old, new } => ProcessorOperation::Update {
+                old: record_store.create_record(old)?,
+                new: record_store.create_record(new)?,
+            },
+        })
+    }
+
+    pub fn load(&self, record_store: &ProcessorRecordStore) -> Result<Operation, ExecutionError> {
+        Ok(match self {
+            ProcessorOperation::Delete { old } => Operation::Delete {
+                old: record_store.load_record(old)?,
+            },
+            ProcessorOperation::Insert { new } => Operation::Insert {
+                new: record_store.load_record(new)?,
+            },
+            ProcessorOperation::Update { old, new } => Operation::Update {
+                old: record_store.load_record(old)?,
+                new: record_store.load_record(new)?,
+            },
+        })
+    }
+}
+
 #[derive(Clone, Debug)]
 pub enum ExecutorOperation {
     Op { op: ProcessorOperation },
diff --git a/dozer-core/src/forwarder.rs b/dozer-core/src/forwarder.rs
index 8eb9df0dfd..d976c15c5f 100644
--- a/dozer-core/src/forwarder.rs
+++ b/dozer-core/src/forwarder.rs
@@ -234,7 +234,7 @@ impl SourceChannelManager {
                 );
 
                 self.manager.send_op(
-                    self.epoch_manager.record_store().create_operation(&op)?,
+                    ProcessorOperation::new(&op, self.epoch_manager.record_store())?,
                     port,
                 )?;
                 self.num_uncommitted_ops += 1;
diff --git a/dozer-core/src/lib.rs b/dozer-core/src/lib.rs
index 407d8d189b..a691e450a6 100644
--- a/dozer-core/src/lib.rs
+++ b/dozer-core/src/lib.rs
@@ -14,7 +14,6 @@ pub mod executor_operation;
 pub mod forwarder;
 mod hash_map_to_vec;
 pub mod node;
-pub mod processor_record;
 pub mod record_store;
 
 #[cfg(test)]
diff --git a/dozer-core/src/node.rs b/dozer-core/src/node.rs
index c3af258697..fd9765c4ca 100644
--- a/dozer-core/src/node.rs
+++ b/dozer-core/src/node.rs
@@ -1,7 +1,7 @@
 use crate::channels::{ProcessorChannelForwarder, SourceChannelForwarder};
 use crate::epoch::Epoch;
 use crate::executor_operation::ProcessorOperation;
-use crate::processor_record::ProcessorRecordStore;
+use dozer_recordstore::ProcessorRecordStore;
 
 use dozer_log::storage::{Object, Queue};
 use dozer_types::errors::internal::BoxedError;
diff --git a/dozer-core/src/processor_record/mod.rs b/dozer-core/src/processor_record/mod.rs
deleted file mode 100644
index 61fc47a803..0000000000
--- a/dozer-core/src/processor_record/mod.rs
+++ /dev/null
@@ -1,51 +0,0 @@
-use std::hash::Hash;
-use std::sync::Arc;
-
-use dozer_types::{
-    serde::{Deserialize, Serialize},
-    types::{Field, Lifetime},
-};
-
-#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
-#[serde(crate = "dozer_types::serde")]
-pub struct RecordRef(Arc<[Field]>);
-
-#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
-pub struct ProcessorRecord {
-    /// All `Field`s in this record. The `Field`s are grouped by `Arc` to reduce memory usage.
-    /// This is a Box<[]> instead of a Vec to save space on storing the vec's capacity
-    values: Box<[RecordRef]>,
-
-    /// Time To Live for this record. If the value is None, the record will never expire.
-    lifetime: Option<Box<Lifetime>>,
-}
-
-impl ProcessorRecord {
-    pub fn new(values: Box<[RecordRef]>) -> Self {
-        Self {
-            values,
-            ..Default::default()
-        }
-    }
-
-    pub fn get_lifetime(&self) -> Option<Lifetime> {
-        self.lifetime.as_ref().map(|lifetime| *lifetime.clone())
-    }
-    pub fn set_lifetime(&mut self, lifetime: Option<Lifetime>) {
-        self.lifetime = lifetime.map(Box::new);
-    }
-
-    pub fn values(&self) -> &[RecordRef] {
-        &self.values
-    }
-
-    pub fn appended(existing: &ProcessorRecord, additional: RecordRef) -> Self {
-        let mut values = Vec::with_capacity(existing.values().len() + 1);
-        values.extend_from_slice(existing.values());
-        values.push(additional);
-        Self::new(values.into_boxed_slice())
-    }
-}
-
-mod store;
-pub use store::ProcessorRecordStore;
diff --git a/dozer-core/src/record_store.rs b/dozer-core/src/record_store.rs
index 7c26041061..4ad29bad0d 100644
--- a/dozer-core/src/record_store.rs
+++ b/dozer-core/src/record_store.rs
@@ -1,7 +1,6 @@
 use crate::executor_operation::ProcessorOperation;
 use crate::node::OutputPortType;
-use crate::processor_record::{ProcessorRecord, ProcessorRecordStore};
-use dozer_storage::errors::StorageError;
+use dozer_recordstore::{ProcessorRecord, ProcessorRecordStore, RecordStoreError};
 use dozer_types::thiserror::{self, Error};
 use dozer_types::types::Schema;
 use std::collections::HashMap;
@@ -14,8 +13,8 @@ use super::node::PortHandle;
 pub enum RecordWriterError {
     #[error("Record not found")]
     RecordNotFound,
-    #[error("Storage error: {0}")]
-    Storage(#[from] StorageError),
+    #[error("Recordstore error: {0}")]
+    RecordStore(#[from] RecordStoreError),
 }
 
 pub trait RecordWriter: Send + Sync {
diff --git a/dozer-core/src/tests/dag_base_create_errors.rs b/dozer-core/src/tests/dag_base_create_errors.rs
index b08c2ef2b4..3b6347161f 100644
--- a/dozer-core/src/tests/dag_base_create_errors.rs
+++ b/dozer-core/src/tests/dag_base_create_errors.rs
@@ -3,8 +3,8 @@ use crate::executor::{DagExecutor, ExecutorOptions};
 use crate::node::{
     OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory, Source, SourceFactory,
 };
-use crate::processor_record::ProcessorRecordStore;
 use crate::{Dag, Endpoint, DEFAULT_PORT_HANDLE};
+use dozer_recordstore::ProcessorRecordStore;
 
 use crate::tests::dag_base_run::NoopProcessorFactory;
 use crate::tests::sinks::{CountingSinkFactory, COUNTING_SINK_INPUT_PORT};
diff --git a/dozer-core/src/tests/dag_base_errors.rs b/dozer-core/src/tests/dag_base_errors.rs
index aa6244fd42..785ed5496b 100644
--- a/dozer-core/src/tests/dag_base_errors.rs
+++ b/dozer-core/src/tests/dag_base_errors.rs
@@ -7,13 +7,13 @@ use crate::node::{
     OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory, Sink, SinkFactory,
     Source, SourceFactory, SourceState,
 };
-use crate::processor_record::ProcessorRecordStore;
 use crate::tests::dag_base_run::NoopProcessorFactory;
 use crate::tests::sinks::{CountingSinkFactory, COUNTING_SINK_INPUT_PORT};
 use crate::tests::sources::{GeneratorSourceFactory, GENERATOR_SOURCE_OUTPUT_PORT};
 use crate::{Dag, Endpoint, DEFAULT_PORT_HANDLE};
 use dozer_log::storage::{Object, Queue};
 use dozer_log::tokio;
+use dozer_recordstore::ProcessorRecordStore;
 use dozer_types::errors::internal::BoxedError;
 use dozer_types::ingestion_types::IngestionMessage;
 use dozer_types::node::{NodeHandle, OpIdentifier};
diff --git a/dozer-core/src/tests/dag_base_run.rs b/dozer-core/src/tests/dag_base_run.rs
index 3a2338bc3d..2433ed9984 100644
--- a/dozer-core/src/tests/dag_base_run.rs
+++ b/dozer-core/src/tests/dag_base_run.rs
@@ -4,7 +4,6 @@ use crate::epoch::Epoch;
 use crate::executor::{DagExecutor, ExecutorOptions};
 use crate::executor_operation::ProcessorOperation;
 use crate::node::{OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory};
-use crate::processor_record::ProcessorRecordStore;
 use crate::tests::sinks::{CountingSinkFactory, COUNTING_SINK_INPUT_PORT};
 use crate::tests::sources::{
     DualPortGeneratorSourceFactory, GeneratorSourceFactory,
@@ -14,6 +13,7 @@ use crate::tests::sources::{
 use crate::{Dag, Endpoint, DEFAULT_PORT_HANDLE};
 use dozer_log::storage::Object;
 use dozer_log::tokio;
+use dozer_recordstore::ProcessorRecordStore;
 use dozer_types::errors::internal::BoxedError;
 use dozer_types::node::NodeHandle;
 use dozer_types::types::Schema;
diff --git a/dozer-core/src/tests/dag_ports.rs b/dozer-core/src/tests/dag_ports.rs
index bf74602c6a..14d52bf26e 100644
--- a/dozer-core/src/tests/dag_ports.rs
+++ b/dozer-core/src/tests/dag_ports.rs
@@ -1,8 +1,8 @@
 use crate::node::{
     OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory, Source, SourceFactory,
 };
-use crate::processor_record::ProcessorRecordStore;
 use crate::{Dag, Endpoint, DEFAULT_PORT_HANDLE};
+use dozer_recordstore::ProcessorRecordStore;
 use dozer_types::errors::internal::BoxedError;
 use dozer_types::{node::NodeHandle, types::Schema};
 use std::collections::HashMap;
diff --git a/dozer-core/src/tests/dag_schemas.rs b/dozer-core/src/tests/dag_schemas.rs
index d6f74e8ecd..4a9e578611 100644
--- a/dozer-core/src/tests/dag_schemas.rs
+++ b/dozer-core/src/tests/dag_schemas.rs
@@ -3,8 +3,8 @@ use crate::node::{
     OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory, SinkFactory, Source,
     SourceFactory,
 };
-use crate::processor_record::ProcessorRecordStore;
 use crate::{Dag, Endpoint, DEFAULT_PORT_HANDLE};
+use dozer_recordstore::ProcessorRecordStore;
 
 use dozer_types::errors::internal::BoxedError;
 use dozer_types::node::NodeHandle;
diff --git a/dozer-core/src/tests/processors.rs b/dozer-core/src/tests/processors.rs
index 8edc74072a..703cf6cc40 100644
--- a/dozer-core/src/tests/processors.rs
+++ b/dozer-core/src/tests/processors.rs
@@ -1,10 +1,10 @@
 use std::collections::HashMap;
 
+use dozer_recordstore::ProcessorRecordStore;
 use dozer_types::{errors::internal::BoxedError, types::Schema};
 
 use crate::{
     node::{OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory},
-    processor_record::ProcessorRecordStore,
     DEFAULT_PORT_HANDLE,
 };
 
diff --git a/dozer-core/src/tests/sinks.rs b/dozer-core/src/tests/sinks.rs
index 8cf3f7ff3a..1310c4fcfc 100644
--- a/dozer-core/src/tests/sinks.rs
+++ b/dozer-core/src/tests/sinks.rs
@@ -1,9 +1,9 @@
 use crate::epoch::Epoch;
 use crate::executor_operation::ProcessorOperation;
 use crate::node::{PortHandle, Sink, SinkFactory};
-use crate::processor_record::ProcessorRecordStore;
 use crate::DEFAULT_PORT_HANDLE;
 use dozer_log::storage::Queue;
+use dozer_recordstore::ProcessorRecordStore;
 use dozer_types::errors::internal::BoxedError;
 use dozer_types::types::Schema;
 
diff --git a/dozer-recordstore/Cargo.toml b/dozer-recordstore/Cargo.toml
new file mode 100644
index 0000000000..f47d595131
--- /dev/null
+++ b/dozer-recordstore/Cargo.toml
@@ -0,0 +1,13 @@
+[package]
+name = "dozer-recordstore"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[features]
+fuzz = ["dozer-types/arbitrary"]
+
+[dependencies]
+dozer-types = { path = "../dozer-types" }
+triomphe = "0.1.9"
diff --git a/dozer-recordstore/fuzz/Cargo.toml b/dozer-recordstore/fuzz/Cargo.toml
new file mode 100644
index 0000000000..d904f85951
--- /dev/null
+++ b/dozer-recordstore/fuzz/Cargo.toml
@@ -0,0 +1,37 @@
+[package]
+name = "dozer-recordstore-fuzz"
+version = "0.0.0"
+publish = false
+edition = "2021"
+
+[package.metadata]
+cargo-fuzz = true
+
+[dependencies]
+libfuzzer-sys = "0.4"
+
+# Prevent this from interfering with workspaces
+[workspace]
+members = ["."]
+
+[dependencies.dozer-recordstore]
+path = ".."
+
+[dependencies.dozer-types]
+path = "../../dozer-types"
+features = ["arbitrary"]
+
+[profile.release]
+debug = 1
+
+[[bin]]
+name = "fuzz_recordref"
+path = "fuzz_targets/fuzz_recordref.rs"
+test = false
+doc = false
+
+[patch.crates-io]
+postgres = { git = "https://github.com/getdozer/rust-postgres" }
+postgres-protocol = { git = "https://github.com/getdozer/rust-postgres" }
+postgres-types = { git = "https://github.com/getdozer/rust-postgres" }
+tokio-postgres = { git = "https://github.com/getdozer/rust-postgres" }
diff --git a/dozer-recordstore/fuzz/fuzz_targets/fuzz_recordref.rs b/dozer-recordstore/fuzz/fuzz_targets/fuzz_recordref.rs
new file mode 100644
index 0000000000..1ca60f1ad8
--- /dev/null
+++ b/dozer-recordstore/fuzz/fuzz_targets/fuzz_recordref.rs
@@ -0,0 +1,30 @@
+#![no_main]
+
+use dozer_recordstore::{ProcessorRecordStore, RecordRef};
+use dozer_types::types::Field;
+use libfuzzer_sys::fuzz_target;
+
+fuzz_target!(|data: Vec<Vec<Field>>| {
+    let store = ProcessorRecordStore::new();
+    for record in data {
+        let record_ref = store.create_ref(&record);
+        let fields: Vec<_> = store
+            .load_ref(&record_ref)
+            .into_iter()
+            .map(|field| field.cloned())
+            .collect();
+        assert_eq!(fields, record);
+
+        let cloned = record_ref.clone();
+
+        drop(record_ref);
+
+        // Check that dropping is sound (e.g. no double-free when a clone is dropped)
+        let fields: Vec<_> = cloned
+            .load()
+            .into_iter()
+            .map(|field| field.cloned())
+            .collect();
+        assert_eq!(fields, record);
+    }
+});
diff --git a/dozer-recordstore/src/lib.rs b/dozer-recordstore/src/lib.rs
new file mode 100644
index 0000000000..174d91ec19
--- /dev/null
+++ b/dozer-recordstore/src/lib.rs
@@ -0,0 +1,542 @@
+//! [`RecordRef`] is a compact representation of a collection of [dozer_types::types::Field]s
+//! There are two principles that make this representation more compact than `[Field]`:
+//!  1. The fields and their types are stored as a Struct of Arrays instead of
+//!     and Array of Structs. This makes it possible to pack the discriminants
+//!     for the field types as a byte per field, instead of taking up a full word,
+//!     which is the case in [Field] (because the variant value must be aligned)
+//!  2. The field values are stored packed. In a `[Field]` representation, each
+//!     field takes as much space as the largest enum variant in [Field] (plus its discriminant,
+//!     see (1.)). Instead, for the compact representation, we pack the values into
+//!     align_of::<Field>() sized slots. This way, a u64 takes only 8 bytes, whereas
+//!     a u128 can still use its 16 bytes.
+use std::alloc::{dealloc, handle_alloc_error, Layout};
+use std::{hash::Hash, ptr::NonNull};
+
+use triomphe::{Arc, HeaderSlice};
+
+use dozer_types::chrono::{DateTime, FixedOffset, NaiveDate};
+use dozer_types::json_types::JsonValue;
+use dozer_types::ordered_float::OrderedFloat;
+use dozer_types::rust_decimal::Decimal;
+use dozer_types::types::{DozerDuration, DozerPoint};
+use dozer_types::{
+    serde::{Deserialize, Serialize},
+    types::{Field, FieldType, Lifetime},
+};
+
+// The alignment of an enum is necessarily the maximum alignment of its variants
+// (otherwise it would be unsound to read from it).
+// So, by using the alignment of `Field` as the alignment of the values in our
+// packed `RecordRef`, we ensure that all accesses are aligned.
+// This wastes a little bit of memory for subsequent fields that have
+// smaller minimum alignment and size (such as `bool`, which has size=1, align=1),
+// but in practice this should be negligible compared to the added effort of
+// packing these fields while keeping everything aligned.
+const ALIGN: usize = std::mem::align_of::<Field>();
+
+// This asserts at compile time that `ALIGN` is not
+// accidentally increased, as that would waste space.
+const _ASSERT_ALIGN: usize = 8 - ALIGN;
+
+const fn aligned_size_of<T>() -> usize {
+    const MASK: usize = !(ALIGN - 1);
+    let size = std::mem::size_of::<T>();
+    (size + (ALIGN - 1)) & MASK
+}
+
+#[repr(transparent)]
+#[derive(Debug)]
+/// `repr(transparent)` inner struct so we can implement drop logic on it
+/// This is a `triomphe` HeaderSlice so we can make a fat Arc, saving a level
+/// of indirection and a pointer which would otherwise be needed for the field types
+struct RecordRefInner(HeaderSlice<NonNull<u8>, [Option<FieldType>]>);
+
+#[derive(Debug, Clone)]
+pub struct RecordRef(Arc<RecordRefInner>);
+
+impl PartialEq for RecordRef {
+    fn eq(&self, other: &Self) -> bool {
+        self.load() == other.load()
+    }
+}
+
+impl Eq for RecordRef {}
+
+unsafe impl Send for RecordRef {}
+unsafe impl Sync for RecordRef {}
+
+impl Hash for RecordRef {
+    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+        self.load().hash(state)
+    }
+}
+
+impl<'de> Deserialize<'de> for RecordRef {
+    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
+    where
+        D: dozer_types::serde::Deserializer<'de>,
+    {
+        let fields = Vec::<FieldRef>::deserialize(deserializer)?;
+        let owned_fields: Vec<_> = fields.iter().map(FieldRef::cloned).collect();
+        Ok(Self::new(owned_fields))
+    }
+}
+impl Serialize for RecordRef {
+    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
+    where
+        S: dozer_types::serde::Serializer,
+    {
+        self.load().serialize(serializer)
+    }
+}
+
+/// # Safety
+/// ptr should be valid for reading a `T`
+unsafe fn inc_ptr<T>(ptr: *mut T) -> *mut u8 {
+    // Move the pointer one T forward
+    let ptr = ptr.add(1);
+    // And make sure it is again aligned to 8 bytes
+    ptr.add(ptr.align_offset(ALIGN)) as *mut u8
+}
+
+/// # Safety
+/// ptr should be valid for writing a `T`,
+/// that is, ptr..ptr + size_of::<T> should be inside a single live allocation
+unsafe fn write<T>(ptr: *mut u8, value: T) -> *mut u8 {
+    let ptr = ptr as *mut T;
+    ptr.write(value);
+    inc_ptr(ptr)
+}
+
+/// # Safety
+/// ptr should be valid for reading a `T`,
+/// that is, ptr..ptr + size_of::<T> should be inside a single live allocation
+/// and the memory read should be initialized.
+/// The returned reference is only valid as long as pointed to memory is valid
+/// for reading.
+unsafe fn read_ref<'a, T>(ptr: *mut u8) -> (*mut u8, &'a T) {
+    let ptr = ptr as *mut T;
+    let result = &*ptr;
+    (inc_ptr(ptr), result)
+}
+
+/// # Safety
+/// ptr should be valid for reading a `T`,
+/// that is, ptr..ptr + size_of::<T> should be inside a single live allocation
+/// and the memory read should be initialized.
+/// This takes ownership of the memory returned as `T`, which means dropping `T`
+/// may make future reads from `ptr` undefined behavior
+unsafe fn read<T>(ptr: *mut u8) -> (*mut u8, T) {
+    let ptr = ptr as *mut T;
+    let result = ptr.read();
+    (inc_ptr(ptr), result)
+}
+
+/// # Safety
+/// `ptr` should be valid for reading the contents of a `Field` with the type
+/// corresponding to `field_type`.
+/// See `read_ref`
+unsafe fn read_field_ref<'a>(ptr: *mut u8, field_type: FieldType) -> (*mut u8, FieldRef<'a>) {
+    match field_type {
+        FieldType::UInt => {
+            let (ptr, value) = read_ref(ptr);
+            (ptr, FieldRef::UInt(*value))
+        }
+        FieldType::U128 => {
+            let (ptr, value) = read_ref(ptr);
+            (ptr, FieldRef::U128(*value))
+        }
+
+        FieldType::Int => {
+            let (ptr, value) = read_ref(ptr);
+            (ptr, FieldRef::Int(*value))
+        }
+
+        FieldType::I128 => {
+            let (ptr, value) = read_ref(ptr);
+            (ptr, FieldRef::I128(*value))
+        }
+
+        FieldType::Float => {
+            let (ptr, value) = read_ref(ptr);
+            (ptr, FieldRef::Float(*value))
+        }
+
+        FieldType::Boolean => {
+            let (ptr, value) = read_ref(ptr);
+            (ptr, FieldRef::Boolean(*value))
+        }
+
+        FieldType::String => {
+            let (ptr, value): (_, &String) = read_ref(ptr);
+            (ptr, FieldRef::String(value))
+        }
+        FieldType::Text => {
+            let (ptr, value): (_, &String) = read_ref(ptr);
+            (ptr, FieldRef::Text(value))
+        }
+        FieldType::Binary => {
+            let (ptr, value): (_, &Vec<u8>) = read_ref(ptr);
+            (ptr, FieldRef::Binary(value))
+        }
+        FieldType::Decimal => {
+            let (ptr, value) = read_ref(ptr);
+            (ptr, FieldRef::Decimal(*value))
+        }
+        FieldType::Timestamp => {
+            let (ptr, value) = read_ref(ptr);
+            (ptr, FieldRef::Timestamp(*value))
+        }
+        FieldType::Date => {
+            let (ptr, value) = read_ref(ptr);
+            (ptr, FieldRef::Date(*value))
+        }
+        FieldType::Json => {
+            let (ptr, value) = read_ref::<JsonValue>(ptr);
+            (ptr, FieldRef::Json(value.to_owned()))
+        }
+        FieldType::Point => {
+            let (ptr, value) = read_ref(ptr);
+            (ptr, FieldRef::Point(*value))
+        }
+        FieldType::Duration => {
+            let (ptr, value) = read_ref(ptr);
+            (ptr, FieldRef::Duration(*value))
+        }
+    }
+}
+unsafe fn read_field(ptr: *mut u8, field_type: FieldType) -> (*mut u8, Field) {
+    match field_type {
+        FieldType::UInt => {
+            let (ptr, value) = read(ptr);
+            (ptr, Field::UInt(value))
+        }
+        FieldType::U128 => {
+            let (ptr, value) = read(ptr);
+            (ptr, Field::U128(value))
+        }
+
+        FieldType::Int => {
+            let (ptr, value) = read(ptr);
+            (ptr, Field::Int(value))
+        }
+
+        FieldType::I128 => {
+            let (ptr, value) = read(ptr);
+            (ptr, Field::I128(value))
+        }
+
+        FieldType::Float => {
+            let (ptr, value) = read(ptr);
+            (ptr, Field::Float(value))
+        }
+
+        FieldType::Boolean => {
+            let (ptr, value) = read(ptr);
+            (ptr, Field::Boolean(value))
+        }
+
+        FieldType::String => {
+            let (ptr, value) = read(ptr);
+            (ptr, Field::String(value))
+        }
+        FieldType::Text => {
+            let (ptr, value) = read(ptr);
+            (ptr, Field::String(value))
+        }
+        FieldType::Binary => {
+            let (ptr, value) = read(ptr);
+            (ptr, Field::Binary(value))
+        }
+        FieldType::Decimal => {
+            let (ptr, value) = read(ptr);
+            (ptr, Field::Decimal(value))
+        }
+        FieldType::Timestamp => {
+            let (ptr, value) = read(ptr);
+            (ptr, Field::Timestamp(value))
+        }
+        FieldType::Date => {
+            let (ptr, value) = read(ptr);
+            (ptr, Field::Date(value))
+        }
+        FieldType::Json => {
+            let (ptr, value) = read::<JsonValue>(ptr);
+            (ptr, Field::Json(value))
+        }
+        FieldType::Point => {
+            let (ptr, value) = read(ptr);
+            (ptr, Field::Point(value))
+        }
+        FieldType::Duration => {
+            let (ptr, value) = read(ptr);
+            (ptr, Field::Duration(value))
+        }
+    }
+}
+
+fn size(fields: &[Option<FieldType>]) -> usize {
+    fields
+        .iter()
+        .copied()
+        // Filter out None -> NULL -> size 0
+        .flatten()
+        .map(|field| match field {
+            FieldType::UInt => aligned_size_of::<u64>(),
+            FieldType::U128 => aligned_size_of::<u128>(),
+            FieldType::Int => aligned_size_of::<i64>(),
+            FieldType::I128 => aligned_size_of::<i128>(),
+            FieldType::Float => aligned_size_of::<OrderedFloat<f64>>(),
+            FieldType::Boolean => aligned_size_of::<bool>(),
+            FieldType::String => aligned_size_of::<String>(),
+            FieldType::Text => aligned_size_of::<String>(),
+            FieldType::Binary => aligned_size_of::<Vec<u8>>(),
+            FieldType::Decimal => aligned_size_of::<Decimal>(),
+            FieldType::Timestamp => aligned_size_of::<DateTime<FixedOffset>>(),
+            FieldType::Date => aligned_size_of::<NaiveDate>(),
+            FieldType::Json => aligned_size_of::<JsonValue>(),
+            FieldType::Point => aligned_size_of::<DozerPoint>(),
+            FieldType::Duration => aligned_size_of::<DozerDuration>(),
+        })
+        .sum()
+}
+
+#[derive(Hash, Serialize, Deserialize, Debug, PartialEq, Eq)]
+#[serde(crate = "dozer_types::serde")]
+pub enum FieldRef<'a> {
+    UInt(u64),
+    U128(u128),
+    Int(i64),
+    I128(i128),
+    Float(OrderedFloat<f64>),
+    Boolean(bool),
+    String(&'a str),
+    Text(&'a str),
+    Binary(&'a [u8]),
+    Decimal(Decimal),
+    Timestamp(DateTime<FixedOffset>),
+    Date(NaiveDate),
+    Json(JsonValue),
+    Point(DozerPoint),
+    Duration(DozerDuration),
+    Null,
+}
+
+impl FieldRef<'_> {
+    pub fn cloned(&self) -> Field {
+        match self {
+            FieldRef::UInt(v) => Field::UInt(*v),
+            FieldRef::U128(v) => Field::U128(*v),
+            FieldRef::Int(v) => Field::Int(*v),
+            FieldRef::I128(v) => Field::I128(*v),
+            FieldRef::Float(v) => Field::Float(*v),
+            FieldRef::Boolean(v) => Field::Boolean(*v),
+            FieldRef::String(v) => Field::String((*v).to_owned()),
+            FieldRef::Text(v) => Field::Text((*v).to_owned()),
+            FieldRef::Binary(v) => Field::Binary((*v).to_vec()),
+            FieldRef::Decimal(v) => Field::Decimal(*v),
+            FieldRef::Timestamp(v) => Field::Timestamp(*v),
+            FieldRef::Date(v) => Field::Date(*v),
+            FieldRef::Json(v) => Field::Json(v.clone()),
+            FieldRef::Point(v) => Field::Point(*v),
+            FieldRef::Duration(v) => Field::Duration(*v),
+            FieldRef::Null => Field::Null,
+        }
+    }
+}
+
+impl RecordRef {
+    pub fn new(fields: Vec<Field>) -> Self {
+        let field_types = fields
+            .iter()
+            .map(|field| field.ty())
+            .collect::<Box<[Option<FieldType>]>>();
+        let size = size(&field_types);
+
+        let layout = Layout::from_size_align(size, ALIGN).unwrap();
+        // SAFETY: Everything is `ALIGN` byte aligned
+        let data = unsafe {
+            let data = std::alloc::alloc(layout);
+            if data.is_null() {
+                handle_alloc_error(layout);
+            }
+            data
+        };
+        // SAFETY: We checked for null above
+        let data = unsafe { NonNull::new_unchecked(data) };
+        let mut ptr = data.as_ptr();
+
+        // SAFETY:
+        // - ptr is non-null (we got it from a NonNull)
+        // - ptr is dereferencable (its memory range is large enough and not de-allocated)
+        //
+        unsafe {
+            for field in fields {
+                match field {
+                    Field::UInt(v) => ptr = write(ptr, v),
+                    Field::U128(v) => ptr = write(ptr, v),
+                    Field::Int(v) => ptr = write(ptr, v),
+                    Field::I128(v) => ptr = write(ptr, v),
+                    Field::Float(v) => ptr = write(ptr, v),
+                    Field::Boolean(v) => ptr = write(ptr, v),
+                    Field::String(v) => ptr = write(ptr, v),
+                    Field::Text(v) => ptr = write(ptr, v),
+                    Field::Binary(v) => ptr = write(ptr, v),
+                    Field::Decimal(v) => ptr = write(ptr, v),
+                    Field::Timestamp(v) => ptr = write(ptr, v),
+                    Field::Date(v) => ptr = write(ptr, v),
+                    Field::Json(v) => ptr = write(ptr, v),
+                    Field::Point(v) => ptr = write(ptr, v),
+                    Field::Duration(v) => ptr = write(ptr, v),
+                    Field::Null => (),
+                }
+            }
+        }
+        // SAFETY: This is valid, because inner is `repr(transparent)`
+        let arc = unsafe {
+            let arc = Arc::from_header_and_slice(data, &field_types);
+            std::mem::transmute(arc)
+        };
+        Self(arc)
+    }
+
+    pub fn load(&self) -> Vec<FieldRef<'_>> {
+        self.0
+            .field_types()
+            .iter()
+            .scan(self.0.data().as_ptr(), |ptr, field_type| {
+                let Some(field_type) = field_type else {
+                    return Some(FieldRef::Null);
+                };
+
+                unsafe {
+                    let (new_ptr, value) = read_field_ref(*ptr, *field_type);
+                    *ptr = new_ptr;
+                    Some(value)
+                }
+            })
+            .collect()
+    }
+
+    #[inline(always)]
+    pub fn id(&self) -> usize {
+        self.0.as_ptr() as *const () as usize
+    }
+}
+
+impl RecordRefInner {
+    #[inline(always)]
+    fn field_types(&self) -> &[Option<FieldType>] {
+        &self.0.slice
+    }
+
+    #[inline(always)]
+    fn data(&self) -> NonNull<u8> {
+        self.0.header
+    }
+}
+
+impl Drop for RecordRefInner {
+    fn drop(&mut self) {
+        let mut ptr = self.data().as_ptr();
+        for field in self.field_types().iter().flatten() {
+            unsafe {
+                // Read owned so all field destructors run
+                ptr = read_field(ptr, *field).0;
+            }
+        }
+        // Then deallocate the field storage
+        unsafe {
+            dealloc(
+                self.data().as_ptr(),
+                Layout::from_size_align(size(self.field_types()), ALIGN).unwrap(),
+            );
+        }
+    }
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
+pub struct ProcessorRecord {
+    /// All `Field`s in this record. The `Field`s are grouped by `Arc` to reduce memory usage.
+    /// This is a Box<[]> instead of a Vec to save space on storing the vec's capacity
+    values: Box<[RecordRef]>,
+
+    /// Time To Live for this record. If the value is None, the record will never expire.
+    lifetime: Option<Box<Lifetime>>,
+}
+
+impl ProcessorRecord {
+    pub fn new(values: Box<[RecordRef]>) -> Self {
+        Self {
+            values,
+            ..Default::default()
+        }
+    }
+
+    pub fn get_lifetime(&self) -> Option<Lifetime> {
+        self.lifetime.as_ref().map(|lifetime| *lifetime.clone())
+    }
+    pub fn set_lifetime(&mut self, lifetime: Option<Lifetime>) {
+        self.lifetime = lifetime.map(Box::new);
+    }
+
+    pub fn values(&self) -> &[RecordRef] {
+        &self.values
+    }
+
+    pub fn appended(existing: &ProcessorRecord, additional: RecordRef) -> Self {
+        let mut values = Vec::with_capacity(existing.values().len() + 1);
+        values.extend_from_slice(existing.values());
+        values.push(additional);
+        Self::new(values.into_boxed_slice())
+    }
+}
+
+mod store;
+pub use store::{ProcessorRecordStore, RecordStoreError};
+
+#[cfg(test)]
+mod tests {
+    use dozer_types::types::Field;
+
+    use crate::RecordRef;
+
+    #[test]
+    fn test_store_load() {
+        let fields = vec![
+            Field::String("asdf".to_owned()),
+            Field::Int(23),
+            Field::Null,
+            Field::U128(234),
+        ];
+
+        let record = RecordRef::new(fields.clone());
+        let loaded_fields: Vec<_> = record
+            .load()
+            .into_iter()
+            .map(|field| field.cloned())
+            .collect();
+        assert_eq!(&fields, &loaded_fields);
+    }
+
+    #[test]
+    fn test_ser_de() {
+        let fields = vec![
+            Field::String("asdf".to_owned()),
+            Field::Int(23),
+            Field::Null,
+            Field::U128(234),
+        ];
+
+        let record = RecordRef::new(fields.clone());
+
+        let bytes = dozer_types::bincode::serialize(&record).unwrap();
+        let deserialized: RecordRef = dozer_types::bincode::deserialize(&bytes).unwrap();
+        let loaded_fields: Vec<_> = deserialized
+            .load()
+            .into_iter()
+            .map(|field| field.cloned())
+            .collect();
+        assert_eq!(&fields, &loaded_fields);
+    }
+}
diff --git a/dozer-core/src/processor_record/store.rs b/dozer-recordstore/src/store.rs
similarity index 71%
rename from dozer-core/src/processor_record/store.rs
rename to dozer-recordstore/src/store.rs
index de9d383fc4..29160a6b1a 100644
--- a/dozer-core/src/processor_record/store.rs
+++ b/dozer-recordstore/src/store.rs
@@ -1,16 +1,30 @@
 use std::collections::HashMap;
 
-use dozer_storage::errors::StorageError;
 use dozer_types::{
     bincode,
+    errors::internal::BoxedError,
     parking_lot::RwLock,
     serde::{Deserialize, Serialize},
-    types::{Field, Lifetime, Operation, Record},
+    thiserror::Error,
+    types::{Field, Lifetime, Record},
 };
 
-use crate::{errors::ExecutionError, executor_operation::ProcessorOperation};
-
-use super::{ProcessorRecord, RecordRef};
+use super::{FieldRef, ProcessorRecord, RecordRef};
+
+#[derive(Error, Debug)]
+pub enum RecordStoreError {
+    #[error("Unable to deserialize type: {} - Reason: {}", typ, reason.to_string())]
+    DeserializationError {
+        typ: &'static str,
+        reason: BoxedError,
+    },
+
+    #[error("Unable to serialize type: {} - Reason: {}", typ, reason.to_string())]
+    SerializationError {
+        typ: &'static str,
+        reason: BoxedError,
+    },
+}
 
 #[derive(Debug)]
 pub struct ProcessorRecordStore {
@@ -24,7 +38,7 @@ struct ProcessorRecordStoreInner {
 }
 
 impl ProcessorRecordStore {
-    pub fn new() -> Result<Self, ExecutionError> {
+    pub fn new() -> Result<Self, RecordStoreError> {
         Ok(Self {
             inner: RwLock::new(Default::default()),
         })
@@ -34,19 +48,19 @@ impl ProcessorRecordStore {
         self.inner.read().records.len()
     }
 
-    pub fn serialize_slice(&self, start: usize) -> Result<(Vec<u8>, usize), StorageError> {
+    pub fn serialize_slice(&self, start: usize) -> Result<(Vec<u8>, usize), RecordStoreError> {
         let records = &self.inner.read().records;
         let slice = &records[start..];
-        let data = bincode::serialize(slice).map_err(|e| StorageError::SerializationError {
+        let data = bincode::serialize(slice).map_err(|e| RecordStoreError::SerializationError {
             typ: "[RecordRef]",
             reason: Box::new(e),
         })?;
         Ok((data, slice.len()))
     }
 
-    pub fn deserialize_and_extend(&self, data: &[u8]) -> Result<(), StorageError> {
+    pub fn deserialize_and_extend(&self, data: &[u8]) -> Result<(), RecordStoreError> {
         let slice: Vec<RecordRef> =
-            bincode::deserialize(data).map_err(|e| StorageError::DeserializationError {
+            bincode::deserialize(data).map_err(|e| RecordStoreError::DeserializationError {
                 typ: "[RecordRef]",
                 reason: Box::new(e),
             })?;
@@ -64,8 +78,8 @@ impl ProcessorRecordStore {
         Ok(())
     }
 
-    pub fn create_ref(&self, values: &[Field]) -> Result<RecordRef, StorageError> {
-        let record = RecordRef(values.to_vec().into());
+    pub fn create_ref(&self, values: &[Field]) -> Result<RecordRef, RecordStoreError> {
+        let record = RecordRef::new(values.to_vec());
 
         let mut inner = self.inner.write();
 
@@ -76,8 +90,11 @@ impl ProcessorRecordStore {
         Ok(record)
     }
 
-    pub fn load_ref(&self, record_ref: &RecordRef) -> Result<Vec<Field>, StorageError> {
-        Ok(record_ref.0.to_vec())
+    pub fn load_ref<'a>(
+        &self,
+        record_ref: &'a RecordRef,
+    ) -> Result<Vec<FieldRef<'a>>, RecordStoreError> {
+        Ok(record_ref.load())
     }
 
     pub fn serialize_ref(&self, record_ref: &RecordRef) -> u64 {
@@ -85,7 +102,7 @@ impl ProcessorRecordStore {
             .inner
             .read()
             .record_pointer_to_index
-            .get(&(record_ref.0.as_ptr() as usize))
+            .get(&record_ref.id())
             .expect("RecordRef not found in ProcessorRecordStore") as u64
     }
 
@@ -93,18 +110,23 @@ impl ProcessorRecordStore {
         self.inner.read().records[index as usize].clone()
     }
 
-    pub fn create_record(&self, record: &Record) -> Result<ProcessorRecord, StorageError> {
+    pub fn create_record(&self, record: &Record) -> Result<ProcessorRecord, RecordStoreError> {
         let record_ref = self.create_ref(&record.values)?;
         let mut processor_record = ProcessorRecord::new(Box::new([record_ref]));
         processor_record.set_lifetime(record.lifetime.clone());
         Ok(processor_record)
     }
 
-    pub fn load_record(&self, processor_record: &ProcessorRecord) -> Result<Record, StorageError> {
+    pub fn load_record(
+        &self,
+        processor_record: &ProcessorRecord,
+    ) -> Result<Record, RecordStoreError> {
         let mut record = Record::default();
         for record_ref in processor_record.values.iter() {
             let fields = self.load_ref(record_ref)?;
-            record.values.extend(fields.iter().cloned());
+            record
+                .values
+                .extend(fields.iter().map(|field| field.cloned()));
         }
         record.set_lifetime(processor_record.get_lifetime());
         Ok(record)
@@ -131,48 +153,6 @@ impl ProcessorRecordStore {
             .collect();
         Ok(ProcessorRecord { values, lifetime })
     }
-
-    pub fn create_operation(
-        &self,
-        operation: &Operation,
-    ) -> Result<ProcessorOperation, StorageError> {
-        match operation {
-            Operation::Delete { old } => {
-                let old = self.create_record(old)?;
-                Ok(ProcessorOperation::Delete { old })
-            }
-            Operation::Insert { new } => {
-                let new = self.create_record(new)?;
-                Ok(ProcessorOperation::Insert { new })
-            }
-            Operation::Update { old, new } => {
-                let old = self.create_record(old)?;
-                let new = self.create_record(new)?;
-                Ok(ProcessorOperation::Update { old, new })
-            }
-        }
-    }
-
-    pub fn load_operation(
-        &self,
-        operation: &ProcessorOperation,
-    ) -> Result<Operation, StorageError> {
-        match operation {
-            ProcessorOperation::Delete { old } => {
-                let old = self.load_record(old)?;
-                Ok(Operation::Delete { old })
-            }
-            ProcessorOperation::Insert { new } => {
-                let new = self.load_record(new)?;
-                Ok(Operation::Insert { new })
-            }
-            ProcessorOperation::Update { old, new } => {
-                let old = self.load_record(old)?;
-                let new = self.load_record(new)?;
-                Ok(Operation::Update { old, new })
-            }
-        }
-    }
 }
 
 fn insert_record_pointer_to_index(
@@ -180,7 +160,7 @@ fn insert_record_pointer_to_index(
     record: &RecordRef,
     index: usize,
 ) {
-    let previous_index = record_pointer_to_index.insert(record.0.as_ptr() as usize, index);
+    let previous_index = record_pointer_to_index.insert(record.id(), index);
     debug_assert!(previous_index.is_none());
 }
 
diff --git a/dozer-sql/Cargo.toml b/dozer-sql/Cargo.toml
index a7a0aa0f07..caf7c51bc9 100644
--- a/dozer-sql/Cargo.toml
+++ b/dozer-sql/Cargo.toml
@@ -12,6 +12,7 @@ dozer-storage = { path = "../dozer-storage" }
 dozer-core = { path = "../dozer-core" }
 dozer-tracing = { path = "../dozer-tracing" }
 dozer-sql-expression = { path = "expression" }
+dozer-recordstore = {path = "../dozer-recordstore"}
 
 ahash = "0.8.3"
 enum_dispatch = "0.3.11"
diff --git a/dozer-sql/src/aggregation/factory.rs b/dozer-sql/src/aggregation/factory.rs
index 1f09bf804e..76463b6141 100644
--- a/dozer-sql/src/aggregation/factory.rs
+++ b/dozer-sql/src/aggregation/factory.rs
@@ -1,11 +1,11 @@
 use crate::planner::projection::CommonPlanner;
 use crate::projection::processor::ProjectionProcessor;
 use crate::{aggregation::processor::AggregationProcessor, errors::PipelineError};
-use dozer_core::processor_record::ProcessorRecordStore;
 use dozer_core::{
     node::{OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory},
     DEFAULT_PORT_HANDLE,
 };
+use dozer_recordstore::ProcessorRecordStore;
 use dozer_sql_expression::sqlparser::ast::Select;
 use dozer_types::errors::internal::BoxedError;
 use dozer_types::models::udf_config::UdfConfig;
diff --git a/dozer-sql/src/aggregation/processor.rs b/dozer-sql/src/aggregation/processor.rs
index 4bd8772e0b..70cd1773b6 100644
--- a/dozer-sql/src/aggregation/processor.rs
+++ b/dozer-sql/src/aggregation/processor.rs
@@ -7,8 +7,8 @@ use dozer_core::channels::ProcessorChannelForwarder;
 use dozer_core::dozer_log::storage::Object;
 use dozer_core::executor_operation::ProcessorOperation;
 use dozer_core::node::{PortHandle, Processor};
-use dozer_core::processor_record::ProcessorRecordStore;
 use dozer_core::DEFAULT_PORT_HANDLE;
+use dozer_recordstore::ProcessorRecordStore;
 use dozer_sql_expression::execution::Expression;
 use dozer_types::bincode;
 use dozer_types::errors::internal::BoxedError;
@@ -585,10 +585,10 @@ impl Processor for AggregationProcessor {
         op: ProcessorOperation,
         fw: &mut dyn ProcessorChannelForwarder,
     ) -> Result<(), BoxedError> {
-        let op = record_store.load_operation(&op)?;
+        let op = op.load(record_store)?;
         let ops = self.aggregate(op)?;
         for output_op in ops {
-            let output_op = record_store.create_operation(&output_op)?;
+            let output_op = ProcessorOperation::new(&output_op, record_store)?;
             fw.send(output_op, DEFAULT_PORT_HANDLE);
         }
         Ok(())
diff --git a/dozer-sql/src/errors.rs b/dozer-sql/src/errors.rs
index 2f56c676b7..98c6d94c63 100644
--- a/dozer-sql/src/errors.rs
+++ b/dozer-sql/src/errors.rs
@@ -1,7 +1,7 @@
 #![allow(clippy::enum_variant_names)]
 
 use dozer_core::node::PortHandle;
-use dozer_storage::errors::StorageError;
+use dozer_recordstore::RecordStoreError;
 use dozer_types::chrono::RoundingError;
 use dozer_types::errors::internal::BoxedError;
 use dozer_types::errors::types::TypeError;
@@ -193,8 +193,8 @@ pub enum JoinError {
     #[error("Field type error computing the eviction time in the TTL reference field")]
     EvictionTypeOverflow,
 
-    #[error("Storage error: {0}")]
-    Storage(#[from] StorageError),
+    #[error("Recordstore error: {0}")]
+    RecordStore(#[from] RecordStoreError),
 
     #[error("Deserialization error: {0}")]
     Deserialization(#[from] DeserializationError),
@@ -294,8 +294,8 @@ pub enum WindowError {
     #[error("WINDOW functions require alias")]
     NoAlias,
 
-    #[error("Storage error")]
-    Storage(#[from] StorageError),
+    #[error("RecordStore error")]
+    RecordStore(#[from] RecordStoreError),
 }
 
 #[derive(Error, Debug)]
@@ -324,6 +324,6 @@ pub enum TableOperatorError {
     #[error("TTL input must evaluate to timestamp, but it evaluates to {0}")]
     InvalidTtlInputType(Field),
 
-    #[error("Storage error")]
-    Storage(#[from] StorageError),
+    #[error("Recordstore error")]
+    RecordStore(#[from] RecordStoreError),
 }
diff --git a/dozer-sql/src/expression/tests/test_common.rs b/dozer-sql/src/expression/tests/test_common.rs
index c456c7949b..5ec2a24731 100644
--- a/dozer-sql/src/expression/tests/test_common.rs
+++ b/dozer-sql/src/expression/tests/test_common.rs
@@ -2,8 +2,8 @@ use crate::{projection::factory::ProjectionProcessorFactory, tests::utils::get_s
 use dozer_core::channels::ProcessorChannelForwarder;
 use dozer_core::executor_operation::ProcessorOperation;
 use dozer_core::node::ProcessorFactory;
-use dozer_core::processor_record::ProcessorRecordStore;
 use dozer_core::DEFAULT_PORT_HANDLE;
+use dozer_recordstore::ProcessorRecordStore;
 use dozer_types::types::Record;
 use dozer_types::types::{Field, Schema};
 use std::collections::HashMap;
diff --git a/dozer-sql/src/pipeline/expression/tests/test_common.rs b/dozer-sql/src/pipeline/expression/tests/test_common.rs
new file mode 100644
index 0000000000..aae33c661f
--- /dev/null
+++ b/dozer-sql/src/pipeline/expression/tests/test_common.rs
@@ -0,0 +1,114 @@
+use crate::pipeline::{projection::factory::ProjectionProcessorFactory, tests::utils::get_select};
+use dozer_core::channels::ProcessorChannelForwarder;
+use dozer_core::executor_operation::ProcessorOperation;
+use dozer_core::node::ProcessorFactory;
+use dozer_core::DEFAULT_PORT_HANDLE;
+use dozer_recordstore::ProcessorRecordStore;
+use dozer_types::chrono::{
+    DateTime, Datelike, FixedOffset, NaiveDate, NaiveDateTime, NaiveTime, Timelike,
+};
+use dozer_types::rust_decimal::Decimal;
+use dozer_types::types::Record;
+use dozer_types::types::{Field, Schema};
+use proptest::prelude::*;
+use std::collections::HashMap;
+
+struct TestChannelForwarder {
+    operations: Vec<ProcessorOperation>,
+}
+
+impl ProcessorChannelForwarder for TestChannelForwarder {
+    fn send(&mut self, op: ProcessorOperation, _port: dozer_core::node::PortHandle) {
+        self.operations.push(op);
+    }
+}
+
+pub(crate) fn run_fct(sql: &str, schema: Schema, input: Vec<Field>) -> Field {
+    let record_store = ProcessorRecordStore::new().unwrap();
+
+    let select = get_select(sql).unwrap();
+    let processor_factory =
+        ProjectionProcessorFactory::_new("projection_id".to_owned(), select.projection, vec![]);
+    processor_factory
+        .get_output_schema(
+            &DEFAULT_PORT_HANDLE,
+            &[(DEFAULT_PORT_HANDLE, schema.clone())]
+                .into_iter()
+                .collect(),
+        )
+        .unwrap();
+
+    let mut processor = processor_factory
+        .build(
+            HashMap::from([(DEFAULT_PORT_HANDLE, schema)]),
+            HashMap::new(),
+            &record_store,
+            None,
+        )
+        .unwrap();
+
+    let mut fw = TestChannelForwarder { operations: vec![] };
+    let rec = Record::new(input);
+    let rec = record_store.create_record(&rec).unwrap();
+
+    let op = ProcessorOperation::Insert { new: rec };
+
+    processor
+        .process(DEFAULT_PORT_HANDLE, &record_store, op, &mut fw)
+        .unwrap();
+
+    match &fw.operations[0] {
+        ProcessorOperation::Insert { new } => {
+            let mut new = record_store.load_record(new).unwrap();
+            new.values.remove(0)
+        }
+        _ => panic!("Unable to find result value"),
+    }
+}
+
+#[derive(Debug)]
+pub struct ArbitraryDecimal(pub Decimal);
+
+impl Arbitrary for ArbitraryDecimal {
+    type Parameters = ();
+    type Strategy = BoxedStrategy<Self>;
+
+    fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
+        (i64::MIN..i64::MAX, u32::MIN..29u32)
+            .prop_map(|(num, scale)| ArbitraryDecimal(Decimal::new(num, scale)))
+            .boxed()
+    }
+}
+
+#[derive(Debug)]
+pub struct ArbitraryDateTime(pub DateTime<FixedOffset>);
+
+impl Arbitrary for ArbitraryDateTime {
+    type Parameters = ();
+    type Strategy = BoxedStrategy<Self>;
+
+    fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
+        (
+            NaiveDateTime::MIN.year()..NaiveDateTime::MAX.year(),
+            1..13u32,
+            1..32u32,
+            0..NaiveDateTime::MAX.second(),
+            0..NaiveDateTime::MAX.nanosecond(),
+        )
+            .prop_map(|(year, month, day, secs, nano)| {
+                let timezone_east = FixedOffset::east_opt(8 * 60 * 60).unwrap();
+                let date = NaiveDate::from_ymd_opt(year, month, day);
+                // Some dates are not able to created caused by leap in February with day larger than 28 or 29
+                if date.is_none() {
+                    return ArbitraryDateTime(DateTime::default());
+                }
+                let time = NaiveTime::from_num_seconds_from_midnight_opt(secs, nano).unwrap();
+                let datetime = DateTime::<FixedOffset>::from_local(
+                    NaiveDateTime::new(date.unwrap(), time),
+                    timezone_east,
+                );
+                ArbitraryDateTime(datetime)
+            })
+            .boxed()
+    }
+}
diff --git a/dozer-sql/src/product/join/factory.rs b/dozer-sql/src/product/join/factory.rs
index b5b6f367dc..c910f977fd 100644
--- a/dozer-sql/src/product/join/factory.rs
+++ b/dozer-sql/src/product/join/factory.rs
@@ -2,7 +2,6 @@ use std::collections::HashMap;
 
 use dozer_core::{
     node::{OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory},
-    processor_record::ProcessorRecordStore,
     DEFAULT_PORT_HANDLE,
 };
 use dozer_sql_expression::{
@@ -12,6 +11,8 @@ use dozer_sql_expression::{
         JoinOperator as SqlJoinOperator,
     },
 };
+
+use dozer_recordstore::ProcessorRecordStore;
 use dozer_types::{
     errors::internal::BoxedError,
     types::{FieldDefinition, Schema},
diff --git a/dozer-sql/src/product/join/operator/mod.rs b/dozer-sql/src/product/join/operator/mod.rs
index 2bd0b1f6cd..93db8a0314 100644
--- a/dozer-sql/src/product/join/operator/mod.rs
+++ b/dozer-sql/src/product/join/operator/mod.rs
@@ -1,7 +1,5 @@
-use dozer_core::{
-    dozer_log::storage::Object,
-    processor_record::{ProcessorRecord, ProcessorRecordStore},
-};
+use dozer_core::dozer_log::storage::Object;
+use dozer_recordstore::{ProcessorRecord, ProcessorRecordStore};
 use dozer_types::types::{Record, Schema, Timestamp};
 
 use crate::{
diff --git a/dozer-sql/src/product/join/operator/table.rs b/dozer-sql/src/product/join/operator/table.rs
index cbd2863b19..74605de7a4 100644
--- a/dozer-sql/src/product/join/operator/table.rs
+++ b/dozer-sql/src/product/join/operator/table.rs
@@ -6,10 +6,8 @@ use std::{
     iter::{once, Flatten, Once},
 };
 
-use dozer_core::{
-    dozer_log::storage::Object,
-    processor_record::{ProcessorRecord, ProcessorRecordStore},
-};
+use dozer_core::dozer_log::storage::Object;
+use dozer_recordstore::{ProcessorRecord, ProcessorRecordStore};
 use dozer_types::{
     chrono,
     types::{Field, Record, Schema, Timestamp},
diff --git a/dozer-sql/src/product/join/processor.rs b/dozer-sql/src/product/join/processor.rs
index 23553f5213..4a98609f9c 100644
--- a/dozer-sql/src/product/join/processor.rs
+++ b/dozer-sql/src/product/join/processor.rs
@@ -3,8 +3,8 @@ use dozer_core::dozer_log::storage::Object;
 use dozer_core::epoch::Epoch;
 use dozer_core::executor_operation::ProcessorOperation;
 use dozer_core::node::{PortHandle, Processor};
-use dozer_core::processor_record::ProcessorRecordStore;
 use dozer_core::DEFAULT_PORT_HANDLE;
+use dozer_recordstore::ProcessorRecordStore;
 use dozer_tracing::Labels;
 use dozer_types::errors::internal::BoxedError;
 use dozer_types::types::Lifetime;
diff --git a/dozer-sql/src/product/set/operator.rs b/dozer-sql/src/product/set/operator.rs
index 0eef1111b4..f0ade01fd9 100644
--- a/dozer-sql/src/product/set/operator.rs
+++ b/dozer-sql/src/product/set/operator.rs
@@ -1,6 +1,6 @@
 use super::record_map::{CountingRecordMap, CountingRecordMapEnum};
 use crate::errors::PipelineError;
-use dozer_core::processor_record::ProcessorRecord;
+use dozer_recordstore::ProcessorRecord;
 use dozer_sql_expression::sqlparser::ast::{SetOperator, SetQuantifier};
 
 #[derive(Clone, Debug, PartialEq, Eq, Copy)]
diff --git a/dozer-sql/src/product/set/record_map/mod.rs b/dozer-sql/src/product/set/record_map/mod.rs
index eeab3e9ddb..6ad0acfbb4 100644
--- a/dozer-sql/src/product/set/record_map/mod.rs
+++ b/dozer-sql/src/product/set/record_map/mod.rs
@@ -1,7 +1,5 @@
-use dozer_core::{
-    dozer_log::storage::Object,
-    processor_record::{ProcessorRecord, ProcessorRecordStore},
-};
+use dozer_core::dozer_log::storage::Object;
+use dozer_recordstore::{ProcessorRecord, ProcessorRecordStore};
 use dozer_types::serde::{Deserialize, Serialize};
 use enum_dispatch::enum_dispatch;
 use std::collections::HashMap;
@@ -163,7 +161,7 @@ mod bloom;
 
 #[cfg(test)]
 mod tests {
-    use dozer_core::processor_record::{ProcessorRecord, ProcessorRecordStore};
+    use dozer_recordstore::{ProcessorRecord, ProcessorRecordStore};
     use dozer_types::types::{Field, Record};
 
     use super::{
diff --git a/dozer-sql/src/product/set/set_factory.rs b/dozer-sql/src/product/set/set_factory.rs
index 04910aa5b6..c5421f6097 100644
--- a/dozer-sql/src/product/set/set_factory.rs
+++ b/dozer-sql/src/product/set/set_factory.rs
@@ -3,11 +3,11 @@ use std::collections::HashMap;
 use crate::errors::PipelineError;
 use crate::errors::SetError;
 
-use dozer_core::processor_record::ProcessorRecordStore;
 use dozer_core::{
     node::{OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory},
     DEFAULT_PORT_HANDLE,
 };
+use dozer_recordstore::ProcessorRecordStore;
 use dozer_sql_expression::sqlparser::ast::{SetOperator, SetQuantifier};
 use dozer_types::errors::internal::BoxedError;
 use dozer_types::types::{FieldDefinition, Schema, SourceDefinition};
diff --git a/dozer-sql/src/product/set/set_processor.rs b/dozer-sql/src/product/set/set_processor.rs
index a9f74034fc..9a6e2e205b 100644
--- a/dozer-sql/src/product/set/set_processor.rs
+++ b/dozer-sql/src/product/set/set_processor.rs
@@ -10,8 +10,8 @@ use dozer_core::dozer_log::storage::Object;
 use dozer_core::epoch::Epoch;
 use dozer_core::executor_operation::ProcessorOperation;
 use dozer_core::node::{PortHandle, Processor};
-use dozer_core::processor_record::{ProcessorRecord, ProcessorRecordStore};
 use dozer_core::DEFAULT_PORT_HANDLE;
+use dozer_recordstore::{ProcessorRecord, ProcessorRecordStore};
 use dozer_types::errors::internal::BoxedError;
 use std::fmt::{Debug, Formatter};
 
diff --git a/dozer-sql/src/product/table/factory.rs b/dozer-sql/src/product/table/factory.rs
index a016f368f9..0f5ab6b8de 100644
--- a/dozer-sql/src/product/table/factory.rs
+++ b/dozer-sql/src/product/table/factory.rs
@@ -2,9 +2,9 @@ use std::collections::HashMap;
 
 use dozer_core::{
     node::{OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory},
-    processor_record::ProcessorRecordStore,
     DEFAULT_PORT_HANDLE,
 };
+use dozer_recordstore::ProcessorRecordStore;
 use dozer_sql_expression::{
     builder::{extend_schema_source_def, NameOrAlias},
     sqlparser::ast::TableFactor,
diff --git a/dozer-sql/src/product/table/processor.rs b/dozer-sql/src/product/table/processor.rs
index e6dfe8edc0..1368b0698b 100644
--- a/dozer-sql/src/product/table/processor.rs
+++ b/dozer-sql/src/product/table/processor.rs
@@ -3,8 +3,8 @@ use dozer_core::dozer_log::storage::Object;
 use dozer_core::epoch::Epoch;
 use dozer_core::executor_operation::ProcessorOperation;
 use dozer_core::node::{PortHandle, Processor};
-use dozer_core::processor_record::ProcessorRecordStore;
 use dozer_core::DEFAULT_PORT_HANDLE;
+use dozer_recordstore::ProcessorRecordStore;
 use dozer_types::errors::internal::BoxedError;
 
 #[derive(Debug)]
diff --git a/dozer-sql/src/projection/factory.rs b/dozer-sql/src/projection/factory.rs
index 54e7df80f0..b9b150ef0f 100644
--- a/dozer-sql/src/projection/factory.rs
+++ b/dozer-sql/src/projection/factory.rs
@@ -2,9 +2,9 @@ use std::collections::HashMap;
 
 use dozer_core::{
     node::{OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory},
-    processor_record::ProcessorRecordStore,
     DEFAULT_PORT_HANDLE,
 };
+use dozer_recordstore::ProcessorRecordStore;
 use dozer_sql_expression::{
     builder::ExpressionBuilder,
     execution::Expression,
diff --git a/dozer-sql/src/projection/processor.rs b/dozer-sql/src/projection/processor.rs
index ec8a03ec43..b715663582 100644
--- a/dozer-sql/src/projection/processor.rs
+++ b/dozer-sql/src/projection/processor.rs
@@ -6,8 +6,8 @@ use dozer_core::dozer_log::storage::Object;
 use dozer_core::epoch::Epoch;
 use dozer_core::executor_operation::ProcessorOperation;
 use dozer_core::node::{PortHandle, Processor};
-use dozer_core::processor_record::ProcessorRecordStore;
 use dozer_core::DEFAULT_PORT_HANDLE;
+use dozer_recordstore::ProcessorRecordStore;
 use dozer_types::errors::internal::BoxedError;
 use dozer_types::types::{Operation, Record, Schema};
 
@@ -82,13 +82,13 @@ impl Processor for ProjectionProcessor {
         op: ProcessorOperation,
         fw: &mut dyn ProcessorChannelForwarder,
     ) -> Result<(), BoxedError> {
-        let op = record_store.load_operation(&op)?;
+        let op = op.load(record_store)?;
         let output_op = match op {
             Operation::Delete { ref old } => self.delete(old)?,
             Operation::Insert { ref new } => self.insert(new)?,
             Operation::Update { ref old, ref new } => self.update(old, new)?,
         };
-        let output_op = record_store.create_operation(&output_op)?;
+        let output_op = ProcessorOperation::new(&output_op, record_store)?;
         fw.send(output_op, DEFAULT_PORT_HANDLE);
         Ok(())
     }
diff --git a/dozer-sql/src/selection/factory.rs b/dozer-sql/src/selection/factory.rs
index 6bcf6de27f..679994ac41 100644
--- a/dozer-sql/src/selection/factory.rs
+++ b/dozer-sql/src/selection/factory.rs
@@ -1,11 +1,11 @@
 use std::collections::HashMap;
 
 use crate::errors::PipelineError;
-use dozer_core::processor_record::ProcessorRecordStore;
 use dozer_core::{
     node::{OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory},
     DEFAULT_PORT_HANDLE,
 };
+use dozer_recordstore::ProcessorRecordStore;
 use dozer_sql_expression::builder::ExpressionBuilder;
 use dozer_sql_expression::sqlparser::ast::Expr as SqlExpr;
 use dozer_types::models::udf_config::UdfConfig;
diff --git a/dozer-sql/src/selection/processor.rs b/dozer-sql/src/selection/processor.rs
index e2677a0cc8..c1ae3901ad 100644
--- a/dozer-sql/src/selection/processor.rs
+++ b/dozer-sql/src/selection/processor.rs
@@ -3,8 +3,8 @@ use dozer_core::dozer_log::storage::Object;
 use dozer_core::epoch::Epoch;
 use dozer_core::executor_operation::ProcessorOperation;
 use dozer_core::node::{PortHandle, Processor};
-use dozer_core::processor_record::ProcessorRecordStore;
 use dozer_core::DEFAULT_PORT_HANDLE;
+use dozer_recordstore::ProcessorRecordStore;
 use dozer_sql_expression::execution::Expression;
 use dozer_types::errors::internal::BoxedError;
 use dozer_types::types::{Field, Schema};
diff --git a/dozer-sql/src/table_operator/factory.rs b/dozer-sql/src/table_operator/factory.rs
index ca3bcd1189..1c1fa30e81 100644
--- a/dozer-sql/src/table_operator/factory.rs
+++ b/dozer-sql/src/table_operator/factory.rs
@@ -2,9 +2,9 @@ use std::{collections::HashMap, time::Duration};
 
 use dozer_core::{
     node::{OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory},
-    processor_record::ProcessorRecordStore,
     DEFAULT_PORT_HANDLE,
 };
+use dozer_recordstore::ProcessorRecordStore;
 use dozer_sql_expression::{
     builder::ExpressionBuilder,
     execution::Expression,
diff --git a/dozer-sql/src/table_operator/lifetime.rs b/dozer-sql/src/table_operator/lifetime.rs
index 975f200b55..4379db1158 100644
--- a/dozer-sql/src/table_operator/lifetime.rs
+++ b/dozer-sql/src/table_operator/lifetime.rs
@@ -1,4 +1,4 @@
-use dozer_core::processor_record::{ProcessorRecord, ProcessorRecordStore};
+use dozer_recordstore::{ProcessorRecord, ProcessorRecordStore};
 use dozer_sql_expression::execution::Expression;
 use dozer_types::types::{Field, Lifetime, Schema};
 
diff --git a/dozer-sql/src/table_operator/operator.rs b/dozer-sql/src/table_operator/operator.rs
index 387b2a9a60..da95c03833 100644
--- a/dozer-sql/src/table_operator/operator.rs
+++ b/dozer-sql/src/table_operator/operator.rs
@@ -1,5 +1,5 @@
 use crate::table_operator::lifetime::LifetimeTableOperator;
-use dozer_core::processor_record::{ProcessorRecord, ProcessorRecordStore};
+use dozer_recordstore::{ProcessorRecord, ProcessorRecordStore};
 use dozer_types::types::Schema;
 use enum_dispatch::enum_dispatch;
 
diff --git a/dozer-sql/src/table_operator/processor.rs b/dozer-sql/src/table_operator/processor.rs
index af334f7a42..12f2ae87e6 100644
--- a/dozer-sql/src/table_operator/processor.rs
+++ b/dozer-sql/src/table_operator/processor.rs
@@ -3,8 +3,8 @@ use dozer_core::dozer_log::storage::Object;
 use dozer_core::epoch::Epoch;
 use dozer_core::executor_operation::ProcessorOperation;
 use dozer_core::node::{PortHandle, Processor};
-use dozer_core::processor_record::ProcessorRecordStore;
 use dozer_core::DEFAULT_PORT_HANDLE;
+use dozer_recordstore::ProcessorRecordStore;
 use dozer_types::errors::internal::BoxedError;
 use dozer_types::types::Schema;
 
diff --git a/dozer-sql/src/table_operator/tests/operator_test.rs b/dozer-sql/src/table_operator/tests/operator_test.rs
index 072e448818..ec9a6f709d 100644
--- a/dozer-sql/src/table_operator/tests/operator_test.rs
+++ b/dozer-sql/src/table_operator/tests/operator_test.rs
@@ -1,6 +1,6 @@
 use std::time::Duration;
 
-use dozer_core::processor_record::ProcessorRecordStore;
+use dozer_recordstore::ProcessorRecordStore;
 use dozer_sql_expression::execution::Expression;
 use dozer_types::{
     chrono::DateTime,
diff --git a/dozer-sql/src/tests/builder_test.rs b/dozer-sql/src/tests/builder_test.rs
index 6df1d62af3..be42d53225 100644
--- a/dozer-sql/src/tests/builder_test.rs
+++ b/dozer-sql/src/tests/builder_test.rs
@@ -10,8 +10,8 @@ use dozer_core::node::{
     OutputPortDef, OutputPortType, PortHandle, Sink, SinkFactory, Source, SourceFactory,
     SourceState,
 };
-use dozer_core::processor_record::ProcessorRecordStore;
 use dozer_core::DEFAULT_PORT_HANDLE;
+use dozer_recordstore::ProcessorRecordStore;
 use dozer_types::errors::internal::BoxedError;
 use dozer_types::ingestion_types::IngestionMessage;
 use dozer_types::log::debug;
diff --git a/dozer-sql/src/utils/serialize.rs b/dozer-sql/src/utils/serialize.rs
index 6192ce7c60..44cceca6a3 100644
--- a/dozer-sql/src/utils/serialize.rs
+++ b/dozer-sql/src/utils/serialize.rs
@@ -1,7 +1,5 @@
-use dozer_core::{
-    dozer_log::{storage::Object, tokio::sync::mpsc::error::SendError},
-    processor_record::{ProcessorRecord, ProcessorRecordStore},
-};
+use dozer_core::dozer_log::{storage::Object, tokio::sync::mpsc::error::SendError};
+use dozer_recordstore::{ProcessorRecord, ProcessorRecordStore};
 use dozer_types::{
     bincode,
     serde::{de::DeserializeOwned, Serialize},
diff --git a/dozer-sql/src/window/factory.rs b/dozer-sql/src/window/factory.rs
index 3fbef96600..9885afb689 100644
--- a/dozer-sql/src/window/factory.rs
+++ b/dozer-sql/src/window/factory.rs
@@ -2,9 +2,9 @@ use std::collections::HashMap;
 
 use dozer_core::{
     node::{OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory},
-    processor_record::ProcessorRecordStore,
     DEFAULT_PORT_HANDLE,
 };
+use dozer_recordstore::ProcessorRecordStore;
 use dozer_types::{errors::internal::BoxedError, types::Schema};
 
 use crate::{
diff --git a/dozer-sql/src/window/operator.rs b/dozer-sql/src/window/operator.rs
index 9172f473f8..e17492b046 100644
--- a/dozer-sql/src/window/operator.rs
+++ b/dozer-sql/src/window/operator.rs
@@ -1,4 +1,4 @@
-use dozer_core::processor_record::{ProcessorRecord, ProcessorRecordStore};
+use dozer_recordstore::{ProcessorRecord, ProcessorRecordStore};
 use dozer_types::{
     chrono::{Duration, DurationRound},
     types::{Field, FieldDefinition, FieldType, Record, Schema, SourceDefinition},
diff --git a/dozer-sql/src/window/processor.rs b/dozer-sql/src/window/processor.rs
index 4439fb88bf..78e5cc9d0f 100644
--- a/dozer-sql/src/window/processor.rs
+++ b/dozer-sql/src/window/processor.rs
@@ -4,8 +4,8 @@ use dozer_core::dozer_log::storage::Object;
 use dozer_core::epoch::Epoch;
 use dozer_core::executor_operation::ProcessorOperation;
 use dozer_core::node::{PortHandle, Processor};
-use dozer_core::processor_record::ProcessorRecordStore;
 use dozer_core::DEFAULT_PORT_HANDLE;
+use dozer_recordstore::ProcessorRecordStore;
 use dozer_types::errors::internal::BoxedError;
 
 use super::operator::WindowType;
diff --git a/dozer-sql/src/window/tests/operator_test.rs b/dozer-sql/src/window/tests/operator_test.rs
index 21d8dd9e4d..4b5fe3b510 100644
--- a/dozer-sql/src/window/tests/operator_test.rs
+++ b/dozer-sql/src/window/tests/operator_test.rs
@@ -1,4 +1,4 @@
-use dozer_core::processor_record::{ProcessorRecord, ProcessorRecordStore};
+use dozer_recordstore::{ProcessorRecord, ProcessorRecordStore};
 use dozer_types::types::Record;
 use dozer_types::{
     chrono::{DateTime, Duration},
diff --git a/dozer-tests/Cargo.toml b/dozer-tests/Cargo.toml
index 8a8632b607..468427412b 100644
--- a/dozer-tests/Cargo.toml
+++ b/dozer-tests/Cargo.toml
@@ -25,6 +25,7 @@ dozer-api = { path = "../dozer-api" }
 dozer-utils = { path = "../dozer-utils" }
 dozer-cli = { path = "../dozer-cli" }
 dozer-tracing = { path = "../dozer-tracing" }
+dozer-recordstore = { path = "../dozer-recordstore" }
 
 reqwest = { version = "0.11.20", features = ["json", "rustls-tls"], default-features = false }
 tokio = { version = "1.25.0", features = ["full", "rt"] }
diff --git a/dozer-tests/src/sql_tests/helper/pipeline.rs b/dozer-tests/src/sql_tests/helper/pipeline.rs
index 2eb7352d08..1a82b7d5c8 100644
--- a/dozer-tests/src/sql_tests/helper/pipeline.rs
+++ b/dozer-tests/src/sql_tests/helper/pipeline.rs
@@ -12,8 +12,8 @@ use dozer_core::node::{
     SourceState,
 };
 
-use dozer_core::processor_record::ProcessorRecordStore;
 use dozer_core::{Dag, DEFAULT_PORT_HANDLE};
+use dozer_recordstore::ProcessorRecordStore;
 
 use dozer_core::executor::{DagExecutor, ExecutorOptions};
 
diff --git a/dozer-types/Cargo.toml b/dozer-types/Cargo.toml
index 5a8af1020d..223b4e7a81 100644
--- a/dozer-types/Cargo.toml
+++ b/dozer-types/Cargo.toml
@@ -36,6 +36,7 @@ tokio-postgres = { version = "0.7.7", features = [
     "with-uuid-1",
 ] }
 serde_bytes = "0.11.12"
+arbitrary = { version = "1", features = ["derive"], optional = true }
 
 [build-dependencies]
 tonic-build = "0.10.0"
@@ -43,3 +44,4 @@ tonic-build = "0.10.0"
 [features]
 python-extension-module = ["dep:pyo3", "pyo3?/extension-module"]
 python-auto-initialize = ["dep:pyo3", "pyo3?/auto-initialize"]
+arbitrary = ["dep:arbitrary", "chrono/arbitrary", "rust_decimal/rust-fuzz"]
diff --git a/dozer-types/src/json_types.rs b/dozer-types/src/json_types.rs
index 4c42d229d1..fbb60ae46e 100644
--- a/dozer-types/src/json_types.rs
+++ b/dozer-types/src/json_types.rs
@@ -13,10 +13,14 @@ use std::fmt::{Display, Formatter};
 use std::str::FromStr;
 
 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, PartialOrd, Ord, Hash)]
+#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
 pub enum JsonValue {
     Null,
     Bool(bool),
-    Number(OrderedFloat<f64>),
+    Number(
+        #[cfg_attr(feature="arbitrary", arbitrary(with = crate::types::field::arbitrary_float))]
+        OrderedFloat<f64>,
+    ),
     String(String),
     Array(Vec<JsonValue>),
     Object(BTreeMap<String, JsonValue>),
diff --git a/dozer-types/src/types/field.rs b/dozer-types/src/types/field.rs
index f67c16d2fc..2c5caae647 100644
--- a/dozer-types/src/types/field.rs
+++ b/dozer-types/src/types/field.rs
@@ -16,12 +16,13 @@ use std::time::Duration;
 
 pub const DATE_FORMAT: &str = "%Y-%m-%d";
 #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, PartialOrd, Ord, Hash)]
+#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
 pub enum Field {
     UInt(u64),
     U128(u128),
     Int(i64),
     I128(i128),
-    Float(OrderedFloat<f64>),
+    Float(#[cfg_attr(feature= "arbitrary", arbitrary(with = arbitrary_float))] OrderedFloat<f64>),
     Boolean(bool),
     String(String),
     Text(String),
@@ -35,6 +36,13 @@ pub enum Field {
     Null,
 }
 
+#[cfg(feature = "arbitrary")]
+pub(crate) fn arbitrary_float(
+    arbitrary: &mut arbitrary::Unstructured,
+) -> arbitrary::Result<OrderedFloat<f64>> {
+    Ok(OrderedFloat(arbitrary.arbitrary()?))
+}
+
 impl Field {
     pub(crate) fn data_encoding_len(&self) -> usize {
         match self {
@@ -183,6 +191,27 @@ impl Field {
         }
     }
 
+    pub fn ty(&self) -> Option<FieldType> {
+        match self {
+            Field::UInt(_) => Some(FieldType::UInt),
+            Field::U128(_) => Some(FieldType::U128),
+            Field::Int(_) => Some(FieldType::Int),
+            Field::I128(_) => Some(FieldType::I128),
+            Field::Float(_) => Some(FieldType::Float),
+            Field::Boolean(_) => Some(FieldType::Boolean),
+            Field::String(_) => Some(FieldType::String),
+            Field::Text(_) => Some(FieldType::Text),
+            Field::Binary(_) => Some(FieldType::Binary),
+            Field::Decimal(_) => Some(FieldType::Decimal),
+            Field::Timestamp(_) => Some(FieldType::Timestamp),
+            Field::Date(_) => Some(FieldType::Date),
+            Field::Json(_) => Some(FieldType::Json),
+            Field::Point(_) => Some(FieldType::Point),
+            Field::Duration(_) => Some(FieldType::Duration),
+            Field::Null => None,
+        }
+    }
+
     pub fn as_uint(&self) -> Option<u64> {
         match self {
             Field::UInt(i) => Some(*i),
diff --git a/dozer-types/src/types/mod.rs b/dozer-types/src/types/mod.rs
index 63d27a8d08..a495c8a979 100644
--- a/dozer-types/src/types/mod.rs
+++ b/dozer-types/src/types/mod.rs
@@ -260,6 +260,7 @@ pub enum Operation {
 // Helpful in interacting with external systems during ingestion and querying
 // For example, nanoseconds can overflow.
 #[derive(Clone, Copy, Serialize, Deserialize, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
+#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
 pub enum TimeUnit {
     Seconds,
     Milliseconds,
@@ -320,6 +321,7 @@ impl TimeUnit {
 }
 
 #[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, PartialEq, Hash)]
+#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
 pub struct DozerDuration(pub std::time::Duration, pub TimeUnit);
 
 impl Ord for DozerDuration {
@@ -371,6 +373,16 @@ impl DozerDuration {
 #[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, PartialEq, Hash)]
 pub struct DozerPoint(pub Point<OrderedFloat<f64>>);
 
+#[cfg(feature = "arbitrary")]
+impl<'a> arbitrary::Arbitrary<'a> for DozerPoint {
+    fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
+        let x = self::field::arbitrary_float(u)?;
+        let y = self::field::arbitrary_float(u)?;
+
+        Ok(Self(geo::Point::new(x, y)))
+    }
+}
+
 impl GeodesicDistance<OrderedFloat<f64>> for DozerPoint {
     fn geodesic_distance(&self, rhs: &Self) -> OrderedFloat<f64> {
         let f = point! { x: self.0.x().0, y: self.0.y().0 };