diff --git a/pom.xml b/pom.xml index b3ce429..c893d98 100644 --- a/pom.xml +++ b/pom.xml @@ -91,6 +91,7 @@ sql/xsql-hbase sql/xsql-shell sql/xsql-monitor + sql/xsql-jdbc assembly diff --git a/sql/xsql-jdbc/pom.xml b/sql/xsql-jdbc/pom.xml new file mode 100644 index 0000000..4d4fcbe --- /dev/null +++ b/sql/xsql-jdbc/pom.xml @@ -0,0 +1,147 @@ + + + + spark-parent_2.11 + org.apache.spark + 0.7.0-SNAPSHOT + ../../pom.xml + + 4.0.0 + Spark Project XSQL JDBC Connector + xsql-jdbc_2.11 + + none + + + + + org.apache.spark + xsql_${scala.binary.version} + ${project.version} + provided + + + org.apache.spark + xsql-sql_${scala.binary.version} + ${project.version} + provided + + + org.apache.spark + spark-core_${scala.binary.version} + provided + + + org.apache.spark + xsql-hive_${scala.binary.version} + ${project.version} + provided + + + + org.apache.spark + spark-core_${scala.binary.version} + test-jar + test + + + * + * + + + + + org.apache.spark + xsql-sql_${scala.binary.version} + ${project.version} + test-jar + test + + + * + * + + + + + org.apache.spark + xsql-catalyst_${scala.binary.version} + ${project.version} + test-jar + test + + + * + * + + + + + + org.scala-lang + scala-compiler + test + + + org.scalacheck + scalacheck_${scala.binary.version} + test + + + + + mysql + mysql-connector-java + + + + com.oracle + ojdbc6 + 11.2.0.1.0 + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + copy-module-dependencies + package + + copy-dependencies + + + unused + runtime + ${jars.target.dir} + + + + + + + org.apache.maven.plugins + maven-jar-plugin + + ${jars.target.dir} + + + + org.apache.maven.plugins + maven-source-plugin + + + create-source-jar + none + + + + + + + \ No newline at end of file diff --git a/sql/xsql-jdbc/src/main/resources/META-INF/services/org.apache.spark.sql.xsql.DataSourceManager b/sql/xsql-jdbc/src/main/resources/META-INF/services/org.apache.spark.sql.xsql.DataSourceManager new file mode 100644 index 0000000..d03da12 --- /dev/null +++ b/sql/xsql-jdbc/src/main/resources/META-INF/services/org.apache.spark.sql.xsql.DataSourceManager @@ -0,0 +1 @@ +org.apache.spark.sql.xsql.manager.JDBCManager \ No newline at end of file diff --git a/sql/xsql-jdbc/src/main/scala/org/apache/spark/sql/xsql/manager/JDBCManager.scala b/sql/xsql-jdbc/src/main/scala/org/apache/spark/sql/xsql/manager/JDBCManager.scala new file mode 100644 index 0000000..c31ff7f --- /dev/null +++ b/sql/xsql-jdbc/src/main/scala/org/apache/spark/sql/xsql/manager/JDBCManager.scala @@ -0,0 +1,474 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.xsql.manager + +import java.sql.{Connection, DriverManager, ResultSet, SQLException} + +import scala.collection.JavaConverters._ +import scala.collection.mutable.{ArrayBuffer, HashMap} +import scala.language.implicitConversions + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFormat, CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils} +import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils.getCommonJDBCType +import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects} +import org.apache.spark.sql.types.{StructType} +import org.apache.spark.sql.xsql.{CatalogDataSource, DataSourceManager, JDBCDataSource} +import org.apache.spark.sql.xsql.DataSourceType.{JDBC} +import org.apache.spark.sql.xsql.internal.config.{XSQL_DEFAULT_DATABASE, XSQL_DEFAULT_DATASOURCE} +import org.apache.spark.sql.xsql.types.{MYSQL_COLUMN_AUTOINC, MYSQL_COLUMN_DEFAULT, PRIMARY_KEY} +import org.apache.spark.sql.xsql.util.Utils +import org.apache.spark.util.{Utils => SparkUtils} + + +class JDBCManager(conf: SparkConf) extends DataSourceManager with Logging{ + + def this() = { + this(null) + } + + import JDBCManager._ + import DataSourceManager._ + + protected var connOpt: Option[Connection] = None + + /** + * the conf of partitons info present in the currently specified file. + */ + protected val partitionsMap = new HashMap[String, HashMap[String, HashMap[String, String]]] + + override def shortName(): String = JDBC.toString + + /** + * JDBCManager has not implemented pushdown yet + */ + override def isPushDown: Boolean = false + + @throws[SQLException] + implicit protected def typeConvertor(rs: ResultSet) = { + val list = new ArrayBuffer[HashMap[String, Object]]() + val md = rs.getMetaData + val columnCount = md.getColumnCount + while (rs.next) { + val rowData = new HashMap[String, Object]() + for (i <- 1 to columnCount) { + rowData.put(md.getColumnName(i), rs.getObject(i)) + } + list.append(rowData) + } + list + } + + /** + * Get a connection to the given database + */ + protected def getConnect(): Connection = { + if (connOpt.isEmpty || !connOpt.get.isValid(0)) { + SparkUtils.classForName(cachedProperties(DRIVER)) + connOpt = Some( + DriverManager.getConnection( + cachedProperties(URL), + cachedProperties(USER), + cachedProperties(PASSWORD))) + } + connOpt.get + } + + protected def setJdbcOptions(dbName: String, tbName: String): JDBCOptions = { + val jdbcOptions = new JDBCOptions( + cachedProperties + .updated(JDBCOptions.JDBC_URL, cachedProperties(URL)) + .updated(JDBCOptions.JDBC_TABLE_NAME, s"$tbName") + .updated(JDBCOptions.JDBC_DRIVER_CLASS, cachedProperties(DRIVER)) + .updated(DATABASE, dbName) + .toMap) + jdbcOptions + } + + /** + * check if the database in whitelist. + */ + protected def isSelectedDatabase(dsName: String, dbMap: HashMap[String, Object]): Boolean = { + // In case the key of some databases is lowercase + val toUpperDbMap = dbMap.map { + case (k, v) => (k.toUpperCase, v) + } + val dbName = toUpperDbMap.get("TABLE_CAT").map(_.toString).getOrElse("") + val defaultSource = conf.get(XSQL_DEFAULT_DATASOURCE) + val isDefault = dsName.equalsIgnoreCase(defaultSource) + isSelectedDatabase(isDefault, dbName, conf.get(XSQL_DEFAULT_DATABASE)) + } + + /** + * Choose a database before execute sql + */ + protected def selectDatabase(conn: Connection, dbName: String): Unit = { + conn.setCatalog(dbName) + } + + /** + * Not support drop table for JDBCManager + */ + protected def dropTableSQLText( + dbName: String, + table: String, + ignoreIfNotExists: Boolean, + purge: Boolean): String = { + throw new UnsupportedOperationException(s"Drop ${shortName()} table not supported!") + } + + override protected def cacheDatabase( + isDefault: Boolean, + dataSourceName: String, + infos: Map[String, String], + dataSourcesByName: HashMap[String, CatalogDataSource], + dataSourceToCatalogDatabase: HashMap[String, HashMap[String, CatalogDatabase]]): Unit = { + val url = cachedProperties.get(URL) + if (url.isEmpty) { + throw new SparkException("Data source is JDBC must have uri!") + } + val partitionFile = cachedProperties.get(PARTITION_CONF) + if (partitionFile != None) { + val partitionFilePath = Utils.getPropertiesFile(file = partitionFile.get) + Utils.getSettingsFromFile(partitionFilePath, partitionsMap, Utils.getPartitonsFromStr) + } + + val ds = new JDBCDataSource( + dataSourceName, + JDBC, + this, + url.get, + cachedProperties(USER), + cachedProperties(PASSWORD), + cachedProperties(VERSION)) + // Get jdbc connection, get databases + val conn = getConnect() + val dbMetaData = conn.getMetaData() + val databases = dbMetaData.getCatalogs() + val xdatabases = + dataSourceToCatalogDatabase.getOrElseUpdate( + ds.getName, + new HashMap[String, CatalogDatabase]) + // Get each database's info, update dataSourceToCatalogDatabase and dbToCatalogTable + databases.filter { isSelectedDatabase(dataSourceName, _) }.foreach { dbMap => + val toUpperDbMap = dbMap.map { + case (k, v) => (k.toUpperCase, v) + } + val dbName = toUpperDbMap.get("TABLE_CAT").map(_.toString.toLowerCase).getOrElse("") + logDebug(s"Parse $dataSourceName's database: $dbName") + val db = CatalogDatabase( + id = newDatabaseId, + dataSourceName = dataSourceName, + name = dbName, + description = null, + locationUri = null, + properties = Map.empty) + xdatabases += ((db.name, db)) + } + dataSourcesByName(ds.getName) = ds + } + + /** + * Do not check if the table exists here + */ + override def tableExists(dbName: String, table: String): Boolean = { + true + } + + override def listTables(dbName: String): Seq[String] = { + val conn = getConnect() + val (whiteTables, blackTables) = getWhiteAndBlackTables(dbName) + val dbMetaData = conn.getMetaData() + dbMetaData + .getTables(dbName, null, "%", Array("TABLE")) + .map { tbMap => + val toUpperTbMap = tbMap.map { + case (k, v) => (k.toUpperCase, v) + } + toUpperTbMap.get("TABLE_NAME").map(_.toString).getOrElse("") + } + .filter(isSelectedTable(whiteTables, blackTables, _)) + } + + override def listDatabases(): Seq[String] = { + val conn = getConnect() + val dbMetaData = conn.getMetaData() + dbMetaData + .getCatalogs() + .filter { isSelectedDatabase(dsName, _) } + .map { dbMap => + val toUpperDbMap = dbMap.map { + case (k, v) => (k.toUpperCase, v) + } + toUpperDbMap.get("TABLE_CAT").map(_.toString).getOrElse("") + } + } + + override def doGetRawTable( + dbName: String, + originDB: String, + table: String): Option[CatalogTable] = { + val conn = getConnect() + selectDatabase(conn, dbName) + val jdbcOptions = setJdbcOptions(dbName, table) + val schema = resolveTableConnnectOnce(conn, jdbcOptions) + cacheSpecialProperties(dsName, dbName, table) + Option( + CatalogTable( + identifier = TableIdentifier(table, Option(dbName), Option(dsName)), + tableType = CatalogTableType.JDBC, + storage = CatalogStorageFormat.empty.copy( + properties = jdbcOptions.asProperties.asScala.toMap ++ + specialProperties + .getOrElse(s"${dsName}.${dbName}.${table}", Map.empty[String, String])), + schema = schema, + provider = Some(FULL_PROVIDER))) + } + + /** + * Similiar to JdbcUtils createTable + */ + override def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = { + val conn = getConnect() + val dbName = tableDefinition.database + // Must select the database here, as we reuse only one connection + selectDatabase(conn, dbName) + val exists = if (ignoreIfExists) "IF NOT EXISTS" else "" + val dialect = JdbcDialects.get(cachedProperties(URL)) + val strSchema = schemaString(tableDefinition.schema, dialect) + val table = dialect.quoteIdentifier(tableDefinition.identifier.table) + val createTableOptions = tableDefinition.properties.map(a => a._1 + "=" + a._2).mkString(" ") + val sql = s"CREATE TABLE ${exists} $table ($strSchema) $createTableOptions" + + val statement = conn.createStatement + try { + statement.executeUpdate(sql) + val jdbcOptions = setJdbcOptions(dbName, tableDefinition.identifier.table) + val schema = resolveTableConnnectOnce(conn, jdbcOptions) + tableDefinition.copy( + tableType = CatalogTableType.JDBC, + storage = + CatalogStorageFormat.empty.copy(properties = jdbcOptions.asProperties.asScala.toMap), + schema = schema, + provider = Some(FULL_PROVIDER)) + } catch { + case e: Exception => + throw new SparkException(s"Error when execute ${sql}, details:\n${e.getMessage}") + } finally { + statement.close() + } + } + + /** + * Drop 'table' in some data source. + */ + override def dropTable( + dbName: String, + table: String, + ignoreIfNotExists: Boolean, + purge: Boolean): Unit = { + val conn = getConnect() + // Must select the database here, as we reuse only one connection + selectDatabase(conn, dbName) + val statement = conn.createStatement + val sql = dropTableSQLText(dbName, table, ignoreIfNotExists, purge) + try { + statement.executeUpdate(sql) + } catch { + case e: Exception => + throw new SparkException(s"Error when execute ${sql}, details:\n${e.getMessage}") + } finally { + statement.close() + } + } + + /** + * Alter schema of 'table' in some data source. + */ + override def alterTableDataSchema(dbName: String, queryContent: String): Unit = { + val conn = getConnect() + // Must select the database here, as we reuse only one connection + selectDatabase(conn, dbName) + val statement = conn.createStatement + try { + statement.executeUpdate(queryContent) + } catch { + case e: Exception => + throw new SparkException(s"Error when execute ${queryContent}, details:\n${e.getMessage}") + } finally { + statement.close() + } + } + + /** + * Change name of 'table' in some data source + */ + override def renameTable(dbName: String, oldName: String, newName: String): Unit = { + val dialect = JdbcDialects.get(cachedProperties(URL)) + val sql = + s""" + |ALTER TABLE ${dialect.quoteIdentifier(oldName)} + |RENAME TO ${dialect.quoteIdentifier(newName)} + |""".stripMargin + val conn = getConnect() + // Must select the database here, as we reuse only one connection + selectDatabase(conn, dbName) + val statement = conn.createStatement + try { + statement.executeUpdate(sql) + } catch { + case e: Exception => + throw new SparkException(s"Error when execute ${sql}, details:\n${e.getMessage}") + } finally { + statement.close() + } + } + + /** + * Delete all data of the table + */ + override def truncateTable( + table: CatalogTable, + partitionSpec: Option[TablePartitionSpec]): Unit = { + val dbName = table.database + val conn = getConnect() + selectDatabase(conn, dbName) + val tableName = table.identifier.table + val dialect = JdbcDialects.get(cachedProperties(URL)) + val quoteTbName = dialect.quoteIdentifier(tableName) + val sql = s"TRUNCATE TABLE ${quoteTbName}" + val statement = conn.createStatement + try { + statement.executeUpdate(sql) + } catch { + case e: Exception => + throw new SparkException(s"Error when execute ${sql}, details:\n${e.getMessage}") + } finally { + statement.close() + } + } + + override def getDefaultOptions(table: CatalogTable): Map[String, String] = { + val jdbcOptions = setJdbcOptions(table.database, table.identifier.table) + jdbcOptions.asProperties.asScala.toMap ++ + Map(DataSourceManager.TABLETYPE -> CatalogTableType.JDBC.name) + } + + override def stop(): Unit = { + connOpt.foreach(_.close()) + } + + /** + * Cache special properties for the datasource + */ + private def cacheSpecialProperties( + dsName: String, + dbName: String, + tbName: String): Unit = { + val tablePartitionsMap = partitionsMap.get(dbName) + var partitionsParameters = new HashMap[String, String] + if (tablePartitionsMap != None) { + if (tablePartitionsMap.get.get(tbName) != None) { + partitionsParameters = tablePartitionsMap.get.get(tbName).get + } + } + if (partitionsParameters.nonEmpty) { + specialProperties += ((s"${dsName}.${dbName}.${tbName}", partitionsParameters)) + } + } + + /** + * Similar to JDBCUtils schemaString + */ + private def schemaString(schema: StructType, dialect: JdbcDialect): String = { + val sb = new StringBuilder() + val pkColNames = ArrayBuffer[String]() + schema.fields.foreach { field => + val name = dialect.quoteIdentifier(field.name) + val typ = dialect + .getJDBCType(field.dataType) + .orElse(getCommonJDBCType(field.dataType)) + .get + .databaseTypeDefinition + val nullable = if (typ.equalsIgnoreCase("TIMESTAMP")) { + "NULL" + } else { + if (field.nullable) { + "" + } else { + "NOT NULL" + } + } + sb.append(s", $name $typ $nullable ") + if (field.metadata.contains(MYSQL_COLUMN_DEFAULT)) { + sb.append(s"${DEFAULT} ${field.metadata.getString(MYSQL_COLUMN_DEFAULT)} ") + } + if (field.metadata.contains(MYSQL_COLUMN_AUTOINC)) { + sb.append(s"${AUTO_INCREMENT} ") + } + if (field.metadata.contains(COMMENT.toLowerCase)) { + sb.append(s"${COMMENT} '${field.metadata.getString(COMMENT.toLowerCase)}'") + } + if (field.metadata.contains(PRIMARY_KEY)) { + pkColNames.append(name) + } + } + if (pkColNames.size > 0) { + sb.append(s", ${COLUMN_PRIMARY_KEY} (${pkColNames.mkString(",")})") + } + if (sb.length < 2) "" else sb.substring(2) + } + +} + +object JDBCManager{ + + val DRIVER = "driver" + val PARTITION_CONF = "partitionConf" + val AUTO_INCREMENT = "AUTO_INCREMENT" + val COMMENT = "COMMENT" + val COLUMN_PRIMARY_KEY = "PRIMARY KEY" + val FULL_PROVIDER = "jdbc" + val DEFAULT = "default" + val DATABASE = "DATABASE" + + /** + * Similar to JDBCRelation'schema method + * note: database must be selected before call the method + */ + def resolveTableConnnectOnce(conn: Connection, options: JDBCOptions): StructType = { + val url = options.url + val table = options.tableOrQuery + val dialect = JdbcDialects.get(url) + val statement = conn.prepareStatement(dialect.getSchemaQuery(table)) + try { + val rs = statement.executeQuery() + try { + JdbcUtils.getSchema(rs, dialect, alwaysNullable = true) + } finally { + rs.close() + } + } finally { + statement.close() + } + } + +} diff --git a/sql/xsql/src/main/scala/org/apache/spark/sql/xsql/CatalogDataSource.scala b/sql/xsql/src/main/scala/org/apache/spark/sql/xsql/CatalogDataSource.scala index 26435e0..33f2773 100644 --- a/sql/xsql/src/main/scala/org/apache/spark/sql/xsql/CatalogDataSource.scala +++ b/sql/xsql/src/main/scala/org/apache/spark/sql/xsql/CatalogDataSource.scala @@ -64,6 +64,20 @@ class ElasticSearchDataSource( def getPwd: String = passwd } +class JDBCDataSource( + private val name: String, + private val dsType: DataSourceType, + override val dsManager: DataSourceManager, + private val url: String, + private val username: String, + private val passwd: String, + private val version: String) + extends CatalogDataSource(name, dsType, version, dsManager) { + def getUrl: String = url + def getUser: String = username + def getPwd: String = passwd +} + class MysqlDataSource( private val name: String, private val dsType: DataSourceType, diff --git a/sql/xsql/src/main/scala/org/apache/spark/sql/xsql/DataSourceType.scala b/sql/xsql/src/main/scala/org/apache/spark/sql/xsql/DataSourceType.scala index 7327e40..ba44f1e 100644 --- a/sql/xsql/src/main/scala/org/apache/spark/sql/xsql/DataSourceType.scala +++ b/sql/xsql/src/main/scala/org/apache/spark/sql/xsql/DataSourceType.scala @@ -24,7 +24,7 @@ package org.apache.spark.sql.xsql object DataSourceType extends Enumeration { type DataSourceType = Value - val HIVE, ELASTICSEARCH, MONGO, REDIS, MYSQL, KAFKA, HBASE, DRUID, ORACLE, EXTERNAL = Value + val HIVE, ELASTICSEARCH, MONGO, REDIS, JDBC, MYSQL, KAFKA, HBASE, DRUID, EXTERNAL = Value def checkExists(dsType: String): Boolean = this.values.exists(_.toString.equalsIgnoreCase(dsType)) diff --git a/sql/xsql/src/main/scala/org/apache/spark/sql/xsql/XSQLExternalCatalog.scala b/sql/xsql/src/main/scala/org/apache/spark/sql/xsql/XSQLExternalCatalog.scala index f17dc81..3ed7af0 100644 --- a/sql/xsql/src/main/scala/org/apache/spark/sql/xsql/XSQLExternalCatalog.scala +++ b/sql/xsql/src/main/scala/org/apache/spark/sql/xsql/XSQLExternalCatalog.scala @@ -675,7 +675,7 @@ private[xsql] class XSQLExternalCatalog(conf: SparkConf, hadoopConf: Configurati case ELASTICSEARCH => CatalogTableType.TYPE case MONGO => CatalogTableType.COLLECTION case HBASE => CatalogTableType.HBASE - case MYSQL | ORACLE => CatalogTableType.JDBC + case MYSQL | JDBC => CatalogTableType.JDBC case DRUID => CatalogTableType.JDBC case _ => tableDefinition.tableType }