diff --git a/saturn-console-api/src/main/java/com/vip/saturn/job/console/domain/JobConfig.java b/saturn-console-api/src/main/java/com/vip/saturn/job/console/domain/JobConfig.java index 264793bbe..7d04afd68 100644 --- a/saturn-console-api/src/main/java/com/vip/saturn/job/console/domain/JobConfig.java +++ b/saturn-console-api/src/main/java/com/vip/saturn/job/console/domain/JobConfig.java @@ -4,9 +4,6 @@ import java.io.Serializable; -/** - * @author chembo.huang - */ public class JobConfig implements Serializable { private static final long serialVersionUID = 7366583369937964951L; @@ -38,7 +35,7 @@ public class JobConfig implements Serializable { private Boolean useSerial; private Boolean failover; private String jobMode; // 系统作业等 - private String customContext; + private String customContext; // 仅仅用于动态更新cron时携带的上下文数据 /** * @deprecated replaced by downStream */ @@ -63,6 +60,7 @@ public void setDefaultValues() { cron = getDefaultIfNull(cron, ""); pausePeriodDate = getDefaultIfNull(pausePeriodDate, ""); pausePeriodTime = getDefaultIfNull(pausePeriodTime, ""); + shardingItemParameters = getDefaultIfNull(shardingItemParameters, ""); jobParameter = getDefaultIfNull(jobParameter, ""); processCountIntervalSeconds = getDefaultIfNull(processCountIntervalSeconds, 300); description = getDefaultIfNull(description, ""); @@ -84,6 +82,7 @@ public void setDefaultValues() { useSerial = getDefaultIfNull(useSerial, Boolean.FALSE); failover = getDefaultIfNull(failover, !localMode); // 已经设置localMode jobMode = getDefaultIfNull(jobMode, ""); + customContext = getDefaultIfNull(customContext, "{}"); dependencies = getDefaultIfNull(dependencies, ""); groups = getDefaultIfNull(groups, ""); rerun = getDefaultIfNull(rerun, Boolean.FALSE); diff --git a/saturn-console-api/src/main/java/com/vip/saturn/job/console/repository/zookeeper/CuratorRepository.java b/saturn-console-api/src/main/java/com/vip/saturn/job/console/repository/zookeeper/CuratorRepository.java index 3b70a7cc9..ebdc667e6 100644 --- a/saturn-console-api/src/main/java/com/vip/saturn/job/console/repository/zookeeper/CuratorRepository.java +++ b/saturn-console-api/src/main/java/com/vip/saturn/job/console/repository/zookeeper/CuratorRepository.java @@ -15,7 +15,6 @@ import java.util.Collection; import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; public interface CuratorRepository { @@ -27,21 +26,21 @@ public interface CuratorRepository { interface CuratorFrameworkOp { - boolean checkExists(String znode); + boolean checkExists(String node); - String getData(String znode); + String getData(String node); - List getChildren(String znode); + List getChildren(String node); - void create(String znode); + void create(String node); - void create(final String znode, Object value); + void create(final String node, Object value); - void update(String znode, Object value); + void update(String node, Object value); - void delete(String znode); + void delete(String node); - void deleteRecursive(String znode); + void deleteRecursive(String node); void fillJobNodeIfNotExist(String node, Object value); @@ -57,16 +56,13 @@ interface CuratorFrameworkOp { interface CuratorTransactionOp { - CuratorTransactionOp replace(String znode, Object value) throws Exception; + CuratorTransactionOp replace(String node, Object value) throws Exception; - CuratorTransactionOp replaceIfChanged(String znode, Object value) throws Exception; + CuratorTransactionOp replaceIfChanged(String node, Object value) throws Exception; - CuratorTransactionOp replaceIfChanged(String znode, Object value, AtomicInteger changedCount) - throws Exception; + CuratorTransactionOp create(String node, Object value) throws Exception; - CuratorTransactionOp create(String znode, Object value) throws Exception; - - CuratorTransactionOp delete(String znode) throws Exception; + CuratorTransactionOp delete(String node) throws Exception; Collection commit() throws Exception; } diff --git a/saturn-console-api/src/main/java/com/vip/saturn/job/console/repository/zookeeper/impl/CuratorRepositoryImpl.java b/saturn-console-api/src/main/java/com/vip/saturn/job/console/repository/zookeeper/impl/CuratorRepositoryImpl.java index cd0d577e6..0d92bc4c9 100644 --- a/saturn-console-api/src/main/java/com/vip/saturn/job/console/repository/zookeeper/impl/CuratorRepositoryImpl.java +++ b/saturn-console-api/src/main/java/com/vip/saturn/job/console/repository/zookeeper/impl/CuratorRepositoryImpl.java @@ -36,7 +36,6 @@ import java.util.Collection; import java.util.List; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; @Repository public class CuratorRepositoryImpl implements CuratorRepository { @@ -109,9 +108,9 @@ public CuratorFrameworkOpImpl(CuratorFramework curatorFramework) { } @Override - public boolean checkExists(final String znode) { + public boolean checkExists(final String node) { try { - return null != curatorFramework.checkExists().forPath(znode); + return null != curatorFramework.checkExists().forPath(node); // CHECKSTYLE:OFF } catch (final Exception ex) { // CHECKSTYLE:ON @@ -120,15 +119,15 @@ public boolean checkExists(final String znode) { } @Override - public String getData(final String znode) { + public String getData(final String node) { try { - if (checkExists(znode)) { - byte[] getZnodeData = curatorFramework.getData().forPath(znode); - if (getZnodeData == null) {// executor的分片可能存在全部飘走的情况,sharding节点有可能获取到的是null,需要对null做判断,否则new + if (checkExists(node)) { + byte[] nodeData = curatorFramework.getData().forPath(node); + if (nodeData == null) {// executor的分片可能存在全部飘走的情况,sharding节点有可能获取到的是null,需要对null做判断,否则new // String时会报空指针异常 return null; } - return new String(getZnodeData, Charset.forName("UTF-8")); + return new String(nodeData, Charset.forName("UTF-8")); } else { return null; } @@ -142,9 +141,9 @@ public String getData(final String znode) { } @Override - public List getChildren(final String znode) { + public List getChildren(final String node) { try { - return curatorFramework.getChildren().forPath(znode); + return curatorFramework.getChildren().forPath(node); // CHECKSTYLE:OFF } catch (final NoNodeException ignore) { return null; @@ -156,15 +155,19 @@ public List getChildren(final String znode) { } @Override - public void create(final String znode) { - create(znode, ""); + public void create(final String node) { + create(node, ""); } @Override - public void create(final String znode, Object data) { + public void create(final String node, Object value) { + if (value == null) { + log.info("node value is null, won't create, node: {}", node); + return; + } try { curatorFramework.create().creatingParentsIfNeeded() - .forPath(znode, data.toString().getBytes(Charset.forName("UTF-8"))); + .forPath(node, value.toString().getBytes(Charset.forName("UTF-8"))); } catch (final NodeExistsException ignore) { // CHECKSTYLE:OFF } catch (final Exception ex) { @@ -173,13 +176,18 @@ public void create(final String znode, Object data) { } } - public void update(final String znode, final Object value) { + @Override + public void update(final String node, final Object value) { + if (value == null) { + log.info("node value is null, won't update, node: {}", node); + return; + } try { - if (this.checkExists(znode)) { - curatorFramework.inTransaction().check().forPath(znode).and().setData() - .forPath(znode, value.toString().getBytes(Charset.forName("UTF-8"))).and().commit(); + if (this.checkExists(node)) { + curatorFramework.inTransaction().check().forPath(node).and().setData() + .forPath(node, value.toString().getBytes(Charset.forName("UTF-8"))).and().commit(); } else { - this.create(znode, value); + this.create(node, value); } } catch (final NoNodeException ignore) { // CHECKSTYLE:OFF @@ -190,10 +198,10 @@ public void update(final String znode, final Object value) { } @Override - public void delete(final String znode) { + public void delete(final String node) { try { - if (null != curatorFramework.checkExists().forPath(znode)) { - curatorFramework.delete().forPath(znode); + if (null != curatorFramework.checkExists().forPath(node)) { + curatorFramework.delete().forPath(node); } } catch (final NoNodeException ignore) { // CHECKSTYLE:OFF @@ -204,10 +212,10 @@ public void delete(final String znode) { } @Override - public void deleteRecursive(final String znode) { + public void deleteRecursive(final String node) { try { - if (null != curatorFramework.checkExists().forPath(znode)) { - CuratorUtils.deletingChildrenIfNeeded(curatorFramework, znode); + if (null != curatorFramework.checkExists().forPath(node)) { + CuratorUtils.deletingChildrenIfNeeded(curatorFramework, node); } } catch (final NoNodeException ignore) { // CHECKSTYLE:OFF @@ -225,8 +233,8 @@ public void deleteRecursive(final String znode) { */ @Override public void fillJobNodeIfNotExist(final String node, final Object value) { - if (null == value) { - log.info("job node value is null, node:{}", node); + if (value == null) { + log.info("node value is null, won't fillJobNodeIfNotExist, node: {}", node); return; } if (!checkExists(node)) { @@ -311,22 +319,18 @@ public CuratorTransactionOpImpl(CuratorFramework curatorClient) { } } - private boolean checkExists(String znode) throws Exception { - return curatorClient.checkExists().forPath(znode) != null; + private boolean checkExists(String node) throws Exception { + return curatorClient.checkExists().forPath(node) != null; } - private CuratorTransactionOpImpl create(String znode, byte[] data) throws Exception { + private CuratorTransactionOpImpl create(String node, byte[] data) throws Exception { curatorTransactionFinal = curatorTransactionFinal.create().withMode(CreateMode.PERSISTENT) - .forPath(znode, data).and(); + .forPath(node, data).and(); return this; } - private byte[] getData(String znode) throws Exception { - return curatorClient.getData().forPath(znode); - } - - private byte[] toData(Object value) { - return (value == null ? "" : value.toString()).getBytes(Charset.forName("UTF-8")); + private byte[] getData(String node) throws Exception { + return curatorClient.getData().forPath(node); } private boolean bytesEquals(byte[] a, byte[] b) { @@ -345,44 +349,50 @@ private boolean bytesEquals(byte[] a, byte[] b) { } @Override - public CuratorTransactionOpImpl replace(String znode, Object value) throws Exception { - byte[] data = toData(value); - curatorTransactionFinal = curatorTransactionFinal.setData().forPath(znode, data).and(); + public CuratorTransactionOpImpl replace(String node, Object value) throws Exception { + if (value == null) { + log.info("node value is null, won't replace, node: {}", node); + return this; + } + byte[] data = value.toString().getBytes(Charset.forName("UTF-8")); + curatorTransactionFinal = curatorTransactionFinal.setData().forPath(node, data).and(); return this; } - public CuratorTransactionOpImpl replaceIfChanged(String znode, Object value) throws Exception { - return replaceIfChanged(znode, value, new AtomicInteger(0)); - } - - public CuratorTransactionOpImpl replaceIfChanged(String znode, Object value, AtomicInteger changedCount) - throws Exception { - byte[] newData = toData(value); - if (this.checkExists(znode)) { - byte[] oldData = this.getData(znode); + @Override + public CuratorTransactionOpImpl replaceIfChanged(String node, Object value) throws Exception { + if (value == null) { + log.info("node value is null, won't replaceIfChanged, node: {}", node); + return this; + } + byte[] newData = value.toString().getBytes(Charset.forName("UTF-8")); + if (this.checkExists(node)) { + byte[] oldData = this.getData(node); if (!bytesEquals(newData, oldData)) { - curatorTransactionFinal = curatorTransactionFinal.check().forPath(znode).and().setData() - .forPath(znode, newData).and(); - changedCount.incrementAndGet(); + curatorTransactionFinal = curatorTransactionFinal.check().forPath(node).and().setData() + .forPath(node, newData).and(); } } else { - this.create(znode, newData); - changedCount.incrementAndGet(); + this.create(node, newData); } return this; } @Override - public CuratorTransactionOp create(String znode, Object value) throws Exception { - byte[] data = toData(value); + public CuratorTransactionOp create(String node, Object value) throws Exception { + if (value == null) { + log.info("node value is null, won't create, node: {}", node); + return this; + } + byte[] data = value.toString().getBytes(Charset.forName("UTF-8")); curatorTransactionFinal = curatorTransactionFinal.create().withMode(CreateMode.PERSISTENT) - .forPath(znode, data).and(); + .forPath(node, data).and(); return this; } @Override - public CuratorTransactionOp delete(String znode) throws Exception { - curatorTransactionFinal = curatorTransactionFinal.delete().forPath(znode).and(); + public CuratorTransactionOp delete(String node) throws Exception { + curatorTransactionFinal = curatorTransactionFinal.delete().forPath(node).and(); return this; }