Skip to content

Commit

Permalink
Add CDCJobAPI (#29220)
Browse files Browse the repository at this point in the history
* Refactor MigrationJobOption

* Refactor MigrationJobOption

* Add CDCJobAPI

* Add CDCJobAPI
  • Loading branch information
terrymanu authored Nov 27, 2023
1 parent bb00c45 commit 0df0325
Show file tree
Hide file tree
Showing 8 changed files with 350 additions and 253 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ default YamlTransmissionJobItemProgressSwapper getYamlJobItemProgressSwapper() {
*/
PipelineJobInfo getJobInfo(String jobId);

/**
* Extend YAML job configuration.
*
* @param contextKey context key
* @param yamlJobConfig YAML job configuration
*/
void extendYamlJobConfiguration(PipelineContextKey contextKey, YamlPipelineJobConfiguration yamlJobConfig);

/**
* Build task configuration.
*
Expand All @@ -65,14 +73,6 @@ default YamlTransmissionJobItemProgressSwapper getYamlJobItemProgressSwapper() {
*/
TransmissionProcessContext buildProcessContext(PipelineJobConfiguration jobConfig);

/**
* Extend YAML job configuration.
*
* @param contextKey context key
* @param yamlJobConfig YAML job configuration
*/
void extendYamlJobConfiguration(PipelineContextKey contextKey, YamlPipelineJobConfiguration yamlJobConfig);

/**
* Build pipeline data consistency checker.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
package org.apache.shardingsphere.cdc.distsql.handler.update;

import org.apache.shardingsphere.cdc.distsql.statement.DropStreamingStatement;
import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobOption;
import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI;
import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;

import java.sql.SQLException;

Expand All @@ -28,11 +30,11 @@
*/
public final class DropStreamingUpdater implements RALUpdater<DropStreamingStatement> {

private final CDCJobOption jobAPI = new CDCJobOption();
private final CDCJobAPI jobAPI = (CDCJobAPI) TypedSPILoader.getService(TransmissionJobAPI.class, "STREAMING");

@Override
public void executeUpdate(final String databaseName, final DropStreamingStatement sqlStatement) throws SQLException {
jobAPI.dropStreaming(sqlStatement.getJobId());
jobAPI.drop(sqlStatement.getJobId());
}

@Override
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobOption;
import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
Expand All @@ -44,10 +45,12 @@
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.util.close.QuietlyCloser;

import java.util.Collection;
Expand All @@ -65,9 +68,11 @@ public final class CDCJob extends AbstractPipelineJob implements SimpleJob {
@Getter
private final PipelineSink sink;

private final CDCJobOption jobAPI = new CDCJobOption();
private final CDCJobOption jobOption = new CDCJobOption();

private final PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = new PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper());
private final CDCJobAPI jobAPI = (CDCJobAPI) TypedSPILoader.getService(TransmissionJobAPI.class, "STREAMING");

private final PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = new PipelineJobItemManager<>(jobOption.getYamlJobItemProgressSwapper());

private final CDCJobPreparer jobPreparer = new CDCJobPreparer();

Expand Down Expand Up @@ -109,8 +114,8 @@ public void execute(final ShardingContext shardingContext) {

private CDCJobItemContext buildPipelineJobItemContext(final CDCJobConfiguration jobConfig, final int shardingItem) {
Optional<TransmissionJobItemProgress> initProgress = jobItemManager.getProgress(jobConfig.getJobId(), shardingItem);
CDCProcessContext jobProcessContext = jobAPI.buildProcessContext(jobConfig);
CDCTaskConfiguration taskConfig = jobAPI.buildTaskConfiguration(jobConfig, shardingItem, jobProcessContext.getPipelineProcessConfig());
CDCProcessContext jobProcessContext = jobOption.buildProcessContext(jobConfig);
CDCTaskConfiguration taskConfig = jobOption.buildTaskConfiguration(jobConfig, shardingItem, jobProcessContext.getPipelineProcessConfig());
return new CDCJobItemContext(jobConfig, shardingItem, initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager, sink);
}

Expand All @@ -131,7 +136,7 @@ private void processFailed(final String jobId, final int shardingItem, final Exc
log.error("job execution failed, {}-{}", jobId, shardingItem, ex);
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId, shardingItem, ex);
PipelineJobCenter.stop(jobId);
jobAPI.updateJobConfigurationDisabled(jobId, true);
jobAPI.disable(jobId);
}

private void executeInventoryTasks(final List<CDCJobItemContext> jobItemContexts) {
Expand Down Expand Up @@ -212,7 +217,7 @@ public void onFailure(final Throwable throwable) {
cdcSink.getChannel().writeAndFlush(CDCResponseUtils.failed("", "", throwable.getMessage()));
}
PipelineJobCenter.stop(jobId);
jobAPI.updateJobConfigurationDisabled(jobId, true);
jobAPI.disable(jobId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobOption;
import org.apache.shardingsphere.data.pipeline.cdc.api.pojo.StreamDataParameter;
import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
Expand Down Expand Up @@ -48,12 +49,14 @@
import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI;
import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
import org.apache.shardingsphere.infra.database.opengauss.type.OpenGaussDatabaseType;
import org.apache.shardingsphere.infra.datanode.DataNode;
import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;

import java.util.Collection;
import java.util.HashSet;
Expand All @@ -70,9 +73,9 @@
@Slf4j
public final class CDCBackendHandler {

private final CDCJobOption jobAPI = new CDCJobOption();
private final CDCJobAPI jobAPI = (CDCJobAPI) TypedSPILoader.getService(TransmissionJobAPI.class, "STREAMING");

private final PipelineJobConfigurationManager jobConfigManager = new PipelineJobConfigurationManager(jobAPI);
private final PipelineJobConfigurationManager jobConfigManager = new PipelineJobConfigurationManager(new CDCJobOption());

/**
* Get database name by job ID.
Expand Down Expand Up @@ -115,7 +118,7 @@ public CDCResponse streamData(final String requestId, final StreamDataRequestBod
ShardingSpherePreconditions.checkState(!actualDataNodesMap.isEmpty(), () -> new PipelineInvalidParameterException(String.format("Not find table %s", tableNames)));
boolean decodeWithTx = database.getProtocolType() instanceof OpenGaussDatabaseType;
StreamDataParameter parameter = new StreamDataParameter(requestBody.getDatabase(), new LinkedList<>(schemaTableNames), requestBody.getFull(), actualDataNodesMap, decodeWithTx);
String jobId = jobAPI.createJob(parameter, CDCSinkType.SOCKET, new Properties());
String jobId = jobAPI.create(parameter, CDCSinkType.SOCKET, new Properties());
connectionContext.setJobId(jobId);
startStreaming(jobId, connectionContext, channel);
return CDCResponseUtils.succeed(requestId, ResponseCase.STREAM_DATA_RESULT, StreamDataResult.newBuilder().setStreamingId(jobId).build());
Expand All @@ -135,7 +138,7 @@ public void startStreaming(final String jobId, final CDCConnectionContext connec
PipelineJobCenter.stop(jobId);
}
ShardingSphereDatabase database = PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(cdcJobConfig.getDatabaseName());
jobAPI.startJob(jobId, new CDCSocketSink(channel, database, cdcJobConfig.getSchemaTableNames()));
jobAPI.start(jobId, new CDCSocketSink(channel, database, cdcJobConfig.getSchemaTableNames()));
connectionContext.setJobId(jobId);
}

Expand All @@ -157,7 +160,7 @@ public void stopStreaming(final String jobId, final ChannelId channelId) {
if (job.getSink().identifierMatched(channelId)) {
log.info("close CDC job, channel id: {}", channelId);
PipelineJobCenter.stop(jobId);
jobAPI.updateJobConfigurationDisabled(jobId, true);
jobAPI.disable(jobId);
}
}

Expand All @@ -167,7 +170,7 @@ public void stopStreaming(final String jobId, final ChannelId channelId) {
* @param jobId job ID
*/
public void dropStreaming(final String jobId) {
jobAPI.dropStreaming(jobId);
jobAPI.drop(jobId);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@

import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand All @@ -70,10 +69,31 @@
@Slf4j
public final class MigrationJobOption implements TransmissionJobOption {

@SuppressWarnings("unchecked")
@Override
public YamlMigrationJobConfigurationSwapper getYamlJobConfigurationSwapper() {
return new YamlMigrationJobConfigurationSwapper();
}

@Override
public Class<MigrationJob> getJobClass() {
return MigrationJob.class;
}

@Override
public Optional<String> getToBeStartDisabledNextJobType() {
return Optional.of("CONSISTENCY_CHECK");
}

@Override
public Optional<String> getToBeStoppedPreviousJobType() {
return Optional.of("CONSISTENCY_CHECK");
}

@Override
public PipelineJobInfo getJobInfo(final String jobId) {
PipelineJobMetaData jobMetaData = new PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
List<String> sourceTables = new LinkedList<>();
Collection<String> sourceTables = new LinkedList<>();
new PipelineJobConfigurationManager(this).<MigrationJobConfiguration>getJobConfiguration(jobId).getJobShardingDataNodes()
.forEach(each -> each.getEntries().forEach(entry -> entry.getDataNodes().forEach(dataNode -> sourceTables.add(DataNodeUtils.formatWithSchema(dataNode)))));
return new PipelineJobInfo(jobMetaData, null, String.join(",", sourceTables));
Expand All @@ -87,12 +107,6 @@ public void extendYamlJobConfiguration(final PipelineContextKey contextKey, fina
}
}

@SuppressWarnings("unchecked")
@Override
public YamlMigrationJobConfigurationSwapper getYamlJobConfigurationSwapper() {
return new YamlMigrationJobConfigurationSwapper();
}

@Override
public MigrationTaskConfiguration buildTaskConfiguration(final PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final PipelineProcessConfiguration processConfig) {
MigrationJobConfiguration jobConfig = (MigrationJobConfiguration) pipelineJobConfig;
Expand Down Expand Up @@ -149,21 +163,6 @@ public PipelineDataConsistencyChecker buildDataConsistencyChecker(final Pipeline
return new MigrationDataConsistencyChecker((MigrationJobConfiguration) jobConfig, processContext, progressContext);
}

@Override
public Optional<String> getToBeStartDisabledNextJobType() {
return Optional.of("CONSISTENCY_CHECK");
}

@Override
public Optional<String> getToBeStoppedPreviousJobType() {
return Optional.of("CONSISTENCY_CHECK");
}

@Override
public Class<MigrationJob> getJobClass() {
return MigrationJob.class;
}

@Override
public String getType() {
return "MIGRATION";
Expand Down

0 comments on commit 0df0325

Please sign in to comment.