Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce hybrid (CPU) scan for Parquet read #11720

Open
wants to merge 27 commits into
base: branch-25.02
Choose a base branch
from

Conversation

res-life
Copy link
Collaborator

@res-life res-life commented Nov 13, 2024

Introduce hybrid (CPU) scan for Parquet read
This PR leverages Gluten/Velox to do scan on CPU.

hybrid feature contains

  • Gluten repo: In internal gitlab repo gluten-public
  • Hybrid MR: In internal gitlab repo rapids-hybrid-execution, branch 1.2
  • This Spark-Rapids PR

This PR

Add Shims

build for all shims: 320-324, 330-334, 340-344, 350-353, CDHs, Databricks, throw runtime error if it's CDH or Databricks runtime.

Checks

  • In Hybrid MR: Gluten bundle version
  • Scala version is 2.12
  • Java version is 1.8
  • Hybrid MR: Arch is amd64, OS is Ubuntu 22.04 or Ubuntu 20.04
  • Spark is not Databricks or CDH
  • Hybrid jar is in the classpath if Hybrid is enabled.
  • Scan runs properly when Hybrid jar is not in the classpath and Hybrid is disabled.

Call to Hybrid JNI to do Parquet scan

Limitations

supports more Spark versions than Gluten official supports

The Gluten official doc says only support Spark 322, 331, 342, 351.

Support Spark 3.2.2, 3.3.1, 3.4.2, and 3.5.1 with all UTs passed(if data type supported)

Hybrid supports totally 19 Spark versions(320-324, 330-334, 340-344, 350-353 ), and add doc to config HYBRID_PARQUET_READER that other versions except Gluten official supports are not fully tested.

tests

config jars exists ? result comment
Hybrid enabled Hybrid/Gluten jar are exist pass
Hybrid enabled Hybrid/Gluten jar are not exist pass Report Jar is not in the classpath
Hybrid disabled Hybrid/Gluten jar are exist pass no error reported
Hybrid disabled Hybrid/Gluten jar are not exist pass no error reported

Signed-off-by: sperlingxx [email protected]
Signed-off-by: Chong Gao [email protected]

@res-life
Copy link
Collaborator Author

res-life commented Nov 13, 2024

It's draft, may missed some code change, will double check later.
This can not pass building, because Gluten backends-velox 1.2.0 jar is not deployed to public maven repo by Gluten community.
The building will pass if the Gluten jars are installed locally by maven install

@res-life res-life requested review from jlowe and sperlingxx November 14, 2024 01:13
Copy link
Member

@jlowe jlowe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please elaborate in the headline and description what this PR is doing. C2C is not a well-known acronym in the project and is not very descriptive.

@sameerz sameerz added the performance A performance related task/issue label Nov 16, 2024
Copy link
Collaborator

@revans2 revans2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a quick look at the code. Nothing too in depth.

@res-life res-life changed the base branch from branch-24.12 to branch-25.02 November 25, 2024 09:53
@res-life res-life marked this pull request as ready for review November 25, 2024 10:25
@res-life
Copy link
Collaborator Author

Passed IT. Tested conventional Spark-Rapids jar and regular Spark-Rapids jar.
Passed NDS test.
Will fix comments later.
Will push commits related to make a uber jar for all spark versions.

Copy link
Collaborator

@revans2 revans2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to do some manual testing on my own to try and understand what is happening here and how this is all working. It may take a while.

sql-plugin/pom.xml Show resolved Hide resolved
case MapType(kt, vt, _) if kt.isInstanceOf[MapType] || vt.isInstanceOf[MapType] => false
// For the time being, BinaryType is not supported yet
case _: BinaryType => false
case _ => true
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

facebookincubator/velox#9560 I am not an expert, and I don't even know what version of velox we will end up using. It sounds like it is plugable. But according to this, even the latest version of velox cannot handle bytes/TINYINT. We are not looking for spaces in the names of columns, among other issues. I know that other implementations fall back for even more things. Should we be concerned about this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gluten uses another velox repo, code link

