Skip to content

Commit

Permalink
Add PipelineJobExecutor (apache#32762)
Browse files Browse the repository at this point in the history
* Add PipelineJobExecutor

* Add PipelineJobExecutor

* Add PipelineJobExecutor

* Add PipelineJobExecutor
  • Loading branch information
terrymanu authored Sep 1, 2024
1 parent 5be9973 commit d801647
Show file tree
Hide file tree
Showing 7 changed files with 277 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.core.job;
package org.apache.shardingsphere.data.pipeline.core.job.executor;

import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
Expand All @@ -41,25 +42,25 @@
import java.sql.SQLException;

/**
* Abstract separable pipeline job.
*
* @param <T> type of pipeline job configuration
* @param <I> type of pipeline job item context
* @param <P> type of pipeline job item progress
* Distributed pipeline job executor.
*/
@Getter
@RequiredArgsConstructor
@Slf4j
public abstract class AbstractSeparablePipelineJob<T extends PipelineJobConfiguration, I extends PipelineJobItemContext, P extends PipelineJobItemProgress> implements PipelineJob {
public final class DistributedPipelineJobExecutor {

private final PipelineJobRunnerManager jobRunnerManager;
@SuppressWarnings("rawtypes")
private final DistributedPipelineJobExecutorCallback callback;

protected AbstractSeparablePipelineJob() {
jobRunnerManager = new PipelineJobRunnerManager();
}
@Getter
private final PipelineJobRunnerManager jobRunnerManager = new PipelineJobRunnerManager();

/**
* Execute job.
*
* @param shardingContext sharding context
*/
@SuppressWarnings("unchecked")
@Override
public final void execute(final ShardingContext shardingContext) {
public void execute(final ShardingContext shardingContext) {
String jobId = shardingContext.getJobName();
int shardingItem = shardingContext.getShardingItem();
log.info("Execute job {}-{}.", jobId, shardingItem);
Expand All @@ -69,14 +70,14 @@ public final void execute(final ShardingContext shardingContext) {
}
PipelineJobType jobType = PipelineJobIdUtils.parseJobType(jobId);
PipelineContextKey contextKey = PipelineJobIdUtils.parseContextKey(jobId);
T jobConfig = (T) jobType.getYamlJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
PipelineJobItemManager<P> jobItemManager = new PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
P jobItemProgress = jobItemManager.getProgress(shardingContext.getJobName(), shardingItem).orElse(null);
PipelineJobConfiguration jobConfig = jobType.getYamlJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
PipelineJobItemManager<PipelineJobItemProgress> jobItemManager = new PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
PipelineJobItemProgress jobItemProgress = jobItemManager.getProgress(shardingContext.getJobName(), shardingItem).orElse(null);
TransmissionProcessContext jobProcessContext = createTransmissionProcessContext(jobId, jobType, contextKey);
PipelineGovernanceFacade governanceFacade = PipelineAPIFactory.getPipelineGovernanceFacade(contextKey);
boolean started = false;
try {
started = execute(buildJobItemContext(jobConfig, shardingItem, jobItemProgress, jobProcessContext), governanceFacade);
started = execute(callback.buildJobItemContext(jobConfig, shardingItem, jobItemProgress, jobProcessContext, jobRunnerManager.getDataSourceManager()), governanceFacade);
if (started) {
PipelineJobProgressPersistService.persistNow(jobId, shardingItem);
}
Expand All @@ -95,9 +96,10 @@ public final void execute(final ShardingContext shardingContext) {
}
}

private boolean execute(final I jobItemContext, final PipelineGovernanceFacade governanceFacade) {
@SuppressWarnings("unchecked")
private boolean execute(final PipelineJobItemContext jobItemContext, final PipelineGovernanceFacade governanceFacade) {
int shardingItem = jobItemContext.getShardingItem();
PipelineTasksRunner tasksRunner = buildTasksRunner(jobItemContext);
PipelineTasksRunner tasksRunner = callback.buildTasksRunner(jobItemContext);
if (!jobRunnerManager.addTasksRunner(shardingItem, tasksRunner)) {
return false;
}
Expand All @@ -117,19 +119,14 @@ private TransmissionProcessContext createTransmissionProcessContext(final String
return new TransmissionProcessContext(jobId, processConfig);
}

protected abstract I buildJobItemContext(T jobConfig, int shardingItem, P jobItemProgress, TransmissionProcessContext jobProcessContext);

protected abstract PipelineTasksRunner buildTasksRunner(I jobItemContext);

protected final void prepare(final I jobItemContext) {
@SuppressWarnings("unchecked")
private void prepare(final PipelineJobItemContext jobItemContext) {
try {
doPrepare(jobItemContext);
callback.prepare(jobItemContext);
// CHECKSTYLE:OFF
} catch (final SQLException ex) {
// CHECKSTYLE:ON
throw new PipelineInternalException(ex);
}
}

protected abstract void doPrepare(I jobItemContext) throws SQLException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.core.job.executor;

import org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobItemProgress;
import org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;

import java.sql.SQLException;

/**
* Distributed pipeline job executor callback.
*
* @param <T> type of pipeline job configuration
* @param <I> type of pipeline job item context
* @param <P> type of pipeline job item progress
*/
public interface DistributedPipelineJobExecutorCallback<T extends PipelineJobConfiguration, I extends PipelineJobItemContext, P extends PipelineJobItemProgress> {

/**
* Build job item context.
*
* @param jobConfig job configuration
* @param shardingItem sharding item
* @param jobItemProgress job item progress
* @param jobProcessContext job process context
* @param dataSourceManager pipeline data source manager
* @return built job item context
*/
I buildJobItemContext(T jobConfig, int shardingItem, P jobItemProgress, TransmissionProcessContext jobProcessContext, PipelineDataSourceManager dataSourceManager);

/**
* Build tasks runner.
*
* @param jobItemContext job item context
* @return built tasks runner
*/
PipelineTasksRunner buildTasksRunner(I jobItemContext);

/**
* Prepare.
*
* @param jobItemContext job item context
* @throws SQLException SQL exception
*/
void prepare(I jobItemContext) throws SQLException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,29 @@

package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;

import org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
import org.apache.shardingsphere.data.pipeline.core.job.AbstractSeparablePipelineJob;
import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.core.job.progress.ConsistencyCheckJobItemProgress;
import org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.context.ConsistencyCheckJobItemContext;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.task.ConsistencyCheckTasksRunner;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJob;
import org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerManager;
import org.apache.shardingsphere.data.pipeline.core.job.executor.DistributedPipelineJobExecutor;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;

/**
* Consistency check job.
*/
public final class ConsistencyCheckJob extends AbstractSeparablePipelineJob<ConsistencyCheckJobConfiguration, ConsistencyCheckJobItemContext, ConsistencyCheckJobItemProgress> {
public final class ConsistencyCheckJob implements PipelineJob {

@Override
public ConsistencyCheckJobItemContext buildJobItemContext(final ConsistencyCheckJobConfiguration jobConfig,
final int shardingItem, final ConsistencyCheckJobItemProgress jobItemProgress, final TransmissionProcessContext jobProcessContext) {
return new ConsistencyCheckJobItemContext(jobConfig, shardingItem, JobStatus.RUNNING, jobItemProgress);
private final DistributedPipelineJobExecutor executor;

public ConsistencyCheckJob() {
executor = new DistributedPipelineJobExecutor(new ConsistencyCheckJobExecutorCallback());
}

@Override
protected PipelineTasksRunner buildTasksRunner(final ConsistencyCheckJobItemContext jobItemContext) {
return new ConsistencyCheckTasksRunner(jobItemContext);
public PipelineJobRunnerManager getJobRunnerManager() {
return executor.getJobRunnerManager();
}

@Override
protected void doPrepare(final ConsistencyCheckJobItemContext jobItemContext) {
public void execute(final ShardingContext shardingContext) {
executor.execute(shardingContext);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;

import org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.core.job.executor.DistributedPipelineJobExecutorCallback;
import org.apache.shardingsphere.data.pipeline.core.job.progress.ConsistencyCheckJobItemProgress;
import org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.context.ConsistencyCheckJobItemContext;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.task.ConsistencyCheckTasksRunner;

/**
* Consistency check job executor callback.
*/
public final class ConsistencyCheckJobExecutorCallback
implements
DistributedPipelineJobExecutorCallback<ConsistencyCheckJobConfiguration, ConsistencyCheckJobItemContext, ConsistencyCheckJobItemProgress> {

@Override
public ConsistencyCheckJobItemContext buildJobItemContext(final ConsistencyCheckJobConfiguration jobConfig, final int shardingItem,
final ConsistencyCheckJobItemProgress jobItemProgress, final TransmissionProcessContext jobProcessContext,
final PipelineDataSourceManager dataSourceManager) {
return new ConsistencyCheckJobItemContext(jobConfig, shardingItem, JobStatus.RUNNING, jobItemProgress);
}

@Override
public PipelineTasksRunner buildTasksRunner(final ConsistencyCheckJobItemContext jobItemContext) {
return new ConsistencyCheckTasksRunner(jobItemContext);
}

@Override
public void prepare(final ConsistencyCheckJobItemContext jobItemContext) {
}
}
Loading

0 comments on commit d801647

Please sign in to comment.