Skip to content

Commit

Permalink
Add viewfs support in scan validation
Browse files Browse the repository at this point in the history
  • Loading branch information
JkSelf committed Nov 27, 2024
1 parent 2649fa7 commit a8a33e7
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
format: ReadFileFormat,
fields: Array[StructField],
rootPaths: Seq[String],
properties: Map[String, String]): ValidationResult = {
properties: Map[String, String],
serializableHadoopConf: Option[SerializableConfiguration] = None): ValidationResult = {

// Validate if all types are supported.
def hasComplexType: Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Pa
import org.apache.spark.sql.hive.execution.HiveFileFormat
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.util.SerializableConfiguration

import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{FileSystem, Path}

import scala.util.control.Breaks.breakable

Expand Down Expand Up @@ -96,15 +97,36 @@ object VeloxBackendSettings extends BackendSettingsApi {
format: ReadFileFormat,
fields: Array[StructField],
rootPaths: Seq[String],
properties: Map[String, String]): ValidationResult = {
properties: Map[String, String],
serializableHadoopConf: Option[SerializableConfiguration] = None): ValidationResult = {

def validateScheme(): Option[String] = {
val filteredRootPaths = distinctRootPaths(rootPaths)
if (
filteredRootPaths.nonEmpty && !VeloxFileSystemValidationJniWrapper
.allSupportedByRegisteredFileSystems(filteredRootPaths.toArray)
) {
Some(s"Scheme of [$filteredRootPaths] is not supported by registered file systems.")
if (filteredRootPaths.nonEmpty) {
val resolvedPaths =
if (
GlutenConfig.getConf.enableHdfsViewfs && filteredRootPaths.head.startsWith("viewfs")
) {
// Convert the viewfs path to hdfs path.
filteredRootPaths.map {
viewfsPath =>
val viewPath = new Path(viewfsPath)
val viewFileSystem =
FileSystem.get(viewPath.toUri, serializableHadoopConf.get.value)
viewFileSystem.resolvePath(viewPath).toString
}
} else {
filteredRootPaths
}

if (
!VeloxFileSystemValidationJniWrapper.allSupportedByRegisteredFileSystems(
resolvedPaths.toArray)
) {
Some(s"Scheme of [$filteredRootPaths] is not supported by registered file systems.")
} else {
None
}
} else {
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand
import org.apache.spark.sql.execution.datasources.{FileFormat, InsertIntoHadoopFsRelationCommand}
import org.apache.spark.sql.types.StructField
import org.apache.spark.util.SerializableConfiguration

trait BackendSettingsApi {

Expand All @@ -37,7 +38,9 @@ trait BackendSettingsApi {
format: ReadFileFormat,
fields: Array[StructField],
rootPaths: Seq[String],
properties: Map[String, String]): ValidationResult = ValidationResult.succeeded
properties: Map[String, String],
serializableHadoopConf: Option[SerializableConfiguration] = None): ValidationResult =
ValidationResult.succeeded

def supportWriteFilesExec(
format: FileFormat,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.hive.HiveTableScanExecTransformer
import org.apache.spark.sql.types.{BooleanType, StringType, StructField, StructType}
import org.apache.spark.util.SerializableConfiguration

import com.google.protobuf.StringValue
import io.substrait.proto.NamedStruct
Expand Down Expand Up @@ -77,6 +78,9 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource
getProperties))
}

val serializableHadoopConf: SerializableConfiguration = new SerializableConfiguration(
sparkContext.hadoopConfiguration)

override protected def doValidateInternal(): ValidationResult = {
var fields = schema.fields

Expand All @@ -91,7 +95,12 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource
}

val validationResult = BackendsApiManager.getSettings
.validateScanExec(fileFormat, fields, getRootFilePaths, getProperties)
.validateScanExec(
fileFormat,
fields,
getRootFilePaths,
getProperties,
Some(serializableHadoopConf))
if (!validationResult.ok()) {
return validationResult
}
Expand Down

0 comments on commit a8a33e7

Please sign in to comment.