VELOX_REPO=https://github.com/oap-project/velox.git
VELOX_BRANCH=gluten-1.2.1

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be something we should remember once we switch to use facebookincubator/velox directly.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My main concern is that if the gluten/velox version we use is pluggable, then we need to have some clear documentation on exactly which version you need to be based off of.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, Chong has added hybrid-execution.md to clarify the 1.2.0 version of Gluten.

@res-life res-life marked this pull request as draft November 26, 2024 00:59
@winningsix winningsix changed the title Merge C2C code to main Introduce hybrid (CPU) scan for Parquet read Nov 26, 2024
@res-life res-life marked this pull request as ready for review December 11, 2024 08:50
@res-life
Copy link
Collaborator Author

build

@res-life
Copy link
Collaborator Author

Depending on deoloying Hybrid 25.02 jar into Maven repo. @NvTimLiu

@res-life
Copy link
Collaborator Author

build

Comment on lines 187 to 189
"Hybrid jar is not in the classpath, Please add Hybrid jar into the class path, or " +
"Please disable Hybrid feature by setting " +
"spark.rapids.sql.parquet.useHybridReader=false")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrong exception message.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did not get the point, could you provide the message

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's in checkJavaVersion. Shouldn't the message be related to Java version? I think you copied the code from other place but forgot to modify.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will remove this check if other Java versions are compatible.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, passed for Java 11, removed this check.

Comment on lines 166 to 173
try {
Class.forName(HYBRID_JAR_PLUGIN_CLASS_NAME)
} catch {
case e: ClassNotFoundException => throw new RuntimeException(
"Hybrid jar is not in the classpath, Please add Hybrid jar into the class path, or " +
"Please disable Hybrid feature by setting " +
"spark.rapids.sql.parquet.useHybridReader=false", e)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this way to check the class only works on driver side.
Do we need to check on executor side as well?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this way to check the class only works on driver side.

Yes.

Do we need to check on executor side as well?

Yes. Will check.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Collaborator

@revans2 revans2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly just more questions for me to understand what is happening. This looks a lot better. I assume a lot of the code that is very picky about getting the exact setup right is here just because that is what this code has been tested with.

integration_tests/src/main/python/parquet_test.py Outdated Show resolved Hide resolved
# MapGen(StringGen(pattern='key_[0-9]', nullable=False), simple_string_to_string_map_gen)
],
]

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some tests to validate that predicate push down and filtering is working correctly? It would be nice to have

  1. simple filters
  2. complex filters that are not supported by normal parquet predicate push down. (like the ors at the top level instead of ands)
  3. filters that have operators in them that velox does not support, but spark rapids does.

Copy link
Collaborator Author

@res-life res-life Dec 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed internally before, the decision is putting into a follow-up PR.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow-up issue filed: #11892

case MapType(kt, vt, _) if kt.isInstanceOf[MapType] || vt.isInstanceOf[MapType] => false
// For the time being, BinaryType is not supported yet
case _: BinaryType => false
case _ => true
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My main concern is that if the gluten/velox version we use is pluggable, then we need to have some clear documentation on exactly which version you need to be based off of.

