Skip to content

Commit

Permalink
Move PipelineJobAPI.marshalJobId() to PipelineJobId.marshal() (#29011)
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu authored Nov 12, 2023
1 parent c3bea55 commit f8a420f
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI;

import java.util.List;
import java.util.Optional;

/**
Expand Down Expand Up @@ -97,11 +96,11 @@ public interface PipelineJobAPI extends TypedSPI {

/**
* Get pipeline job info.
*
* @param contextKey context key
* @return job info list
*
* @param jobId job ID
* @return pipeline job info
*/
List<PipelineJobInfo> list(PipelineContextKey contextKey);
PipelineJobInfo getJobInfo(String jobId);

/**
* Persist job item progress.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.core.job.service;

import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Pipeline job manager.
*/
@RequiredArgsConstructor
public final class PipelineJobManager {

private final PipelineJobAPI pipelineJobAPI;

/**
* Get pipeline jobs info.
*
* @param contextKey context key
* @return jobs info
*/
public List<PipelineJobInfo> getPipelineJobInfos(final PipelineContextKey contextKey) {
return getJobBriefInfos(contextKey, pipelineJobAPI.getType()).map(each -> pipelineJobAPI.getJobInfo(each.getJobName())).collect(Collectors.toList());
}

private Stream<JobBriefInfo> getJobBriefInfos(final PipelineContextKey contextKey, final String jobType) {
return PipelineAPIFactory.getJobStatisticsAPI(contextKey).getAllJobsBriefInfo().stream().filter(each -> !each.getJobName().startsWith("_"))
.filter(each -> jobType.equals(PipelineJobIdUtils.parseJobType(each.getJobName()).getType()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,6 @@ public PipelineProcessConfiguration showProcessConfiguration(final PipelineConte
return PipelineProcessConfigurationUtils.convertWithDefaultValue(processConfigPersistService.load(contextKey, getType()));
}

@Override
protected abstract TableBasedPipelineJobInfo getJobInfo(String jobId);

@Override
public Map<Integer, InventoryIncrementalJobItemProgress> getJobProgress(final PipelineJobConfiguration jobConfig) {
String jobId = jobConfig.getJobId();
Expand All @@ -111,7 +108,7 @@ public List<InventoryIncrementalJobItemInfo> getJobItemInfos(final String jobId)
List<InventoryIncrementalJobItemInfo> result = new LinkedList<>();
for (Entry<Integer, InventoryIncrementalJobItemProgress> entry : jobProgress.entrySet()) {
int shardingItem = entry.getKey();
TableBasedPipelineJobInfo jobInfo = getJobInfo(jobId);
TableBasedPipelineJobInfo jobInfo = (TableBasedPipelineJobInfo) getJobInfo(jobId);
InventoryIncrementalJobItemProgress jobItemProgress = entry.getValue();
String errorMessage = getJobItemErrorMessage(jobId, shardingItem);
if (null == jobItemProgress) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.listener.PipelineElasticJobListener;
import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
import org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.common.util.PipelineDistributedBarrier;
Expand All @@ -35,18 +34,14 @@
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;
import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Abstract pipeline job API impl.
Expand All @@ -56,19 +51,6 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {

protected static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

@Override
public List<PipelineJobInfo> list(final PipelineContextKey contextKey) {
return getJobBriefInfos(contextKey).map(each -> getJobInfo(each.getJobName())).collect(Collectors.toList());
}

private Stream<JobBriefInfo> getJobBriefInfos(final PipelineContextKey contextKey) {
return PipelineAPIFactory.getJobStatisticsAPI(contextKey).getAllJobsBriefInfo().stream().filter(each -> !each.getJobName().startsWith("_"))
.filter(each -> PipelineJobIdUtils.parseJobType(each.getJobName()).getType().equals(getType()));
}

// TODO Add getJobInfo
protected abstract PipelineJobInfo getJobInfo(String jobId);

protected PipelineJobMetaData buildPipelineJobMetaData(final JobConfigurationPOJO jobConfigPOJO) {
return new PipelineJobMetaData(jobConfigPOJO.getJobName(), !jobConfigPOJO.isDisabled(),
jobConfigPOJO.getShardingTotalCount(), jobConfigPOJO.getProps().getProperty("create_time"), jobConfigPOJO.getProps().getProperty("stop_time"), jobConfigPOJO.getJobParameter());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
Expand All @@ -35,11 +36,11 @@
*/
public final class ShowStreamingListExecutor implements QueryableRALExecutor<ShowStreamingListStatement> {

private final CDCJobAPI jobAPI = new CDCJobAPI();
private final PipelineJobManager pipelineJobManager = new PipelineJobManager(new CDCJobAPI());

@Override
public Collection<LocalDataQueryResultRow> getRows(final ShowStreamingListStatement sqlStatement) {
return jobAPI.list(new PipelineContextKey(InstanceType.PROXY)).stream().map(each -> new LocalDataQueryResultRow(each.getJobMetaData().getJobId(),
return pipelineJobManager.getPipelineJobInfos(new PipelineContextKey(InstanceType.PROXY)).stream().map(each -> new LocalDataQueryResultRow(each.getJobMetaData().getJobId(),
((TableBasedPipelineJobInfo) each).getDatabaseName(), ((TableBasedPipelineJobInfo) each).getTable(),
each.getJobMetaData().getJobItemCount(), each.getJobMetaData().isActive() ? Boolean.TRUE.toString() : Boolean.FALSE.toString(),
each.getJobMetaData().getCreateTime(), Optional.ofNullable(each.getJobMetaData().getStopTime()).orElse(""))).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
Expand All @@ -34,11 +35,11 @@
*/
public final class ShowMigrationListExecutor implements QueryableRALExecutor<ShowMigrationListStatement> {

private final MigrationJobAPI jobAPI = new MigrationJobAPI();
private final PipelineJobManager pipelineJobManager = new PipelineJobManager(new MigrationJobAPI());

@Override
public Collection<LocalDataQueryResultRow> getRows(final ShowMigrationListStatement sqlStatement) {
return jobAPI.list(new PipelineContextKey(InstanceType.PROXY)).stream().map(each -> new LocalDataQueryResultRow(each.getJobMetaData().getJobId(),
return pipelineJobManager.getPipelineJobInfos(new PipelineContextKey(InstanceType.PROXY)).stream().map(each -> new LocalDataQueryResultRow(each.getJobMetaData().getJobId(),
((TableBasedPipelineJobInfo) each).getTable(), each.getJobMetaData().getJobItemCount(),
each.getJobMetaData().isActive() ? Boolean.TRUE.toString() : Boolean.FALSE.toString(),
each.getJobMetaData().getCreateTime(), each.getJobMetaData().getStopTime())).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ protected YamlPipelineJobConfiguration swapToYamlJobConfiguration(final Pipeline
}

@Override
protected TableBasedPipelineJobInfo getJobInfo(final String jobId) {
public TableBasedPipelineJobInfo getJobInfo(final String jobId) {
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
PipelineJobMetaData jobMetaData = buildPipelineJobMetaData(jobConfigPOJO);
CDCJobConfiguration jobConfig = getJobConfiguration(jobConfigPOJO);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ public PipelineProcessContext buildPipelineProcessContext(final PipelineJobConfi
}

@Override
protected PipelineJobInfo getJobInfo(final String jobId) {
public PipelineJobInfo getJobInfo(final String jobId) {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ private Map<String, String> buildTargetTableSchemaMap(final Map<String, List<Dat
}

@Override
protected TableBasedPipelineJobInfo getJobInfo(final String jobId) {
public TableBasedPipelineJobInfo getJobInfo(final String jobId) {
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
PipelineJobMetaData jobMetaData = buildPipelineJobMetaData(jobConfigPOJO);
List<String> sourceTables = new LinkedList<>();
Expand Down

0 comments on commit f8a420f

Please sign in to comment.