From f5b6c7f9ad412302ef68f5e43cc82ef4ed18d013 Mon Sep 17 00:00:00 2001 From: Solomon <108011288+abcpro1@users.noreply.github.com> Date: Fri, 25 Aug 2023 06:52:28 +0000 Subject: [PATCH] feat: make probabilistic optimizations optional and tunable in the YAML config (#1912) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Probabilistic optimization sacrifices accuracy in order to reduce memory consumption. In certain parts of the pipeline, a Bloom Filter is used ([set_processor](https://github.com/getdozer/dozer/blob/2e3ba96c3f4bdf9a691747191ab15617564d8ca2/dozer-sql/src/pipeline/product/set/set_processor.rs#L20)), while in other parts, hash tables that store only the hash of the keys instead of the full keys are used ([aggregation_processor](https://github.com/getdozer/dozer/blob/2e3ba96c3f4bdf9a691747191ab15617564d8ca2/dozer-sql/src/pipeline/aggregation/processor.rs#L59) and [join_processor](https://github.com/getdozer/dozer/blob/2e3ba96c3f4bdf9a691747191ab15617564d8ca2/dozer-sql/src/pipeline/product/join/operator.rs#L57-L58)). This commit makes these optimizations disabled by default and offers user-configurable flags to enable each of these optimizations separately. This is an example of how to turn on probabilistic optimizations for each processor in the Dozer configuration. ``` flags:   enable_probabilistic_optimizations:     in_sets: true # enable probabilistic optimizations in set operations (UNION, EXCEPT, INTERSECT); Default: false     in_joins: true # enable probabilistic optimizations in JOIN operations; Default: false     in_aggregations: true # enable probabilistic optimizations in aggregations (SUM, COUNT, MIN, etc.); Default: false ``` --- dozer-cli/src/lib.rs | 2 +- dozer-cli/src/live/state.rs | 10 +- dozer-cli/src/pipeline/builder.rs | 8 +- dozer-cli/src/pipeline/tests/builder.rs | 2 + dozer-cli/src/simple/executor.rs | 3 + dozer-cli/src/simple/orchestrator.rs | 4 +- dozer-cli/src/ui_helper.rs | 12 +- dozer-core/src/app.rs | 47 +++- dozer-core/src/tests/app.rs | 4 +- dozer-sql/src/pipeline/aggregation/factory.rs | 10 +- .../src/pipeline/aggregation/processor.rs | 72 ++++--- .../tests/aggregation_test_planner.rs | 1 + .../tests/aggregation_tests_utils.rs | 1 + dozer-sql/src/pipeline/builder.rs | 34 ++- dozer-sql/src/pipeline/mod.rs | 1 + .../pipeline/pipeline_builder/join_builder.rs | 1 + .../src/pipeline/product/join/factory.rs | 13 +- .../src/pipeline/product/join/operator.rs | 203 ++++++++++-------- dozer-sql/src/pipeline/product/set/mod.rs | 1 + .../src/pipeline/product/set/operator.rs | 14 +- .../src/pipeline/product/set/record_map.rs | 158 ++++++++++++++ .../src/pipeline/product/set/set_factory.rs | 14 +- .../src/pipeline/product/set/set_processor.rs | 29 +-- dozer-sql/src/pipeline/tests/builder_test.rs | 2 +- dozer-sql/src/pipeline/utils/mod.rs | 1 + .../pipeline/utils/record_hashtable_key.rs | 32 +++ dozer-tests/src/sql_tests/helper/pipeline.rs | 2 +- dozer-types/protos/cloud_types.proto | 7 + dozer-types/src/models/flags.rs | 23 ++ .../tests/flags_config_yaml_deserialize.rs | 15 +- 30 files changed, 536 insertions(+), 190 deletions(-) create mode 100644 dozer-sql/src/pipeline/product/set/record_map.rs create mode 100644 dozer-sql/src/pipeline/utils/mod.rs create mode 100644 dozer-sql/src/pipeline/utils/record_hashtable_key.rs diff --git a/dozer-cli/src/lib.rs b/dozer-cli/src/lib.rs index f00dfa64ea..a7af6791d5 100644 --- a/dozer-cli/src/lib.rs +++ b/dozer-cli/src/lib.rs @@ -64,7 +64,7 @@ pub use dozer_ingestion::{ pub use dozer_sql::pipeline::builder::QueryContext; pub use ui_helper::config_to_ui_dag; pub fn wrapped_statement_to_pipeline(sql: &str) -> Result { - let mut pipeline = AppPipeline::new(); + let mut pipeline = AppPipeline::new_with_default_flags(); statement_to_pipeline(sql, &mut pipeline, None) } diff --git a/dozer-cli/src/live/state.rs b/dozer-cli/src/live/state.rs index 458dbd46fd..d67347ccd1 100644 --- a/dozer-cli/src/live/state.rs +++ b/dozer-cli/src/live/state.rs @@ -14,6 +14,7 @@ use dozer_types::{ models::{ api_config::{ApiConfig, AppGrpcOptions}, api_endpoint::ApiEndpoint, + flags::Flags, }, }; use tokio::{runtime::Runtime, sync::RwLock}; @@ -292,6 +293,7 @@ async fn create_dag( dozer.config.sql.as_deref(), endpoint_and_logs, MultiProgress::new(), + Flags::default(), ); let (_shutdown_sender, shutdown_receiver) = shutdown::new(&dozer.runtime); builder.build(&dozer.runtime, shutdown_receiver).await @@ -324,8 +326,12 @@ fn get_dozer_run_instance( ) -> Result { match req.request { Some(dozer_types::grpc_types::live::run_request::Request::Sql(req)) => { - let context = statement_to_pipeline(&req.sql, &mut AppPipeline::new(), None) - .map_err(LiveError::PipelineError)?; + let context = statement_to_pipeline( + &req.sql, + &mut AppPipeline::new(dozer.config.flags.clone().unwrap_or_default().into()), + None, + ) + .map_err(LiveError::PipelineError)?; //overwrite sql dozer.config.sql = Some(req.sql); diff --git a/dozer-cli/src/pipeline/builder.rs b/dozer-cli/src/pipeline/builder.rs index 476d89b765..ffba23d451 100644 --- a/dozer-cli/src/pipeline/builder.rs +++ b/dozer-cli/src/pipeline/builder.rs @@ -16,6 +16,7 @@ use dozer_types::indicatif::MultiProgress; use dozer_types::log::debug; use dozer_types::models::api_endpoint::ApiEndpoint; use dozer_types::models::connection::Connection; +use dozer_types::models::flags::Flags; use dozer_types::models::source::Source; use dozer_types::parking_lot::Mutex; use std::hash::Hash; @@ -57,6 +58,7 @@ pub struct PipelineBuilder<'a> { /// `ApiEndpoint` and its log. endpoint_and_logs: Vec<(ApiEndpoint, OptionLog)>, progress: MultiProgress, + flags: Flags, } impl<'a> PipelineBuilder<'a> { @@ -66,6 +68,7 @@ impl<'a> PipelineBuilder<'a> { sql: Option<&'a str>, endpoint_and_logs: Vec<(ApiEndpoint, OptionLog)>, progress: MultiProgress, + flags: Flags, ) -> Self { Self { connections, @@ -73,6 +76,7 @@ impl<'a> PipelineBuilder<'a> { sql, endpoint_and_logs, progress, + flags, } } @@ -148,7 +152,7 @@ impl<'a> PipelineBuilder<'a> { let mut original_sources = vec![]; let mut query_ctx = None; - let mut pipeline = AppPipeline::new(); + let mut pipeline = AppPipeline::new((&self.flags).into()); let mut transformed_sources = vec![]; @@ -205,7 +209,7 @@ impl<'a> PipelineBuilder<'a> { let mut pipelines: Vec> = vec![]; - let mut pipeline = AppPipeline::new(); + let mut pipeline = AppPipeline::new(self.flags.into()); let mut available_output_tables: HashMap = HashMap::new(); diff --git a/dozer-cli/src/pipeline/tests/builder.rs b/dozer-cli/src/pipeline/tests/builder.rs index d973d97524..50b5833c93 100644 --- a/dozer-cli/src/pipeline/tests/builder.rs +++ b/dozer-cli/src/pipeline/tests/builder.rs @@ -7,6 +7,7 @@ use dozer_types::models::config::Config; use dozer_types::indicatif::MultiProgress; use dozer_types::models::connection::{Connection, ConnectionConfig}; +use dozer_types::models::flags::Flags; use dozer_types::models::source::Source; fn get_default_config() -> Config { @@ -66,6 +67,7 @@ fn load_multi_sources() { .map(|endpoint| (endpoint, None)) .collect(), MultiProgress::new(), + Flags::default(), ); let runtime = tokio::runtime::Builder::new_current_thread() diff --git a/dozer-cli/src/simple/executor.rs b/dozer-cli/src/simple/executor.rs index 9c8f78d32b..af6a0f621d 100644 --- a/dozer-cli/src/simple/executor.rs +++ b/dozer-cli/src/simple/executor.rs @@ -5,6 +5,7 @@ use dozer_cache::dozer_log::replication::Log; use dozer_core::checkpoint::{CheckpointFactory, CheckpointFactoryOptions}; use dozer_core::processor_record::ProcessorRecordStore; use dozer_types::models::api_endpoint::ApiEndpoint; +use dozer_types::models::flags::Flags; use dozer_types::parking_lot::Mutex; use tokio::runtime::Runtime; @@ -84,6 +85,7 @@ impl<'a> Executor<'a> { runtime: &Arc, executor_options: ExecutorOptions, shutdown: ShutdownReceiver, + flags: Flags, ) -> Result { let builder = PipelineBuilder::new( self.connections, @@ -94,6 +96,7 @@ impl<'a> Executor<'a> { .map(|(endpoint, log)| (endpoint.clone(), Some(log.log.clone()))) .collect(), self.multi_pb.clone(), + flags, ); let dag = builder.build(runtime, shutdown).await?; diff --git a/dozer-cli/src/simple/orchestrator.rs b/dozer-cli/src/simple/orchestrator.rs index 3c56ccfa86..bdb56b0337 100644 --- a/dozer-cli/src/simple/orchestrator.rs +++ b/dozer-cli/src/simple/orchestrator.rs @@ -188,6 +188,7 @@ impl SimpleOrchestrator { &self.runtime, get_executor_options(&self.config), shutdown.clone(), + self.config.flags.clone().unwrap_or_default(), ))?; let app_grpc_config = get_app_grpc_config(&self.config); @@ -289,6 +290,7 @@ impl SimpleOrchestrator { self.config.sql.as_deref(), endpoint_and_logs, self.multi_pb.clone(), + self.config.flags.clone().unwrap_or_default(), ); let dag = self .runtime @@ -374,7 +376,7 @@ impl SimpleOrchestrator { } pub fn validate_sql(sql: String) -> Result<(), PipelineError> { - statement_to_pipeline(&sql, &mut AppPipeline::new(), None).map_or_else( + statement_to_pipeline(&sql, &mut AppPipeline::new_with_default_flags(), None).map_or_else( |e| { error!( "[sql][{}] Transforms validation error: {}", diff --git a/dozer-cli/src/ui_helper.rs b/dozer-cli/src/ui_helper.rs index 228b76dfae..3a00e752ec 100644 --- a/dozer-cli/src/ui_helper.rs +++ b/dozer-cli/src/ui_helper.rs @@ -10,7 +10,7 @@ use dozer_core::{ use dozer_sql::pipeline::builder::{statement_to_pipeline, SchemaSQLContext}; use dozer_types::{ grpc_types::cloud::{QueryEdge, QueryGraph, QueryNode, QueryNodeType}, - models::{config::Config, connection::Connection, source::Source}, + models::{config::Config, connection::Connection, flags::Flags, source::Source}, }; use crate::{errors::OrchestrationError, pipeline::source_builder::SourceBuilder}; @@ -53,8 +53,9 @@ fn prepare_pipeline_dag( sql: String, connection_sources: HashMap>, connection_source_ports: HashMap<(&str, &str), u16>, + flags: Flags, ) -> Result, OrchestrationError> { - let mut pipeline = AppPipeline::new(); + let mut pipeline = AppPipeline::new(flags.into()); let mut asm: AppSourceManager = AppSourceManager::new(); connection_sources.iter().for_each(|cs| { @@ -169,6 +170,11 @@ pub fn config_to_ui_dag(config: Config) -> Result { processors: Vec<(NodeHandle, Box>)>, sinks: Vec<(NodeHandle, Box>)>, entry_points: Vec<(NodeHandle, PipelineEntryPoint)>, -} - -impl Default for AppPipeline { - fn default() -> Self { - Self::new() - } + flags: PipelineFlags, } impl AppPipeline { @@ -79,21 +75,58 @@ impl AppPipeline { self.edges.push(edge); } - pub fn new() -> Self { + pub fn new(flags: PipelineFlags) -> Self { Self { processors: Vec::new(), sinks: Vec::new(), edges: Vec::new(), entry_points: Vec::new(), + flags, } } + pub fn new_with_default_flags() -> Self { + Self::new(Default::default()) + } + pub fn get_entry_points_sources_names(&self) -> Vec { self.entry_points .iter() .map(|(_, p)| p.source_name().to_string()) .collect() } + + pub fn flags(&self) -> &PipelineFlags { + &self.flags + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct PipelineFlags { + pub enable_probabilistic_optimizations: EnableProbabilisticOptimizations, +} + +impl From<&Flags> for PipelineFlags { + fn from(flags: &Flags) -> Self { + Self { + enable_probabilistic_optimizations: flags + .enable_probabilistic_optimizations + .clone() + .unwrap_or_default(), + } + } +} + +impl From for PipelineFlags { + fn from(flags: Flags) -> Self { + Self::from(&flags) + } +} + +impl Default for PipelineFlags { + fn default() -> Self { + Flags::default().into() + } } pub struct App { diff --git a/dozer-core/src/tests/app.rs b/dozer-core/src/tests/app.rs index 4d31e9e95d..1e902cf2b0 100644 --- a/dozer-core/src/tests/app.rs +++ b/dozer-core/src/tests/app.rs @@ -174,7 +174,7 @@ async fn test_app_dag() { let mut app = App::new(asm); - let mut p1 = AppPipeline::new(); + let mut p1 = AppPipeline::new_with_default_flags(); p1.add_processor( Box::new(NoopJoinProcessorFactory {}), "join", @@ -197,7 +197,7 @@ async fn test_app_dag() { app.add_pipeline(p1); - let mut p2 = AppPipeline::new(); + let mut p2 = AppPipeline::new_with_default_flags(); p2.add_processor( Box::new(NoopJoinProcessorFactory {}), "join", diff --git a/dozer-sql/src/pipeline/aggregation/factory.rs b/dozer-sql/src/pipeline/aggregation/factory.rs index 65fb3c9c68..c4d58a652f 100644 --- a/dozer-sql/src/pipeline/aggregation/factory.rs +++ b/dozer-sql/src/pipeline/aggregation/factory.rs @@ -17,14 +17,21 @@ pub struct AggregationProcessorFactory { id: String, projection: Select, _stateful: bool, + enable_probabilistic_optimizations: bool, } impl AggregationProcessorFactory { - pub fn new(id: String, projection: Select, stateful: bool) -> Self { + pub fn new( + id: String, + projection: Select, + stateful: bool, + enable_probabilistic_optimizations: bool, + ) -> Self { Self { id, projection, _stateful: stateful, + enable_probabilistic_optimizations, } } @@ -90,6 +97,7 @@ impl ProcessorFactory for AggregationProcessorFactory { planner.having, input_schema.clone(), planner.post_aggregation_schema, + self.enable_probabilistic_optimizations, )?) }; Ok(processor) diff --git a/dozer-sql/src/pipeline/aggregation/processor.rs b/dozer-sql/src/pipeline/aggregation/processor.rs index 8cab82e9b8..4840386a70 100644 --- a/dozer-sql/src/pipeline/aggregation/processor.rs +++ b/dozer-sql/src/pipeline/aggregation/processor.rs @@ -1,6 +1,7 @@ #![allow(clippy::too_many_arguments)] use crate::pipeline::errors::PipelineError; +use crate::pipeline::utils::record_hashtable_key::{get_record_hash, RecordKey}; use crate::pipeline::{aggregation::aggregator::Aggregator, expression::execution::Expression}; use dozer_core::channels::ProcessorChannelForwarder; use dozer_core::executor_operation::ProcessorOperation; @@ -9,15 +10,13 @@ use dozer_core::processor_record::ProcessorRecordStore; use dozer_core::DEFAULT_PORT_HANDLE; use dozer_types::errors::internal::BoxedError; use dozer_types::types::{Field, FieldType, Operation, Record, Schema}; -use std::hash::{Hash, Hasher}; +use std::collections::HashMap; use crate::pipeline::aggregation::aggregator::{ get_aggregator_from_aggregator_type, get_aggregator_type_from_aggregation_expression, AggregatorEnum, AggregatorType, }; -use ahash::AHasher; use dozer_core::epoch::Epoch; -use hashbrown::HashMap; const DEFAULT_SEGMENT_KEY: &str = "DOZER_DEFAULT_SEGMENT_KEY"; @@ -56,9 +55,10 @@ pub struct AggregationProcessor { having: Option, input_schema: Schema, aggregation_schema: Schema, - states: HashMap, - default_segment_key: u64, + states: HashMap, + default_segment_key: RecordKey, having_eval_schema: Schema, + accurate_keys: bool, } enum AggregatorOperation { @@ -76,6 +76,7 @@ impl AggregationProcessor { having: Option, input_schema: Schema, aggregation_schema: Schema, + enable_probabilistic_optimizations: bool, ) -> Result { let mut aggr_types = Vec::new(); let mut aggr_measures = Vec::new(); @@ -89,12 +90,11 @@ impl AggregationProcessor { aggr_measures_ret_types.push(measure.get_type(&input_schema)?.return_type) } - let mut hasher = AHasher::default(); - DEFAULT_SEGMENT_KEY.hash(&mut hasher); - let mut having_eval_schema_fields = input_schema.fields.clone(); having_eval_schema_fields.extend(aggregation_schema.fields.clone()); + let accurate_keys = !enable_probabilistic_optimizations; + Ok(Self { _id: id, dimensions, @@ -106,11 +106,19 @@ impl AggregationProcessor { having, measures_types: aggr_types, measures_return_types: aggr_measures_ret_types, - default_segment_key: hasher.finish(), + default_segment_key: { + let fields = vec![Field::String(DEFAULT_SEGMENT_KEY.into())]; + if accurate_keys { + RecordKey::Accurate(fields) + } else { + RecordKey::Hash(get_record_hash(fields.iter())) + } + }, having_eval_schema: Schema { fields: having_eval_schema_fields, primary_index: vec![], }, + accurate_keys, }) } @@ -177,12 +185,13 @@ impl AggregationProcessor { let mut out_rec_insert: Vec = Vec::with_capacity(self.measures.len()); let key = if !self.dimensions.is_empty() { - get_key(&self.input_schema, old, &self.dimensions)? + Some(self.get_key(old)?) } else { - self.default_segment_key + None }; + let key = key.as_ref().unwrap_or(&self.default_segment_key); - let curr_state_opt = self.states.get_mut(&key); + let curr_state_opt = self.states.get_mut(key); assert!( curr_state_opt.is_some(), "Unable to find aggregator state during DELETE operation" @@ -220,7 +229,7 @@ impl AggregationProcessor { }; let res = if curr_state.count == 1 { - self.states.remove(&key); + self.states.remove(key); if out_rec_delete_having_satisfied { vec![Operation::Delete { old: Self::build_projection( @@ -256,9 +265,9 @@ impl AggregationProcessor { let mut out_rec_insert: Vec = Vec::with_capacity(self.measures.len()); let key = if !self.dimensions.is_empty() { - get_key(&self.input_schema, new, &self.dimensions)? + self.get_key(new)? } else { - self.default_segment_key + self.default_segment_key.clone() }; let curr_state = self.states.entry(key).or_insert(AggregationState::new( @@ -407,7 +416,7 @@ impl AggregationProcessor { &mut self, old: &mut Record, new: &mut Record, - key: u64, + key: RecordKey, ) -> Result, PipelineError> { let mut out_rec_delete: Vec = Vec::with_capacity(self.measures.len()); let mut out_rec_insert: Vec = Vec::with_capacity(self.measures.len()); @@ -519,12 +528,12 @@ impl AggregationProcessor { ref mut new, } => { let (old_record_hash, new_record_hash) = if self.dimensions.is_empty() { - (self.default_segment_key, self.default_segment_key) - } else { ( - get_key(&self.input_schema, old, &self.dimensions)?, - get_key(&self.input_schema, new, &self.dimensions)?, + self.default_segment_key.clone(), + self.default_segment_key.clone(), ) + } else { + (self.get_key(old)?, self.get_key(new)?) }; if old_record_hash == new_record_hash { @@ -538,21 +547,18 @@ impl AggregationProcessor { } } } -} -fn get_key( - schema: &Schema, - record: &Record, - dimensions: &[Expression], -) -> Result { - let mut key = Vec::::with_capacity(dimensions.len()); - for dimension in dimensions.iter() { - key.push(dimension.evaluate(record, schema)?); + fn get_key(&self, record: &Record) -> Result { + let mut key = Vec::::with_capacity(self.dimensions.len()); + for dimension in self.dimensions.iter() { + key.push(dimension.evaluate(record, &self.input_schema)?); + } + if self.accurate_keys { + Ok(RecordKey::Accurate(key)) + } else { + Ok(RecordKey::Hash(get_record_hash(key.iter()))) + } } - let mut hasher = AHasher::default(); - key.hash(&mut hasher); - let v = hasher.finish(); - Ok(v) } impl Processor for AggregationProcessor { diff --git a/dozer-sql/src/pipeline/aggregation/tests/aggregation_test_planner.rs b/dozer-sql/src/pipeline/aggregation/tests/aggregation_test_planner.rs index b14d0c63bb..061af211ed 100644 --- a/dozer-sql/src/pipeline/aggregation/tests/aggregation_test_planner.rs +++ b/dozer-sql/src/pipeline/aggregation/tests/aggregation_test_planner.rs @@ -84,6 +84,7 @@ fn test_planner_with_aggregator() { projection_planner.having, schema, projection_planner.post_aggregation_schema, + false, ) .unwrap(); diff --git a/dozer-sql/src/pipeline/aggregation/tests/aggregation_tests_utils.rs b/dozer-sql/src/pipeline/aggregation/tests/aggregation_tests_utils.rs index 2e1f65ff08..8b82f3363c 100644 --- a/dozer-sql/src/pipeline/aggregation/tests/aggregation_tests_utils.rs +++ b/dozer-sql/src/pipeline/aggregation/tests/aggregation_tests_utils.rs @@ -36,6 +36,7 @@ pub(crate) fn init_processor( projection_planner.having, input_schema.clone(), projection_planner.post_aggregation_schema, + false, ) .unwrap_or_else(|e| panic!("{}", e.to_string())); diff --git a/dozer-sql/src/pipeline/builder.rs b/dozer-sql/src/pipeline/builder.rs index cafe737f9c..dae1923917 100644 --- a/dozer-sql/src/pipeline/builder.rs +++ b/dozer-sql/src/pipeline/builder.rs @@ -281,8 +281,15 @@ fn select_to_pipeline( } } - let aggregation = - AggregationProcessorFactory::new(gen_agg_name.clone(), select.clone(), stateful); + let aggregation = AggregationProcessorFactory::new( + gen_agg_name.clone(), + select.clone(), + stateful, + pipeline + .flags() + .enable_probabilistic_optimizations + .in_aggregations, + ); pipeline.add_processor(Box::new(aggregation), &gen_agg_name, vec![]); @@ -468,7 +475,11 @@ fn set_to_pipeline( gen_set_name = table_info.override_name.to_owned().unwrap(); } - let set_proc_fac = SetProcessorFactory::new(gen_set_name.clone(), set_quantifier); + let set_proc_fac = SetProcessorFactory::new( + gen_set_name.clone(), + set_quantifier, + pipeline.flags().enable_probabilistic_optimizations.in_sets, + ); pipeline.add_processor(Box::new(set_proc_fac), &gen_set_name, vec![]); @@ -629,7 +640,7 @@ mod tests { #[should_panic] fn disallow_zero_outgoing_ndes() { let sql = "select * from film"; - statement_to_pipeline(sql, &mut AppPipeline::new(), None).unwrap(); + statement_to_pipeline(sql, &mut AppPipeline::new_with_default_flags(), None).unwrap(); } #[test] fn parse_sql_pipeline() { @@ -688,7 +699,8 @@ mod tests { from stocks join tbl on tbl.id = stocks.id; "#; - let context = statement_to_pipeline(sql, &mut AppPipeline::new(), None).unwrap(); + let context = + statement_to_pipeline(sql, &mut AppPipeline::new_with_default_flags(), None).unwrap(); // Should create as many output tables as into statements let mut output_keys = context.output_tables_map.keys().collect::>(); @@ -709,7 +721,7 @@ mod tests { #[test] fn test_missing_into_in_simple_from_clause() { let sql = r#"SELECT a FROM B "#; - let result = statement_to_pipeline(sql, &mut AppPipeline::new(), None); + let result = statement_to_pipeline(sql, &mut AppPipeline::new_with_default_flags(), None); //check if the result is an error assert!(matches!(result, Err(PipelineError::MissingIntoClause))) } @@ -717,7 +729,7 @@ fn test_missing_into_in_simple_from_clause() { #[test] fn test_correct_into_clause() { let sql = r#"SELECT a INTO C FROM B"#; - let result = statement_to_pipeline(sql, &mut AppPipeline::new(), None); + let result = statement_to_pipeline(sql, &mut AppPipeline::new_with_default_flags(), None); //check if the result is ok assert!(result.is_ok()); } @@ -725,7 +737,7 @@ fn test_correct_into_clause() { #[test] fn test_missing_into_in_nested_from_clause() { let sql = r#"SELECT a FROM (SELECT a from b)"#; - let result = statement_to_pipeline(sql, &mut AppPipeline::new(), None); + let result = statement_to_pipeline(sql, &mut AppPipeline::new_with_default_flags(), None); //check if the result is an error assert!(matches!(result, Err(PipelineError::MissingIntoClause))) } @@ -733,7 +745,7 @@ fn test_missing_into_in_nested_from_clause() { #[test] fn test_correct_into_in_nested_from() { let sql = r#"SELECT a INTO c FROM (SELECT a from b)"#; - let result = statement_to_pipeline(sql, &mut AppPipeline::new(), None); + let result = statement_to_pipeline(sql, &mut AppPipeline::new_with_default_flags(), None); //check if the result is ok assert!(result.is_ok()); } @@ -743,7 +755,7 @@ fn test_missing_into_in_with_clause() { let sql = r#"WITH tbl as (select a from B) select B from tbl;"#; - let result = statement_to_pipeline(sql, &mut AppPipeline::new(), None); + let result = statement_to_pipeline(sql, &mut AppPipeline::new_with_default_flags(), None); //check if the result is an error assert!(matches!(result, Err(PipelineError::MissingIntoClause))) } @@ -754,7 +766,7 @@ fn test_correct_into_in_with_clause() { select B into C from tbl;"#; - let result = statement_to_pipeline(sql, &mut AppPipeline::new(), None); + let result = statement_to_pipeline(sql, &mut AppPipeline::new_with_default_flags(), None); //check if the result is ok assert!(result.is_ok()); } diff --git a/dozer-sql/src/pipeline/mod.rs b/dozer-sql/src/pipeline/mod.rs index d807f13894..e7b5bada0a 100644 --- a/dozer-sql/src/pipeline/mod.rs +++ b/dozer-sql/src/pipeline/mod.rs @@ -8,6 +8,7 @@ mod product; mod projection; mod selection; mod table_operator; +mod utils; mod window; #[cfg(test)] diff --git a/dozer-sql/src/pipeline/pipeline_builder/join_builder.rs b/dozer-sql/src/pipeline/pipeline_builder/join_builder.rs index 260666d3f0..eb4f5042dc 100644 --- a/dozer-sql/src/pipeline/pipeline_builder/join_builder.rs +++ b/dozer-sql/src/pipeline/pipeline_builder/join_builder.rs @@ -55,6 +55,7 @@ pub(crate) fn insert_join_to_pipeline( left_name_or_alias.clone(), right_name_or_alias, join.join_operator.clone(), + pipeline.flags().enable_probabilistic_optimizations.in_joins, ); let mut pipeline_entry_points = vec![]; diff --git a/dozer-sql/src/pipeline/product/join/factory.rs b/dozer-sql/src/pipeline/product/join/factory.rs index 72360680a1..95b47966a0 100644 --- a/dozer-sql/src/pipeline/product/join/factory.rs +++ b/dozer-sql/src/pipeline/product/join/factory.rs @@ -32,6 +32,7 @@ pub struct JoinProcessorFactory { left: Option, right: Option, join_operator: SqlJoinOperator, + enable_probabilistic_optimizations: bool, } impl JoinProcessorFactory { @@ -40,12 +41,14 @@ impl JoinProcessorFactory { left: Option, right: Option, join_operator: SqlJoinOperator, + enable_probabilistic_optimizations: bool, ) -> Self { Self { id, left, right, join_operator, + enable_probabilistic_optimizations, } } } @@ -180,12 +183,10 @@ impl ProcessorFactory for JoinProcessorFactory { let join_operator = JoinOperator::new( join_type, - left_join_key_indexes, - right_join_key_indexes, - left_primary_key_indexes, - right_primary_key_indexes, - left_default_record, - right_default_record, + (left_join_key_indexes, right_join_key_indexes), + (left_primary_key_indexes, right_primary_key_indexes), + (left_default_record, right_default_record), + self.enable_probabilistic_optimizations, ); Ok(Box::new(ProductProcessor::new( diff --git a/dozer-sql/src/pipeline/product/join/operator.rs b/dozer-sql/src/pipeline/product/join/operator.rs index 9776dbc477..2723d815d9 100644 --- a/dozer-sql/src/pipeline/product/join/operator.rs +++ b/dozer-sql/src/pipeline/product/join/operator.rs @@ -1,15 +1,11 @@ -use ahash::AHasher; +use crate::pipeline::utils::record_hashtable_key::{get_record_hash, RecordKey}; use dozer_core::processor_record::{ProcessorRecord, ProcessorRecordStore}; use dozer_types::{ chrono, - types::{Lifetime, Record, Timestamp}, + types::{Field, Lifetime, Record, Timestamp}, }; use linked_hash_map::LinkedHashMap; -use std::{ - collections::HashMap, - fmt::Debug, - hash::{Hash, Hasher}, -}; +use std::{collections::HashMap, fmt::Debug}; use crate::pipeline::errors::JoinError; @@ -39,7 +35,8 @@ pub enum JoinAction { Delete, } -type IndexKey = (u64, u64); // (join_key, primary_key) +type JoinKey = RecordKey; +type IndexKey = (JoinKey, u64); // (join_key, primary_key) #[derive(Debug, Clone)] pub struct JoinOperator { @@ -54,23 +51,24 @@ pub struct JoinOperator { left_default_record: ProcessorRecord, right_default_record: ProcessorRecord, - left_map: HashMap>>, - right_map: HashMap>>, + left_map: HashMap>>, + right_map: HashMap>>, left_lifetime_map: LinkedHashMap>, right_lifetime_map: LinkedHashMap>, + + accurate_keys: bool, } impl JoinOperator { pub fn new( join_type: JoinType, - left_join_key_indexes: Vec, - right_join_key_indexes: Vec, - left_primary_key_indexes: Vec, - right_primary_key_indexes: Vec, - left_default_record: ProcessorRecord, - right_default_record: ProcessorRecord, + (left_join_key_indexes, right_join_key_indexes): (Vec, Vec), + (left_primary_key_indexes, right_primary_key_indexes): (Vec, Vec), + (left_default_record, right_default_record): (ProcessorRecord, ProcessorRecord), + enable_probabilistic_optimizations: bool, ) -> Self { + let accurate_keys = !enable_probabilistic_optimizations; Self { join_type, left_join_key_indexes, @@ -83,6 +81,7 @@ impl JoinOperator { right_map: HashMap::new(), left_lifetime_map: LinkedHashMap::new(), right_lifetime_map: LinkedHashMap::new(), + accurate_keys, } } @@ -97,7 +96,7 @@ impl JoinOperator { fn inner_join_from_left( &self, action: &JoinAction, - join_key: u64, + join_key: &JoinKey, left_record: ProcessorRecord, ) -> JoinResult> { let right_records = get_join_records(&self.right_map, join_key); @@ -118,7 +117,7 @@ impl JoinOperator { fn inner_join_from_right( &self, action: &JoinAction, - join_key: u64, + join_key: &JoinKey, right_record: ProcessorRecord, ) -> JoinResult> { let left_records = get_join_records(&self.left_map, join_key); @@ -139,7 +138,7 @@ impl JoinOperator { fn left_join_from_left( &self, action: &JoinAction, - join_key: u64, + join_key: &JoinKey, left_record: ProcessorRecord, ) -> JoinResult> { let right_records = get_join_records(&self.right_map, join_key); @@ -166,7 +165,7 @@ impl JoinOperator { fn left_join_from_right( &self, action: &JoinAction, - join_key: u64, + join_key: &JoinKey, record_store: &ProcessorRecordStore, right_record: ProcessorRecord, ) -> JoinResult> { @@ -215,7 +214,7 @@ impl JoinOperator { fn right_join_from_left( &self, action: &JoinAction, - join_key: u64, + join_key: &JoinKey, record_store: &ProcessorRecordStore, left_record: ProcessorRecord, ) -> JoinResult> { @@ -264,7 +263,7 @@ impl JoinOperator { fn right_join_from_right( &self, action: &JoinAction, - join_key: u64, + join_key: &JoinKey, right_record: ProcessorRecord, ) -> JoinResult> { let left_records = get_join_records(&self.left_map, join_key); @@ -289,9 +288,9 @@ impl JoinOperator { } fn get_left_matching_count(&self, action: &JoinAction, record: &Record) -> JoinResult { - let join_key = get_record_key(record, &self.right_join_key_indexes); + let join_key = self.get_join_key(record, &self.right_join_key_indexes); - let mut matching_count = get_join_records(&self.left_map, join_key).len(); + let mut matching_count = get_join_records(&self.left_map, &join_key).len(); if action == &JoinAction::Insert { matching_count -= 1; } @@ -299,9 +298,9 @@ impl JoinOperator { } fn get_right_matching_count(&self, action: &JoinAction, record: &Record) -> JoinResult { - let join_key = get_record_key(record, &self.left_join_key_indexes); + let join_key = self.get_join_key(record, &self.left_join_key_indexes); - let mut matching_count = get_join_records(&self.right_map, join_key).len(); + let mut matching_count = get_join_records(&self.right_map, &join_key).len(); if action == &JoinAction::Insert { matching_count -= 1; } @@ -319,7 +318,7 @@ impl JoinOperator { if eviction_instant <= now { old_instants.push(*eviction_instant); for (join_key, primary_key) in join_index_keys { - evict_join_record(join_index, *join_key, *primary_key); + evict_join_record(join_index, join_key, *primary_key); } } else { break; @@ -332,7 +331,7 @@ impl JoinOperator { &mut self, from_branch: &JoinBranch, lifetime: Lifetime, - join_key: u64, + join_key: JoinKey, primary_key: u64, ) -> JoinResult<()> { let eviction_index = match from_branch { @@ -383,75 +382,75 @@ impl JoinOperator { ) -> JoinResult> { match (&self.join_type, from) { (JoinType::Inner, JoinBranch::Left) => { - let join_key = get_record_key(&old_decoded, &self.left_join_key_indexes); + let join_key = self.get_join_key(&old_decoded, &self.left_join_key_indexes); remove_join_record( &mut self.left_map, &self.left_primary_key_indexes, - join_key, + &join_key, &old_decoded, ); - let records = self.inner_join_from_left(&JoinAction::Delete, join_key, old)?; + let records = self.inner_join_from_left(&JoinAction::Delete, &join_key, old)?; Ok(records) } (JoinType::Inner, JoinBranch::Right) => { - let join_key = get_record_key(&old_decoded, &self.right_join_key_indexes); + let join_key = self.get_join_key(&old_decoded, &self.right_join_key_indexes); remove_join_record( &mut self.right_map, &self.right_primary_key_indexes, - join_key, + &join_key, &old_decoded, ); - let records = self.inner_join_from_right(&JoinAction::Delete, join_key, old)?; + let records = self.inner_join_from_right(&JoinAction::Delete, &join_key, old)?; Ok(records) } (JoinType::LeftOuter, JoinBranch::Left) => { - let join_key = get_record_key(&old_decoded, &self.left_join_key_indexes); + let join_key = self.get_join_key(&old_decoded, &self.left_join_key_indexes); remove_join_record( &mut self.left_map, &self.left_primary_key_indexes, - join_key, + &join_key, &old_decoded, ); - let records = self.left_join_from_left(&JoinAction::Delete, join_key, old)?; + let records = self.left_join_from_left(&JoinAction::Delete, &join_key, old)?; Ok(records) } (JoinType::LeftOuter, JoinBranch::Right) => { - let join_key = get_record_key(&old_decoded, &self.right_join_key_indexes); + let join_key = self.get_join_key(&old_decoded, &self.right_join_key_indexes); remove_join_record( &mut self.right_map, &self.right_primary_key_indexes, - join_key, + &join_key, &old_decoded, ); let records = - self.left_join_from_right(&JoinAction::Delete, join_key, record_store, old)?; + self.left_join_from_right(&JoinAction::Delete, &join_key, record_store, old)?; Ok(records) } (JoinType::RightOuter, JoinBranch::Left) => { - let join_key = get_record_key(&old_decoded, &self.left_join_key_indexes); + let join_key = self.get_join_key(&old_decoded, &self.left_join_key_indexes); remove_join_record( &mut self.left_map, &self.left_primary_key_indexes, - join_key, + &join_key, &old_decoded, ); let records = - self.right_join_from_left(&JoinAction::Delete, join_key, record_store, old)?; + self.right_join_from_left(&JoinAction::Delete, &join_key, record_store, old)?; Ok(records) } (JoinType::RightOuter, JoinBranch::Right) => { - let join_key = get_record_key(&old_decoded, &self.right_join_key_indexes); + let join_key = self.get_join_key(&old_decoded, &self.right_join_key_indexes); remove_join_record( &mut self.right_map, &self.right_primary_key_indexes, - join_key, + &join_key, &old_decoded, ); - let records = self.right_join_from_right(&JoinAction::Delete, join_key, old)?; + let records = self.right_join_from_right(&JoinAction::Delete, &join_key, old)?; Ok(records) } } @@ -466,97 +465,108 @@ impl JoinOperator { ) -> JoinResult> { match (&self.join_type, from) { (JoinType::Inner, JoinBranch::Left) => { - let join_key = get_record_key(&new_decoded, &self.left_join_key_indexes); - let primary_key = get_record_key(&new_decoded, &self.left_primary_key_indexes); + let join_key = self.get_join_key(&new_decoded, &self.left_join_key_indexes); + let primary_key = get_record_key_hash(&new_decoded, &self.left_primary_key_indexes); - add_join_record(&mut self.left_map, join_key, primary_key, &new); + add_join_record(&mut self.left_map, join_key.clone(), primary_key, &new); if let Some(lifetime) = new.get_lifetime() { - self.insert_evict_index(from, lifetime, join_key, primary_key)? + self.insert_evict_index(from, lifetime, join_key.clone(), primary_key)? } - let records = self.inner_join_from_left(&JoinAction::Insert, join_key, new)?; + let records = self.inner_join_from_left(&JoinAction::Insert, &join_key, new)?; Ok(records) } (JoinType::Inner, JoinBranch::Right) => { - let join_key = get_record_key(&new_decoded, &self.right_join_key_indexes); - let primary_key = get_record_key(&new_decoded, &self.right_primary_key_indexes); + let join_key = self.get_join_key(&new_decoded, &self.right_join_key_indexes); + let primary_key = + get_record_key_hash(&new_decoded, &self.right_primary_key_indexes); - add_join_record(&mut self.right_map, join_key, primary_key, &new); + add_join_record(&mut self.right_map, join_key.clone(), primary_key, &new); if let Some(lifetime) = new.get_lifetime() { - self.insert_evict_index(from, lifetime, join_key, primary_key)? + self.insert_evict_index(from, lifetime, join_key.clone(), primary_key)? } - let records = self.inner_join_from_right(&JoinAction::Insert, join_key, new)?; + let records = self.inner_join_from_right(&JoinAction::Insert, &join_key, new)?; Ok(records) } (JoinType::LeftOuter, JoinBranch::Left) => { - let join_key = get_record_key(&new_decoded, &self.left_join_key_indexes); - let primary_key = get_record_key(&new_decoded, &self.left_primary_key_indexes); + let join_key = self.get_join_key(&new_decoded, &self.left_join_key_indexes); + let primary_key = get_record_key_hash(&new_decoded, &self.left_primary_key_indexes); - add_join_record(&mut self.left_map, join_key, primary_key, &new); + add_join_record(&mut self.left_map, join_key.clone(), primary_key, &new); if let Some(lifetime) = new.get_lifetime() { - self.insert_evict_index(from, lifetime, join_key, primary_key)? + self.insert_evict_index(from, lifetime, join_key.clone(), primary_key)? } - let records = self.left_join_from_left(&JoinAction::Insert, join_key, new)?; + let records = self.left_join_from_left(&JoinAction::Insert, &join_key, new)?; Ok(records) } (JoinType::LeftOuter, JoinBranch::Right) => { - let join_key = get_record_key(&new_decoded, &self.right_join_key_indexes); - let primary_key = get_record_key(&new_decoded, &self.right_primary_key_indexes); + let join_key = self.get_join_key(&new_decoded, &self.right_join_key_indexes); + let primary_key = + get_record_key_hash(&new_decoded, &self.right_primary_key_indexes); - add_join_record(&mut self.right_map, join_key, primary_key, &new); + add_join_record(&mut self.right_map, join_key.clone(), primary_key, &new); if let Some(lifetime) = new.get_lifetime() { - self.insert_evict_index(from, lifetime, join_key, primary_key)? + self.insert_evict_index(from, lifetime, join_key.clone(), primary_key)? } let records = - self.left_join_from_right(&JoinAction::Insert, join_key, record_store, new)?; + self.left_join_from_right(&JoinAction::Insert, &join_key, record_store, new)?; Ok(records) } (JoinType::RightOuter, JoinBranch::Left) => { - let join_key = get_record_key(&new_decoded, &self.left_join_key_indexes); - let primary_key = get_record_key(&new_decoded, &self.left_primary_key_indexes); + let join_key = self.get_join_key(&new_decoded, &self.left_join_key_indexes); + let primary_key = get_record_key_hash(&new_decoded, &self.left_primary_key_indexes); - add_join_record(&mut self.left_map, join_key, primary_key, &new); + add_join_record(&mut self.left_map, join_key.clone(), primary_key, &new); if let Some(lifetime) = new.get_lifetime() { - self.insert_evict_index(from, lifetime, join_key, primary_key)? + self.insert_evict_index(from, lifetime, join_key.clone(), primary_key)? } let records = - self.right_join_from_left(&JoinAction::Insert, join_key, record_store, new)?; + self.right_join_from_left(&JoinAction::Insert, &join_key, record_store, new)?; Ok(records) } (JoinType::RightOuter, JoinBranch::Right) => { - let join_key = get_record_key(&new_decoded, &self.right_join_key_indexes); - let primary_key = get_record_key(&new_decoded, &self.right_primary_key_indexes); + let join_key = self.get_join_key(&new_decoded, &self.right_join_key_indexes); + let primary_key = + get_record_key_hash(&new_decoded, &self.right_primary_key_indexes); - add_join_record(&mut self.right_map, join_key, primary_key, &new); + add_join_record(&mut self.right_map, join_key.clone(), primary_key, &new); if let Some(lifetime) = new.get_lifetime() { - self.insert_evict_index(from, lifetime, join_key, primary_key)? + self.insert_evict_index(from, lifetime, join_key.clone(), primary_key)? } - let records = self.right_join_from_right(&JoinAction::Insert, join_key, new)?; + let records = self.right_join_from_right(&JoinAction::Insert, &join_key, new)?; Ok(records) } } } + + fn get_join_key(&self, record: &Record, key_indexes: &[usize]) -> JoinKey { + if self.accurate_keys { + JoinKey::Accurate(get_record_key_fields(record, key_indexes)) + } else { + JoinKey::Hash(get_record_key_hash(record, key_indexes)) + } + } } fn add_join_record( - join_map: &mut HashMap>>, - join_key: u64, + join_map: &mut HashMap>>, + join_key: JoinKey, record_key: u64, record: &ProcessorRecord, ) { @@ -574,13 +584,13 @@ fn add_join_record( } fn remove_join_record( - join_map: &mut HashMap>>, + join_map: &mut HashMap>>, primary_key_indexes: &[usize], - join_key: u64, + join_key: &JoinKey, record: &Record, ) { - if let Some(record_map) = join_map.get_mut(&join_key) { - let record_key = get_record_key(record, primary_key_indexes); + if let Some(record_map) = join_map.get_mut(join_key) { + let record_key = get_record_key_hash(record, primary_key_indexes); if let Some(record_vec) = record_map.get_mut(&record_key) { record_vec.pop(); } @@ -588,31 +598,34 @@ fn remove_join_record( } fn evict_join_record( - join_map: &mut HashMap>>, - join_key: u64, + join_map: &mut HashMap>>, + join_key: &JoinKey, primary_key: u64, ) { - if let Some(record_map) = join_map.get_mut(&join_key) { + if let Some(record_map) = join_map.get_mut(join_key) { if let Some(record_vec) = record_map.get_mut(&primary_key) { record_vec.pop(); } } } -fn get_record_key(record: &Record, key_indexes: &[usize]) -> u64 { - let mut hasher = AHasher::default(); - for index in key_indexes.iter() { - let val = &record.values[*index]; - val.hash(&mut hasher); - } - hasher.finish() +fn get_record_key_hash(record: &Record, key_indexes: &[usize]) -> u64 { + let key_fields = key_indexes.iter().map(|i| &record.values[*i]); + get_record_hash(key_fields) +} + +fn get_record_key_fields(record: &Record, key_indexes: &[usize]) -> Vec { + key_indexes + .iter() + .map(|i| record.values[*i].clone()) + .collect() } fn get_join_records( - join_map: &HashMap>>, - join_key: u64, + join_map: &HashMap>>, + join_key: &JoinKey, ) -> Vec { - let join_map = join_map.get(&join_key); + let join_map = join_map.get(join_key); if let Some(records_map) = join_map { records_map.values().flatten().cloned().collect() diff --git a/dozer-sql/src/pipeline/product/set/mod.rs b/dozer-sql/src/pipeline/product/set/mod.rs index 9debac1051..9da13861bf 100644 --- a/dozer-sql/src/pipeline/product/set/mod.rs +++ b/dozer-sql/src/pipeline/product/set/mod.rs @@ -2,3 +2,4 @@ pub mod set_factory; mod set_processor; pub(crate) mod operator; +pub(crate) mod record_map; diff --git a/dozer-sql/src/pipeline/product/set/operator.rs b/dozer-sql/src/pipeline/product/set/operator.rs index 6f7bfb679e..861ed47dde 100644 --- a/dozer-sql/src/pipeline/product/set/operator.rs +++ b/dozer-sql/src/pipeline/product/set/operator.rs @@ -1,5 +1,5 @@ +use super::record_map::{CountingRecordMap, CountingRecordMapEnum}; use crate::pipeline::errors::PipelineError; -use bloom::{CountingBloomFilter, ASMS}; use dozer_core::processor_record::ProcessorRecord; use sqlparser::ast::{SetOperator, SetQuantifier}; @@ -28,7 +28,7 @@ impl SetOperation { &self, action: SetAction, record: ProcessorRecord, - record_map: &mut CountingBloomFilter, + record_map: &mut CountingRecordMapEnum, ) -> Result, PipelineError> { match (self.op, self.quantifier) { (SetOperator::Union, SetQuantifier::All) => Ok(vec![(action, record)]), @@ -43,7 +43,7 @@ impl SetOperation { &self, action: SetAction, record: ProcessorRecord, - record_map: &mut CountingBloomFilter, + record_map: &mut CountingRecordMapEnum, ) -> Result, PipelineError> { match action { SetAction::Insert => self.union_insert(action, record, record_map), @@ -55,7 +55,7 @@ impl SetOperation { &self, action: SetAction, record: ProcessorRecord, - record_map: &mut CountingBloomFilter, + record_map: &mut CountingRecordMapEnum, ) -> Result, PipelineError> { let _count = self.update_map(record.clone(), false, record_map); if _count == 1 { @@ -69,7 +69,7 @@ impl SetOperation { &self, action: SetAction, record: ProcessorRecord, - record_map: &mut CountingBloomFilter, + record_map: &mut CountingRecordMapEnum, ) -> Result, PipelineError> { let _count = self.update_map(record.clone(), true, record_map); if _count == 0 { @@ -83,8 +83,8 @@ impl SetOperation { &self, record: ProcessorRecord, decr: bool, - record_map: &mut CountingBloomFilter, - ) -> u32 { + record_map: &mut CountingRecordMapEnum, + ) -> u64 { if decr { record_map.remove(&record); } else { diff --git a/dozer-sql/src/pipeline/product/set/record_map.rs b/dozer-sql/src/pipeline/product/set/record_map.rs new file mode 100644 index 0000000000..afa1e6fc11 --- /dev/null +++ b/dozer-sql/src/pipeline/product/set/record_map.rs @@ -0,0 +1,158 @@ +use bloom::{CountingBloomFilter, ASMS}; +use dozer_core::processor_record::ProcessorRecord; +use enum_dispatch::enum_dispatch; +use std::collections::HashMap; + +#[enum_dispatch(CountingRecordMap)] +pub enum CountingRecordMapEnum { + AccurateCountingRecordMap, + ProbabilisticCountingRecordMap, +} + +#[enum_dispatch] +pub trait CountingRecordMap { + /// Inserts a record, or increases its insertion count if it already exixts in the map. + fn insert(&mut self, record: &ProcessorRecord); + + /// Decreases the insertion count of a record, and removes it if the count reaches zero. + fn remove(&mut self, record: &ProcessorRecord); + + /// Returns an estimate of the number of times this record has been inserted into the filter. + /// Depending on the implementation, this number may not be accurate. + fn estimate_count(&self, record: &ProcessorRecord) -> u64; + + /// Clears the map, removing all records. + fn clear(&mut self); +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct AccurateCountingRecordMap { + map: HashMap, +} + +impl AccurateCountingRecordMap { + pub fn new() -> Self { + Self { + map: HashMap::new(), + } + } +} + +impl CountingRecordMap for AccurateCountingRecordMap { + fn insert(&mut self, record: &ProcessorRecord) { + let count = self.map.entry(record.clone()).or_insert(0); + if *count < u64::max_value() { + *count += 1; + } + } + + fn remove(&mut self, record: &ProcessorRecord) { + if let Some(count) = self.map.get_mut(record) { + *count -= 1; + if *count == 0 { + self.map.remove(record); + } + } + } + + fn estimate_count(&self, record: &ProcessorRecord) -> u64 { + self.map.get(record).copied().unwrap_or(0) + } + + fn clear(&mut self) { + self.map.clear(); + } +} + +pub struct ProbabilisticCountingRecordMap { + map: CountingBloomFilter, +} + +impl ProbabilisticCountingRecordMap { + const BITS_PER_ENTRY: usize = 8; + const FALSE_POSITIVE_RATE: f32 = 0.01; + const EXPECTED_NUM_ITEMS: u32 = 10000000; + + pub fn new() -> Self { + Self { + map: CountingBloomFilter::with_rate( + Self::BITS_PER_ENTRY, + Self::FALSE_POSITIVE_RATE, + Self::EXPECTED_NUM_ITEMS, + ), + } + } +} + +impl CountingRecordMap for ProbabilisticCountingRecordMap { + fn insert(&mut self, record: &ProcessorRecord) { + self.map.insert(record); + } + + fn remove(&mut self, record: &ProcessorRecord) { + self.map.remove(record); + } + + fn estimate_count(&self, record: &ProcessorRecord) -> u64 { + self.map.estimate_count(record) as u64 + } + + fn clear(&mut self) { + self.map.clear(); + } +} + +#[cfg(test)] +mod tests { + use dozer_core::processor_record::{ProcessorRecord, ProcessorRecordStore}; + use dozer_types::types::{Field, Record}; + + use super::{ + AccurateCountingRecordMap, CountingRecordMap, CountingRecordMapEnum, + ProbabilisticCountingRecordMap, + }; + + fn test_map(mut map: CountingRecordMapEnum) { + let record_store = ProcessorRecordStore::new().unwrap(); + let make_record = |fields: Vec| -> ProcessorRecord { + record_store.create_record(&Record::new(fields)).unwrap() + }; + + let a = make_record(vec![Field::String('a'.into())]); + let b = make_record(vec![Field::String('b'.into())]); + + assert_eq!(map.estimate_count(&a), 0); + assert_eq!(map.estimate_count(&b), 0); + + map.insert(&a); + map.insert(&b); + assert_eq!(map.estimate_count(&a), 1); + assert_eq!(map.estimate_count(&b), 1); + + map.insert(&b); + map.insert(&b); + assert_eq!(map.estimate_count(&a), 1); + assert_eq!(map.estimate_count(&b), 3); + + map.remove(&b); + assert_eq!(map.estimate_count(&a), 1); + assert_eq!(map.estimate_count(&b), 2); + + map.remove(&a); + assert_eq!(map.estimate_count(&a), 0); + assert_eq!(map.estimate_count(&b), 2); + + map.clear(); + assert_eq!(map.estimate_count(&a), 0); + assert_eq!(map.estimate_count(&b), 0); + } + + #[test] + fn test_maps() { + let accurate_map = AccurateCountingRecordMap::new().into(); + test_map(accurate_map); + + let probabilistic_map = ProbabilisticCountingRecordMap::new().into(); + test_map(probabilistic_map); + } +} diff --git a/dozer-sql/src/pipeline/product/set/set_factory.rs b/dozer-sql/src/pipeline/product/set/set_factory.rs index 6d3952f7b4..f02c930e15 100644 --- a/dozer-sql/src/pipeline/product/set/set_factory.rs +++ b/dozer-sql/src/pipeline/product/set/set_factory.rs @@ -20,12 +20,21 @@ use super::set_processor::SetProcessor; pub struct SetProcessorFactory { id: String, set_quantifier: SetQuantifier, + enable_probabilistic_optimizations: bool, } impl SetProcessorFactory { /// Creates a new [`FromProcessorFactory`]. - pub fn new(id: String, set_quantifier: SetQuantifier) -> Self { - Self { id, set_quantifier } + pub fn new( + id: String, + set_quantifier: SetQuantifier, + enable_probabilistic_optimizations: bool, + ) -> Self { + Self { + id, + set_quantifier, + enable_probabilistic_optimizations, + } } } @@ -81,6 +90,7 @@ impl ProcessorFactory for SetProcessorFactory { op: SetOperator::Union, quantifier: self.set_quantifier, }, + self.enable_probabilistic_optimizations, )?)) } } diff --git a/dozer-sql/src/pipeline/product/set/set_processor.rs b/dozer-sql/src/pipeline/product/set/set_processor.rs index a906520f6c..3bdfa1a15f 100644 --- a/dozer-sql/src/pipeline/product/set/set_processor.rs +++ b/dozer-sql/src/pipeline/product/set/set_processor.rs @@ -1,5 +1,8 @@ +use super::operator::{SetAction, SetOperation}; +use super::record_map::{ + AccurateCountingRecordMap, CountingRecordMapEnum, ProbabilisticCountingRecordMap, +}; use crate::pipeline::errors::{PipelineError, ProductError}; -use bloom::CountingBloomFilter; use dozer_core::channels::ProcessorChannelForwarder; use dozer_core::epoch::Epoch; use dozer_core::executor_operation::ProcessorOperation; @@ -10,32 +13,30 @@ use dozer_types::errors::internal::BoxedError; use std::collections::hash_map::RandomState; use std::fmt::{Debug, Formatter}; -use super::operator::{SetAction, SetOperation}; - pub struct SetProcessor { _id: String, /// Set operations operator: SetOperation, /// Hashmap containing records with its occurrence - record_map: CountingBloomFilter, + record_map: CountingRecordMapEnum, } -const BITS_PER_ENTRY: usize = 8; -const FALSE_POSITIVE_RATE: f32 = 0.01; -const EXPECTED_NUM_ITEMS: u32 = 10000000; - impl SetProcessor { /// Creates a new [`SetProcessor`]. - pub fn new(id: String, operator: SetOperation) -> Result { + pub fn new( + id: String, + operator: SetOperation, + enable_probabilistic_optimizations: bool, + ) -> Result { let _s = RandomState::new(); Ok(Self { _id: id, operator, - record_map: CountingBloomFilter::with_rate( - BITS_PER_ENTRY, - FALSE_POSITIVE_RATE, - EXPECTED_NUM_ITEMS, - ), + record_map: if enable_probabilistic_optimizations { + ProbabilisticCountingRecordMap::new().into() + } else { + AccurateCountingRecordMap::new().into() + }, }) } diff --git a/dozer-sql/src/pipeline/tests/builder_test.rs b/dozer-sql/src/pipeline/tests/builder_test.rs index 53913560a4..923e829aaa 100644 --- a/dozer-sql/src/pipeline/tests/builder_test.rs +++ b/dozer-sql/src/pipeline/tests/builder_test.rs @@ -190,7 +190,7 @@ impl Sink for TestSink { #[tokio::test] async fn test_pipeline_builder() { - let mut pipeline = AppPipeline::new(); + let mut pipeline = AppPipeline::new_with_default_flags(); let context = statement_to_pipeline( "SELECT COUNT(Spending), users.Country \ FROM users \ diff --git a/dozer-sql/src/pipeline/utils/mod.rs b/dozer-sql/src/pipeline/utils/mod.rs new file mode 100644 index 0000000000..488633d492 --- /dev/null +++ b/dozer-sql/src/pipeline/utils/mod.rs @@ -0,0 +1 @@ +pub mod record_hashtable_key; diff --git a/dozer-sql/src/pipeline/utils/record_hashtable_key.rs b/dozer-sql/src/pipeline/utils/record_hashtable_key.rs new file mode 100644 index 0000000000..e3694dad81 --- /dev/null +++ b/dozer-sql/src/pipeline/utils/record_hashtable_key.rs @@ -0,0 +1,32 @@ +use ahash::AHasher; +use dozer_types::types::Field; +use std::hash::{Hash, Hasher}; + +#[derive(Clone, Debug, Hash, PartialEq, Eq)] +pub enum RecordKey { + Accurate(Vec), + Hash(u64), +} + +pub fn get_record_hash<'a, I>(fields_iter: I) -> u64 +where + I: Iterator, +{ + let mut hasher = AHasher::default(); + for field in fields_iter { + field.hash(&mut hasher); + } + hasher.finish() +} + +#[test] +fn test_record_hash() { + let record_a = vec![Field::Int(1), Field::String("a".into())]; + let record_b = vec![Field::Int(2), Field::String("b".into())]; + + let hash_a = get_record_hash(record_a.iter()); + let hash_b = get_record_hash(record_b.iter()); + + assert_ne!(record_a, record_b); + assert_ne!(hash_a, hash_b); +} diff --git a/dozer-tests/src/sql_tests/helper/pipeline.rs b/dozer-tests/src/sql_tests/helper/pipeline.rs index 932e3ecb7a..35f1cca911 100644 --- a/dozer-tests/src/sql_tests/helper/pipeline.rs +++ b/dozer-tests/src/sql_tests/helper/pipeline.rs @@ -291,7 +291,7 @@ impl TestPipeline { schemas: HashMap, ops: Vec<(String, Operation)>, ) -> Result { - let mut pipeline = AppPipeline::new(); + let mut pipeline = AppPipeline::new_with_default_flags(); let transform_response = statement_to_pipeline(&sql, &mut pipeline, Some("results".to_string())).unwrap(); diff --git a/dozer-types/protos/cloud_types.proto b/dozer-types/protos/cloud_types.proto index 547de4da3c..d88fc1bbce 100644 --- a/dozer-types/protos/cloud_types.proto +++ b/dozer-types/protos/cloud_types.proto @@ -18,6 +18,13 @@ message Flags { bool grpc_web = 2; bool push_events = 3; bool authenticate_server_reflection = 4; + EnableProbabilisticOptimizations enable_probabilistic_optimizations = 5; +} + +message EnableProbabilisticOptimizations { + bool in_sets = 1; + bool in_joins = 2; + bool in_aggregations = 3; } message Connection { diff --git a/dozer-types/src/models/flags.rs b/dozer-types/src/models/flags.rs index 58dbcae3a3..b2f0f07101 100644 --- a/dozer-types/src/models/flags.rs +++ b/dozer-types/src/models/flags.rs @@ -19,6 +19,29 @@ pub struct Flags { #[prost(bool, tag = "4", default = false)] #[serde(default = "default_false")] pub authenticate_server_reflection: bool, + + /// probablistic optimizations reduce memory consumption at the expense of accuracy. + #[serde(skip_serializing_if = "Option::is_none")] + #[prost(message, optional, tag = "5")] + pub enable_probabilistic_optimizations: Option, +} + +#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, prost::Message)] +pub struct EnableProbabilisticOptimizations { + /// enable probabilistic optimizations in set operations (UNION, EXCEPT, INTERSECT); Default: false + #[prost(bool, tag = "1", default = false)] + #[serde(default = "default_false")] + pub in_sets: bool, + + /// enable probabilistic optimizations in JOIN operations; Default: false + #[prost(bool, tag = "2", default = false)] + #[serde(default = "default_false")] + pub in_joins: bool, + + /// enable probabilistic optimizations in aggregations (SUM, COUNT, MIN, etc.); Default: false + #[prost(bool, tag = "3", default = false)] + #[serde(default = "default_false")] + pub in_aggregations: bool, } fn default_true() -> bool { diff --git a/dozer-types/src/tests/flags_config_yaml_deserialize.rs b/dozer-types/src/tests/flags_config_yaml_deserialize.rs index 98c89cd2c0..920570401d 100644 --- a/dozer-types/src/tests/flags_config_yaml_deserialize.rs +++ b/dozer-types/src/tests/flags_config_yaml_deserialize.rs @@ -1,4 +1,7 @@ -use crate::models::{config::Config, flags::Flags}; +use crate::models::{ + config::Config, + flags::{EnableProbabilisticOptimizations, Flags}, +}; #[test] fn test_partial_flag_config_input() { @@ -41,6 +44,16 @@ fn test_flags_default() { grpc_web: true, push_events: true, authenticate_server_reflection: false, + enable_probabilistic_optimizations: None, } ); + + assert_eq!( + EnableProbabilisticOptimizations::default(), + EnableProbabilisticOptimizations { + in_sets: false, + in_joins: false, + in_aggregations: false + } + ) }