Skip to content

Commit

Permalink
Merge pull request #207 from wolfboys/main
Browse files Browse the repository at this point in the history
Update version to 1.1.0
  • Loading branch information
wolfboys authored Jun 25, 2021
2 parents 569f4f3 + f2f5ee2 commit d9b39bd
Show file tree
Hide file tree
Showing 30 changed files with 244 additions and 130 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.streamxhub.streamx</groupId>
<artifactId>streamx</artifactId>
<version>1.1.0-SNAPSHOT</version>
<version>1.1.0</version>
<packaging>pom</packaging>
<name>StreamX</name>
<description>The flink and spark development scaffolding</description>
Expand Down
8 changes: 7 additions & 1 deletion streamx-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,26 @@
<parent>
<groupId>com.streamxhub.streamx</groupId>
<artifactId>streamx-parent</artifactId>
<version>1.1.0-SNAPSHOT</version>
<version>1.1.0</version>
<relativePath>../streamx-parent/pom.xml</relativePath>
</parent>

<artifactId>streamx-common</artifactId>
<name>StreamX : Common</name>

<dependencies>

<!-- enumeratum -->
<dependency>
<groupId>com.beachape</groupId>
<artifactId>enumeratum_${scala.binary.version}</artifactId>
</dependency>

<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>

