Skip to content

Commit

Permalink
Refactor PipelineGovernanceFacade (#29142)
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu authored Nov 23, 2023
1 parent 095a9d4 commit 56ebfb1
Show file tree
Hide file tree
Showing 27 changed files with 198 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import lombok.AccessLevel;
import lombok.Getter;
import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineNodePath;
import org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.item.PipelineJobItemFacade;
import org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.job.PipelineJobFacade;
import org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.metadata.PipelineMetaDataFacade;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;

Expand All @@ -32,32 +35,17 @@ public final class PipelineGovernanceFacade {
@Getter(AccessLevel.NONE)
private final ClusterPersistRepository repository;

private final PipelineJobConfigurationGovernanceRepository jobConfigurationGovernanceRepository;
private final PipelineJobFacade jobFacade;

private final PipelineJobOffsetGovernanceRepository jobOffsetGovernanceRepository;
private final PipelineJobItemFacade jobItemFacade;

private final PipelineJobItemProcessGovernanceRepository jobItemProcessGovernanceRepository;

private final PipelineJobItemErrorMessageGovernanceRepository jobItemErrorMessageGovernanceRepository;

private final PipelineJobCheckGovernanceRepository jobCheckGovernanceRepository;

private final PipelineJobGovernanceRepository jobGovernanceRepository;

private final PipelineMetaDataDataSourceGovernanceRepository metaDataDataSourceGovernanceRepository;

private final PipelineMetaDataProcessConfigurationGovernanceRepository metaDataProcessConfigurationGovernanceRepository;
private final PipelineMetaDataFacade metaDataFacade;

public PipelineGovernanceFacade(final ClusterPersistRepository repository) {
this.repository = repository;
jobConfigurationGovernanceRepository = new PipelineJobConfigurationGovernanceRepository(repository);
jobOffsetGovernanceRepository = new PipelineJobOffsetGovernanceRepository(repository);
jobItemProcessGovernanceRepository = new PipelineJobItemProcessGovernanceRepository(repository);
jobItemErrorMessageGovernanceRepository = new PipelineJobItemErrorMessageGovernanceRepository(repository);
jobCheckGovernanceRepository = new PipelineJobCheckGovernanceRepository(repository);
jobGovernanceRepository = new PipelineJobGovernanceRepository(repository);
metaDataDataSourceGovernanceRepository = new PipelineMetaDataDataSourceGovernanceRepository(repository);
metaDataProcessConfigurationGovernanceRepository = new PipelineMetaDataProcessConfigurationGovernanceRepository(repository);
jobFacade = new PipelineJobFacade(repository);
jobItemFacade = new PipelineJobItemFacade(repository);
metaDataFacade = new PipelineMetaDataFacade(repository);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
package org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.item;

import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.item;

import lombok.Getter;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;

/**
* Pipeline job item facade.
*/
@Getter
public final class PipelineJobItemFacade {

private final PipelineJobItemProcessGovernanceRepository process;

private final PipelineJobItemErrorMessageGovernanceRepository errorMessage;

public PipelineJobItemFacade(final ClusterPersistRepository repository) {
process = new PipelineJobItemProcessGovernanceRepository(repository);
errorMessage = new PipelineJobItemErrorMessageGovernanceRepository(repository);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
package org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.item;

import com.google.common.base.Strings;
import lombok.RequiredArgsConstructor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
package org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.job;

import com.google.common.base.Strings;
import lombok.RequiredArgsConstructor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
package org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.job;

import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.job;

import lombok.Getter;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;

/**
* Pipeline job facade.
*/
@Getter
public final class PipelineJobFacade {

private final PipelineJobGovernanceRepository job;

private final PipelineJobConfigurationGovernanceRepository configuration;

private final PipelineJobOffsetGovernanceRepository offset;

private final PipelineJobCheckGovernanceRepository check;

public PipelineJobFacade(final ClusterPersistRepository repository) {
job = new PipelineJobGovernanceRepository(repository);
configuration = new PipelineJobConfigurationGovernanceRepository(repository);
offset = new PipelineJobOffsetGovernanceRepository(repository);
check = new PipelineJobCheckGovernanceRepository(repository);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
package org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.job;

import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
package org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.job;

import com.google.common.base.Strings;
import lombok.RequiredArgsConstructor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
package org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.metadata;

import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.metadata;

import lombok.Getter;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;

/**
* Pipeline job item facade.
*/
@Getter
public final class PipelineMetaDataFacade {

private final PipelineMetaDataDataSourceGovernanceRepository dataSource;

private final PipelineMetaDataProcessConfigurationGovernanceRepository processConfiguration;

public PipelineMetaDataFacade(final ClusterPersistRepository repository) {
dataSource = new PipelineMetaDataDataSourceGovernanceRepository(repository);
processConfiguration = new PipelineMetaDataProcessConfigurationGovernanceRepository(repository);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
package org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.metadata;

import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public PipelineJobIteErrorMessageManager(final String jobId, final int shardingI
* @return map, key is sharding item, value is error message
*/
public String getErrorMessage() {
return Optional.ofNullable(governanceFacade.getJobItemErrorMessageGovernanceRepository().load(jobId, shardingItem)).orElse("");
return Optional.ofNullable(governanceFacade.getJobItemFacade().getErrorMessage().load(jobId, shardingItem)).orElse("");
}

/**
Expand All @@ -55,7 +55,7 @@ public String getErrorMessage() {
* @param error error
*/
public void updateErrorMessage(final Object error) {
governanceFacade.getJobItemErrorMessageGovernanceRepository().update(jobId, shardingItem, null == error ? "" : buildErrorMessage(error));
governanceFacade.getJobItemFacade().getErrorMessage().update(jobId, shardingItem, null == error ? "" : buildErrorMessage(error));
}

private String buildErrorMessage(final Object error) {
Expand All @@ -66,6 +66,6 @@ private String buildErrorMessage(final Object error) {
* Clean job item error message.
*/
public void cleanErrorMessage() {
governanceFacade.getJobItemErrorMessageGovernanceRepository().update(jobId, shardingItem, "");
governanceFacade.getJobItemFacade().getErrorMessage().update(jobId, shardingItem, "");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void updateStatus(final String jobId, final int shardingItem, final JobSt
}
jobItemProgress.get().setStatus(status);
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId))
.getJobItemProcessGovernanceRepository().update(jobId, shardingItem, YamlEngine.marshal(swapper.swapToYamlConfiguration(jobItemProgress.get())));
.getJobItemFacade().getProcess().update(jobId, shardingItem, YamlEngine.marshal(swapper.swapToYamlConfiguration(jobItemProgress.get())));
}

/**
Expand All @@ -66,7 +66,7 @@ public void updateStatus(final String jobId, final int shardingItem, final JobSt
* @return job item progress
*/
public Optional<T> getProgress(final String jobId, final int shardingItem) {
return PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemProcessGovernanceRepository().load(jobId, shardingItem)
return PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getProcess().load(jobId, shardingItem)
.map(optional -> swapper.swapToObject(YamlEngine.unmarshal(optional, swapper.getYamlProgressClass(), true)));
}

Expand All @@ -77,7 +77,7 @@ public Optional<T> getProgress(final String jobId, final int shardingItem) {
*/
public void persistProgress(final PipelineJobItemContext jobItemContext) {
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId()))
.getJobItemProcessGovernanceRepository().persist(jobItemContext.getJobId(), jobItemContext.getShardingItem(), convertProgressYamlContent(jobItemContext));
.getJobItemFacade().getProcess().persist(jobItemContext.getJobId(), jobItemContext.getShardingItem(), convertProgressYamlContent(jobItemContext));
}

/**
Expand All @@ -87,7 +87,7 @@ public void persistProgress(final PipelineJobItemContext jobItemContext) {
*/
public void updateProgress(final PipelineJobItemContext jobItemContext) {
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId()))
.getJobItemProcessGovernanceRepository().update(jobItemContext.getJobId(), jobItemContext.getShardingItem(), convertProgressYamlContent(jobItemContext));
.getJobItemFacade().getProcess().update(jobItemContext.getJobId(), jobItemContext.getShardingItem(), convertProgressYamlContent(jobItemContext));
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,12 @@ public Optional<String> start(final PipelineJobConfiguration jobConfig) {
String jobId = jobConfig.getJobId();
ShardingSpherePreconditions.checkState(0 != jobConfig.getJobShardingCount(), () -> new PipelineJobCreationWithInvalidShardingCountException(jobId));
PipelineGovernanceFacade governanceFacade = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId));
if (governanceFacade.getJobConfigurationGovernanceRepository().isExisted(jobId)) {
if (governanceFacade.getJobFacade().getConfiguration().isExisted(jobId)) {
log.warn("jobId already exists in registry center, ignore, job id is `{}`", jobId);
return Optional.of(jobId);
}
governanceFacade.getJobGovernanceRepository().create(jobId, jobAPI.getJobClass());
governanceFacade.getJobConfigurationGovernanceRepository().persist(jobId, jobConfig.convertToJobConfigurationPOJO());
governanceFacade.getJobFacade().getJob().create(jobId, jobAPI.getJobClass());
governanceFacade.getJobFacade().getConfiguration().persist(jobId, jobConfig.convertToJobConfigurationPOJO());
return Optional.of(jobId);
}

Expand Down Expand Up @@ -119,7 +119,7 @@ private void startCurrentDisabledJob(final String jobId) {
}

private void startNextDisabledJob(final String jobId, final String toBeStartDisabledNextJobType) {
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobCheckGovernanceRepository().getLatestCheckJobId(jobId).ifPresent(optional -> {
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getCheck().getLatestCheckJobId(jobId).ifPresent(optional -> {
try {
new PipelineJobManager(TypedSPILoader.getService(PipelineJobAPI.class, toBeStartDisabledNextJobType)).startDisabledJob(optional);
// CHECKSTYLE:OFF
Expand All @@ -141,7 +141,7 @@ public void stop(final String jobId) {
}

private void stopPreviousJob(final String jobId, final String toBeStoppedPreviousJobType) {
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobCheckGovernanceRepository().getLatestCheckJobId(jobId).ifPresent(optional -> {
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getCheck().getLatestCheckJobId(jobId).ifPresent(optional -> {
try {
new PipelineJobManager(TypedSPILoader.getService(PipelineJobAPI.class, toBeStoppedPreviousJobType)).stop(optional);
// CHECKSTYLE:OFF
Expand Down Expand Up @@ -176,7 +176,7 @@ private void stopCurrentJob(final String jobId) {
public void drop(final String jobId) {
PipelineContextKey contextKey = PipelineJobIdUtils.parseContextKey(jobId);
PipelineAPIFactory.getJobOperateAPI(contextKey).remove(String.valueOf(jobId), null);
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getJobGovernanceRepository().delete(jobId);
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getJobFacade().getJob().delete(jobId);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public final class PipelineDataSourcePersistService implements PipelineMetaDataP
@Override
@SuppressWarnings("unchecked")
public Map<String, DataSourcePoolProperties> load(final PipelineContextKey contextKey, final String jobType) {
String dataSourcesProps = PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getMetaDataDataSourceGovernanceRepository().load(jobType);
String dataSourcesProps = PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getMetaDataFacade().getDataSource().load(jobType);
if (Strings.isNullOrEmpty(dataSourcesProps)) {
return Collections.emptyMap();
}
Expand All @@ -55,6 +55,6 @@ public void persist(final PipelineContextKey contextKey, final String jobType, f
for (Entry<String, DataSourcePoolProperties> entry : propsMap.entrySet()) {
dataSourceMap.put(entry.getKey(), swapper.swapToMap(entry.getValue()));
}
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getMetaDataDataSourceGovernanceRepository().persist(jobType, YamlEngine.marshal(dataSourceMap));
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getMetaDataFacade().getDataSource().persist(jobType, YamlEngine.marshal(dataSourceMap));
}
}
Loading

0 comments on commit 56ebfb1

Please sign in to comment.