Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add PipelineJobManager #29011

Merged
merged 1 commit into from
Nov 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI;

import java.util.List;
import java.util.Optional;

/**
Expand Down Expand Up @@ -97,11 +96,11 @@ public interface PipelineJobAPI extends TypedSPI {

/**
* Get pipeline job info.
*
* @param contextKey context key
* @return job info list
*
* @param jobId job ID
* @return pipeline job info
*/
List<PipelineJobInfo> list(PipelineContextKey contextKey);
PipelineJobInfo getJobInfo(String jobId);

/**
* Persist job item progress.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.core.job.service;

import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Pipeline job manager.
*/
@RequiredArgsConstructor
public final class PipelineJobManager {

private final PipelineJobAPI pipelineJobAPI;

/**
* Get pipeline jobs info.
*
* @param contextKey context key
* @return jobs info
*/
public List<PipelineJobInfo> getPipelineJobInfos(final PipelineContextKey contextKey) {
return getJobBriefInfos(contextKey, pipelineJobAPI.getType()).map(each -> pipelineJobAPI.getJobInfo(each.getJobName())).collect(Collectors.toList());
}

private Stream<JobBriefInfo> getJobBriefInfos(final PipelineContextKey contextKey, final String jobType) {
return PipelineAPIFactory.getJobStatisticsAPI(contextKey).getAllJobsBriefInfo().stream().filter(each -> !each.getJobName().startsWith("_"))
.filter(each -> jobType.equals(PipelineJobIdUtils.parseJobType(each.getJobName()).getType()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,6 @@ public PipelineProcessConfiguration showProcessConfiguration(final PipelineConte
return PipelineProcessConfigurationUtils.convertWithDefaultValue(processConfigPersistService.load(contextKey, getType()));
}

@Override
protected abstract TableBasedPipelineJobInfo getJobInfo(String jobId);

@Override
public Map<Integer, InventoryIncrementalJobItemProgress> getJobProgress(final PipelineJobConfiguration jobConfig) {
String jobId = jobConfig.getJobId();
Expand All @@ -111,7 +108,7 @@ public List<InventoryIncrementalJobItemInfo> getJobItemInfos(final String jobId)
List<InventoryIncrementalJobItemInfo> result = new LinkedList<>();
for (Entry<Integer, InventoryIncrementalJobItemProgress> entry : jobProgress.entrySet()) {
int shardingItem = entry.getKey();
TableBasedPipelineJobInfo jobInfo = getJobInfo(jobId);
TableBasedPipelineJobInfo jobInfo = (TableBasedPipelineJobInfo) getJobInfo(jobId);
InventoryIncrementalJobItemProgress jobItemProgress = entry.getValue();
String errorMessage = getJobItemErrorMessage(jobId, shardingItem);
if (null == jobItemProgress) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.listener.PipelineElasticJobListener;
import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
import org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.common.util.PipelineDistributedBarrier;
Expand All @@ -35,18 +34,14 @@
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;
import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Abstract pipeline job API impl.
Expand All @@ -56,19 +51,6 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {

protected static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

@Override
public List<PipelineJobInfo> list(final PipelineContextKey contextKey) {
return getJobBriefInfos(contextKey).map(each -> getJobInfo(each.getJobName())).collect(Collectors.toList());
}

private Stream<JobBriefInfo> getJobBriefInfos(final PipelineContextKey contextKey) {
return PipelineAPIFactory.getJobStatisticsAPI(contextKey).getAllJobsBriefInfo().stream().filter(each -> !each.getJobName().startsWith("_"))
.filter(each -> PipelineJobIdUtils.parseJobType(each.getJobName()).getType().equals(getType()));
}

// TODO Add getJobInfo
protected abstract PipelineJobInfo getJobInfo(String jobId);

protected PipelineJobMetaData buildPipelineJobMetaData(final JobConfigurationPOJO jobConfigPOJO) {
return new PipelineJobMetaData(jobConfigPOJO.getJobName(), !jobConfigPOJO.isDisabled(),
jobConfigPOJO.getShardingTotalCount(), jobConfigPOJO.getProps().getProperty("create_time"), jobConfigPOJO.getProps().getProperty("stop_time"), jobConfigPOJO.getJobParameter());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
Expand All @@ -35,11 +36,11 @@
*/
public final class ShowStreamingListExecutor implements QueryableRALExecutor<ShowStreamingListStatement> {

private final CDCJobAPI jobAPI = new CDCJobAPI();
private final PipelineJobManager pipelineJobManager = new PipelineJobManager(new CDCJobAPI());

@Override
public Collection<LocalDataQueryResultRow> getRows(final ShowStreamingListStatement sqlStatement) {
return jobAPI.list(new PipelineContextKey(InstanceType.PROXY)).stream().map(each -> new LocalDataQueryResultRow(each.getJobMetaData().getJobId(),
return pipelineJobManager.getPipelineJobInfos(new PipelineContextKey(InstanceType.PROXY)).stream().map(each -> new LocalDataQueryResultRow(each.getJobMetaData().getJobId(),
((TableBasedPipelineJobInfo) each).getDatabaseName(), ((TableBasedPipelineJobInfo) each).getTable(),
each.getJobMetaData().getJobItemCount(), each.getJobMetaData().isActive() ? Boolean.TRUE.toString() : Boolean.FALSE.toString(),
each.getJobMetaData().getCreateTime(), Optional.ofNullable(each.getJobMetaData().getStopTime()).orElse(""))).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
Expand All @@ -34,11 +35,11 @@
*/
public final class ShowMigrationListExecutor implements QueryableRALExecutor<ShowMigrationListStatement> {

private final MigrationJobAPI jobAPI = new MigrationJobAPI();
private final PipelineJobManager pipelineJobManager = new PipelineJobManager(new MigrationJobAPI());

@Override
public Collection<LocalDataQueryResultRow> getRows(final ShowMigrationListStatement sqlStatement) {
return jobAPI.list(new PipelineContextKey(InstanceType.PROXY)).stream().map(each -> new LocalDataQueryResultRow(each.getJobMetaData().getJobId(),
return pipelineJobManager.getPipelineJobInfos(new PipelineContextKey(InstanceType.PROXY)).stream().map(each -> new LocalDataQueryResultRow(each.getJobMetaData().getJobId(),
((TableBasedPipelineJobInfo) each).getTable(), each.getJobMetaData().getJobItemCount(),
each.getJobMetaData().isActive() ? Boolean.TRUE.toString() : Boolean.FALSE.toString(),
each.getJobMetaData().getCreateTime(), each.getJobMetaData().getStopTime())).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ protected YamlPipelineJobConfiguration swapToYamlJobConfiguration(final Pipeline
}

@Override
protected TableBasedPipelineJobInfo getJobInfo(final String jobId) {
public TableBasedPipelineJobInfo getJobInfo(final String jobId) {
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
PipelineJobMetaData jobMetaData = buildPipelineJobMetaData(jobConfigPOJO);
CDCJobConfiguration jobConfig = getJobConfiguration(jobConfigPOJO);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ public PipelineProcessContext buildPipelineProcessContext(final PipelineJobConfi
}

@Override
protected PipelineJobInfo getJobInfo(final String jobId) {
public PipelineJobInfo getJobInfo(final String jobId) {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ private Map<String, String> buildTargetTableSchemaMap(final Map<String, List<Dat
}

@Override
protected TableBasedPipelineJobInfo getJobInfo(final String jobId) {
public TableBasedPipelineJobInfo getJobInfo(final String jobId) {
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
PipelineJobMetaData jobMetaData = buildPipelineJobMetaData(jobConfigPOJO);
List<String> sourceTables = new LinkedList<>();
Expand Down