<!--log4j -->
<dependency>
<groupId>ch.qos.logback</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ object ConfigConst {

val KEY_JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf"

val KEY_SECURITY_KERBEROS_EXPIRE = "security.kerberos.expire"

//spark

val KEY_SPARK_USER_ARGS = "spark.user.args"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@

package com.streamxhub.streamx.common.util

import com.github.benmanes.caffeine.cache.{Cache, CacheLoader, Caffeine}
import com.streamxhub.streamx.common.conf.ConfigConst._
import org.apache.commons.collections.CollectionUtils
import org.apache.commons.lang.StringUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{CommonConfigurationKeys, FileSystem, LocalFileSystem, Path}
import org.apache.hadoop.fs._
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.hadoop.net.NetUtils
import org.apache.hadoop.security.UserGroupInformation
Expand Down Expand Up @@ -53,41 +54,54 @@ import scala.util.{Failure, Success, Try}
*/
object HadoopUtils extends Logger {

private[this] val HADOOP_HOME: String = "HADOOP_HOME"
private[this] val HADOOP_CONF_DIR: String = "HADOOP_CONF_DIR"
private[this] val CONF_SUFFIX: String = "/etc/hadoop"
private[this] lazy val HADOOP_HOME: String = "HADOOP_HOME"
private[this] lazy val HADOOP_CONF_DIR: String = "HADOOP_CONF_DIR"
private[this] lazy val CONF_SUFFIX: String = "/etc/hadoop"

private[this] var reusableYarnClient: YarnClient = _
private[this] var reusableConf: Configuration = _

private[util] var hdfs: FileSystem = getFileSystem(hadoopConf)

private[this] var innerYarnClient: YarnClient = _
private[this] var rmHttpURL: String = _

private[this] val configurationCache: util.Map[String, Configuration] = new ConcurrentHashMap[String, Configuration]
private[this] lazy val configurationCache: util.Map[String, Configuration] = new ConcurrentHashMap[String, Configuration]()

lazy val kerberosConf: Map[String, String] = {
SystemPropertyUtils.get("app.home", null) match {
case null =>
val inputStream = getClass.getResourceAsStream("/kerberos.yml")
if (inputStream == null) null else {
PropertiesUtils.fromYamlFile(inputStream)
}
case f =>
val file = new File(s"$f/conf/kerberos.yml")
if (file.exists() && file.isFile) {
PropertiesUtils.fromYamlFile(file.getAbsolutePath)
} else null
}
private[this] var caffeine: Cache[String, Configuration] = _

lazy val kerberosConf: Map[String, String] = SystemPropertyUtils.get("app.home", null) match {
case null =>
getClass.getResourceAsStream("/kerberos.yml") match {
case x if x != null => PropertiesUtils.fromYamlFile(x)
case _ => null
}
case f =>
val file = new File(s"$f/conf/kerberos.yml")
if (file.exists() && file.isFile) {
PropertiesUtils.fromYamlFile(file.getAbsolutePath)
} else null
}

private[this] val kerberosEnable: Boolean = if (kerberosConf == null) false else {
val enableString = kerberosConf.getOrElse(KEY_SECURITY_KERBEROS_ENABLE, "false")
Try(enableString.trim.toBoolean).getOrElse(false)
}


private[this] lazy val hadoopConfDir: String = Try(FileUtils.getPathFromEnv(HADOOP_CONF_DIR)) match {
case Failure(_) => FileUtils.resolvePath(FileUtils.getPathFromEnv(HADOOP_HOME), CONF_SUFFIX)
case Success(value) => value
}

private[this] def getFileSystem(hadoopConf: Configuration): FileSystem = {
Try(FileSystem.get(hadoopConf)) match {
case Success(fs) => fs
case Failure(e) =>
new IllegalArgumentException(s"[StreamX] access hdfs error.$e")
null
}
}


def getConfigurationFromHadoopConfDir(confDir: String = hadoopConfDir): Configuration = {
if (!configurationCache.containsKey(confDir)) {
FileUtils.exists(confDir)
Expand All @@ -111,68 +125,102 @@ object HadoopUtils extends Logger {
* 推荐第二种方法,不用copy配置文件.<br>
* </pre>
*/
lazy val conf: Configuration = {
val conf = getConfigurationFromHadoopConfDir(hadoopConfDir)
//add hadoopConfDir to classpath...you know why???
ClassLoaderUtils.loadResource(hadoopConfDir)

if (StringUtils.isBlank(conf.get("hadoop.tmp.dir"))) {
conf.set("hadoop.tmp.dir", "/tmp")
def hadoopConf: Configuration = {
def initHadoopConf(): Configuration = {
val conf = getConfigurationFromHadoopConfDir(hadoopConfDir)
//add hadoopConfDir to classpath...you know why???
ClassLoaderUtils.loadResource(hadoopConfDir)

if (StringUtils.isBlank(conf.get("hadoop.tmp.dir"))) {
conf.set("hadoop.tmp.dir", "/tmp")
}
if (StringUtils.isBlank(conf.get("hbase.fs.tmp.dir"))) {
conf.set("hbase.fs.tmp.dir", "/tmp")
}
// disable timeline service as we only query yarn app here.
// Otherwise we may hit this kind of ERROR:
// java.lang.ClassNotFoundException: com.sun.jersey.api.client.config.ClientConfig
conf.set("yarn.timeline-service.enabled", "false")
conf.set("fs.hdfs.impl", classOf[DistributedFileSystem].getName)
conf.set("fs.file.impl", classOf[LocalFileSystem].getName)
conf.set("fs.hdfs.impl.disable.cache", "true")
conf
}
if (StringUtils.isBlank(conf.get("hbase.fs.tmp.dir"))) {
conf.set("hbase.fs.tmp.dir", "/tmp")

def kerberosLogin(conf: Configuration): Unit = {
logInfo("kerberos login starting....")
val principal = kerberosConf.getOrElse(KEY_SECURITY_KERBEROS_PRINCIPAL, "").trim
val keytab = kerberosConf.getOrElse(KEY_SECURITY_KERBEROS_KEYTAB, "").trim
require(
principal.nonEmpty && keytab.nonEmpty,
s"$KEY_SECURITY_KERBEROS_PRINCIPAL and $KEY_SECURITY_KERBEROS_KEYTAB must not be empty"
)

val krb5 = kerberosConf.getOrElse(
KEY_SECURITY_KERBEROS_KRB5_CONF,
kerberosConf.getOrElse(KEY_JAVA_SECURITY_KRB5_CONF, "")
).trim

if (krb5.nonEmpty) {
System.setProperty("java.security.krb5.conf", krb5)
System.setProperty("java.security.krb5.conf.path", krb5)
}
System.setProperty("sun.security.spnego.debug", "true")
System.setProperty("sun.security.krb5.debug", "true")

conf.set(KEY_HADOOP_SECURITY_AUTHENTICATION, KEY_KERBEROS)
try {
UserGroupInformation.setConfiguration(conf)
UserGroupInformation.loginUserFromKeytab(principal, keytab)
logInfo("kerberos authentication successful")
} catch {
case e: IOException =>
logInfo(s"kerberos login failed,${e.getLocalizedMessage}")
throw e
}
}
// disable timeline service as we only query yarn app here.
// Otherwise we may hit this kind of ERROR:
// java.lang.ClassNotFoundException: com.sun.jersey.api.client.config.ClientConfig
conf.set("yarn.timeline-service.enabled", "false")
conf.set("fs.hdfs.impl", classOf[DistributedFileSystem].getName)
conf.set("fs.file.impl", classOf[LocalFileSystem].getName)
conf.set("fs.hdfs.impl.disable.cache", "true")
kerberosLogin(conf)
conf
}

private[this] def kerberosLogin(conf: Configuration): Unit = if (kerberosEnable) {
logInfo("kerberos login starting....")
val principal = kerberosConf.getOrElse(KEY_SECURITY_KERBEROS_PRINCIPAL, "").trim
val keytab = kerberosConf.getOrElse(KEY_SECURITY_KERBEROS_KEYTAB, "").trim
require(
principal.nonEmpty && keytab.nonEmpty,
s"${KEY_SECURITY_KERBEROS_PRINCIPAL} and ${KEY_SECURITY_KERBEROS_KEYTAB} must not be empty"
)

val krb5 = kerberosConf.getOrElse(
KEY_SECURITY_KERBEROS_KRB5_CONF,
kerberosConf.getOrElse(KEY_JAVA_SECURITY_KRB5_CONF, "")
).trim

if (krb5.nonEmpty) {
System.setProperty("java.security.krb5.conf", krb5)
System.setProperty("java.security.krb5.conf.path", krb5)
def refreshConf(): Configuration = {
val conf = initHadoopConf()
kerberosLogin(conf)
if (reusableYarnClient != null) {
reusableYarnClient.close()
reusableYarnClient = null
}
hdfs = getFileSystem(conf)
conf
}

conf.set(KEY_HADOOP_SECURITY_AUTHENTICATION, KEY_KERBEROS)
try {
UserGroupInformation.setConfiguration(conf)
UserGroupInformation.loginUserFromKeytab(principal, keytab)
logInfo("kerberos authentication successful")
} catch {
case e: IOException =>
logInfo(s"kerberos login failed,${e.getLocalizedMessage}")
throw e
if (kerberosEnable) {
val expire = kerberosConf.getOrElse(KEY_SECURITY_KERBEROS_EXPIRE, "2").trim
val (duration, unit) = DateUtils.getTimeUnit(expire)
if (caffeine == null) {
caffeine = Caffeine.newBuilder().refreshAfterWrite(duration.toLong, unit)
.build[String, Configuration](new CacheLoader[String, Configuration]() {
override def load(key: String): Configuration = {
logInfo(s"recertification kerberos: time:${DateUtils.now(DateUtils.fullFormat)}, duration:$expire")
refreshConf()
}
})
caffeine.put("config", refreshConf())
}
caffeine.getIfPresent("config")
} else {
if (reusableConf == null) {
reusableConf = initHadoopConf()
}
reusableConf
}
}

def yarnClient: YarnClient = {
if (innerYarnClient == null || !innerYarnClient.isInState(STATE.STARTED)) {
innerYarnClient = YarnClient.createYarnClient
val yarnConf = new YarnConfiguration(conf)
kerberosLogin(yarnConf)
innerYarnClient.init(yarnConf)
innerYarnClient.start()
if (reusableYarnClient == null || !reusableYarnClient.isInState(STATE.STARTED)) {
reusableYarnClient = YarnClient.createYarnClient
val yarnConf = new YarnConfiguration(hadoopConf)
reusableYarnClient.init(yarnConf)
reusableYarnClient.start()
}
innerYarnClient
reusableYarnClient
}

/**
Expand All @@ -190,6 +238,8 @@ object HadoopUtils extends Logger {
synchronized {
if (rmHttpURL == null || getLatest) {

val conf = hadoopConf

val useHttps = YarnConfiguration.useHttps(conf)
val (addressPrefix, defaultPort, protocol) = useHttps match {
case x if x => (YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS, "8090", "https://")
Expand Down Expand Up @@ -299,5 +349,4 @@ object HadoopUtils extends Logger {
fs.copyToLocalFile(sourcePath, destPath)
new File(destPath.toString).getAbsolutePath
}

}
Loading

0 comments on commit d9b39bd

Please sign in to comment.