diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/event/builder/MetaDataChangedEventBuilder.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/event/builder/MetaDataChangedEventBuilder.java new file mode 100644 index 0000000000000..a902d1868fc07 --- /dev/null +++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/event/builder/MetaDataChangedEventBuilder.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.mode.event.builder; + +import org.apache.shardingsphere.metadata.persist.node.DatabaseMetaDataNode; +import org.apache.shardingsphere.metadata.persist.node.metadata.DataSourceMetaDataNode; +import org.apache.shardingsphere.metadata.persist.node.metadata.TableMetaDataNode; +import org.apache.shardingsphere.metadata.persist.node.metadata.ViewMetaDataNode; +import org.apache.shardingsphere.mode.event.DataChangedEvent; +import org.apache.shardingsphere.mode.event.DataChangedEvent.Type; +import org.apache.shardingsphere.mode.event.dispatch.DispatchEvent; +import org.apache.shardingsphere.mode.event.dispatch.datasource.node.StorageNodeAlteredEvent; +import org.apache.shardingsphere.mode.event.dispatch.datasource.node.StorageNodeRegisteredEvent; +import org.apache.shardingsphere.mode.event.dispatch.datasource.node.StorageNodeUnregisteredEvent; +import org.apache.shardingsphere.mode.event.dispatch.datasource.unit.StorageUnitAlteredEvent; +import org.apache.shardingsphere.mode.event.dispatch.datasource.unit.StorageUnitRegisteredEvent; +import org.apache.shardingsphere.mode.event.dispatch.datasource.unit.StorageUnitUnregisteredEvent; +import org.apache.shardingsphere.mode.event.dispatch.metadata.schema.SchemaAddedEvent; +import org.apache.shardingsphere.mode.event.dispatch.metadata.schema.SchemaDeletedEvent; +import org.apache.shardingsphere.mode.event.dispatch.metadata.schema.table.TableCreatedOrAlteredEvent; +import org.apache.shardingsphere.mode.event.dispatch.metadata.schema.table.TableDroppedEvent; +import org.apache.shardingsphere.mode.event.dispatch.metadata.schema.view.ViewCreatedOrAlteredEvent; +import org.apache.shardingsphere.mode.event.dispatch.metadata.schema.view.ViewDroppedEvent; + +import java.util.Optional; + +/** + * Meta data changed event builder. + */ +public final class MetaDataChangedEventBuilder { + + /** + * Build meta data changed event. + * + * @param databaseName database name + * @param event data changed event + * @return built event + */ + public Optional build(final String databaseName, final DataChangedEvent event) { + String key = event.getKey(); + Optional schemaName = DatabaseMetaDataNode.getSchemaName(key); + if (schemaName.isPresent()) { + return buildSchemaChangedEvent(databaseName, schemaName.get(), event); + } + schemaName = DatabaseMetaDataNode.getSchemaNameByTableNode(key); + if (schemaName.isPresent() && isTableMetaDataChanged(event.getKey())) { + return buildTableChangedEvent(databaseName, schemaName.get(), event); + } + if (schemaName.isPresent() && isViewMetaDataChanged(event.getKey())) { + return buildViewChangedEvent(databaseName, schemaName.get(), event); + } + if (DataSourceMetaDataNode.isDataSourcesNode(key)) { + return buildDataSourceChangedEvent(databaseName, event); + } + return Optional.empty(); + } + + private Optional buildSchemaChangedEvent(final String databaseName, final String schemaName, final DataChangedEvent event) { + switch (event.getType()) { + case ADDED: + case UPDATED: + return Optional.of(new SchemaAddedEvent(databaseName, schemaName)); + case DELETED: + return Optional.of(new SchemaDeletedEvent(databaseName, schemaName)); + default: + return Optional.empty(); + } + } + + private boolean isTableMetaDataChanged(final String key) { + return TableMetaDataNode.isTableActiveVersionNode(key) || TableMetaDataNode.isTableNode(key); + } + + private Optional buildTableChangedEvent(final String databaseName, final String schemaName, final DataChangedEvent event) { + if ((Type.ADDED == event.getType() || Type.UPDATED == event.getType()) && TableMetaDataNode.isTableActiveVersionNode(event.getKey())) { + String tableName = TableMetaDataNode.getTableNameByActiveVersionNode(event.getKey()).orElseThrow(() -> new IllegalStateException("Table name not found.")); + return Optional.of(new TableCreatedOrAlteredEvent(databaseName, schemaName, tableName, event.getKey(), event.getValue())); + } + if (Type.DELETED == event.getType() && TableMetaDataNode.isTableNode(event.getKey())) { + String tableName = TableMetaDataNode.getTableName(event.getKey()).orElseThrow(() -> new IllegalStateException("Table name not found.")); + return Optional.of(new TableDroppedEvent(databaseName, schemaName, tableName)); + } + return Optional.empty(); + } + + private boolean isViewMetaDataChanged(final String key) { + return ViewMetaDataNode.isViewActiveVersionNode(key) || ViewMetaDataNode.isViewNode(key); + } + + private Optional buildViewChangedEvent(final String databaseName, final String schemaName, final DataChangedEvent event) { + if ((Type.ADDED == event.getType() || Type.UPDATED == event.getType()) && ViewMetaDataNode.isViewActiveVersionNode(event.getKey())) { + String viewName = ViewMetaDataNode.getViewNameByActiveVersionNode(event.getKey()).orElseThrow(() -> new IllegalStateException("View name not found.")); + return Optional.of(new ViewCreatedOrAlteredEvent(databaseName, schemaName, viewName, event.getKey(), event.getValue())); + } + if (Type.DELETED == event.getType() && ViewMetaDataNode.isViewNode(event.getKey())) { + String viewName = ViewMetaDataNode.getViewName(event.getKey()).orElseThrow(() -> new IllegalStateException("View name not found.")); + return Optional.of(new ViewDroppedEvent(databaseName, schemaName, viewName, event.getKey(), event.getValue())); + } + return Optional.empty(); + } + + private Optional buildDataSourceChangedEvent(final String databaseName, final DataChangedEvent event) { + if (DataSourceMetaDataNode.isDataSourceUnitActiveVersionNode(event.getKey()) || DataSourceMetaDataNode.isDataSourceUnitNode(event.getKey())) { + return buildStorageUnitChangedEvent(databaseName, event); + } + if (DataSourceMetaDataNode.isDataSourceNodeActiveVersionNode(event.getKey()) || DataSourceMetaDataNode.isDataSourceNodeNode(event.getKey())) { + return buildStorageNodeChangedEvent(databaseName, event); + } + return Optional.empty(); + } + + private Optional buildStorageUnitChangedEvent(final String databaseName, final DataChangedEvent event) { + Optional dataSourceUnitName = DataSourceMetaDataNode.getDataSourceNameByDataSourceUnitActiveVersionNode(event.getKey()); + if (dataSourceUnitName.isPresent()) { + if (Type.ADDED == event.getType()) { + return Optional.of(new StorageUnitRegisteredEvent(databaseName, dataSourceUnitName.get(), event.getKey(), event.getValue())); + } + if (Type.UPDATED == event.getType()) { + return Optional.of(new StorageUnitAlteredEvent(databaseName, dataSourceUnitName.get(), event.getKey(), event.getValue())); + } + } + dataSourceUnitName = DataSourceMetaDataNode.getDataSourceNameByDataSourceUnitNode(event.getKey()); + if (Type.DELETED == event.getType() && dataSourceUnitName.isPresent()) { + return Optional.of(new StorageUnitUnregisteredEvent(databaseName, dataSourceUnitName.get())); + } + return Optional.empty(); + } + + private Optional buildStorageNodeChangedEvent(final String databaseName, final DataChangedEvent event) { + Optional dataSourceNodeName = DataSourceMetaDataNode.getDataSourceNameByDataSourceNodeActiveVersionNode(event.getKey()); + if (dataSourceNodeName.isPresent()) { + if (Type.ADDED == event.getType()) { + return Optional.of(new StorageNodeRegisteredEvent(databaseName, dataSourceNodeName.get(), event.getKey(), event.getValue())); + } + if (Type.UPDATED == event.getType()) { + return Optional.of(new StorageNodeAlteredEvent(databaseName, dataSourceNodeName.get(), event.getKey(), event.getValue())); + } + } + dataSourceNodeName = DataSourceMetaDataNode.getDataSourceNameByDataSourceNodeNode(event.getKey()); + if (Type.DELETED == event.getType() && dataSourceNodeName.isPresent()) { + return Optional.of(new StorageNodeUnregisteredEvent(databaseName, dataSourceNodeName.get())); + } + return Optional.empty(); + } +} diff --git a/mode/core/src/test/java/org/apache/shardingsphere/mode/event/builder/MetaDataChangedEventBuilderTest.java b/mode/core/src/test/java/org/apache/shardingsphere/mode/event/builder/MetaDataChangedEventBuilderTest.java new file mode 100644 index 0000000000000..60e9320caa9cc --- /dev/null +++ b/mode/core/src/test/java/org/apache/shardingsphere/mode/event/builder/MetaDataChangedEventBuilderTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.mode.event.builder; + +import org.apache.shardingsphere.mode.event.DataChangedEvent; +import org.apache.shardingsphere.mode.event.DataChangedEvent.Type; +import org.apache.shardingsphere.mode.event.dispatch.DispatchEvent; +import org.apache.shardingsphere.mode.event.dispatch.datasource.node.StorageNodeAlteredEvent; +import org.apache.shardingsphere.mode.event.dispatch.datasource.node.StorageNodeRegisteredEvent; +import org.apache.shardingsphere.mode.event.dispatch.datasource.node.StorageNodeUnregisteredEvent; +import org.apache.shardingsphere.mode.event.dispatch.datasource.unit.StorageUnitAlteredEvent; +import org.apache.shardingsphere.mode.event.dispatch.datasource.unit.StorageUnitRegisteredEvent; +import org.apache.shardingsphere.mode.event.dispatch.datasource.unit.StorageUnitUnregisteredEvent; +import org.apache.shardingsphere.mode.event.dispatch.metadata.schema.SchemaAddedEvent; +import org.apache.shardingsphere.mode.event.dispatch.metadata.schema.SchemaDeletedEvent; +import org.apache.shardingsphere.mode.event.dispatch.metadata.schema.table.TableCreatedOrAlteredEvent; +import org.apache.shardingsphere.mode.event.dispatch.metadata.schema.table.TableDroppedEvent; +import org.apache.shardingsphere.mode.event.dispatch.metadata.schema.view.ViewCreatedOrAlteredEvent; +import org.apache.shardingsphere.mode.event.dispatch.metadata.schema.view.ViewDroppedEvent; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.ArgumentsProvider; +import org.junit.jupiter.params.provider.ArgumentsSource; + +import java.util.Optional; +import java.util.stream.Stream; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class MetaDataChangedEventBuilderTest { + + @ParameterizedTest(name = "{0}") + @ArgumentsSource(TestCaseArgumentsProvider.class) + void assertBuild(final String name, final String eventKey, final Type type, final Class toBePostedEventType) { + Optional actual = new MetaDataChangedEventBuilder().build("foo_db", new DataChangedEvent(eventKey, "value", type)); + if (null == toBePostedEventType) { + assertFalse(actual.isPresent()); + } else { + assertTrue(actual.isPresent()); + assertThat(actual.get(), instanceOf(toBePostedEventType)); + } + } + + private static class TestCaseArgumentsProvider implements ArgumentsProvider { + + @Override + public final Stream provideArguments(final ExtensionContext extensionContext) { + return Stream.of( + Arguments.of("changeWithoutDatabase", "/metadata", Type.IGNORED, null), + Arguments.of("addSchema", "/metadata/foo_db/schemas/foo_schema", Type.ADDED, SchemaAddedEvent.class), + Arguments.of("updateSchema", "/metadata/foo_db/schemas/foo_schema", Type.UPDATED, SchemaAddedEvent.class), + Arguments.of("deleteSchema", "/metadata/foo_db/schemas/foo_schema", Type.DELETED, SchemaDeletedEvent.class), + Arguments.of("ignoreChangeSchema", "/metadata/foo_db/schemas/foo_schema", Type.IGNORED, null), + Arguments.of("addTable", "/metadata/foo_db/schemas/foo_schema/tables/foo_tbl/active_version/0", Type.ADDED, TableCreatedOrAlteredEvent.class), + Arguments.of("updateTable", "/metadata/foo_db/schemas/foo_schema/tables/foo_tbl/active_version/0", Type.UPDATED, TableCreatedOrAlteredEvent.class), + Arguments.of("deleteTable", "/metadata/foo_db/schemas/foo_schema/tables/foo_tbl", Type.DELETED, TableDroppedEvent.class), + Arguments.of("invalidAddTable", "/metadata/foo_db/schemas/foo_schema/tables/foo_tbl", Type.ADDED, null), + Arguments.of("invalidDeleteTable", "/metadata/foo_db/schemas/foo_schema/tables/foo_tbl/active_version/0", Type.DELETED, null), + Arguments.of("addView", "/metadata/foo_db/schemas/foo_schema/views/foo_view/active_version/0", Type.ADDED, ViewCreatedOrAlteredEvent.class), + Arguments.of("updateView", "/metadata/foo_db/schemas/foo_schema/views/foo_view/active_version/0", Type.UPDATED, ViewCreatedOrAlteredEvent.class), + Arguments.of("deleteView", "/metadata/foo_db/schemas/foo_schema/views/foo_view", Type.DELETED, ViewDroppedEvent.class), + Arguments.of("invalidAddView", "/metadata/foo_db/schemas/foo_schema/views/foo_view", Type.ADDED, null), + Arguments.of("invalidDeleteView", "/metadata/foo_db/schemas/foo_schema/views/foo_view/active_version/0", Type.DELETED, null), + Arguments.of("registerStorageUnit", "/metadata/foo_db/data_sources/units/foo_unit/active_version/0", Type.ADDED, StorageUnitRegisteredEvent.class), + Arguments.of("alterStorageUnit", "/metadata/foo_db/data_sources/units/foo_unit/active_version/0", Type.UPDATED, StorageUnitAlteredEvent.class), + Arguments.of("unregisterStorageUnit", "/metadata/foo_db/data_sources/units/foo_unit", Type.DELETED, StorageUnitUnregisteredEvent.class), + Arguments.of("invalidRegisterStorageNode", "/metadata/foo_db/data_sources/units/foo_unit", Type.ADDED, null), + Arguments.of("invalidUnregisterStorageNode", "/metadata/foo_db/data_sources/units/foo_unit/active_version/0", Type.DELETED, null), + Arguments.of("ignoreChangeStorageUnit", "/metadata/foo_db/data_sources/units/foo_unit", Type.IGNORED, null), + Arguments.of("registerStorageNode", "/metadata/foo_db/data_sources/nodes/foo_node/active_version/0", Type.ADDED, StorageNodeRegisteredEvent.class), + Arguments.of("alterStorageNode", "/metadata/foo_db/data_sources/nodes/foo_node/active_version/0", Type.UPDATED, StorageNodeAlteredEvent.class), + Arguments.of("unregisterStorageNode", "/metadata/foo_db/data_sources/nodes/foo_node", Type.DELETED, StorageNodeUnregisteredEvent.class), + Arguments.of("invalidRegisterStorageNode", "/metadata/foo_db/data_sources/nodes/foo_node", Type.ADDED, null), + Arguments.of("invalidUnregisterStorageNode", "/metadata/foo_db/data_sources/nodes/foo_node/active_version/0", Type.DELETED, null), + Arguments.of("ignoreChangeStorageNode", "/metadata/foo_db/data_sources/nodes/foo_node", Type.IGNORED, null), + Arguments.of("invalidChangeDataSource", "/metadata/foo_db/data_sources/other", Type.ADDED, null)); + } + } +} diff --git a/mode/core/src/test/java/org/apache/shardingsphere/mode/event/builder/RuleConfigurationChangedEventBuilderTest.java b/mode/core/src/test/java/org/apache/shardingsphere/mode/event/builder/RuleConfigurationChangedEventBuilderTest.java index e9a1fbd1ea74d..9d888833df2f4 100644 --- a/mode/core/src/test/java/org/apache/shardingsphere/mode/event/builder/RuleConfigurationChangedEventBuilderTest.java +++ b/mode/core/src/test/java/org/apache/shardingsphere/mode/event/builder/RuleConfigurationChangedEventBuilderTest.java @@ -61,8 +61,7 @@ void assertBuildWithoutRuleNodePathProvider() { @ParameterizedTest(name = "{0}") @ArgumentsSource(TestCaseArgumentsProvider.class) - void assertBuild(final String name, final String eventKey, final String eventValue, final DataChangedEvent.Type type, - final boolean isEventPresent, final Class dispatchEventClass) { + void assertBuild(final String name, final String eventKey, final String eventValue, final Type type, final boolean isEventPresent, final Class dispatchEventClass) { RuleNodePathProvider ruleNodePathProvider = mock(RuleNodePathProvider.class, RETURNS_DEEP_STUBS); when(ruleNodePathProvider.getRuleNodePath()).thenReturn(new RuleNodePath("fixture", Collections.singleton("named"), Collections.singleton("unique"))); when(ShardingSphereServiceLoader.getServiceInstances(RuleNodePathProvider.class)).thenReturn(Collections.singleton(ruleNodePathProvider)); diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/listener/type/DatabaseMetaDataChangedListener.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/listener/type/DatabaseMetaDataChangedListener.java index 50838500209c8..e0d6c1553780b 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/listener/type/DatabaseMetaDataChangedListener.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/listener/type/DatabaseMetaDataChangedListener.java @@ -20,25 +20,10 @@ import lombok.RequiredArgsConstructor; import org.apache.shardingsphere.infra.util.eventbus.EventBusContext; import org.apache.shardingsphere.metadata.persist.node.DatabaseMetaDataNode; -import org.apache.shardingsphere.metadata.persist.node.metadata.DataSourceMetaDataNode; -import org.apache.shardingsphere.metadata.persist.node.metadata.TableMetaDataNode; -import org.apache.shardingsphere.metadata.persist.node.metadata.ViewMetaDataNode; import org.apache.shardingsphere.mode.event.DataChangedEvent; -import org.apache.shardingsphere.mode.event.DataChangedEvent.Type; +import org.apache.shardingsphere.mode.event.builder.MetaDataChangedEventBuilder; import org.apache.shardingsphere.mode.event.builder.RuleConfigurationChangedEventBuilder; import org.apache.shardingsphere.mode.event.dispatch.DispatchEvent; -import org.apache.shardingsphere.mode.event.dispatch.datasource.node.StorageNodeAlteredEvent; -import org.apache.shardingsphere.mode.event.dispatch.datasource.node.StorageNodeRegisteredEvent; -import org.apache.shardingsphere.mode.event.dispatch.datasource.node.StorageNodeUnregisteredEvent; -import org.apache.shardingsphere.mode.event.dispatch.datasource.unit.StorageUnitAlteredEvent; -import org.apache.shardingsphere.mode.event.dispatch.datasource.unit.StorageUnitRegisteredEvent; -import org.apache.shardingsphere.mode.event.dispatch.datasource.unit.StorageUnitUnregisteredEvent; -import org.apache.shardingsphere.mode.event.dispatch.metadata.schema.SchemaAddedEvent; -import org.apache.shardingsphere.mode.event.dispatch.metadata.schema.SchemaDeletedEvent; -import org.apache.shardingsphere.mode.event.dispatch.metadata.schema.table.TableCreatedOrAlteredEvent; -import org.apache.shardingsphere.mode.event.dispatch.metadata.schema.table.TableDroppedEvent; -import org.apache.shardingsphere.mode.event.dispatch.metadata.schema.view.ViewCreatedOrAlteredEvent; -import org.apache.shardingsphere.mode.event.dispatch.metadata.schema.view.ViewDroppedEvent; import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener; import java.util.Optional; @@ -49,10 +34,12 @@ @RequiredArgsConstructor public final class DatabaseMetaDataChangedListener implements DataChangedEventListener { - private final EventBusContext eventBusContext; + private final MetaDataChangedEventBuilder metaDataChangedEventBuilder = new MetaDataChangedEventBuilder(); private final RuleConfigurationChangedEventBuilder ruleConfigChangedEventBuilder = new RuleConfigurationChangedEventBuilder(); + private final EventBusContext eventBusContext; + @Override public void onChange(final DataChangedEvent event) { createDispatchEvent(event).ifPresent(eventBusContext::post); @@ -64,108 +51,7 @@ private Optional createDispatchEvent(final DataChangedEvent event if (!databaseName.isPresent()) { return Optional.empty(); } - Optional schemaName = DatabaseMetaDataNode.getSchemaName(key); - if (schemaName.isPresent()) { - return createSchemaChangedEvent(databaseName.get(), schemaName.get(), event); - } - schemaName = DatabaseMetaDataNode.getSchemaNameByTableNode(key); - if (schemaName.isPresent() && isTableMetaDataChanged(event.getKey())) { - return createTableChangedEvent(databaseName.get(), schemaName.get(), event); - } - if (schemaName.isPresent() && isViewMetaDataChanged(event.getKey())) { - return createViewChangedEvent(databaseName.get(), schemaName.get(), event); - } - if (DataSourceMetaDataNode.isDataSourcesNode(key)) { - return createDataSourceChangedEvent(databaseName.get(), event); - } - return ruleConfigChangedEventBuilder.build(databaseName.get(), event); - } - - private Optional createSchemaChangedEvent(final String databaseName, final String schemaName, final DataChangedEvent event) { - switch (event.getType()) { - case ADDED: - case UPDATED: - return Optional.of(new SchemaAddedEvent(databaseName, schemaName)); - case DELETED: - return Optional.of(new SchemaDeletedEvent(databaseName, schemaName)); - default: - return Optional.empty(); - } - } - - private boolean isTableMetaDataChanged(final String key) { - return TableMetaDataNode.isTableActiveVersionNode(key) || TableMetaDataNode.isTableNode(key); - } - - private Optional createTableChangedEvent(final String databaseName, final String schemaName, final DataChangedEvent event) { - if ((Type.ADDED == event.getType() || Type.UPDATED == event.getType()) && TableMetaDataNode.isTableActiveVersionNode(event.getKey())) { - String tableName = TableMetaDataNode.getTableNameByActiveVersionNode(event.getKey()).orElseThrow(() -> new IllegalStateException("Table name not found.")); - return Optional.of(new TableCreatedOrAlteredEvent(databaseName, schemaName, tableName, event.getKey(), event.getValue())); - } - if (Type.DELETED == event.getType() && TableMetaDataNode.isTableNode(event.getKey())) { - String tableName = TableMetaDataNode.getTableName(event.getKey()).orElseThrow(() -> new IllegalStateException("Table name not found.")); - return Optional.of(new TableDroppedEvent(databaseName, schemaName, tableName)); - } - return Optional.empty(); - } - - private boolean isViewMetaDataChanged(final String key) { - return ViewMetaDataNode.isViewActiveVersionNode(key) || ViewMetaDataNode.isViewNode(key); - } - - private Optional createViewChangedEvent(final String databaseName, final String schemaName, final DataChangedEvent event) { - if ((Type.ADDED == event.getType() || Type.UPDATED == event.getType()) && ViewMetaDataNode.isViewActiveVersionNode(event.getKey())) { - String viewName = ViewMetaDataNode.getViewNameByActiveVersionNode(event.getKey()).orElseThrow(() -> new IllegalStateException("View name not found.")); - return Optional.of(new ViewCreatedOrAlteredEvent(databaseName, schemaName, viewName, event.getKey(), event.getValue())); - } - if (Type.DELETED == event.getType() && ViewMetaDataNode.isViewNode(event.getKey())) { - String viewName = ViewMetaDataNode.getViewName(event.getKey()).orElseThrow(() -> new IllegalStateException("View name not found.")); - return Optional.of(new ViewDroppedEvent(databaseName, schemaName, viewName, event.getKey(), event.getValue())); - } - return Optional.empty(); - } - - private Optional createDataSourceChangedEvent(final String databaseName, final DataChangedEvent event) { - if (DataSourceMetaDataNode.isDataSourceUnitActiveVersionNode(event.getKey()) || DataSourceMetaDataNode.isDataSourceUnitNode(event.getKey())) { - return createStorageUnitChangedEvent(databaseName, event); - } - if (DataSourceMetaDataNode.isDataSourceNodeActiveVersionNode(event.getKey()) || DataSourceMetaDataNode.isDataSourceNodeNode(event.getKey())) { - return createStorageNodeChangedEvent(databaseName, event); - } - return Optional.empty(); - } - - private Optional createStorageUnitChangedEvent(final String databaseName, final DataChangedEvent event) { - Optional dataSourceUnitName = DataSourceMetaDataNode.getDataSourceNameByDataSourceUnitActiveVersionNode(event.getKey()); - if (dataSourceUnitName.isPresent()) { - if (Type.ADDED == event.getType()) { - return Optional.of(new StorageUnitRegisteredEvent(databaseName, dataSourceUnitName.get(), event.getKey(), event.getValue())); - } - if (Type.UPDATED == event.getType()) { - return Optional.of(new StorageUnitAlteredEvent(databaseName, dataSourceUnitName.get(), event.getKey(), event.getValue())); - } - } - dataSourceUnitName = DataSourceMetaDataNode.getDataSourceNameByDataSourceUnitNode(event.getKey()); - if (Type.DELETED == event.getType() && dataSourceUnitName.isPresent()) { - return Optional.of(new StorageUnitUnregisteredEvent(databaseName, dataSourceUnitName.get())); - } - return Optional.empty(); - } - - private Optional createStorageNodeChangedEvent(final String databaseName, final DataChangedEvent event) { - Optional dataSourceNodeName = DataSourceMetaDataNode.getDataSourceNameByDataSourceNodeActiveVersionNode(event.getKey()); - if (dataSourceNodeName.isPresent()) { - if (Type.ADDED == event.getType()) { - return Optional.of(new StorageNodeRegisteredEvent(databaseName, dataSourceNodeName.get(), event.getKey(), event.getValue())); - } - if (Type.UPDATED == event.getType()) { - return Optional.of(new StorageNodeAlteredEvent(databaseName, dataSourceNodeName.get(), event.getKey(), event.getValue())); - } - } - dataSourceNodeName = DataSourceMetaDataNode.getDataSourceNameByDataSourceNodeNode(event.getKey()); - if (Type.DELETED == event.getType() && dataSourceNodeName.isPresent()) { - return Optional.of(new StorageNodeUnregisteredEvent(databaseName, dataSourceNodeName.get())); - } - return Optional.empty(); + Optional metaDataChangedEvent = metaDataChangedEventBuilder.build(databaseName.get(), event); + return metaDataChangedEvent.isPresent() ? metaDataChangedEvent : ruleConfigChangedEventBuilder.build(databaseName.get(), event); } } diff --git a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/listener/type/DatabaseMetaDataChangedListenerTest.java b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/listener/type/DatabaseMetaDataChangedListenerTest.java index d1fda9c64ac84..69e3253b501ef 100644 --- a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/listener/type/DatabaseMetaDataChangedListenerTest.java +++ b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/listener/type/DatabaseMetaDataChangedListenerTest.java @@ -17,115 +17,47 @@ package org.apache.shardingsphere.mode.manager.cluster.event.dispatch.listener.type; -import lombok.SneakyThrows; import org.apache.shardingsphere.infra.util.eventbus.EventBusContext; import org.apache.shardingsphere.mode.event.DataChangedEvent; import org.apache.shardingsphere.mode.event.DataChangedEvent.Type; -import org.apache.shardingsphere.mode.event.builder.RuleConfigurationChangedEventBuilder; -import org.apache.shardingsphere.mode.event.dispatch.DispatchEvent; -import org.apache.shardingsphere.mode.event.dispatch.datasource.node.StorageNodeAlteredEvent; -import org.apache.shardingsphere.mode.event.dispatch.datasource.node.StorageNodeRegisteredEvent; -import org.apache.shardingsphere.mode.event.dispatch.datasource.node.StorageNodeUnregisteredEvent; -import org.apache.shardingsphere.mode.event.dispatch.datasource.unit.StorageUnitAlteredEvent; -import org.apache.shardingsphere.mode.event.dispatch.datasource.unit.StorageUnitRegisteredEvent; -import org.apache.shardingsphere.mode.event.dispatch.datasource.unit.StorageUnitUnregisteredEvent; -import org.apache.shardingsphere.mode.event.dispatch.metadata.schema.SchemaAddedEvent; -import org.apache.shardingsphere.mode.event.dispatch.metadata.schema.SchemaDeletedEvent; -import org.apache.shardingsphere.mode.event.dispatch.metadata.schema.table.TableCreatedOrAlteredEvent; -import org.apache.shardingsphere.mode.event.dispatch.metadata.schema.table.TableDroppedEvent; -import org.apache.shardingsphere.mode.event.dispatch.metadata.schema.view.ViewCreatedOrAlteredEvent; -import org.apache.shardingsphere.mode.event.dispatch.metadata.schema.view.ViewDroppedEvent; -import org.apache.shardingsphere.mode.event.dispatch.rule.alter.AlterUniqueRuleItemEvent; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.junit.jupiter.api.extension.ExtensionContext; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.ArgumentsProvider; -import org.junit.jupiter.params.provider.ArgumentsSource; import org.mockito.Mock; -import org.mockito.internal.configuration.plugins.Plugins; import org.mockito.junit.jupiter.MockitoExtension; -import org.mockito.junit.jupiter.MockitoSettings; -import org.mockito.quality.Strictness; - -import java.util.Optional; -import java.util.stream.Stream; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) -@MockitoSettings(strictness = Strictness.LENIENT) class DatabaseMetaDataChangedListenerTest { private DatabaseMetaDataChangedListener listener; - + @Mock private EventBusContext eventBusContext; - + @BeforeEach void setUp() { listener = new DatabaseMetaDataChangedListener(eventBusContext); - setMockedBuilder(); } - @SneakyThrows(ReflectiveOperationException.class) - private void setMockedBuilder() { - RuleConfigurationChangedEventBuilder ruleConfigChangedEventBuilder = mock(RuleConfigurationChangedEventBuilder.class); - when(ruleConfigChangedEventBuilder.build(eq("foo_db"), any(DataChangedEvent.class))).thenReturn(Optional.of(new AlterUniqueRuleItemEvent("foo_db", "key", "value", "type"))); - Plugins.getMemberAccessor().set(DatabaseMetaDataChangedListener.class.getDeclaredField("ruleConfigChangedEventBuilder"), listener, ruleConfigChangedEventBuilder); + @Test + void assertOnChangeWithoutDatabase() { + listener.onChange(new DataChangedEvent("/metadata", "value", Type.IGNORED)); + verify(eventBusContext, times(0)).post(any()); } - @ParameterizedTest(name = "{0}") - @ArgumentsSource(TestCaseArgumentsProvider.class) - void assertOnChangeWithMetaData(final String name, final String eventKey, final Type type, final Class toBePostedEventType) { - listener.onChange(new DataChangedEvent(eventKey, "value", type)); - if (null == toBePostedEventType) { - verify(eventBusContext, times(0)).post(any()); - } else { - verify(eventBusContext).post(any(toBePostedEventType)); - } + @Test + void assertOnChangeWithMetaDataChanged() { + listener.onChange(new DataChangedEvent("/metadata/foo_db/schemas/foo_schema", "value", Type.ADDED)); + verify(eventBusContext).post(any()); } - private static class TestCaseArgumentsProvider implements ArgumentsProvider { - - @Override - public final Stream provideArguments(final ExtensionContext extensionContext) { - return Stream.of( - Arguments.of("changeWithoutDatabase", "/metadata", Type.IGNORED, null), - Arguments.of("addSchema", "/metadata/foo_db/schemas/foo_schema", Type.ADDED, SchemaAddedEvent.class), - Arguments.of("updateSchema", "/metadata/foo_db/schemas/foo_schema", Type.UPDATED, SchemaAddedEvent.class), - Arguments.of("deleteSchema", "/metadata/foo_db/schemas/foo_schema", Type.DELETED, SchemaDeletedEvent.class), - Arguments.of("ignoreChangeSchema", "/metadata/foo_db/schemas/foo_schema", Type.IGNORED, null), - Arguments.of("addTable", "/metadata/foo_db/schemas/foo_schema/tables/foo_tbl/active_version/0", Type.ADDED, TableCreatedOrAlteredEvent.class), - Arguments.of("updateTable", "/metadata/foo_db/schemas/foo_schema/tables/foo_tbl/active_version/0", Type.UPDATED, TableCreatedOrAlteredEvent.class), - Arguments.of("deleteTable", "/metadata/foo_db/schemas/foo_schema/tables/foo_tbl", Type.DELETED, TableDroppedEvent.class), - Arguments.of("invalidAddTable", "/metadata/foo_db/schemas/foo_schema/tables/foo_tbl", Type.ADDED, null), - Arguments.of("invalidDeleteTable", "/metadata/foo_db/schemas/foo_schema/tables/foo_tbl/active_version/0", Type.DELETED, null), - Arguments.of("addView", "/metadata/foo_db/schemas/foo_schema/views/foo_view/active_version/0", Type.ADDED, ViewCreatedOrAlteredEvent.class), - Arguments.of("updateView", "/metadata/foo_db/schemas/foo_schema/views/foo_view/active_version/0", Type.UPDATED, ViewCreatedOrAlteredEvent.class), - Arguments.of("deleteView", "/metadata/foo_db/schemas/foo_schema/views/foo_view", Type.DELETED, ViewDroppedEvent.class), - Arguments.of("invalidAddView", "/metadata/foo_db/schemas/foo_schema/views/foo_view", Type.ADDED, null), - Arguments.of("invalidDeleteView", "/metadata/foo_db/schemas/foo_schema/views/foo_view/active_version/0", Type.DELETED, null), - Arguments.of("registerStorageUnit", "/metadata/foo_db/data_sources/units/foo_unit/active_version/0", Type.ADDED, StorageUnitRegisteredEvent.class), - Arguments.of("alterStorageUnit", "/metadata/foo_db/data_sources/units/foo_unit/active_version/0", Type.UPDATED, StorageUnitAlteredEvent.class), - Arguments.of("unregisterStorageUnit", "/metadata/foo_db/data_sources/units/foo_unit", Type.DELETED, StorageUnitUnregisteredEvent.class), - Arguments.of("invalidRegisterStorageNode", "/metadata/foo_db/data_sources/units/foo_unit", Type.ADDED, null), - Arguments.of("invalidUnregisterStorageNode", "/metadata/foo_db/data_sources/units/foo_unit/active_version/0", Type.DELETED, null), - Arguments.of("ignoreChangeStorageUnit", "/metadata/foo_db/data_sources/units/foo_unit", Type.IGNORED, null), - Arguments.of("registerStorageNode", "/metadata/foo_db/data_sources/nodes/foo_node/active_version/0", Type.ADDED, StorageNodeRegisteredEvent.class), - Arguments.of("alterStorageNode", "/metadata/foo_db/data_sources/nodes/foo_node/active_version/0", Type.UPDATED, StorageNodeAlteredEvent.class), - Arguments.of("unregisterStorageNode", "/metadata/foo_db/data_sources/nodes/foo_node", Type.DELETED, StorageNodeUnregisteredEvent.class), - Arguments.of("invalidRegisterStorageNode", "/metadata/foo_db/data_sources/nodes/foo_node", Type.ADDED, null), - Arguments.of("invalidUnregisterStorageNode", "/metadata/foo_db/data_sources/nodes/foo_node/active_version/0", Type.DELETED, null), - Arguments.of("ignoreChangeStorageNode", "/metadata/foo_db/data_sources/nodes/foo_node", Type.IGNORED, null), - Arguments.of("invalidChangeDataSource", "/metadata/foo_db/data_sources/other", Type.ADDED, null), - Arguments.of("changeRule", "/metadata/foo_db/schemas/foo_schema/rule/", Type.ADDED, AlterUniqueRuleItemEvent.class)); - } + @Test + void assertOnChangeWithRuleConfigurationChanged() { + listener.onChange(new DataChangedEvent("/metadata/foo_db/schemas/foo_schema/rule/", "value", Type.ADDED)); + verify(eventBusContext, times(0)).post(any()); } }