Skip to content

Commit

Permalink
Add alter/show streaming rule SQL parse IT
Browse files Browse the repository at this point in the history
  • Loading branch information
azexcy committed Nov 16, 2023
1 parent 57c84ef commit 477bc00
Show file tree
Hide file tree
Showing 15 changed files with 434 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.cdc.distsql.handler.query;

import org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingRuleStatement;
import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.util.json.JsonUtils;

import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedList;

/**
* Show streaming rule executor.
*/
public final class ShowStreamingRuleExecutor implements QueryableRALExecutor<ShowStreamingRuleStatement> {

@Override
public Collection<LocalDataQueryResultRow> getRows(final ShowStreamingRuleStatement sqlStatement) {
PipelineProcessConfiguration processConfig = ((InventoryIncrementalJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, "STREAMING"))
.showProcessConfiguration(new PipelineContextKey(InstanceType.PROXY));
Collection<LocalDataQueryResultRow> result = new LinkedList<>();
result.add(new LocalDataQueryResultRow(getString(processConfig.getRead()), getString(processConfig.getWrite()), getString(processConfig.getStreamChannel())));
return result;
}

private String getString(final Object obj) {
return null == obj ? "" : JsonUtils.toJsonString(obj);
}

@Override
public Collection<String> getColumnNames() {
return Arrays.asList("read", "write", "stream_channel");
}

@Override
public Class<ShowStreamingRuleStatement> getType() {
return ShowStreamingRuleStatement.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ org.apache.shardingsphere.migration.distsql.handler.query.ShowMigrationSourceSto
org.apache.shardingsphere.migration.distsql.handler.query.ShowMigrationCheckAlgorithmsExecutor
org.apache.shardingsphere.cdc.distsql.handler.query.ShowStreamingListExecutor
org.apache.shardingsphere.cdc.distsql.handler.query.ShowStreamingJobStatusExecutor
org.apache.shardingsphere.cdc.distsql.handler.query.ShowStreamingRuleExecutor
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.cdc.distsql.statement;

import org.apache.shardingsphere.distsql.statement.ral.pipeline.cdc.QueryableCDCRALStatement;

/**
* Show streaming rule statement.
*/
public final class ShowStreamingRuleStatement extends QueryableCDCRALStatement {
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ class CDCE2EIT {
void assertCDCDataImportSuccess(final PipelineTestParameter testParam) throws SQLException, InterruptedException {
TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
try (PipelineContainerComposer containerComposer = new PipelineContainerComposer(testParam, new CDCJobType())) {
containerComposer.proxyExecuteWithLog("ALTER STREAMING RULE (READ(WORKER_THREAD=64,BATCH_SIZE=1000,SHARDING_SIZE=10000000,RATE_LIMITER (TYPE(NAME='QPS',PROPERTIES('qps'='10000')))),"
+ "STREAM_CHANNEL (TYPE(NAME='MEMORY',PROPERTIES('block-queue-size'='2000'))));", 0);
containerComposer.proxyExecuteWithLog("ALTER STREAMING RULE (READ(WORKER_THREAD=20,BATCH_SIZE=1000,SHARDING_SIZE=10000000,RATE_LIMITER (TYPE(NAME='QPS',PROPERTIES('qps'='10000')))),"
+ "WRITE(WORKER_THREAD=20, BATCH_SIZE=1000, RATE_LIMITER (TYPE(NAME='TPS',PROPERTIES('tps'='10000')))));", 0);
for (String each : Arrays.asList(PipelineContainerComposer.DS_0, PipelineContainerComposer.DS_1)) {
containerComposer.registerStorageUnit(each);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.distsql.segment.AlgorithmSegment;
import org.apache.shardingsphere.distsql.segment.ReadOrWriteSegment;
import org.apache.shardingsphere.distsql.statement.ral.updatable.AlterInventoryIncrementalRuleStatement;
import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.SQLCaseAssertContext;
import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.domain.statement.ral.AlterInventoryIncrementalRuleStatementTestCase;
import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.segment.impl.distsql.ExpectedAlgorithm;
import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.segment.impl.distsql.ral.ExpectedRead;
import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.segment.impl.distsql.ral.ExpectedWrite;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;

/**
* Alter inventory incremental rule statement assert.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class AlterInventoryIncrementalRuleStatementAssert {

/**
* Assert statement is correct with expected parser result.
*
* @param assertContext assert context
* @param actual actual statement
* @param expected expected statement test case
*/
public static void assertIs(final SQLCaseAssertContext assertContext, final AlterInventoryIncrementalRuleStatement actual, final AlterInventoryIncrementalRuleStatementTestCase expected) {
if (null == expected) {
assertNull(actual, assertContext.getText("Actual statement should not exist."));
} else {
assertThat(actual.getJobTypeName(), is(expected.getJobTypeName()));
assertRead(assertContext, actual.getProcessConfigSegment().getReadSegment(), expected.getRule().getRead());
assertWrite(assertContext, actual.getProcessConfigSegment().getWriteSegment(), expected.getRule().getWrite());
assertTypeStrategy(assertContext, actual.getProcessConfigSegment().getStreamChannel(), expected.getRule().getStreamChannel());
}
}

private static void assertRead(final SQLCaseAssertContext assertContext, final ReadOrWriteSegment actual, final ExpectedRead expected) {
if (null == expected) {
assertNull(actual, assertContext.getText("Actual read or write should not exist."));
return;
}
assertThat(actual.getWorkerThread(), is(expected.getWorkerThread()));
assertThat(actual.getBatchSize(), is(expected.getBatchSize()));
assertThat(actual.getShardingSize(), is(expected.getShardingSize()));
assertThat(actual.getRateLimiter().getName(), is(expected.getRateLimiter().getName()));
}

private static void assertWrite(final SQLCaseAssertContext assertContext, final ReadOrWriteSegment actual, final ExpectedWrite expected) {
if (null == expected) {
assertNull(actual, assertContext.getText("Actual read or write should not exist."));
return;
}
assertThat(actual.getWorkerThread(), is(expected.getWorkerThread()));
assertThat(actual.getBatchSize(), is(expected.getBatchSize()));
assertThat(actual.getRateLimiter().getName(), is(expected.getRateLimiter().getName()));
}

private static void assertTypeStrategy(final SQLCaseAssertContext assertContext, final AlgorithmSegment actual, final ExpectedAlgorithm expected) {
if (null == expected) {
assertNull(actual, assertContext.getText("Actual strategy should not exist."));
} else {
assertNotNull(actual, assertContext.getText("Actual strategy should exist."));
assertThat(assertContext.getText("Type assertion error"), actual.getName(), is(expected.getName()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingListStatement;
import org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingRuleStatement;
import org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingStatusStatement;
import org.apache.shardingsphere.distsql.statement.ral.pipeline.QueryablePipelineRALStatement;
import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationCheckAlgorithmsStatement;
Expand All @@ -28,10 +29,10 @@
import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationSourceStorageUnitsStatement;
import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationStatusStatement;
import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.SQLCaseAssertContext;
import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ExistingAssert;
import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.cdc.ShowStreamingStatusStatementAssert;
import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.migration.query.ShowMigrationCheckStatusStatementAssert;
import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.migration.query.ShowMigrationStatusStatementAssert;
import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ExistingAssert;
import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.SQLParserTestCase;
import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.ShowStreamingStatusStatementTestCase;
import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.migration.ShowMigrationCheckStatusStatementTestCase;
Expand Down Expand Up @@ -66,6 +67,8 @@ public static void assertIs(final SQLCaseAssertContext assertContext, final Quer
ExistingAssert.assertIs(assertContext, actual, expected);
} else if (actual instanceof ShowStreamingStatusStatement) {
ShowStreamingStatusStatementAssert.assertIs(assertContext, (ShowStreamingStatusStatement) actual, (ShowStreamingStatusStatementTestCase) expected);
} else if (actual instanceof ShowStreamingRuleStatement) {
ExistingAssert.assertIs(assertContext, actual, expected);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.cdc.distsql.statement.DropStreamingStatement;
import org.apache.shardingsphere.distsql.statement.ral.pipeline.UpdatablePipelineRALStatement;
import org.apache.shardingsphere.distsql.statement.ral.updatable.AlterInventoryIncrementalRuleStatement;
import org.apache.shardingsphere.migration.distsql.statement.CheckMigrationStatement;
import org.apache.shardingsphere.migration.distsql.statement.CommitMigrationStatement;
import org.apache.shardingsphere.migration.distsql.statement.MigrateTableStatement;
Expand All @@ -43,6 +44,7 @@
import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.migration.update.StopMigrationCheckStatementAssert;
import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.migration.update.StopMigrationStatementAssert;
import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.migration.update.UnregisterMigrationSourceStorageUnitStatementAssert;
import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.domain.statement.ral.AlterInventoryIncrementalRuleStatementTestCase;
import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.SQLParserTestCase;
import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.DropStreamingStatementTestCase;
import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.migration.CheckMigrationStatementTestCase;
Expand Down Expand Up @@ -93,6 +95,8 @@ public static void assertIs(final SQLCaseAssertContext assertContext, final Upda
StartMigrationCheckStatementAssert.assertIs(assertContext, (StartMigrationCheckStatement) actual, (StartMigrationCheckStatementTestCase) expected);
} else if (actual instanceof StopMigrationCheckStatement) {
StopMigrationCheckStatementAssert.assertIs(assertContext, (StopMigrationCheckStatement) actual, (StopMigrationCheckStatementTestCase) expected);
} else if (actual instanceof AlterInventoryIncrementalRuleStatement) {
AlterInventoryIncrementalRuleStatementAssert.assertIs(assertContext, (AlterInventoryIncrementalRuleStatement) actual, (AlterInventoryIncrementalRuleStatementTestCase) expected);
} else if (actual instanceof DropStreamingStatement) {
DropStreamingStatementAssert.assertIs(assertContext, (DropStreamingStatement) actual, (DropStreamingStatementTestCase) expected);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.domain.statement.ral;

import lombok.Getter;
import lombok.Setter;
import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.SQLParserTestCase;
import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.segment.impl.distsql.ral.ExpectedInventoryIncrementalRule;

import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;

/**
* Alter inventory incremental rule statement test case.
*/
@Getter
@Setter
@XmlAccessorType(XmlAccessType.FIELD)
public final class AlterInventoryIncrementalRuleStatementTestCase extends SQLParserTestCase {

@XmlElement(name = "job-type-name")
private String jobTypeName;

@XmlElement(name = "rule")
private ExpectedInventoryIncrementalRule rule;
}
Loading

0 comments on commit 477bc00

Please sign in to comment.