Skip to content

Commit

Permalink
Merge pull request #378 from nacos-group/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
paderlol authored Sep 28, 2024
2 parents 91999fb + 6585b21 commit 26c8b30
Show file tree
Hide file tree
Showing 22 changed files with 258 additions and 336 deletions.
4 changes: 2 additions & 2 deletions nacossync-worker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,8 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>11</source>
<target>11</target>
<source>17</source>
<target>17</target>
</configuration>
</plugin>
</plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.alibaba.nacossync.util.SkyWalkerUtil;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.SneakyThrows;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
Expand All @@ -43,14 +42,17 @@
@Service
public class SkyWalkerCacheServices {

private static final Map<String, FinishedTask> finishedTaskMap = new ConcurrentHashMap<>();
private static final Map<String, FinishedTask> FINISHED_TASK_MAP = new ConcurrentHashMap<>();

@Autowired
private ClusterAccessService clusterAccessService;

@Autowired
private ObjectMapper objectMapper;
private final ClusterAccessService clusterAccessService;

private final ObjectMapper objectMapper;

public SkyWalkerCacheServices(ClusterAccessService clusterAccessService, ObjectMapper objectMapper) {
this.clusterAccessService = clusterAccessService;
this.objectMapper = objectMapper;
}

public String getClusterConnectKey(String clusterId) {
List<String> allClusterConnectKey = getAllClusterConnectKey(clusterId);
return allClusterConnectKey.get(ThreadLocalRandom.current().nextInt(allClusterConnectKey.size()));
Expand Down Expand Up @@ -83,7 +85,7 @@ public void addFinishedTask(TaskDO taskDO) {
FinishedTask finishedTask = new FinishedTask();
finishedTask.setOperationId(operationId);

finishedTaskMap.put(operationId, finishedTask);
FINISHED_TASK_MAP.put(operationId, finishedTask);
}

public FinishedTask getFinishedTask(TaskDO taskDO) {
Expand All @@ -94,20 +96,20 @@ public FinishedTask getFinishedTask(TaskDO taskDO) {
return null;
}

return finishedTaskMap.get(operationId);
return FINISHED_TASK_MAP.get(operationId);
}


public void removeFinishedTask(String operationId) {
if (!StringUtils.hasLength(operationId)) {
return;
}
finishedTaskMap.remove(operationId);
FINISHED_TASK_MAP.remove(operationId);
}

public Map<String, FinishedTask> getFinishedTaskMap() {

return finishedTaskMap;
return FINISHED_TASK_MAP;
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.nacossync.dao;

import com.alibaba.nacossync.constant.SkyWalkerConstants;
Expand All @@ -39,82 +40,86 @@
*/
@Service
public class TaskAccessService implements PageQueryService<TaskDO> {

private final TaskRepository taskRepository;

public TaskAccessService(TaskRepository taskRepository) {
this.taskRepository = taskRepository;
}

public TaskDO findByTaskId(String taskId) {

return taskRepository.findByTaskId(taskId);
}

public void deleteTaskById(String taskId) {
taskRepository.deleteByTaskId(taskId);
}

/**
* batch delete tasks by taskIds
* @author yongchao9
*
* @param taskIds
* @author yongchao9
*/
public void deleteTaskInBatch(List<String> taskIds) {
List<TaskDO> tds=taskRepository.findAllByTaskIdIn(taskIds);
List<TaskDO> tds = taskRepository.findAllByTaskIdIn(taskIds);
taskRepository.deleteAllInBatch(tds);
}

public Iterable<TaskDO> findAll() {

return taskRepository.findAll();
}

public void addTask(TaskDO taskDO) {

taskRepository.save(taskDO);

}


public int countByDestClusterIdOrSourceClusterId(String destClusterId, String sourceClusterId) {
return taskRepository.countByDestClusterIdOrSourceClusterId(destClusterId, sourceClusterId);
}

private Predicate getPredicate(CriteriaBuilder criteriaBuilder, List<Predicate> predicates) {
Predicate[] p = new Predicate[predicates.size()];
return criteriaBuilder.and(predicates.toArray(p));
}

private List<Predicate> getPredicates(Root<TaskDO> root, CriteriaBuilder criteriaBuilder, QueryCondition queryCondition) {


private List<Predicate> getPredicates(Root<TaskDO> root, CriteriaBuilder criteriaBuilder,
QueryCondition queryCondition) {

List<Predicate> predicates = new ArrayList<>();
predicates.add(criteriaBuilder.like(root.get("serviceName"), "%" + queryCondition.getServiceName() + "%"));

return predicates;
}

@Override
public Page<TaskDO> findPageNoCriteria(Integer pageNum, Integer size) {

Pageable pageable = PageRequest.of(pageNum, size, Sort.Direction.DESC, "id");

return taskRepository.findAll(pageable);
}

@Override
public Page<TaskDO> findPageCriteria(Integer pageNum, Integer size, QueryCondition queryCondition) {

Pageable pageable = PageRequest.of(pageNum, size, Sort.Direction.DESC, "id");

return getTaskDOS(queryCondition, pageable);
}

private Page<TaskDO> getTaskDOS(QueryCondition queryCondition, Pageable pageable) {
return taskRepository.findAll(
(Specification<TaskDO>) (root, criteriaQuery, criteriaBuilder) -> {

List<Predicate> predicates = getPredicates(root,
criteriaBuilder, queryCondition);

return getPredicate(criteriaBuilder, predicates);

}, pageable);
return taskRepository.findAll((Specification<TaskDO>) (root, criteriaQuery, criteriaBuilder) -> {

List<Predicate> predicates = getPredicates(root, criteriaBuilder, queryCondition);

return getPredicate(criteriaBuilder, predicates);

}, pageable);
}

public List<TaskDO> findAllByServiceNameEqualAll() {
Expand All @@ -124,5 +129,5 @@ public List<TaskDO> findAllByServiceNameEqualAll() {
public List<TaskDO> findAllByServiceNameNotEqualAll() {
return taskRepository.findAllByServiceNameNotIgnoreCase(SkyWalkerConstants.NACOS_ALL_SERVICE_NAME);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
*/
package com.alibaba.nacossync.dao.repository;

import javax.transaction.Transactional;

import com.alibaba.nacossync.pojo.model.ClusterDO;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
import org.springframework.data.repository.CrudRepository;

import com.alibaba.nacossync.pojo.model.ClusterDO;
import javax.transaction.Transactional;


/**
* @author NacosSync
Expand All @@ -34,6 +34,6 @@ public interface ClusterRepository extends CrudRepository<ClusterDO, Integer>, J
ClusterDO findByClusterId(String clusterId);

@Transactional
int deleteByClusterId(String clusterId);
void deleteByClusterId(String clusterId);

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,15 @@ public interface TaskRepository extends CrudRepository<TaskDO, Integer>, JpaRepo
TaskDO findByTaskId(String taskId);

@Transactional
int deleteByTaskId(String taskId);
void deleteByTaskId(String taskId);

List<TaskDO> findAllByTaskIdIn(List<String> taskIds);

List<TaskDO> getAllByWorkerIp(String workerIp);

/**
* query service is all,use ns leven sync data
*/
List<TaskDO> findAllByServiceNameEqualsIgnoreCase(String serviceName);
List<TaskDO> findAllByServiceNameNotIgnoreCase(String serviceName);

int countByDestClusterIdOrSourceClusterId(String destClusterId,String sourceClusterId);

}

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package com.alibaba.nacossync.pojo.model;


import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/
package com.alibaba.nacossync.pojo.model;


import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
Expand Down
Loading

0 comments on commit 26c8b30

Please sign in to comment.