Skip to content

Commit

Permalink
Merge pull request #211 from wolfboys/main
Browse files Browse the repository at this point in the history
streamx 1.1.0 release
  • Loading branch information
wolfboys authored Jun 27, 2021
2 parents d9b39bd + 901df28 commit 90988a8
Show file tree
Hide file tree
Showing 46 changed files with 259 additions and 267 deletions.
33 changes: 24 additions & 9 deletions streamx-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,13 @@
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>provided</scope>
<optional>true</optional>
</dependency>

<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<scope>provided</scope>
<optional>true</optional>
</dependency>

<dependency>
Expand Down Expand Up @@ -134,36 +134,51 @@
<artifactId>commons-cli</artifactId>
</dependency>

<!--flink base-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<optional>true</optional>
</dependency>

<!--hbase-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<scope>provided</scope>
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<scope>provided</scope>
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
<scope>provided</scope>
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
<scope>provided</scope>
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>3.12.2</version>
<scope>provided</scope>
<optional>true</optional>
</dependency>

<dependency>
Expand All @@ -175,14 +190,14 @@
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.6.0</version>
<scope>provided</scope>
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.apache.ivy</groupId>
<artifactId>ivy</artifactId>
<version>2.4.0</version>
<scope>provided</scope>
<optional>true</optional>
</dependency>

</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ package com.streamxhub.streamx.common.util
import java.util;


