Skip to content

Commit

Permalink
feat: support select outfile to openmldb online tables (4paradigm#3616)
Browse files Browse the repository at this point in the history
* Support select outfile to openmldb online tables

* Format scala code

* Remove debug code
  • Loading branch information
tobegit3hub authored Nov 28, 2023
1 parent 957095a commit 6e68677
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package com._4paradigm.openmldb.batch.nodes

import com._4paradigm.hybridse.vm.PhysicalSelectIntoNode
import com._4paradigm.openmldb.batch.utils.HybridseUtil
import com._4paradigm.openmldb.batch.utils.{HybridseUtil, OpenmldbTableUtil}
import com._4paradigm.openmldb.batch.{PlanContext, SparkInstance}
import org.slf4j.LoggerFactory

Expand Down Expand Up @@ -45,6 +45,25 @@ object SelectIntoPlan {
val dbt = HybridseUtil.hiveDest(outPath)
logger.info(s"offline select into: hive way, write mode[${mode}], out table ${dbt}")
input.getDf().write.format("hive").mode(mode).saveAsTable(dbt)
} else if (format == "openmldb") {

val (db, table) = HybridseUtil.getOpenmldbDbAndTable(outPath)

val createIfNotExists = extra.get("create_if_not_exists").get.toBoolean
if (createIfNotExists) {
logger.info("Try to create openmldb output table: " + table)

OpenmldbTableUtil.createOpenmldbTableFromDf(ctx.getOpenmldbSession, input.getDf(), db, table)
}

val writeOptions = Map(
"db" -> db,
"table" -> table,
"zkCluster" -> ctx.getConf.openmldbZkCluster,
"zkPath" -> ctx.getConf.openmldbZkRootPath)

input.getDf().write.options(writeOptions).format("openmldb").mode(mode).save()

} else {
logger.info("offline select into: format[{}], options[{}], write mode[{}], out path {}", format, options,
mode, outPath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,8 @@ object HybridseUtil {
// load data: read format, select into: write format
val format = if (file.toLowerCase().startsWith("hive://")) {
"hive"
} else if (file.toLowerCase().startsWith("openmldb://")) {
"openmldb"
} else {
parseOption(getOptionFromNode(node, "format"), "csv", getStringOrDefault).toLowerCase
}
Expand Down Expand Up @@ -252,7 +254,11 @@ object HybridseUtil {
// only for select into, "" means N/A
extraOptions += ("coalesce" -> parseOption(getOptionFromNode(node, "coalesce"), "0", getIntOrDefault))
extraOptions += ("sql" -> parseOption(getOptionFromNode(node, "sql"), "", getStringOrDefault))
extraOptions += ("writer_type") -> parseOption(getOptionFromNode(node, "writer_type"), "single", getStringOrDefault)
extraOptions += ("writer_type") -> parseOption(getOptionFromNode(node, "writer_type"), "single",
getStringOrDefault)

extraOptions += ("create_if_not_exists" -> parseOption(getOptionFromNode(node, "create_if_not_exists"),
"true", getBoolOrDefault))

(format, options.toMap, mode, extraOptions.toMap)
}
Expand Down Expand Up @@ -451,6 +457,19 @@ object HybridseUtil {
path.substring(tableStartPos)
}

def getOpenmldbDbAndTable(path: String): (String, String) = {
require(path.toLowerCase.startsWith("openmldb://"))
// openmldb://<table_pattern>
val tableStartPos = 11
val dbAndTableString = path.substring(tableStartPos)

require(dbAndTableString.split("\\.").size == 2)

val db = dbAndTableString.split("\\.")(0)
val table = dbAndTableString.split("\\.")(1)
(db, table)
}

private def hiveLoad(openmldbSession: OpenmldbSession, file: String, columns: util.List[Common.ColumnDesc],
loadDataSql: String = ""): DataFrame = {
if (logger.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ object OpenmldbTableUtil {

val schema = df.schema

var createTableSql = s"CREATE TABLE $tableName ("
var createTableSql = s"CREATE TABLE IF NOT EXISTS $tableName ("
schema.map(structField => {
val colName = structField.name
val colType = DataTypeUtil.sparkTypeToString(structField.dataType)
Expand Down

0 comments on commit 6e68677

Please sign in to comment.