From c66ad8b92c63fb5e9e2710e94048bf7ca4caf632 Mon Sep 17 00:00:00 2001 From: benjobs Date: Fri, 25 Jun 2021 16:10:20 +0800 Subject: [PATCH 1/2] [Feature] add default welcome page. --- .../streamx/common/conf/ConfigConst.scala | 2 + .../streamx/common/util/HadoopUtils.scala | 179 +++++++++++------- .../streamx/common/util/HdfsUtils.scala | 44 ++--- .../db/migration/V1_1__init_data.sql | 2 +- 4 files changed, 130 insertions(+), 97 deletions(-) diff --git a/streamx-common/src/main/scala/com/streamxhub/streamx/common/conf/ConfigConst.scala b/streamx-common/src/main/scala/com/streamxhub/streamx/common/conf/ConfigConst.scala index cce46e78f0..fbe62c79f2 100644 --- a/streamx-common/src/main/scala/com/streamxhub/streamx/common/conf/ConfigConst.scala +++ b/streamx-common/src/main/scala/com/streamxhub/streamx/common/conf/ConfigConst.scala @@ -77,6 +77,8 @@ object ConfigConst { val KEY_JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf" + val KEY_SECURITY_KERBEROS_EXPIRE = "security.kerberos.expire" + //spark val KEY_SPARK_USER_ARGS = "spark.user.args" diff --git a/streamx-common/src/main/scala/com/streamxhub/streamx/common/util/HadoopUtils.scala b/streamx-common/src/main/scala/com/streamxhub/streamx/common/util/HadoopUtils.scala index bd98b9f586..ad41b9f14a 100644 --- a/streamx-common/src/main/scala/com/streamxhub/streamx/common/util/HadoopUtils.scala +++ b/streamx-common/src/main/scala/com/streamxhub/streamx/common/util/HadoopUtils.scala @@ -21,11 +21,12 @@ package com.streamxhub.streamx.common.util +import com.google.common.cache.{CacheBuilder, CacheLoader} import com.streamxhub.streamx.common.conf.ConfigConst._ import org.apache.commons.collections.CollectionUtils import org.apache.commons.lang.StringUtils import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{CommonConfigurationKeys, FileSystem, LocalFileSystem, Path} +import org.apache.hadoop.fs._ import org.apache.hadoop.hdfs.DistributedFileSystem import org.apache.hadoop.net.NetUtils import org.apache.hadoop.security.UserGroupInformation @@ -42,7 +43,7 @@ import org.apache.http.impl.client.HttpClients import java.io.{File, IOException} import java.net.InetAddress import java.util -import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import java.util.{HashMap => JavaHashMap} import scala.collection.JavaConversions._ import scala.util.control.Breaks._ @@ -57,24 +58,26 @@ object HadoopUtils extends Logger { private[this] val HADOOP_CONF_DIR: String = "HADOOP_CONF_DIR" private[this] val CONF_SUFFIX: String = "/etc/hadoop" - private[this] var innerYarnClient: YarnClient = _ + private[this] var reusableYarnClient: YarnClient = _ + private[this] var reusableConf: Configuration = _ + + private[util] var hdfs: FileSystem = getFileSystem(hadoopConf) + private[this] var rmHttpURL: String = _ private[this] val configurationCache: util.Map[String, Configuration] = new ConcurrentHashMap[String, Configuration] - lazy val kerberosConf: Map[String, String] = { - SystemPropertyUtils.get("app.home", null) match { - case null => - val inputStream = getClass.getResourceAsStream("/kerberos.yml") - if (inputStream == null) null else { - PropertiesUtils.fromYamlFile(inputStream) - } - case f => - val file = new File(s"$f/conf/kerberos.yml") - if (file.exists() && file.isFile) { - PropertiesUtils.fromYamlFile(file.getAbsolutePath) - } else null - } + lazy val kerberosConf: Map[String, String] = SystemPropertyUtils.get("app.home", null) match { + case null => + getClass.getResourceAsStream("/kerberos.yml") match { + case x if x != null => PropertiesUtils.fromYamlFile(x) + case _ => null + } + case f => + val file = new File(s"$f/conf/kerberos.yml") + if (file.exists() && file.isFile) { + PropertiesUtils.fromYamlFile(file.getAbsolutePath) + } else null } private[this] val kerberosEnable: Boolean = if (kerberosConf == null) false else { @@ -82,12 +85,21 @@ object HadoopUtils extends Logger { Try(enableString.trim.toBoolean).getOrElse(false) } - private[this] lazy val hadoopConfDir: String = Try(FileUtils.getPathFromEnv(HADOOP_CONF_DIR)) match { case Failure(_) => FileUtils.resolvePath(FileUtils.getPathFromEnv(HADOOP_HOME), CONF_SUFFIX) case Success(value) => value } + private[this] def getFileSystem(hadoopConf: Configuration): FileSystem = { + Try(FileSystem.get(hadoopConf)) match { + case Success(fs) => fs + case Failure(e) => + new IllegalArgumentException(s"[StreamX] access hdfs error.$e") + null + } + } + + def getConfigurationFromHadoopConfDir(confDir: String = hadoopConfDir): Configuration = { if (!configurationCache.containsKey(confDir)) { FileUtils.exists(confDir) @@ -111,68 +123,92 @@ object HadoopUtils extends Logger { * 推荐第二种方法,不用copy配置文件.
* */ - lazy val conf: Configuration = { - val conf = getConfigurationFromHadoopConfDir(hadoopConfDir) - //add hadoopConfDir to classpath...you know why??? - ClassLoaderUtils.loadResource(hadoopConfDir) - - if (StringUtils.isBlank(conf.get("hadoop.tmp.dir"))) { - conf.set("hadoop.tmp.dir", "/tmp") - } - if (StringUtils.isBlank(conf.get("hbase.fs.tmp.dir"))) { - conf.set("hbase.fs.tmp.dir", "/tmp") + def hadoopConf: Configuration = { + def initHadoopConf(): Configuration = { + val conf = getConfigurationFromHadoopConfDir(hadoopConfDir) + //add hadoopConfDir to classpath...you know why??? + ClassLoaderUtils.loadResource(hadoopConfDir) + + if (StringUtils.isBlank(conf.get("hadoop.tmp.dir"))) { + conf.set("hadoop.tmp.dir", "/tmp") + } + if (StringUtils.isBlank(conf.get("hbase.fs.tmp.dir"))) { + conf.set("hbase.fs.tmp.dir", "/tmp") + } + // disable timeline service as we only query yarn app here. + // Otherwise we may hit this kind of ERROR: + // java.lang.ClassNotFoundException: com.sun.jersey.api.client.config.ClientConfig + conf.set("yarn.timeline-service.enabled", "false") + conf.set("fs.hdfs.impl", classOf[DistributedFileSystem].getName) + conf.set("fs.file.impl", classOf[LocalFileSystem].getName) + conf.set("fs.hdfs.impl.disable.cache", "true") + conf } - // disable timeline service as we only query yarn app here. - // Otherwise we may hit this kind of ERROR: - // java.lang.ClassNotFoundException: com.sun.jersey.api.client.config.ClientConfig - conf.set("yarn.timeline-service.enabled", "false") - conf.set("fs.hdfs.impl", classOf[DistributedFileSystem].getName) - conf.set("fs.file.impl", classOf[LocalFileSystem].getName) - conf.set("fs.hdfs.impl.disable.cache", "true") - kerberosLogin(conf) - conf - } - private[this] def kerberosLogin(conf: Configuration): Unit = if (kerberosEnable) { - logInfo("kerberos login starting....") - val principal = kerberosConf.getOrElse(KEY_SECURITY_KERBEROS_PRINCIPAL, "").trim - val keytab = kerberosConf.getOrElse(KEY_SECURITY_KERBEROS_KEYTAB, "").trim - require( - principal.nonEmpty && keytab.nonEmpty, - s"${KEY_SECURITY_KERBEROS_PRINCIPAL} and ${KEY_SECURITY_KERBEROS_KEYTAB} must not be empty" - ) - - val krb5 = kerberosConf.getOrElse( - KEY_SECURITY_KERBEROS_KRB5_CONF, - kerberosConf.getOrElse(KEY_JAVA_SECURITY_KRB5_CONF, "") - ).trim - - if (krb5.nonEmpty) { - System.setProperty("java.security.krb5.conf", krb5) - System.setProperty("java.security.krb5.conf.path", krb5) + def kerberosLogin(conf: Configuration): Unit = { + logInfo("kerberos login starting....") + val principal = kerberosConf.getOrElse(KEY_SECURITY_KERBEROS_PRINCIPAL, "").trim + val keytab = kerberosConf.getOrElse(KEY_SECURITY_KERBEROS_KEYTAB, "").trim + require( + principal.nonEmpty && keytab.nonEmpty, + s"$KEY_SECURITY_KERBEROS_PRINCIPAL and $KEY_SECURITY_KERBEROS_KEYTAB must not be empty" + ) + + val krb5 = kerberosConf.getOrElse( + KEY_SECURITY_KERBEROS_KRB5_CONF, + kerberosConf.getOrElse(KEY_JAVA_SECURITY_KRB5_CONF, "") + ).trim + + if (krb5.nonEmpty) { + System.setProperty("java.security.krb5.conf", krb5) + System.setProperty("java.security.krb5.conf.path", krb5) + } + System.setProperty("sun.security.spnego.debug", "true") + System.setProperty("sun.security.krb5.debug", "true") + + conf.set(KEY_HADOOP_SECURITY_AUTHENTICATION, KEY_KERBEROS) + try { + UserGroupInformation.setConfiguration(conf) + UserGroupInformation.loginUserFromKeytab(principal, keytab) + logInfo("kerberos authentication successful") + } catch { + case e: IOException => + logInfo(s"kerberos login failed,${e.getLocalizedMessage}") + throw e + } } - conf.set(KEY_HADOOP_SECURITY_AUTHENTICATION, KEY_KERBEROS) - try { - UserGroupInformation.setConfiguration(conf) - UserGroupInformation.loginUserFromKeytab(principal, keytab) - logInfo("kerberos authentication successful") - } catch { - case e: IOException => - logInfo(s"kerberos login failed,${e.getLocalizedMessage}") - throw e + if (kerberosEnable) { + val expire = kerberosConf.getOrElse(KEY_SECURITY_KERBEROS_EXPIRE, "2").trim + CacheBuilder.newBuilder.expireAfterWrite(expire.toLong, TimeUnit.HOURS) + .build[String, Configuration](new CacheLoader[String, Configuration]() { + override def load(key: String): Configuration = { + val conf = initHadoopConf() + kerberosLogin(conf) + if (reusableYarnClient != null) { + reusableYarnClient.close() + reusableYarnClient = null + } + hdfs = getFileSystem(conf) + conf + } + }).asMap().entrySet().head.getValue + } else { + if (reusableConf == null) { + reusableConf = initHadoopConf() + } + reusableConf } } def yarnClient: YarnClient = { - if (innerYarnClient == null || !innerYarnClient.isInState(STATE.STARTED)) { - innerYarnClient = YarnClient.createYarnClient - val yarnConf = new YarnConfiguration(conf) - kerberosLogin(yarnConf) - innerYarnClient.init(yarnConf) - innerYarnClient.start() + if (reusableYarnClient == null || !reusableYarnClient.isInState(STATE.STARTED)) { + reusableYarnClient = YarnClient.createYarnClient + val yarnConf = new YarnConfiguration(hadoopConf) + reusableYarnClient.init(yarnConf) + reusableYarnClient.start() } - innerYarnClient + reusableYarnClient } /** @@ -190,6 +226,8 @@ object HadoopUtils extends Logger { synchronized { if (rmHttpURL == null || getLatest) { + val conf = hadoopConf + val useHttps = YarnConfiguration.useHttps(conf) val (addressPrefix, defaultPort, protocol) = useHttps match { case x if x => (YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS, "8090", "https://") @@ -299,5 +337,4 @@ object HadoopUtils extends Logger { fs.copyToLocalFile(sourcePath, destPath) new File(destPath.toString).getAbsolutePath } - } diff --git a/streamx-common/src/main/scala/com/streamxhub/streamx/common/util/HdfsUtils.scala b/streamx-common/src/main/scala/com/streamxhub/streamx/common/util/HdfsUtils.scala index f1b1f53503..b21b15cce6 100644 --- a/streamx-common/src/main/scala/com/streamxhub/streamx/common/util/HdfsUtils.scala +++ b/streamx-common/src/main/scala/com/streamxhub/streamx/common/util/HdfsUtils.scala @@ -32,37 +32,31 @@ import scala.util.{Failure, Success, Try} object HdfsUtils extends Logger { - lazy val hdfs: FileSystem = Try(FileSystem.get(HadoopUtils.conf)) match { - case Success(fs) => fs - case Failure(e) => new IllegalArgumentException(s"[StreamX] access hdfs error.$e") - null - } - - def getDefaultFS: String = HadoopUtils.conf.get(FileSystem.FS_DEFAULT_NAME_KEY) + def getDefaultFS: String = HadoopUtils.hadoopConf.get(FileSystem.FS_DEFAULT_NAME_KEY) - def list(src: String): List[FileStatus] = hdfs.listStatus(getPath(src)).toList + def list(src: String): List[FileStatus] = HadoopUtils.hdfs.listStatus(getPath(src)).toList - def movie(src: String, dst: String): Unit = hdfs.rename(getPath(src), getPath(dst)) + def movie(src: String, dst: String): Unit = HadoopUtils.hdfs.rename(getPath(src), getPath(dst)) - def mkdirs(path: String): Unit = hdfs.mkdirs(getPath(path)) + def mkdirs(path: String): Unit = HadoopUtils.hdfs.mkdirs(getPath(path)) def copyHdfs(src: String, dst: String, delSrc: Boolean = false, overwrite: Boolean = true): Unit = - FileUtil.copy(hdfs, getPath(src), hdfs, getPath(dst), delSrc, overwrite, HadoopUtils.conf) + FileUtil.copy(HadoopUtils.hdfs, getPath(src), HadoopUtils.hdfs, getPath(dst), delSrc, overwrite, HadoopUtils.hadoopConf) def copyHdfsDir(src: String, dst: String, delSrc: Boolean = false, overwrite: Boolean = true): Unit = { - list(src).foreach(x => FileUtil.copy(hdfs, x, hdfs, getPath(dst), delSrc, overwrite, HadoopUtils.conf)) + list(src).foreach(x => FileUtil.copy(HadoopUtils.hdfs, x, HadoopUtils.hdfs, getPath(dst), delSrc, overwrite, HadoopUtils.hadoopConf)) } def upload(src: String, dst: String, delSrc: Boolean = false, overwrite: Boolean = true): Unit = - hdfs.copyFromLocalFile(delSrc, overwrite, getPath(src), getPath(dst)) + HadoopUtils.hdfs.copyFromLocalFile(delSrc, overwrite, getPath(src), getPath(dst)) def upload2(srcs: Array[String], dst: String, delSrc: Boolean = false, overwrite: Boolean = true): Unit = - hdfs.copyFromLocalFile(delSrc, overwrite, srcs.map(getPath), getPath(dst)) + HadoopUtils.hdfs.copyFromLocalFile(delSrc, overwrite, srcs.map(getPath), getPath(dst)) def download(src: String, dst: String, delSrc: Boolean = false, useRawLocalFileSystem: Boolean = false): Unit = - hdfs.copyToLocalFile(delSrc, getPath(src), getPath(dst), useRawLocalFileSystem) + HadoopUtils.hdfs.copyToLocalFile(delSrc, getPath(src), getPath(dst), useRawLocalFileSystem) - def getNameNode: String = Try(getAddressOfActive(hdfs).getHostString) match { + def getNameNode: String = Try(getAddressOfActive(HadoopUtils.hdfs).getHostString) match { case Success(value) => value case Failure(exception) => throw exception } @@ -76,19 +70,19 @@ object HdfsUtils extends Logger { */ def create(fileName: String, content: String): Unit = { val path: Path = getPath(fileName) - require(hdfs.exists(path), s"[StreamX] hdfs $fileName is exists!! ") - val outputStream: FSDataOutputStream = hdfs.create(path) + require(HadoopUtils.hdfs.exists(path), s"[StreamX] hdfs $fileName is exists!! ") + val outputStream: FSDataOutputStream = HadoopUtils.hdfs.create(path) outputStream.writeUTF(content) outputStream.flush() outputStream.close() } - def exists(path: String): Boolean = hdfs.exists(getPath(path)) + def exists(path: String): Boolean = HadoopUtils.hdfs.exists(getPath(path)) def read(fileName: String): String = { val path: Path = getPath(fileName) - require(hdfs.exists(path) && !hdfs.isDirectory(path), s"[StreamX] path:$fileName not exists or isDirectory ") - val in = hdfs.open(path) + require(HadoopUtils.hdfs.exists(path) && !HadoopUtils.hdfs.isDirectory(path), s"[StreamX] path:$fileName not exists or isDirectory ") + val in = HadoopUtils.hdfs.open(path) val out = new ByteArrayOutputStream() IOUtils.copyBytes(in, out, 4096, false) out.flush() @@ -99,8 +93,8 @@ object HdfsUtils extends Logger { def delete(src: String): Unit = { val path: Path = getPath(src) - if (hdfs.exists(path)) { - hdfs.delete(path, true) + if (HadoopUtils.hdfs.exists(path)) { + HadoopUtils.hdfs.delete(path, true) } else { logWarn(s"hdfs delete $src,but file $src is not exists!") } @@ -108,7 +102,7 @@ object HdfsUtils extends Logger { def fileMd5(fileName: String): String = { val path = getPath(fileName) - val in = hdfs.open(path) + val in = HadoopUtils.hdfs.open(path) Try(DigestUtils.md5Hex(in)) match { case Success(s) => in.close() @@ -121,7 +115,7 @@ object HdfsUtils extends Logger { def downToLocal(hdfsPath: String, localPath: String): Unit = { val path: Path = getPath(hdfsPath) - val input: FSDataInputStream = hdfs.open(path) + val input: FSDataInputStream = HadoopUtils.hdfs.open(path) val content: String = input.readUTF val fw: FileWriter = new FileWriter(localPath) fw.write(content) diff --git a/streamx-console/streamx-console-service/src/main/resources/db/migration/V1_1__init_data.sql b/streamx-console/streamx-console-service/src/main/resources/db/migration/V1_1__init_data.sql index a4cc4e370d..d0a3199458 100644 --- a/streamx-console/streamx-console-service/src/main/resources/db/migration/V1_1__init_data.sql +++ b/streamx-console/streamx-console-service/src/main/resources/db/migration/V1_1__init_data.sql @@ -136,7 +136,7 @@ INSERT INTO `t_role_menu` VALUES (2, 34); -- ---------------------------- INSERT INTO `t_setting` VALUES (1, 'env.flink.home', NULL, 'Flink Home', 'Flink Home', 1); INSERT INTO `t_setting` VALUES (2, 'maven.central.repository', NULL, 'Maven Central Repository', 'Maven 私服地址', 1); -INSERT INTO `t_setting` VALUES (3, 'streamx.console.webapp.address', NULL, 'StreamX Webapp address', 'StreamX Console Web 应用程序 HTTP 端口', 1); +INSERT INTO `t_setting` VALUES (3, 'streamx.console.webapp.address', NULL, 'StreamX Webapp address', 'StreamX Console Web 应用程序HTTP URL', 1); INSERT INTO `t_setting` VALUES (4, 'streamx.console.workspace', '/streamx/workspace', 'StreamX Console Workspace', 'StreamX Console 的工作空间,用于存放项目源码,编译后的项目等', 1); INSERT INTO `t_setting` VALUES (5, 'alert.email.host', NULL, 'Alert Email Smtp Host', '告警邮箱Smtp Host', 1); INSERT INTO `t_setting` VALUES (6, 'alert.email.port', NULL, 'Alert Email Smtp Port', '告警邮箱的Smtp Port', 1); From f2f5ee2da18cc49a47c9dbb95b8d37a02146cb38 Mon Sep 17 00:00:00 2001 From: benjobs Date: Fri, 25 Jun 2021 19:23:35 +0800 Subject: [PATCH 2/2] Update version to 1.1.0 --- pom.xml | 2 +- streamx-common/pom.xml | 8 ++- .../streamx/common/util/HadoopUtils.scala | 48 +++++++++++------- streamx-console/pom.xml | 2 +- .../streamx-console-service/pom.xml | 6 +-- .../console/core/task/FlinkTrackingTask.java | 8 +-- .../src/main/resources/kerberos.yml | 1 + .../src/test/java/RefreshCacheTest.java | 50 +++++++++++++++++++ .../streamx-console-webapp/pom.xml | 2 +- streamx-flink/pom.xml | 2 +- streamx-flink/streamx-flink-core/pom.xml | 2 +- streamx-flink/streamx-flink-shims/pom.xml | 2 +- .../streamx-flink-shims-base/pom.xml | 2 +- .../streamx-flink-shims_flink-1.12/pom.xml | 2 +- .../streamx-flink-shims_flink-1.13/pom.xml | 2 +- streamx-flink/streamx-flink-test/pom.xml | 2 +- streamx-parent/pom.xml | 14 +++++- streamx-plugin/pom.xml | 2 +- streamx-plugin/streamx-flink-repl/pom.xml | 2 +- .../streamx-flink-sqlclient/pom.xml | 2 +- streamx-plugin/streamx-flink-submit/pom.xml | 2 +- streamx-plugin/streamx-flink-udf/pom.xml | 2 +- streamx-plugin/streamx-jvm-profiler/pom.xml | 2 +- streamx-spark/pom.xml | 2 +- streamx-spark/streamx-spark-cli/pom.xml | 2 +- streamx-spark/streamx-spark-core/pom.xml | 2 +- streamx-spark/streamx-spark-test/pom.xml | 6 +-- 27 files changed, 130 insertions(+), 49 deletions(-) create mode 100644 streamx-console/streamx-console-service/src/test/java/RefreshCacheTest.java diff --git a/pom.xml b/pom.xml index ebe2c77ff9..e606a0f4c5 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ 4.0.0 com.streamxhub.streamx streamx - 1.1.0-SNAPSHOT + 1.1.0 pom StreamX The flink and spark development scaffolding diff --git a/streamx-common/pom.xml b/streamx-common/pom.xml index 80b278b5ef..d349039f8f 100644 --- a/streamx-common/pom.xml +++ b/streamx-common/pom.xml @@ -7,7 +7,7 @@ com.streamxhub.streamx streamx-parent - 1.1.0-SNAPSHOT + 1.1.0 ../streamx-parent/pom.xml @@ -15,12 +15,18 @@ StreamX : Common + com.beachape enumeratum_${scala.binary.version} + + com.github.ben-manes.caffeine + caffeine + + ch.qos.logback diff --git a/streamx-common/src/main/scala/com/streamxhub/streamx/common/util/HadoopUtils.scala b/streamx-common/src/main/scala/com/streamxhub/streamx/common/util/HadoopUtils.scala index ad41b9f14a..13f49ab103 100644 --- a/streamx-common/src/main/scala/com/streamxhub/streamx/common/util/HadoopUtils.scala +++ b/streamx-common/src/main/scala/com/streamxhub/streamx/common/util/HadoopUtils.scala @@ -21,7 +21,7 @@ package com.streamxhub.streamx.common.util -import com.google.common.cache.{CacheBuilder, CacheLoader} +import com.github.benmanes.caffeine.cache.{Cache, CacheLoader, Caffeine} import com.streamxhub.streamx.common.conf.ConfigConst._ import org.apache.commons.collections.CollectionUtils import org.apache.commons.lang.StringUtils @@ -43,7 +43,7 @@ import org.apache.http.impl.client.HttpClients import java.io.{File, IOException} import java.net.InetAddress import java.util -import java.util.concurrent.{ConcurrentHashMap, TimeUnit} +import java.util.concurrent.ConcurrentHashMap import java.util.{HashMap => JavaHashMap} import scala.collection.JavaConversions._ import scala.util.control.Breaks._ @@ -54,9 +54,9 @@ import scala.util.{Failure, Success, Try} */ object HadoopUtils extends Logger { - private[this] val HADOOP_HOME: String = "HADOOP_HOME" - private[this] val HADOOP_CONF_DIR: String = "HADOOP_CONF_DIR" - private[this] val CONF_SUFFIX: String = "/etc/hadoop" + private[this] lazy val HADOOP_HOME: String = "HADOOP_HOME" + private[this] lazy val HADOOP_CONF_DIR: String = "HADOOP_CONF_DIR" + private[this] lazy val CONF_SUFFIX: String = "/etc/hadoop" private[this] var reusableYarnClient: YarnClient = _ private[this] var reusableConf: Configuration = _ @@ -65,7 +65,9 @@ object HadoopUtils extends Logger { private[this] var rmHttpURL: String = _ - private[this] val configurationCache: util.Map[String, Configuration] = new ConcurrentHashMap[String, Configuration] + private[this] lazy val configurationCache: util.Map[String, Configuration] = new ConcurrentHashMap[String, Configuration]() + + private[this] var caffeine: Cache[String, Configuration] = _ lazy val kerberosConf: Map[String, String] = SystemPropertyUtils.get("app.home", null) match { case null => @@ -178,21 +180,31 @@ object HadoopUtils extends Logger { } } + def refreshConf(): Configuration = { + val conf = initHadoopConf() + kerberosLogin(conf) + if (reusableYarnClient != null) { + reusableYarnClient.close() + reusableYarnClient = null + } + hdfs = getFileSystem(conf) + conf + } + if (kerberosEnable) { val expire = kerberosConf.getOrElse(KEY_SECURITY_KERBEROS_EXPIRE, "2").trim - CacheBuilder.newBuilder.expireAfterWrite(expire.toLong, TimeUnit.HOURS) - .build[String, Configuration](new CacheLoader[String, Configuration]() { - override def load(key: String): Configuration = { - val conf = initHadoopConf() - kerberosLogin(conf) - if (reusableYarnClient != null) { - reusableYarnClient.close() - reusableYarnClient = null + val (duration, unit) = DateUtils.getTimeUnit(expire) + if (caffeine == null) { + caffeine = Caffeine.newBuilder().refreshAfterWrite(duration.toLong, unit) + .build[String, Configuration](new CacheLoader[String, Configuration]() { + override def load(key: String): Configuration = { + logInfo(s"recertification kerberos: time:${DateUtils.now(DateUtils.fullFormat)}, duration:$expire") + refreshConf() } - hdfs = getFileSystem(conf) - conf - } - }).asMap().entrySet().head.getValue + }) + caffeine.put("config", refreshConf()) + } + caffeine.getIfPresent("config") } else { if (reusableConf == null) { reusableConf = initHadoopConf() diff --git a/streamx-console/pom.xml b/streamx-console/pom.xml index b9fbad9bd0..86d5f5d973 100644 --- a/streamx-console/pom.xml +++ b/streamx-console/pom.xml @@ -6,7 +6,7 @@ 4.0.0 com.streamxhub.streamx streamx-console - 1.1.0-SNAPSHOT + 1.1.0 StreamX : Console Parent pom diff --git a/streamx-console/streamx-console-service/pom.xml b/streamx-console/streamx-console-service/pom.xml index f9649c97f7..077f4f352e 100644 --- a/streamx-console/streamx-console-service/pom.xml +++ b/streamx-console/streamx-console-service/pom.xml @@ -9,7 +9,7 @@ streamx-console-service - 1.1.0-SNAPSHOT + 1.1.0 StreamX : Console Service @@ -412,13 +412,13 @@ com.streamxhub.streamx streamx-flink-shims_flink-1.12 - 1.1.0-SNAPSHOT + 1.1.0 com.streamxhub.streamx streamx-flink-shims_flink-1.13 - 1.1.0-SNAPSHOT + 1.1.0 diff --git a/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/task/FlinkTrackingTask.java b/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/task/FlinkTrackingTask.java index 8fb1adde16..a277ff0691 100644 --- a/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/task/FlinkTrackingTask.java +++ b/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/task/FlinkTrackingTask.java @@ -168,17 +168,17 @@ public void ending() { @Scheduled(fixedDelay = 1000) public void execute() { // 正常5秒钟获取一次信息 - long track_interval = 1000L * 5; + long trackInterval = 1000L * 5; //10秒之内 - long option_interval = 1000L * 10; + long optionInterval = 1000L * 10; //1) 项目刚启动第一次执行,或者前端正在操作...(启动,停止)需要立即返回状态信息. if (lastTrackTime == null || !OPTIONING.isEmpty()) { tracking(); - } else if (System.currentTimeMillis() - lastOptionTime <= option_interval) { + } else if (System.currentTimeMillis() - lastOptionTime <= optionInterval) { //2) 如果在管理端正在操作时间的10秒中之内(每秒执行一次) tracking(); - } else if (System.currentTimeMillis() - lastTrackTime >= track_interval) { + } else if (System.currentTimeMillis() - lastTrackTime >= trackInterval) { //3) 正常信息获取,判断本次时间和上次时间是否间隔5秒(正常监控信息获取,每5秒一次) tracking(); } diff --git a/streamx-console/streamx-console-service/src/main/resources/kerberos.yml b/streamx-console/streamx-console-service/src/main/resources/kerberos.yml index 39729394a7..aef84dfef1 100644 --- a/streamx-console/streamx-console-service/src/main/resources/kerberos.yml +++ b/streamx-console/streamx-console-service/src/main/resources/kerberos.yml @@ -5,6 +5,7 @@ security: principal: krb5: keytab: + expire: 2 h #2小时过期 java: security: diff --git a/streamx-console/streamx-console-service/src/test/java/RefreshCacheTest.java b/streamx-console/streamx-console-service/src/test/java/RefreshCacheTest.java new file mode 100644 index 0000000000..8b19de748d --- /dev/null +++ b/streamx-console/streamx-console-service/src/test/java/RefreshCacheTest.java @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2019 The StreamX Project + *

+ * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import org.junit.Test; + +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +public class RefreshCacheTest { + + Cache caffeine = null; + + @Test + public void cache() throws Exception { + if (caffeine == null) { + caffeine = Caffeine.newBuilder() + .refreshAfterWrite(10, TimeUnit.SECONDS) + .build(this::refresh); + } + caffeine.put("config", "hadoop"); + while (true) { + System.out.println(caffeine.getIfPresent("config")); + Thread.sleep(2000); + } + } + + public String refresh(String value) { + return UUID.randomUUID() + "@" + value; + } +} diff --git a/streamx-console/streamx-console-webapp/pom.xml b/streamx-console/streamx-console-webapp/pom.xml index 53ec145e24..5ce6e6911b 100644 --- a/streamx-console/streamx-console-webapp/pom.xml +++ b/streamx-console/streamx-console-webapp/pom.xml @@ -3,7 +3,7 @@ 4.0.0 com.streamxhub.streamx streamx-console-webapp - 1.1.0-SNAPSHOT + 1.1.0 StreamX : Console Webapp diff --git a/streamx-flink/pom.xml b/streamx-flink/pom.xml index 3cf719aeda..7a013f9b5a 100644 --- a/streamx-flink/pom.xml +++ b/streamx-flink/pom.xml @@ -6,7 +6,7 @@ com.streamxhub.streamx streamx-parent - 1.1.0-SNAPSHOT + 1.1.0 ../streamx-parent/pom.xml diff --git a/streamx-flink/streamx-flink-core/pom.xml b/streamx-flink/streamx-flink-core/pom.xml index 7692c9c220..5286f9da79 100644 --- a/streamx-flink/streamx-flink-core/pom.xml +++ b/streamx-flink/streamx-flink-core/pom.xml @@ -6,7 +6,7 @@ com.streamxhub.streamx streamx-flink - 1.1.0-SNAPSHOT + 1.1.0 streamx-flink-core diff --git a/streamx-flink/streamx-flink-shims/pom.xml b/streamx-flink/streamx-flink-shims/pom.xml index 81a27587f8..24146e5e8b 100644 --- a/streamx-flink/streamx-flink-shims/pom.xml +++ b/streamx-flink/streamx-flink-shims/pom.xml @@ -7,7 +7,7 @@ com.streamxhub.streamx streamx-flink - 1.1.0-SNAPSHOT + 1.1.0 streamx-flink-shims diff --git a/streamx-flink/streamx-flink-shims/streamx-flink-shims-base/pom.xml b/streamx-flink/streamx-flink-shims/streamx-flink-shims-base/pom.xml index 2acff56c9a..2b7a2d0b30 100644 --- a/streamx-flink/streamx-flink-shims/streamx-flink-shims-base/pom.xml +++ b/streamx-flink/streamx-flink-shims/streamx-flink-shims-base/pom.xml @@ -7,7 +7,7 @@ com.streamxhub.streamx streamx-flink-shims - 1.1.0-SNAPSHOT + 1.1.0 streamx-flink-shims-base diff --git a/streamx-flink/streamx-flink-shims/streamx-flink-shims_flink-1.12/pom.xml b/streamx-flink/streamx-flink-shims/streamx-flink-shims_flink-1.12/pom.xml index fd8d9896ff..5433b678cb 100644 --- a/streamx-flink/streamx-flink-shims/streamx-flink-shims_flink-1.12/pom.xml +++ b/streamx-flink/streamx-flink-shims/streamx-flink-shims_flink-1.12/pom.xml @@ -7,7 +7,7 @@ com.streamxhub.streamx streamx-flink-shims - 1.1.0-SNAPSHOT + 1.1.0 streamx-flink-shims_flink-1.12 diff --git a/streamx-flink/streamx-flink-shims/streamx-flink-shims_flink-1.13/pom.xml b/streamx-flink/streamx-flink-shims/streamx-flink-shims_flink-1.13/pom.xml index 53d51ed448..38dfc1e37d 100644 --- a/streamx-flink/streamx-flink-shims/streamx-flink-shims_flink-1.13/pom.xml +++ b/streamx-flink/streamx-flink-shims/streamx-flink-shims_flink-1.13/pom.xml @@ -7,7 +7,7 @@ com.streamxhub.streamx streamx-flink-shims - 1.1.0-SNAPSHOT + 1.1.0 streamx-flink-shims_flink-1.13 diff --git a/streamx-flink/streamx-flink-test/pom.xml b/streamx-flink/streamx-flink-test/pom.xml index f08b50b10e..72fa2db1e3 100644 --- a/streamx-flink/streamx-flink-test/pom.xml +++ b/streamx-flink/streamx-flink-test/pom.xml @@ -6,7 +6,7 @@ com.streamxhub.streamx streamx-flink - 1.1.0-SNAPSHOT + 1.1.0 streamx-flink-test StreamX : Flink Test diff --git a/streamx-parent/pom.xml b/streamx-parent/pom.xml index 9c64905d26..78b33a3a44 100644 --- a/streamx-parent/pom.xml +++ b/streamx-parent/pom.xml @@ -6,7 +6,7 @@ 4.0.0 com.streamxhub.streamx streamx-parent - 1.1.0-SNAPSHOT + 1.1.0 pom StreamX Parent A magical framework that makes Flink development easier @@ -88,6 +88,18 @@ + + com.github.ben-manes.caffeine + caffeine + 2.8.6 + + + com.google.errorprone + error_prone_annotations + + + + mysql mysql-connector-java diff --git a/streamx-plugin/pom.xml b/streamx-plugin/pom.xml index f0c3273078..59dddd89d9 100644 --- a/streamx-plugin/pom.xml +++ b/streamx-plugin/pom.xml @@ -6,7 +6,7 @@ com.streamxhub.streamx streamx-parent - 1.1.0-SNAPSHOT + 1.1.0 ../streamx-parent/pom.xml streamx-plugin diff --git a/streamx-plugin/streamx-flink-repl/pom.xml b/streamx-plugin/streamx-flink-repl/pom.xml index a9d1523cde..09a21164d2 100644 --- a/streamx-plugin/streamx-flink-repl/pom.xml +++ b/streamx-plugin/streamx-flink-repl/pom.xml @@ -5,7 +5,7 @@ com.streamxhub.streamx streamx-plugin - 1.1.0-SNAPSHOT + 1.1.0 streamx-flink-repl diff --git a/streamx-plugin/streamx-flink-sqlclient/pom.xml b/streamx-plugin/streamx-flink-sqlclient/pom.xml index 6055fcd126..c49a215652 100644 --- a/streamx-plugin/streamx-flink-sqlclient/pom.xml +++ b/streamx-plugin/streamx-flink-sqlclient/pom.xml @@ -5,7 +5,7 @@ com.streamxhub.streamx streamx-plugin - 1.1.0-SNAPSHOT + 1.1.0 streamx-flink-sqlclient diff --git a/streamx-plugin/streamx-flink-submit/pom.xml b/streamx-plugin/streamx-flink-submit/pom.xml index 108e74b351..65fe31aca2 100644 --- a/streamx-plugin/streamx-flink-submit/pom.xml +++ b/streamx-plugin/streamx-flink-submit/pom.xml @@ -6,7 +6,7 @@ com.streamxhub.streamx streamx-plugin - 1.1.0-SNAPSHOT + 1.1.0 streamx-flink-submit diff --git a/streamx-plugin/streamx-flink-udf/pom.xml b/streamx-plugin/streamx-flink-udf/pom.xml index 98e11b9e32..3ddd2f1945 100644 --- a/streamx-plugin/streamx-flink-udf/pom.xml +++ b/streamx-plugin/streamx-flink-udf/pom.xml @@ -6,7 +6,7 @@ com.streamxhub.streamx streamx-plugin - 1.1.0-SNAPSHOT + 1.1.0 streamx-flink-udf StreamX : Flink Udf diff --git a/streamx-plugin/streamx-jvm-profiler/pom.xml b/streamx-plugin/streamx-jvm-profiler/pom.xml index a5af69e2c9..5b16526a1a 100644 --- a/streamx-plugin/streamx-jvm-profiler/pom.xml +++ b/streamx-plugin/streamx-jvm-profiler/pom.xml @@ -4,7 +4,7 @@ com.streamxhub.streamx streamx-plugin - 1.1.0-SNAPSHOT + 1.1.0 streamx-jvm-profiler StreamX : JVM Profiler diff --git a/streamx-spark/pom.xml b/streamx-spark/pom.xml index b29a724916..9616f747ee 100644 --- a/streamx-spark/pom.xml +++ b/streamx-spark/pom.xml @@ -6,7 +6,7 @@ com.streamxhub.streamx streamx-parent - 1.1.0-SNAPSHOT + 1.1.0 ../streamx-parent/pom.xml diff --git a/streamx-spark/streamx-spark-cli/pom.xml b/streamx-spark/streamx-spark-cli/pom.xml index 2f3366273f..ef8f5e5658 100644 --- a/streamx-spark/streamx-spark-cli/pom.xml +++ b/streamx-spark/streamx-spark-cli/pom.xml @@ -6,7 +6,7 @@ com.streamxhub.streamx streamx-spark - 1.1.0-SNAPSHOT + 1.1.0 streamx-spark-cli StreamX : Spark Cli diff --git a/streamx-spark/streamx-spark-core/pom.xml b/streamx-spark/streamx-spark-core/pom.xml index aee33901b1..d9537a0d0a 100644 --- a/streamx-spark/streamx-spark-core/pom.xml +++ b/streamx-spark/streamx-spark-core/pom.xml @@ -6,7 +6,7 @@ com.streamxhub.streamx streamx-spark - 1.1.0-SNAPSHOT + 1.1.0 streamx-spark-core StreamX : Spark Core diff --git a/streamx-spark/streamx-spark-test/pom.xml b/streamx-spark/streamx-spark-test/pom.xml index 36f8dbc872..6c8d7230be 100644 --- a/streamx-spark/streamx-spark-test/pom.xml +++ b/streamx-spark/streamx-spark-test/pom.xml @@ -6,18 +6,18 @@ com.streamxhub.streamx streamx-spark - 1.1.0-SNAPSHOT + 1.1.0 streamx-spark-test StreamX : Spark Test - 1.1.0-SNAPSHOT + 1.1.0 com.streamxhub.streamx streamx-spark-core - 1.1.0-SNAPSHOT + 1.1.0