object AssertUtil {
object AssertUtils {

/**
* Assert a boolean expression, throwing <code>IllegalArgumentException</code> if the test result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
*/
package com.streamxhub.streamx.common.util

import java.text.SimpleDateFormat
import java.text.{ParseException, SimpleDateFormat}
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
import java.util.concurrent.TimeUnit
import java.util.{Calendar, TimeZone, _}
import scala.util._
Expand Down Expand Up @@ -181,6 +183,24 @@ object DateUtils {
}
}

def formatFullTime(localDateTime: LocalDateTime): String = formatFullTime(localDateTime, fullFormat)

def formatFullTime(localDateTime: LocalDateTime, pattern: String): String = {
val dateTimeFormatter = DateTimeFormatter.ofPattern(pattern)
localDateTime.format(dateTimeFormatter)
}

private def getDateFormat(date: Date, dateFormatType: String) = {
val simformat = new SimpleDateFormat(dateFormatType)
simformat.format(date)
}

@throws[ParseException] def formatCSTTime(date: String, format: String): String = {
val sdf = new SimpleDateFormat("EEE MMM dd HH:mm:ss zzz yyyy", Locale.US)
val d = sdf.parse(date)
DateUtils.getDateFormat(d, format)
}

def main(args: Array[String]): Unit = {
println(DateUtils.+-(-1))
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright (c) 2019 The StreamX Project
* <p>
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.streamxhub.streamx.common.util

import org.apache.flink.api.common.Plan
import org.apache.flink.client.program.{PackagedProgram, PackagedProgramUtils, ProgramInvocationException}
import org.apache.flink.configuration.{Configuration, JobManagerOptions}
import org.apache.flink.optimizer.costs.DefaultCostEstimator
import org.apache.flink.optimizer.plan.OptimizedPlan
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator
import org.apache.flink.optimizer.{DataStatistics, Optimizer}

import java.net.{InetAddress, InetSocketAddress, ServerSocket}

object FlinkClientUtils {


/**
* getExecutionPlan
*
* @param packagedProgram
* @throws
* @return
*/
@throws[ProgramInvocationException] def getExecutionPlan(packagedProgram: PackagedProgram): String = {
require(packagedProgram != null)
val address: InetAddress = InetAddress.getLocalHost
val jmAddress = new InetSocketAddress(address, new ServerSocket(0).getLocalPort)

val config = new Configuration
config.setString(JobManagerOptions.ADDRESS, jmAddress.getHostName)
config.setInteger(JobManagerOptions.PORT, jmAddress.getPort)

val optimizer = new Optimizer(new DataStatistics, new DefaultCostEstimator, config)
val plan: Plan = PackagedProgramUtils.getPipelineFromProgram(packagedProgram, config, -1, true).asInstanceOf[Plan]
val optimizedPlan: OptimizedPlan = optimizer.compile(plan)
require(optimizedPlan != null)

val dumper = new PlanJSONDumpGenerator
dumper.setEncodeForHTML(true)
dumper.getOptimizerPlanAsJSON(optimizedPlan)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
* specific language governing permissions and limitations
* under the License.
*/
package com.streamxhub.streamx.flink.core.scala.util
package com.streamxhub.streamx.common.util


import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.api.common.typeinfo.TypeInformation
Expand All @@ -30,5 +31,4 @@ object FlinkUtils {
context.getOperatorStateStore.getUnionListState(new ListStateDescriptor(descriptorName, implicitly[TypeInformation[R]].getTypeClass))
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ object HadoopUtils extends Logger {
if (!configurationCache.containsKey(confDir)) {
FileUtils.exists(confDir)
val hadoopConfDir = new File(confDir)
val confName = List("core-site.xml", "hdfs-site.xml", "yarn-site.xml")
val confName = List("core-site.xml", "hdfs-site.xml", "yarn-site.xml","mapred-site.xml")
val files = hadoopConfDir.listFiles().filter(x => x.isFile && confName.contains(x.getName)).toList
val conf = new Configuration()
if (CollectionUtils.isNotEmpty(files)) {
Expand Down Expand Up @@ -175,7 +175,7 @@ object HadoopUtils extends Logger {
logInfo("kerberos authentication successful")
} catch {
case e: IOException =>
logInfo(s"kerberos login failed,${e.getLocalizedMessage}")
logError(s"kerberos login failed,${e.getLocalizedMessage}")
throw e
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
package com.streamxhub.streamx.console.base.config;

import com.p6spy.engine.spy.appender.MessageFormattingStrategy;
import com.streamxhub.streamx.console.base.utils.DateUtil;
import com.streamxhub.streamx.common.util.DateUtils;
import org.apache.commons.lang3.StringUtils;

import java.time.LocalDateTime;
Expand Down Expand Up @@ -49,7 +49,7 @@ public String formatMessage(
return StringUtils.isBlank(sql) ? "" :
String.format(
"%s | 耗时 %d ms | SQL 语句:\n %s;",
DateUtil.formatFullTime(LocalDateTime.now()),
DateUtils.formatFullTime(LocalDateTime.now()),
elapsed,
sql.replaceAll("[\\s]+", StringUtils.SPACE)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
* specific language governing permissions and limitations
* under the License.
*/
package com.streamxhub.streamx.console.base.utils;
package com.streamxhub.streamx.console.base.util;

import com.streamxhub.streamx.common.util.AssertUtil;
import com.streamxhub.streamx.common.util.AssertUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cglib.beans.BeanMap;

Expand All @@ -30,7 +30,7 @@
import java.util.*;

@Slf4j
public class CommonUtil implements Serializable {
public class CommonUtils implements Serializable {

private static final long serialVersionUID = 6458428317155311192L;

Expand All @@ -41,7 +41,6 @@ public class CommonUtil implements Serializable {
*
* @param objs 要判断,处理的对象
* @return Boolean
* @author <a href="mailto:[email protected]">Ben</a>
* @see <b>对象为Null返回true,集合的大小为0也返回true,迭代器没有下一个也返回true..</b>
* @since 1.0
*/
Expand Down Expand Up @@ -110,7 +109,6 @@ public static Boolean isEmpty(Object... objs) {
*
* @param obj 要判断,处理的对象
* @return Boolean
* @author <a href="mailto:[email protected]">Ben</a>
* @see <b>与非空相反</b>
* @since 1.0
*/
Expand Down Expand Up @@ -179,14 +177,14 @@ public static Float toFloat(Object val) {
}

public static List arrayToList(Object source) {
return Arrays.asList(ObjectUtil.toObjectArray(source));
return Arrays.asList(ObjectUtils.toObjectArray(source));
}

public static boolean contains(Iterator iterator, Object element) {
if (iterator != null) {
while (iterator.hasNext()) {
Object candidate = iterator.next();
if (ObjectUtil.safeEquals(candidate, element)) {
if (ObjectUtils.safeEquals(candidate, element)) {
return true;
}
}
Expand All @@ -205,7 +203,7 @@ public static boolean contains(Enumeration enumeration, Object element) {
if (enumeration != null) {
while (enumeration.hasMoreElements()) {
Object candidate = enumeration.nextElement();
if (ObjectUtil.safeEquals(candidate, element)) {
if (ObjectUtils.safeEquals(candidate, element)) {
return true;
}
}
Expand Down Expand Up @@ -420,10 +418,10 @@ public static boolean isUnix() {
*/
public static int getPlatform() {
int platform = 0;
if (CommonUtil.isUnix()) {
if (CommonUtils.isUnix()) {
platform = 1;
}
if (CommonUtil.isWindows()) {
if (CommonUtils.isWindows()) {
platform = 2;
}
return platform;
Expand All @@ -440,7 +438,7 @@ public static <K, V extends Comparable<? super V>> Map<K, V> sortMapByValue(Map<
}

public static <T> T[] arrayRemoveElements(T[] array, T... elem) {
AssertUtil.notNull(array);
AssertUtils.notNull(array);
List<T> arrayList = new ArrayList<>(0);
Collections.addAll(arrayList, array);
if (isEmpty(elem)) {
Expand All @@ -453,7 +451,7 @@ public static <T> T[] arrayRemoveElements(T[] array, T... elem) {
}

public static <T> T[] arrayRemoveIndex(T[] array, int... index) {
AssertUtil.notNull(array);
AssertUtils.notNull(array);
for (int j : index) {
if (j < 0 || j > array.length - 1) {
throw new IndexOutOfBoundsException("index error.@" + j);
Expand All @@ -470,7 +468,7 @@ public static <T> T[] arrayRemoveIndex(T[] array, int... index) {
}

public static <T> T[] arrayInsertIndex(T[] array, int index, T t) {
AssertUtil.notNull(array);
AssertUtils.notNull(array);
List<T> arrayList = new ArrayList<T>(array.length + 1);
if (index == 0) {
arrayList.add(t);
Expand Down Expand Up @@ -579,8 +577,8 @@ public static <T> List<Map<String, Object>> objectsToMaps(List<T> objList) {
if (objList != null && objList.size() > 0) {
Map<String, Object> map = null;
T bean = null;
for (int i = 0, size = objList.size(); i < size; i++) {
bean = objList.get(i);
for (T t : objList) {
bean = t;
map = beanToMap(bean);
list.add(map);
}
Expand All @@ -601,10 +599,10 @@ public static <T> List<T> mapsToObjects(List<Map<String, Object>> maps, Class<T>
throws InstantiationException, IllegalAccessException {
List<T> list = new ArrayList<>();
if (maps != null && maps.size() > 0) {
Map<String, Object> map = null;
T bean = null;
for (int i = 0, size = maps.size(); i < size; i++) {
map = maps.get(i);
Map<String, Object> map;
T bean;
for (Map<String, Object> stringObjectMap : maps) {
map = stringObjectMap;
bean = clazz.newInstance();
mapToBean(map, bean);
list.add(bean);
Expand Down
Loading

0 comments on commit 90988a8

Please sign in to comment.