diff --git a/streamx-common/pom.xml b/streamx-common/pom.xml
index d349039f8f..b92b76040f 100644
--- a/streamx-common/pom.xml
+++ b/streamx-common/pom.xml
@@ -69,13 +69,13 @@
mysql
mysql-connector-java
- provided
+ true
redis.clients
jedis
- provided
+ true
@@ -134,36 +134,51 @@
commons-cli
+
+
+ org.apache.flink
+ flink-core
+ ${flink.version}
+ true
+
+
+
+ org.apache.flink
+ flink-clients_${scala.binary.version}
+ ${flink.version}
+ true
+
+
org.apache.hadoop
hadoop-client
- provided
+ true
org.apache.hbase
hbase-client
- provided
+ true
org.apache.hadoop
hadoop-yarn-api
- provided
+ true
org.apache.hadoop
hadoop-yarn-client
- provided
+ true
org.mongodb
mongo-java-driver
3.12.2
- provided
+ true
@@ -175,14 +190,14 @@
org.apache.curator
curator-framework
2.6.0
- provided
+ true
org.apache.ivy
ivy
2.4.0
- provided
+ true
diff --git a/streamx-common/src/main/scala/com/streamxhub/streamx/common/util/AssertUtil.scala b/streamx-common/src/main/scala/com/streamxhub/streamx/common/util/AssertUtils.scala
similarity index 99%
rename from streamx-common/src/main/scala/com/streamxhub/streamx/common/util/AssertUtil.scala
rename to streamx-common/src/main/scala/com/streamxhub/streamx/common/util/AssertUtils.scala
index 5ff6a7e565..852e47ef2e 100644
--- a/streamx-common/src/main/scala/com/streamxhub/streamx/common/util/AssertUtil.scala
+++ b/streamx-common/src/main/scala/com/streamxhub/streamx/common/util/AssertUtils.scala
@@ -24,7 +24,7 @@ package com.streamxhub.streamx.common.util
import java.util;
-object AssertUtil {
+object AssertUtils {
/**
* Assert a boolean expression, throwing IllegalArgumentException
if the test result
diff --git a/streamx-common/src/main/scala/com/streamxhub/streamx/common/util/DateUtils.scala b/streamx-common/src/main/scala/com/streamxhub/streamx/common/util/DateUtils.scala
index 140e424180..c1e5b02828 100644
--- a/streamx-common/src/main/scala/com/streamxhub/streamx/common/util/DateUtils.scala
+++ b/streamx-common/src/main/scala/com/streamxhub/streamx/common/util/DateUtils.scala
@@ -20,7 +20,9 @@
*/
package com.streamxhub.streamx.common.util
-import java.text.SimpleDateFormat
+import java.text.{ParseException, SimpleDateFormat}
+import java.time.LocalDateTime
+import java.time.format.DateTimeFormatter
import java.util.concurrent.TimeUnit
import java.util.{Calendar, TimeZone, _}
import scala.util._
@@ -181,6 +183,24 @@ object DateUtils {
}
}
+ def formatFullTime(localDateTime: LocalDateTime): String = formatFullTime(localDateTime, fullFormat)
+
+ def formatFullTime(localDateTime: LocalDateTime, pattern: String): String = {
+ val dateTimeFormatter = DateTimeFormatter.ofPattern(pattern)
+ localDateTime.format(dateTimeFormatter)
+ }
+
+ private def getDateFormat(date: Date, dateFormatType: String) = {
+ val simformat = new SimpleDateFormat(dateFormatType)
+ simformat.format(date)
+ }
+
+ @throws[ParseException] def formatCSTTime(date: String, format: String): String = {
+ val sdf = new SimpleDateFormat("EEE MMM dd HH:mm:ss zzz yyyy", Locale.US)
+ val d = sdf.parse(date)
+ DateUtils.getDateFormat(d, format)
+ }
+
def main(args: Array[String]): Unit = {
println(DateUtils.+-(-1))
}
diff --git a/streamx-common/src/main/scala/com/streamxhub/streamx/common/util/FlinkClientUtils.scala b/streamx-common/src/main/scala/com/streamxhub/streamx/common/util/FlinkClientUtils.scala
new file mode 100644
index 0000000000..4150ebcdba
--- /dev/null
+++ b/streamx-common/src/main/scala/com/streamxhub/streamx/common/util/FlinkClientUtils.scala
@@ -0,0 +1,62 @@
+/*
+ * Copyright (c) 2019 The StreamX Project
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.streamxhub.streamx.common.util
+
+import org.apache.flink.api.common.Plan
+import org.apache.flink.client.program.{PackagedProgram, PackagedProgramUtils, ProgramInvocationException}
+import org.apache.flink.configuration.{Configuration, JobManagerOptions}
+import org.apache.flink.optimizer.costs.DefaultCostEstimator
+import org.apache.flink.optimizer.plan.OptimizedPlan
+import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator
+import org.apache.flink.optimizer.{DataStatistics, Optimizer}
+
+import java.net.{InetAddress, InetSocketAddress, ServerSocket}
+
+object FlinkClientUtils {
+
+
+ /**
+ * getExecutionPlan
+ *
+ * @param packagedProgram
+ * @throws
+ * @return
+ */
+ @throws[ProgramInvocationException] def getExecutionPlan(packagedProgram: PackagedProgram): String = {
+ require(packagedProgram != null)
+ val address: InetAddress = InetAddress.getLocalHost
+ val jmAddress = new InetSocketAddress(address, new ServerSocket(0).getLocalPort)
+
+ val config = new Configuration
+ config.setString(JobManagerOptions.ADDRESS, jmAddress.getHostName)
+ config.setInteger(JobManagerOptions.PORT, jmAddress.getPort)
+
+ val optimizer = new Optimizer(new DataStatistics, new DefaultCostEstimator, config)
+ val plan: Plan = PackagedProgramUtils.getPipelineFromProgram(packagedProgram, config, -1, true).asInstanceOf[Plan]
+ val optimizedPlan: OptimizedPlan = optimizer.compile(plan)
+ require(optimizedPlan != null)
+
+ val dumper = new PlanJSONDumpGenerator
+ dumper.setEncodeForHTML(true)
+ dumper.getOptimizerPlanAsJSON(optimizedPlan)
+ }
+
+}
diff --git a/streamx-flink/streamx-flink-core/src/main/scala/com/streamxhub/streamx/flink/core/scala/util/FlinkUtils.scala b/streamx-common/src/main/scala/com/streamxhub/streamx/common/util/FlinkUtils.scala
similarity index 96%
rename from streamx-flink/streamx-flink-core/src/main/scala/com/streamxhub/streamx/flink/core/scala/util/FlinkUtils.scala
rename to streamx-common/src/main/scala/com/streamxhub/streamx/common/util/FlinkUtils.scala
index 09f5e16ffa..ed01f8bcf7 100644
--- a/streamx-flink/streamx-flink-core/src/main/scala/com/streamxhub/streamx/flink/core/scala/util/FlinkUtils.scala
+++ b/streamx-common/src/main/scala/com/streamxhub/streamx/common/util/FlinkUtils.scala
@@ -18,7 +18,8 @@
* specific language governing permissions and limitations
* under the License.
*/
-package com.streamxhub.streamx.flink.core.scala.util
+package com.streamxhub.streamx.common.util
+
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -30,5 +31,4 @@ object FlinkUtils {
context.getOperatorStateStore.getUnionListState(new ListStateDescriptor(descriptorName, implicitly[TypeInformation[R]].getTypeClass))
}
-
}
diff --git a/streamx-common/src/main/scala/com/streamxhub/streamx/common/util/HadoopUtils.scala b/streamx-common/src/main/scala/com/streamxhub/streamx/common/util/HadoopUtils.scala
index 13f49ab103..ac1d0dcdd8 100644
--- a/streamx-common/src/main/scala/com/streamxhub/streamx/common/util/HadoopUtils.scala
+++ b/streamx-common/src/main/scala/com/streamxhub/streamx/common/util/HadoopUtils.scala
@@ -106,7 +106,7 @@ object HadoopUtils extends Logger {
if (!configurationCache.containsKey(confDir)) {
FileUtils.exists(confDir)
val hadoopConfDir = new File(confDir)
- val confName = List("core-site.xml", "hdfs-site.xml", "yarn-site.xml")
+ val confName = List("core-site.xml", "hdfs-site.xml", "yarn-site.xml","mapred-site.xml")
val files = hadoopConfDir.listFiles().filter(x => x.isFile && confName.contains(x.getName)).toList
val conf = new Configuration()
if (CollectionUtils.isNotEmpty(files)) {
@@ -175,7 +175,7 @@ object HadoopUtils extends Logger {
logInfo("kerberos authentication successful")
} catch {
case e: IOException =>
- logInfo(s"kerberos login failed,${e.getLocalizedMessage}")
+ logError(s"kerberos login failed,${e.getLocalizedMessage}")
throw e
}
}
diff --git a/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/base/config/P6spySqlFormatConfig.java b/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/base/config/P6spySqlFormatConfig.java
index 3ee8aa9768..1c09ac061a 100644
--- a/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/base/config/P6spySqlFormatConfig.java
+++ b/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/base/config/P6spySqlFormatConfig.java
@@ -21,7 +21,7 @@
package com.streamxhub.streamx.console.base.config;
import com.p6spy.engine.spy.appender.MessageFormattingStrategy;
-import com.streamxhub.streamx.console.base.utils.DateUtil;
+import com.streamxhub.streamx.common.util.DateUtils;
import org.apache.commons.lang3.StringUtils;
import java.time.LocalDateTime;
@@ -49,7 +49,7 @@ public String formatMessage(
return StringUtils.isBlank(sql) ? "" :
String.format(
"%s | 耗时 %d ms | SQL 语句:\n %s;",
- DateUtil.formatFullTime(LocalDateTime.now()),
+ DateUtils.formatFullTime(LocalDateTime.now()),
elapsed,
sql.replaceAll("[\\s]+", StringUtils.SPACE)
);
diff --git a/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/base/utils/CommonUtil.java b/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/base/util/CommonUtils.java
similarity index 94%
rename from streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/base/utils/CommonUtil.java
rename to streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/base/util/CommonUtils.java
index 47f59dbcd3..141bdb6695 100755
--- a/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/base/utils/CommonUtil.java
+++ b/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/base/util/CommonUtils.java
@@ -18,9 +18,9 @@
* specific language governing permissions and limitations
* under the License.
*/
-package com.streamxhub.streamx.console.base.utils;
+package com.streamxhub.streamx.console.base.util;
-import com.streamxhub.streamx.common.util.AssertUtil;
+import com.streamxhub.streamx.common.util.AssertUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cglib.beans.BeanMap;
@@ -30,7 +30,7 @@
import java.util.*;
@Slf4j
-public class CommonUtil implements Serializable {
+public class CommonUtils implements Serializable {
private static final long serialVersionUID = 6458428317155311192L;
@@ -41,7 +41,6 @@ public class CommonUtil implements Serializable {
*
* @param objs 要判断,处理的对象
* @return Boolean
- * @author Ben
* @see 对象为Null返回true,集合的大小为0也返回true,迭代器没有下一个也返回true..
* @since 1.0
*/
@@ -110,7 +109,6 @@ public static Boolean isEmpty(Object... objs) {
*
* @param obj 要判断,处理的对象
* @return Boolean
- * @author Ben
* @see 与非空相反
* @since 1.0
*/
@@ -179,14 +177,14 @@ public static Float toFloat(Object val) {
}
public static List arrayToList(Object source) {
- return Arrays.asList(ObjectUtil.toObjectArray(source));
+ return Arrays.asList(ObjectUtils.toObjectArray(source));
}
public static boolean contains(Iterator iterator, Object element) {
if (iterator != null) {
while (iterator.hasNext()) {
Object candidate = iterator.next();
- if (ObjectUtil.safeEquals(candidate, element)) {
+ if (ObjectUtils.safeEquals(candidate, element)) {
return true;
}
}
@@ -205,7 +203,7 @@ public static boolean contains(Enumeration enumeration, Object element) {
if (enumeration != null) {
while (enumeration.hasMoreElements()) {
Object candidate = enumeration.nextElement();
- if (ObjectUtil.safeEquals(candidate, element)) {
+ if (ObjectUtils.safeEquals(candidate, element)) {
return true;
}
}
@@ -420,10 +418,10 @@ public static boolean isUnix() {
*/
public static int getPlatform() {
int platform = 0;
- if (CommonUtil.isUnix()) {
+ if (CommonUtils.isUnix()) {
platform = 1;
}
- if (CommonUtil.isWindows()) {
+ if (CommonUtils.isWindows()) {
platform = 2;
}
return platform;
@@ -440,7 +438,7 @@ public static > Map sortMapByValue(Map<
}
public static T[] arrayRemoveElements(T[] array, T... elem) {
- AssertUtil.notNull(array);
+ AssertUtils.notNull(array);
List arrayList = new ArrayList<>(0);
Collections.addAll(arrayList, array);
if (isEmpty(elem)) {
@@ -453,7 +451,7 @@ public static T[] arrayRemoveElements(T[] array, T... elem) {
}
public static T[] arrayRemoveIndex(T[] array, int... index) {
- AssertUtil.notNull(array);
+ AssertUtils.notNull(array);
for (int j : index) {
if (j < 0 || j > array.length - 1) {
throw new IndexOutOfBoundsException("index error.@" + j);
@@ -470,7 +468,7 @@ public static T[] arrayRemoveIndex(T[] array, int... index) {
}
public static T[] arrayInsertIndex(T[] array, int index, T t) {
- AssertUtil.notNull(array);
+ AssertUtils.notNull(array);
List arrayList = new ArrayList(array.length + 1);
if (index == 0) {
arrayList.add(t);
@@ -579,8 +577,8 @@ public static List