lazy val allSupportedTypes = fsse.requiredSchema.exists { field =>
TrampolineUtil.dataTypeExistsRecursively(field.dataType, {
// For the time being, the native backend may return incorrect results over nestedMap
case MapType(kt, vt, _) if kt.isInstanceOf[MapType] || vt.isInstanceOf[MapType] => false
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about if it is a MapType, but kt or vt is not directly a map, but might be a LIST of MAP, so a struct with a MAP in it? Do we know the cause of this error so that we can limit things properly? If not then I would rather just stick with a MAP at the top level and any nested maps are not allowed.

Also what happens if the data is a LIST Internally in Parquet a Map is just a LIST<STRUCT<KEY, VALUE>> would we have similar issues if we had one of them be nested?

Copy link
Collaborator

@sperlingxx sperlingxx Dec 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @revans2 , I am sorry that I did not audit carefully on which types is unsupported by native backend. Just before, I ran a rather comprehensive test:

hybrid_gens_test = [
     # failed
    [decimal_gen_32bit_neg_scale],
    [decimal_gen_128bit],
    decimal_64_map_gens,
    [MapGen(TimestampGen(nullable=False), ArrayGen(string_gen))],
    [MapGen(RepeatSeqGen(IntegerGen(nullable=False), 10), TimestampGen())],
    [MapGen(RepeatSeqGen(IntegerGen(nullable=False), 10), decimal_gen_32bit)],
    [MapGen(RepeatSeqGen(IntegerGen(nullable=False), 10), decimal_gen_64bit)],
    # failed
    [MapGen(StringGen(pattern='key_[0-9]', nullable=False), decimal_gen_128bit)],
    [MapGen(StringGen(pattern='key_[0-9]', nullable=False), ArrayGen(string_gen))],
    [MapGen(StringGen(pattern='key_[0-9]', nullable=False), ArrayGen(ArrayGen(long_gen)))],
    [MapGen(StringGen(pattern='key_[0-9]', nullable=False), ArrayGen(ArrayGen(string_gen)))],
    [MapGen(StringGen(pattern='key_[0-9]', nullable=False),
            StructGen([['child0', string_gen],
                       ['child1', double_gen],
                       ['child2', int_gen],
                       ['child3', StructGen([['child0', ArrayGen(byte_gen)],
                                             ['child1', byte_gen],
                                             ['child2', float_gen],
                                             ['child3', decimal_gen_64bit]])]]))
     ],
    [MapGen(StringGen(pattern='key_[0-9]', nullable=False),
            StructGen([['child0', ArrayGen(ArrayGen(long_gen))],
                       ['child1', ArrayGen(string_gen)],
                       ['child2', ArrayGen(ArrayGen(string_gen))]]))
     ],
    [MapGen(StringGen(pattern='key_[0-9]', nullable=False),
            ArrayGen(MapGen(LongGen(nullable=False), long_gen)))],
    [MapGen(StringGen(pattern='key_[0-9]', nullable=False),
            ArrayGen(MapGen(IntegerGen(nullable=False), string_gen)))],
    [MapGen(StringGen(pattern='key_[0-9]', nullable=False),
            ArrayGen(ArrayGen(MapGen(IntegerGen(nullable=False), string_gen))))],

    [ArrayGen(ArrayGen(string_gen))],
    [ArrayGen(ArrayGen(long_gen))],
    # failed
    [ArrayGen(MapGen(LongGen(nullable=False), long_gen))],
    # failed
    [ArrayGen(MapGen(StringGen(pattern='key_[0-9]', nullable=False), long_gen))],

     # failed
    [MapGen(StringGen(pattern='key_[0-9]', nullable=False), MapGen(LongGen(nullable=False), long_gen))],
    [MapGen(StringGen(pattern='key_[0-9]', nullable=False), MapGen(LongGen(nullable=False), string_gen))],
    [MapGen(StringGen(pattern='key_[0-9]', nullable=False), simple_string_to_string_map_gen)],
     # failed
    [StructGen([['child0', MapGen(LongGen(nullable=False), long_gen)],
                ['child1', MapGen(StringGen(pattern='key_[0-9]', nullable=False), long_gen)],
                ['child2', MapGen(IntegerGen(nullable=False), decimal_gen_64bit)],
                ['child3', StructGen([["cc", MapGen(IntegerGen(nullable=False), decimal_gen_32bit)]])]
                ]),
     ],
    [StructGen([['cc', MapGen(IntegerGen(nullable=False), decimal_gen_64bit)]])],
     # failed
    [StructGen([['cc', ArrayGen(MapGen(IntegerGen(nullable=False), string_gen))]])],
    [StructGen([['cc', ArrayGen(ArrayGen(MapGen(IntegerGen(nullable=False), string_gen)))]])],
]

The test result suggested the unsupported types are:

  1. Decimal with negative scale is NOT supported
  2. Decimal128 inside nested types is NOT supported
  3. BinaryType is NOT supported
  4. MapType inside nested types (Struct of Map/Array of Map/Map of Map) is NOT fully supported

I reworked the typeCheck function are integration tests according to the new finding.

if (javaVersion == null) {
throw new RuntimeException("Hybrid feature: Can not read java.version, get null")
}
if (!javaVersion.startsWith("1.8")) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does it only work with java 1.8? Newer versions are supposed to be backwards compatible.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will test other Java version.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, passed for Java 11, removed this check.

*/
private def checkScalaVersion(): Unit = {
val scalaVersion = scala.util.Properties.versionString
if (!scalaVersion.startsWith("version 2.12")) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have shims and a separate jar for scala 2.13. @gerashegalov is there a way for us to have scala 2.13 specific code that would just fail instead of doing a check like this?

(fsse, conf, p, r) => {
// TODO: HybridScan supports DataSourceV2
if (HybridFileSourceScanExecMeta.useHybridScan(conf, fsse)) {
// Check if runtimes are satisfied: Spark is not Databricks or CDH; Java version is 1.8;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not databricks or CDH? Is it just that we have not tested with these yet?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, because have not tested with CDH and Databricks.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently do not have customer to use CDH and Databricks; Did not test perf on CDH and Databricks.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have confidence that the Hybrid thing support Databricks spark totally. So, for first version, we consider not to support Databricks.

@@ -2895,6 +2912,10 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val avroDebugDumpAlways: Boolean = get(AVRO_DEBUG_DUMP_ALWAYS)

lazy val useHybridParquetReader: Boolean = get(HYBRID_PARQUET_READER)

lazy val loadHybridBackend: Boolean = get(LOAD_HYBRID_BACKEND)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now, it's only used like it must be true if useHybridParquetReader is true.
Where is the code to check this config then load the backend?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LOAD_HYBRID_BACKEND is a startup config, while HYBRID_PARQUET_READER is not. User can config LOAD_HYBRID_BACKEND as true on the startup time, and enable/disable HYBRID_PARQUET_READER at runtime on the fly. This is more flexible.

Copy link
Collaborator

@GaryShen2008 GaryShen2008 Dec 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have some code to check LOAD_HYBRID_BACKEND then try to load the jar when initializing the driver and executor plugin?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's in Hybrid execution repo. It's like:
HybridPluginWrapper.java:

HybridPluginWrapper {
  DriverPluginWrapper {
     if (LOAD_HYBRID_BACKEND) {
        load_impl()
     }
  }

  ExecutorPluginWrapper {
     if (LOAD_HYBRID_BACKEND) {
        load_impl()
     }
  }
}

Also Hybrid execution repo provides a config file spark-rapids-extra-plugins:

com.nvidia.spark.rapids.hybrid.HybridPluginWrapper

If the Hybrid jar in the classpach, Rapids Plugin uses a reflection approach to load the HybridPluginWrapper and init it. Of course, if LOAD_HYBRID_BACKEND is disable, then the hybrid plugin will be not loaded.

Copy link
Collaborator

@revans2 revans2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My issues have pretty much all been addressed and my questions answered. I do want to see a follow on issue filed for #11720 (comment)

I also want to understand the plan for documentation. I get that this is still very early and the configs are all marked as internal so I am okay with where it is at right now. I am not going to approve it yet because I want to hear from others on this too.

@GaryShen2008
Copy link
Collaborator

As discussed with Chong, we also need a doc to describe how to build Gluten/Velox jar for the case that the external users want to have a try.

@res-life
Copy link
Collaborator Author

build

@res-life
Copy link
Collaborator Author

TODO: scala 2.13 buiding is blocking.
In order to not implement shim code for scala 2.12 and scala 2.13, we plan to build a hybrid 2.13 jar, artifact name will be rapids-4-spark-hybrid_2.13.

Signed-off-by: Chong Gao <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
performance A performance related task/issue
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants