Skip to content

Commit

Permalink
Add unit tests for metastore and control plane
Browse files Browse the repository at this point in the history
Also include some comment and naming improvments.
  • Loading branch information
rdettai committed Jan 21, 2025
1 parent d008f8e commit abf7723
Show file tree
Hide file tree
Showing 12 changed files with 252 additions and 32 deletions.
3 changes: 2 additions & 1 deletion quickwit/quickwit-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,8 @@ pub trait TestableForRegression: Serialize + DeserializeOwned {
fn assert_equality(&self, other: &Self);
}

pub fn indexing_params_fingerprint(
/// Return a fingerprint of all parameters that should trigger an indexing pipeline restart.
pub fn indexing_pipeline_params_fingerprint(
index_config: &IndexConfig,
source_config: &SourceConfig,
) -> u64 {
Expand Down
73 changes: 71 additions & 2 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1124,7 +1124,9 @@ mod tests {
use mockall::Sequence;
use quickwit_actors::{AskError, Observe, SupervisorMetrics};
use quickwit_cluster::ClusterChangeStreamFactoryForTest;
use quickwit_config::{IndexConfig, SourceParams, CLI_SOURCE_ID, INGEST_V2_SOURCE_ID};
use quickwit_config::{
IndexConfig, SourceParams, TransformConfig, CLI_SOURCE_ID, INGEST_V2_SOURCE_ID,
};
use quickwit_indexing::IndexingService;
use quickwit_metastore::{
CreateIndexRequestExt, IndexMetadata, ListIndexesMetadataResponseExt,
Expand Down Expand Up @@ -1273,7 +1275,8 @@ mod tests {
assert_eq!(source_config.source_type(), SourceType::Void);
true
})
.returning(|_| Ok(EmptyResponse {}));
.return_once(|_| Ok(EmptyResponse {}));
// the list_indexes_metadata and list_shards calls are made when the control plane starts
mock_metastore
.expect_list_indexes_metadata()
.return_once(move |_| {
Expand Down Expand Up @@ -1312,6 +1315,72 @@ mod tests {
universe.assert_quit().await;
}

#[tokio::test]
async fn test_control_plane_update_source() {
let universe = Universe::with_accelerated_time();
let self_node_id: NodeId = "test-node".into();
let indexer_pool = IndexerPool::default();
let ingester_pool = IngesterPool::default();

let mut index_metadata = IndexMetadata::for_test("test-index", "ram://tata");
index_metadata
.add_source(SourceConfig::ingest_v2())
.unwrap();

let test_source_config = SourceConfig::for_test("test-source", SourceParams::void());
index_metadata.add_source(test_source_config).unwrap();

let mut mock_metastore = MockMetastoreService::new();
mock_metastore
.expect_update_source()
.withf(|update_source_request| {
let source_config: SourceConfig =
serde_json::from_str(&update_source_request.source_config_json).unwrap();
assert_eq!(source_config.source_id, "test-source");
assert_eq!(source_config.source_type(), SourceType::Void);
assert!(source_config.transform_config.is_some());
true
})
.return_once(|_| Ok(EmptyResponse {}));
// the list_indexes_metadata and list_shards calls are made when the control plane starts
mock_metastore
.expect_list_indexes_metadata()
.return_once(move |_| {
Ok(ListIndexesMetadataResponse::for_test(vec![
index_metadata.clone()
]))
});
mock_metastore
.expect_list_shards()
.return_once(move |_| Ok(ListShardsResponse::default()));

let cluster_config = ClusterConfig::for_test();
let cluster_change_stream_factory = ClusterChangeStreamFactoryForTest::default();
let (control_plane_mailbox, _control_plane_handle, _readiness_rx) = ControlPlane::spawn(
&universe,
cluster_config,
self_node_id,
cluster_change_stream_factory,
indexer_pool,
ingester_pool,
MetastoreServiceClient::from_mock(mock_metastore),
);
let index_uid: IndexUid = IndexUid::for_test("test-index", 0);
let mut updated_source_config = SourceConfig::for_test("test-source", SourceParams::void());
updated_source_config.transform_config =
Some(TransformConfig::new("del(.username)".to_string(), None));
let update_source_request = UpdateSourceRequest {
index_uid: Some(index_uid),
source_config_json: serde_json::to_string(&updated_source_config).unwrap(),
};
control_plane_mailbox
.ask_for_res(update_source_request)
.await
.unwrap();

universe.assert_quit().await;
}

#[tokio::test]
async fn test_control_plane_toggle_source() {
let universe = Universe::with_accelerated_time();
Expand Down
6 changes: 4 additions & 2 deletions quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use fnv::{FnvHashMap, FnvHashSet};
use itertools::Itertools;
use once_cell::sync::OnceCell;
use quickwit_common::pretty::PrettySample;
use quickwit_config::{indexing_params_fingerprint, FileSourceParams, SourceParams};
use quickwit_config::{indexing_pipeline_params_fingerprint, FileSourceParams, SourceParams};
use quickwit_proto::indexing::{
ApplyIndexingPlanRequest, CpuCapacity, IndexingService, IndexingTask, PIPELINE_FULL_CAPACITY,
PIPELINE_THROUGHPUT,
Expand Down Expand Up @@ -170,7 +170,9 @@ fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec<SourceToSchedule> {
}
let params_fingerprint = model
.index_metadata(&source_uid.index_uid)
.map(|index_meta| indexing_params_fingerprint(&index_meta.index_config, source_config))
.map(|index_meta| {
indexing_pipeline_params_fingerprint(&index_meta.index_config, source_config)
})
.unwrap_or_default();
match source_config.source_params {
SourceParams::File(FileSourceParams::Filepath(_))
Expand Down
31 changes: 30 additions & 1 deletion quickwit/quickwit-control-plane/src/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ impl ControlPlaneModel {
#[cfg(test)]
mod tests {
use metastore::EmptyResponse;
use quickwit_config::{SourceConfig, SourceParams, INGEST_V2_SOURCE_ID};
use quickwit_config::{SourceConfig, SourceParams, TransformConfig, INGEST_V2_SOURCE_ID};
use quickwit_metastore::IndexMetadata;
use quickwit_proto::ingest::{Shard, ShardState};
use quickwit_proto::metastore::{ListIndexesMetadataResponse, MockMetastoreService};
Expand Down Expand Up @@ -772,6 +772,35 @@ mod tests {
);
}

#[test]
fn test_control_plane_model_update_sources() {
let mut model = ControlPlaneModel::default();
let mut index_metadata = IndexMetadata::for_test("test-index", "ram:///indexes");
let mut my_source = SourceConfig::for_test("my-source", SourceParams::void());
index_metadata.add_source(my_source.clone()).unwrap();
index_metadata
.add_source(SourceConfig::ingest_v2())
.unwrap();
let index_uid = index_metadata.index_uid.clone();
model.add_index(index_metadata.clone());

// Update a source
my_source.transform_config = Some(TransformConfig::new("del(.username)".to_string(), None));
model.update_source(&index_uid, my_source.clone()).unwrap();

assert_eq!(model.index_table.len(), 1);
assert_eq!(
model
.index_table
.get(&index_uid)
.unwrap()
.sources
.get("my-source")
.unwrap(),
&my_source
);
}

#[test]
fn test_control_plane_model_delete_index() {
let mut model = ControlPlaneModel::default();
Expand Down
29 changes: 16 additions & 13 deletions quickwit/quickwit-indexing/src/actors/indexing_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ use quickwit_common::io::Limiter;
use quickwit_common::pubsub::EventBroker;
use quickwit_common::{io, temp_dir};
use quickwit_config::{
build_doc_mapper, indexing_params_fingerprint, IndexConfig, IndexerConfig, SourceConfig,
INGEST_API_SOURCE_ID,
build_doc_mapper, indexing_pipeline_params_fingerprint, IndexConfig, IndexerConfig,
SourceConfig, INGEST_API_SOURCE_ID,
};
use quickwit_ingest::{
DropQueueRequest, GetPartitionId, IngestApiService, IngesterPool, ListQueuesRequest,
Expand Down Expand Up @@ -324,7 +324,8 @@ impl IndexingService {
let max_concurrent_split_uploads_merge =
(self.max_concurrent_split_uploads - max_concurrent_split_uploads_index).max(1);

let params_fingerprint = indexing_params_fingerprint(&index_config, &source_config);
let params_fingerprint =
indexing_pipeline_params_fingerprint(&index_config, &source_config);
if let Some(expected_params_fingerprint) = expected_params_fingerprint {
if params_fingerprint != expected_params_fingerprint {
warn!(
Expand Down Expand Up @@ -1220,7 +1221,9 @@ mod tests {

#[tokio::test]
async fn test_indexing_service_apply_plan() {
const PARAMS_FINGERPRINT: u64 = 3865067856550546352;
const PARAMS_FINGERPRINT_INGEST_API: u64 = 1637744865450232394;
const PARAMS_FINGERPRINT_SOURCE_1: u64 = 1705211905504908791;
const PARAMS_FINGERPRINT_SOURCE_2: u64 = 8706667372658059428;

quickwit_common::setup_logging_for_tests();
let transport = ChannelTransport::default();
Expand Down Expand Up @@ -1281,14 +1284,14 @@ mod tests {
source_id: "test-indexing-service--source-1".to_string(),
shard_ids: Vec::new(),
pipeline_uid: Some(PipelineUid::for_test(0u128)),
params_fingerprint: PARAMS_FINGERPRINT,
params_fingerprint: PARAMS_FINGERPRINT_SOURCE_1,
},
IndexingTask {
index_uid: Some(metadata.index_uid.clone()),
source_id: "test-indexing-service--source-1".to_string(),
shard_ids: Vec::new(),
pipeline_uid: Some(PipelineUid::for_test(1u128)),
params_fingerprint: PARAMS_FINGERPRINT,
params_fingerprint: PARAMS_FINGERPRINT_SOURCE_1,
},
];
indexing_service
Expand Down Expand Up @@ -1327,28 +1330,28 @@ mod tests {
source_id: INGEST_API_SOURCE_ID.to_string(),
shard_ids: Vec::new(),
pipeline_uid: Some(PipelineUid::for_test(3u128)),
params_fingerprint: PARAMS_FINGERPRINT,
params_fingerprint: PARAMS_FINGERPRINT_INGEST_API,
},
IndexingTask {
index_uid: Some(metadata.index_uid.clone()),
source_id: "test-indexing-service--source-1".to_string(),
shard_ids: Vec::new(),
pipeline_uid: Some(PipelineUid::for_test(1u128)),
params_fingerprint: PARAMS_FINGERPRINT,
params_fingerprint: PARAMS_FINGERPRINT_SOURCE_1,
},
IndexingTask {
index_uid: Some(metadata.index_uid.clone()),
source_id: "test-indexing-service--source-1".to_string(),
shard_ids: Vec::new(),
pipeline_uid: Some(PipelineUid::for_test(2u128)),
params_fingerprint: PARAMS_FINGERPRINT,
params_fingerprint: PARAMS_FINGERPRINT_SOURCE_1,
},
IndexingTask {
index_uid: Some(metadata.index_uid.clone()),
source_id: source_config_2.source_id.clone(),
shard_ids: Vec::new(),
pipeline_uid: Some(PipelineUid::for_test(4u128)),
params_fingerprint: PARAMS_FINGERPRINT,
params_fingerprint: PARAMS_FINGERPRINT_SOURCE_2,
},
];
indexing_service
Expand Down Expand Up @@ -1389,21 +1392,21 @@ mod tests {
source_id: INGEST_API_SOURCE_ID.to_string(),
shard_ids: Vec::new(),
pipeline_uid: Some(PipelineUid::for_test(3u128)),
params_fingerprint: PARAMS_FINGERPRINT,
params_fingerprint: PARAMS_FINGERPRINT_INGEST_API,
},
IndexingTask {
index_uid: Some(metadata.index_uid.clone()),
source_id: "test-indexing-service--source-1".to_string(),
shard_ids: Vec::new(),
pipeline_uid: Some(PipelineUid::for_test(1u128)),
params_fingerprint: PARAMS_FINGERPRINT,
params_fingerprint: PARAMS_FINGERPRINT_SOURCE_1,
},
IndexingTask {
index_uid: Some(metadata.index_uid.clone()),
source_id: source_config_2.source_id.clone(),
shard_ids: Vec::new(),
pipeline_uid: Some(PipelineUid::for_test(4u128)),
params_fingerprint: PARAMS_FINGERPRINT,
params_fingerprint: PARAMS_FINGERPRINT_SOURCE_2,
},
];
indexing_service
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-metastore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub use metastore::{
IndexMetadataResponseExt, IndexesMetadataResponseExt, ListIndexesMetadataResponseExt,
ListSplitsQuery, ListSplitsRequestExt, ListSplitsResponseExt, MetastoreServiceExt,
MetastoreServiceStreamSplitsExt, PublishSplitsRequestExt, StageSplitsRequestExt,
UpdateIndexRequestExt,
UpdateIndexRequestExt, UpdateSourceRequestExt,
};
pub use metastore_factory::{MetastoreFactory, UnsupportedMetastore};
pub use metastore_resolver::MetastoreResolver;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ impl IndexMetadata {
if entry.get() == &source_config {
return Ok(false);
}
entry.insert(source_config.clone());
entry.insert(source_config);
Ok(true)
}
Entry::Vacant(_) => Err(MetastoreError::NotFound(EntityKind::Source {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1086,7 +1086,7 @@ impl MetastoreService for PostgresqlMetastore {
async fn update_source(&self, request: UpdateSourceRequest) -> MetastoreResult<EmptyResponse> {
let source_config = request.deserialize_source_config()?;
let index_uid: IndexUid = request.index_uid().clone();
run_with_tx!(self.connection_pool, tx, "add source", {
run_with_tx!(self.connection_pool, tx, "update source", {
mutate_index_metadata::<MetastoreError, _>(tx, index_uid, |index_metadata| {
let mutation_occurred = index_metadata.update_source(source_config)?;
Ok(MutationOccurred::from(mutation_occurred))
Expand Down
7 changes: 7 additions & 0 deletions quickwit/quickwit-metastore/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,13 @@ macro_rules! metastore_test_suite {
$crate::tests::source::test_metastore_add_source::<$metastore_type>().await;
}

#[tokio::test]
#[serial_test::file_serial]
async fn test_metastore_update_source() {
let _ = tracing_subscriber::fmt::try_init();
$crate::tests::source::test_metastore_update_source::<$metastore_type>().await;
}

#[tokio::test]
#[serial_test::file_serial]
async fn test_metastore_toggle_source() {
Expand Down
Loading

0 comments on commit abf7723

Please sign in to comment.