Skip to content

Commit

Permalink
Merge branch 'master' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
jiangML committed Nov 21, 2023
2 parents 2f68b68 + e99e714 commit 741f37d
Show file tree
Hide file tree
Showing 20 changed files with 160 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;

public final class EncryptTest {
class EncryptTest {

private OrderRepository orderRepository;

Expand All @@ -48,7 +48,7 @@ public final class EncryptTest {
private AddressRepository addressRepository;

@Test
void testEncryptInLocalTransactions() throws SQLException, IOException {
void assertEncryptInLocalTransactions() throws SQLException, IOException {
DataSource dataSource = YamlShardingSphereDataSourceFactory.createDataSource(FileTestUtils.readFromFileURLString("yaml/encrypt.yaml"));
orderRepository = new OrderRepository(dataSource);
orderItemRepository = new OrderItemRepository(dataSource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;

public final class MaskTest {
class MaskTest {

private OrderRepository orderRepository;

Expand All @@ -48,7 +48,7 @@ public final class MaskTest {
private AddressRepository addressRepository;

@Test
void testMaskInLocalTransactions() throws SQLException, IOException {
void assertMaskInLocalTransactions() throws SQLException, IOException {
DataSource dataSource = YamlShardingSphereDataSourceFactory.createDataSource(FileTestUtils.readFromFileURLString("yaml/mask.yaml"));
orderRepository = new OrderRepository(dataSource);
orderItemRepository = new OrderItemRepository(dataSource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@

import static org.junit.jupiter.api.Assertions.assertThrows;

public final class ReadWriteSplittingTest {
class ReadWriteSplittingTest {

private OrderRepository orderRepository;

Expand All @@ -45,7 +45,7 @@ public final class ReadWriteSplittingTest {
private AddressRepository addressRepository;

@Test
void testReadWriteSplittingInLocalTransactions() throws SQLException, IOException {
void assertReadWriteSplittingInLocalTransactions() throws SQLException, IOException {
DataSource dataSource = YamlShardingSphereDataSourceFactory.createDataSource(FileTestUtils.readFromFileURLString("yaml/readwrite-splitting.yaml"));
orderRepository = new OrderRepository(dataSource);
orderItemRepository = new OrderItemRepository(dataSource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;

public final class ShadowTest {
class ShadowTest {

private OrderRepository orderRepository;

Expand All @@ -49,7 +49,7 @@ public final class ShadowTest {
private AddressRepository addressRepository;

@Test
void testShadowInLocalTransactions() throws SQLException, IOException {
void assertShadowInLocalTransactions() throws SQLException, IOException {
DataSource dataSource = YamlShardingSphereDataSourceFactory.createDataSource(FileTestUtils.readFromFileURLString("yaml/shadow.yaml"));
orderRepository = new OrderRepository(dataSource);
orderItemRepository = new OrderItemRepository(dataSource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;

public final class ShardingTest {
class ShardingTest {

private OrderRepository orderRepository;

Expand All @@ -49,13 +49,14 @@ public final class ShardingTest {
private AddressRepository addressRepository;

@Test
void testShardingInLocalTransactions() throws SQLException, IOException {
void assertShardingInLocalTransactions() throws SQLException, IOException {
DataSource dataSource = YamlShardingSphereDataSourceFactory.createDataSource(FileTestUtils.readFromFileURLString("yaml/sharding.yaml"));
orderRepository = new OrderRepository(dataSource);
orderItemRepository = new OrderItemRepository(dataSource);
addressRepository = new AddressRepository(dataSource);
this.initEnvironment();
this.processSuccess();
this.cleanEnvironment();
}

private void initEnvironment() throws SQLException {
Expand Down
34 changes: 34 additions & 0 deletions infra/nativetest/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<?xml version="1.0"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<configuration>
<statusListener class="ch.qos.logback.core.status.NopStatusListener" />
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>[%-5level] %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<logger name="org.apache.shardingsphere" level="warn" additivity="false">
<appender-ref ref="console" />
</logger>

<root>
<level value="error" />
<appender-ref ref="console" />
</root>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,20 @@

package org.apache.shardingsphere.data.pipeline.common.pojo;

import lombok.Getter;
import lombok.RequiredArgsConstructor;

/**
* Pipeline job meta data.
* Pipeline job info.
*/
public interface PipelineJobInfo {
@RequiredArgsConstructor
@Getter
public final class PipelineJobInfo {

private final PipelineJobMetaData jobMetaData;

private final String databaseName;

/**
* Get job meta data.
*
* @return job meta data
*/
PipelineJobMetaData getJobMetaData();
// TODO Rename
private final String table;
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobIteErrorMessageManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
Expand Down Expand Up @@ -49,7 +49,6 @@ protected AbstractSimplePipelineJob(final String jobId) {
@Override
public void execute(final ShardingContext shardingContext) {
PipelineJobManager jobManager = new PipelineJobManager(getJobAPI());
PipelineJobItemManager<?> jobItemManager = new PipelineJobItemManager<>(getJobAPI().getYamlJobItemProgressSwapper());
String jobId = shardingContext.getJobName();
int shardingItem = shardingContext.getShardingItem();
log.info("Execute job {}-{}", jobId, shardingItem);
Expand All @@ -59,31 +58,31 @@ public void execute(final ShardingContext shardingContext) {
}
try {
PipelineJobItemContext jobItemContext = buildPipelineJobItemContext(shardingContext);
execute0(jobItemManager, jobItemContext);
execute0(jobItemContext);
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
processFailed(jobManager, jobItemManager, jobId, shardingItem, ex);
processFailed(jobManager, jobId, shardingItem, ex);
throw ex;
}
}

private void execute0(final PipelineJobItemManager<?> jobItemManager, final PipelineJobItemContext jobItemContext) {
private void execute0(final PipelineJobItemContext jobItemContext) {
String jobId = jobItemContext.getJobId();
int shardingItem = jobItemContext.getShardingItem();
PipelineTasksRunner tasksRunner = buildPipelineTasksRunner(jobItemContext);
if (!addTasksRunner(shardingItem, tasksRunner)) {
return;
}
jobItemManager.cleanErrorMessage(jobId, shardingItem);
new PipelineJobIteErrorMessageManager(jobId, shardingItem).cleanErrorMessage();
prepare(jobItemContext);
log.info("start tasks runner, jobId={}, shardingItem={}", jobId, shardingItem);
tasksRunner.start();
}

private void processFailed(final PipelineJobManager jobManager, final PipelineJobItemManager<?> jobItemManager, final String jobId, final int shardingItem, final Exception ex) {
private void processFailed(final PipelineJobManager jobManager, final String jobId, final int shardingItem, final Exception ex) {
log.error("job execution failed, {}-{}", jobId, shardingItem, ex);
jobItemManager.updateErrorMessage(jobId, shardingItem, ex);
new PipelineJobIteErrorMessageManager(jobId, shardingItem).updateErrorMessage(ex);
try {
jobManager.stop(jobId);
} catch (final PipelineJobNotFoundException ignored) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJobItemInfo;
import org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
Expand Down Expand Up @@ -80,12 +80,11 @@ public List<InventoryIncrementalJobItemInfo> getJobItemInfos(final String jobId)
long startTimeMillis = Long.parseLong(Optional.ofNullable(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getProps().getProperty("start_time_millis")).orElse("0"));
Map<Integer, InventoryIncrementalJobItemProgress> jobProgress = getJobProgress(jobConfig);
List<InventoryIncrementalJobItemInfo> result = new LinkedList<>();
PipelineJobItemManager<InventoryIncrementalJobItemProgress> jobItemManager = new PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper());
PipelineJobInfo jobInfo = jobAPI.getJobInfo(jobId);
for (Entry<Integer, InventoryIncrementalJobItemProgress> entry : jobProgress.entrySet()) {
int shardingItem = entry.getKey();
TableBasedPipelineJobInfo jobInfo = (TableBasedPipelineJobInfo) jobAPI.getJobInfo(jobId);
InventoryIncrementalJobItemProgress jobItemProgress = entry.getValue();
String errorMessage = jobItemManager.getErrorMessage(jobId, shardingItem);
String errorMessage = new PipelineJobIteErrorMessageManager(jobId, shardingItem).getErrorMessage();
if (null == jobItemProgress) {
result.add(new InventoryIncrementalJobItemInfo(shardingItem, jobInfo.getTable(), null, startTimeMillis, 0, errorMessage));
continue;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.core.job.service;

import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;

import java.util.Optional;

/**
* Pipeline job item error message manager.
*/
public final class PipelineJobIteErrorMessageManager {

private final String jobId;

private final int shardingItem;

private final GovernanceRepositoryAPI governanceRepositoryAPI;

public PipelineJobIteErrorMessageManager(final String jobId, final int shardingItem) {
this.jobId = jobId;
this.shardingItem = shardingItem;
governanceRepositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId));
}

/**
* Get job item error message.
*
* @return map, key is sharding item, value is error message
*/
public String getErrorMessage() {
return Optional.ofNullable(governanceRepositoryAPI.getJobItemErrorMessage(jobId, shardingItem)).orElse("");
}

/**
* Update job item error message.
*
* @param error error
*/
public void updateErrorMessage(final Object error) {
governanceRepositoryAPI.update(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, shardingItem), null == error ? "" : buildErrorMessage(error));
}

private String buildErrorMessage(final Object error) {
return error instanceof Throwable ? ExceptionUtils.getStackTrace((Throwable) error) : error.toString();
}

/**
* Clean job item error message.
*/
public void cleanErrorMessage() {
governanceRepositoryAPI.persist(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, shardingItem), "");
}
}
Loading

0 comments on commit 741f37d

Please sign in to comment.