From d742febc8332f195efe35385a3b01f3b19c1e8b3 Mon Sep 17 00:00:00 2001 From: Michael Gardner Date: Mon, 16 Dec 2024 17:37:43 -0500 Subject: [PATCH] HPCC4J-672 Include hpcc-spark source code into the hpcc4j project Signed-off-by: Michael Gardner --- spark-hpcc/DataAccess/README.md | 6 + spark-hpcc/DataAccess/pom.xml | 512 ++++++++++++++ .../spark/FileFilterConverter.java | 233 +++++++ .../spark/GenericRowRecordAccessor.java | 127 ++++ .../spark/GenericRowRecordBuilder.java | 119 ++++ .../java/org/hpccsystems/spark/HpccFile.java | 196 ++++++ .../org/hpccsystems/spark/HpccFileWriter.java | 656 ++++++++++++++++++ .../java/org/hpccsystems/spark/HpccRDD.java | 264 +++++++ .../org/hpccsystems/spark/PySparkField.java | 38 + .../spark/PySparkFieldConstructor.java | 39 ++ .../org/hpccsystems/spark/RowConstructor.java | 40 ++ .../spark/SparkSchemaTranslator.java | 191 +++++ .../java/org/hpccsystems/spark/Utils.java | 127 ++++ .../spark/datasource/HpccOptions.java | 154 ++++ .../spark/datasource/HpccRelation.java | 251 +++++++ .../datasource/HpccRelationProvider.java | 111 +++ .../spark/datasource/package-info.java | 6 + .../org/hpccsystems/spark/package-info.java | 38 + .../DataAccess/src/main/javadoc/overview.html | 7 + ...pache.spark.sql.sources.DataSourceRegister | 1 + .../src/main/resources/log4j.properties | 27 + .../spark/BaseIntegrationTest.java | 167 +++++ .../spark/DataframeIntegrationTest.java | 179 +++++ .../hpccsystems/spark/FileFilterTests.java | 111 +++ .../spark/HpccRelationIntegrationTest.java | 192 +++++ spark-hpcc/Examples/PySparkExample.ipynb | 321 +++++++++ spark-hpcc/LICENSE | 201 ++++++ spark-hpcc/README.md | 48 ++ 28 files changed, 4362 insertions(+) create mode 100644 spark-hpcc/DataAccess/README.md create mode 100644 spark-hpcc/DataAccess/pom.xml create mode 100644 spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/FileFilterConverter.java create mode 100644 spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/GenericRowRecordAccessor.java create mode 100644 spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/GenericRowRecordBuilder.java create mode 100644 spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/HpccFile.java create mode 100644 spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/HpccFileWriter.java create mode 100644 spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/HpccRDD.java create mode 100644 spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/PySparkField.java create mode 100644 spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/PySparkFieldConstructor.java create mode 100644 spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/RowConstructor.java create mode 100644 spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/SparkSchemaTranslator.java create mode 100644 spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/Utils.java create mode 100644 spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/datasource/HpccOptions.java create mode 100644 spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/datasource/HpccRelation.java create mode 100644 spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/datasource/HpccRelationProvider.java create mode 100644 spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/datasource/package-info.java create mode 100644 spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/package-info.java create mode 100644 spark-hpcc/DataAccess/src/main/javadoc/overview.html create mode 100644 spark-hpcc/DataAccess/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister create mode 100644 spark-hpcc/DataAccess/src/main/resources/log4j.properties create mode 100644 spark-hpcc/DataAccess/src/test/java/org/hpccsystems/spark/BaseIntegrationTest.java create mode 100644 spark-hpcc/DataAccess/src/test/java/org/hpccsystems/spark/DataframeIntegrationTest.java create mode 100644 spark-hpcc/DataAccess/src/test/java/org/hpccsystems/spark/FileFilterTests.java create mode 100644 spark-hpcc/DataAccess/src/test/java/org/hpccsystems/spark/HpccRelationIntegrationTest.java create mode 100644 spark-hpcc/Examples/PySparkExample.ipynb create mode 100644 spark-hpcc/LICENSE create mode 100644 spark-hpcc/README.md diff --git a/spark-hpcc/DataAccess/README.md b/spark-hpcc/DataAccess/README.md new file mode 100644 index 000000000..f61198279 --- /dev/null +++ b/spark-hpcc/DataAccess/README.md @@ -0,0 +1,6 @@ +# Spark-HPCC/DataAccess +Spark-based classes for HPCC Systems/ Spark interoperability + +HPCC Systems platform target runtime must be 7.x or newer + +HPCC4J wsclient, dfsclient, and commons-hpcc build dependencies must be 7.x or newer. diff --git a/spark-hpcc/DataAccess/pom.xml b/spark-hpcc/DataAccess/pom.xml new file mode 100644 index 000000000..6ff084679 --- /dev/null +++ b/spark-hpcc/DataAccess/pom.xml @@ -0,0 +1,512 @@ + + 4.0.0 + org.hpccsystems + spark-hpcc + 9.9.0-0-SNAPSHOT + spark-hpcc + Spark connector for reading files residing in an HPCC cluster environment + https://hpccsystems.com + + + Apache License, Version 2.0 + https://www.apache.org/licenses/LICENSE-2.0.txt + repo + + + + 1.6.8 + false + 3.1.2 + 3.1.2 + 1.6 + true + 2.2.1 + 3.1.0 + 3.8.0 + 8 + 2.4.6 + 2.11 + 2.11 + UTF-8 + net.razorvine.*:org.apache.*:org.hpccsystems.commons.*:org.hpccsystems.generated.*:org.hpccsystems.dfs.*:org.hpccsystems.ws.*:org.hpccsystems.ws.client.antlr.* + + + scm:git:https://github.com/hpcc-systems/Spark-HPCC.git + scm:git:https://github.com/hpcc-systems/Spark-HPCC.git + scm:git:https://github.com/hpcc-systems/Spark-HPCC.git + HEAD + + + + + + ossrh + https://oss.sonatype.org/content/repositories/snapshots + + + + + + + ossrh + Ossrh Snapshot Repository + https://oss.sonatype.org/content/repositories/snapshots + default + + false + never + + + true + always + warn + + + + + + + org.apache.spark + spark-core_${scala.binary.version} + ${spark.runtime.version} + + + xerces + xercesImpl + + + xerces + xmlParserAPIs + + + provided + + + org.apache.spark + spark-sql_${scala.binary.version} + ${spark.runtime.version} + + + xerces + xercesImpl + + + xerces + xmlParserAPIs + + + provided + + + org.apache.logging.log4j + log4j-core + 2.17.1 + pom + + + org.hpccsystems + commons-hpcc + ${project.version} + + + org.antlr + antlr4 + + + org.antlr + antlr4-runtime + + + + + org.hpccsystems + wsclient + ${project.version} + + + org.antlr + antlr4-runtime + + + org.antlr + antlr4 + + + + + org.hpccsystems + dfsclient + ${project.version} + + + org.antlr + antlr4-runtime + + + org.antlr + antlr4 + + + + + net.razorvine + pyrolite + 4.13 + provided + + + + + + + maven-compiler-plugin + ${maven.compiler.version} + + ${maven.compiler.release} + + + + org.apache.maven.plugins + maven-gpg-plugin + ${maven.gpg.version} + + + sign-artifacts + verify + + sign + + + + + + org.apache.maven.plugins + maven-source-plugin + ${maven.source.version} + + + attach-sources + + jar-no-fork + + + + + + org.apache.maven.plugins + maven-javadoc-plugin + ${maven.javadoc.version} + + + attach-javadocs + + jar + + + + + none + ${javadoc.excludePackageNames} + -Xdoclint:none +
Copyright © 2021 HPCC Systems®. All rights reserved
+
Copyright © 2021 HPCC Systems®. All rights reserved
+ true +
+
+
+
+ + + maven-compiler-plugin + ${maven.compiler.version} + + ${maven.compiler.release} + + + + maven-surefire-plugin + ${maven.surefire.version} + + ${argLine} + ${groups} + + *IntegrationTest.java + + + + + org.apache.maven.plugins + maven-failsafe-plugin + ${maven.failsafe.version} + + + **/*IntegrationTest.java + + + + + + integration-test + verify + + + + + + org.sonatype.plugins + nexus-staging-maven-plugin + ${sonatype.staging.version} + true + + ossrh + https://oss.sonatype.org/ + ${sonatype.staging.autorelease} + + + + org.apache.maven.plugins + maven-assembly-plugin + 2.4.1 + + + jar-with-dependencies + + + **/log4j.properties + + + + + spark-hpcc + package + + single + + + + + + maven-jar-plugin + 3.0.2 + + + **/log4j.properties + + + + +
+ + + spark33 + + 2.12 + 2.12.11 + 3.3.2 + + + + spark34 + + 2.12 + 2.12.11 + 3.4.1 + + + + release + + true + true + + + + + org.apache.maven.plugins + maven-source-plugin + ${maven.source.version} + + + attach-sources + + jar-no-fork + + + + + + org.apache.maven.plugins + maven-javadoc-plugin + ${maven.javadoc.version} + + + attach-javadocs + + jar + + + + + + org.apache.maven.plugins + maven-gpg-plugin + ${maven.gpg.version} + + + sign-artifacts + verify + + sign + + + + + + + + + jenkins-on-demand + + false + org.hpccsystems.commons.annotations.BaseTests,org.hpccsystems.commons.annotations.RemoteTests + + + + + org.apache.maven.plugins + maven-javadoc-plugin + ${maven.javadoc.version} + + ${javadoc.excludePackageNames} + + + + org.apache.maven.plugins + maven-source-plugin + ${maven.source.version} + + + org.sonatype.plugins + nexus-staging-maven-plugin + ${sonatype.staging.version} + + + org.apache.maven.plugins + maven-gpg-plugin + ${maven.gpg.version} + + true + + --batch + --pinentry-mode + loopback + + ${maven.gpg.skip} + + + + maven-surefire-plugin + ${maven.surefire.version} + + ${argLine} + ${groups} + + + + + + + jenkins-release + + true + true + false + org.hpccsystems.commons.annotations.BaseTests + + + + + org.apache.maven.plugins + maven-javadoc-plugin + ${maven.javadoc.version} + + ${javadoc.excludePackageNames} + + + + org.apache.maven.plugins + maven-source-plugin + ${maven.source.version} + + + org.sonatype.plugins + nexus-staging-maven-plugin + ${sonatype.staging.version} + + + org.apache.maven.plugins + maven-gpg-plugin + ${maven.gpg.version} + + true + + --batch + --pinentry-mode + loopback + + ${maven.gpg.skip} + + + + maven-surefire-plugin + ${maven.surefire.version} + + ${argLine} + ${groups} + + + + org.apache.maven.plugins + maven-failsafe-plugin + ${maven.failsafe.version} + + + **/*IntegrationTest.java + + + + + + + + + + johnholt + John Holt + john.holt@lexisnexis.com + LexisNexis Risk + https://hpccsystems.com + + architect + developer + + America/New_York + + + rpastrana + Rodrigo Pastrana + rodrigo.pastrana@lexisnexis.com + LexisNexis Risk + https://hpccsystems.com + + architect + developer + + America/New_York + + + + Github + https://github.com/hpcc-systems/Spark-HPCC/issues + +
diff --git a/spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/FileFilterConverter.java b/spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/FileFilterConverter.java new file mode 100644 index 000000000..a082ced3c --- /dev/null +++ b/spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/FileFilterConverter.java @@ -0,0 +1,233 @@ +/******************************************************************************* + * HPCC SYSTEMS software Copyright (C) 2019 HPCC Systems. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *******************************************************************************/ +package org.hpccsystems.spark; + +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.LogManager; +import org.apache.spark.sql.sources.And; +import org.apache.spark.sql.sources.EqualNullSafe; +import org.apache.spark.sql.sources.EqualTo; +import org.apache.spark.sql.sources.Filter; +import org.apache.spark.sql.sources.GreaterThan; +import org.apache.spark.sql.sources.GreaterThanOrEqual; +import org.apache.spark.sql.sources.In; +import org.apache.spark.sql.sources.IsNotNull; +import org.apache.spark.sql.sources.IsNull; +import org.apache.spark.sql.sources.LessThan; +import org.apache.spark.sql.sources.LessThanOrEqual; +import org.apache.spark.sql.sources.Not; +import org.apache.spark.sql.sources.Or; +import org.apache.spark.sql.sources.StringContains; +import org.apache.spark.sql.sources.StringEndsWith; +import org.apache.spark.sql.sources.StringStartsWith; +import org.hpccsystems.commons.ecl.FieldFilter; +import org.hpccsystems.commons.ecl.FieldFilterRange; +import org.hpccsystems.commons.ecl.FileFilter; + +/** + * A helper class that translates Spark Filters into an HPCC Systems FileFilter. + * + * HPCC Systems FileFilter does not support all possible Spark Filters. In these cases an exception will be thrown. + */ +public class FileFilterConverter +{ + private static final Logger log = LogManager.getLogger(FileFilterConverter.class); + private static final long serialVersionUID = 1L; + + public static FileFilter CovertToHPCCFileFilter(Filter [] sparkfilters) throws Exception + { + FileFilter hpccFilters = new FileFilter(); + + for (Filter sparkfilter : sparkfilters) + { + hpccFilters.andFilter(ConvertToHPCCFileFilterString(sparkfilter)); + } + + return hpccFilters; + } + + public static FileFilter ConvertToHPCCFileFilterString(Filter sparkfilter) throws Exception + { + FileFilter hpccfilter = new FileFilter(); + + //https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/sql + //And, EqualNullSafe, EqualTo, GreaterThan, GreaterThanOrEqual, In, IsNotNull, IsNull, LessThan, LessThanOrEqual, Not, Or, StringContains, StringEndsWith, StringStartsWith + if (sparkfilter instanceof EqualTo) + { + //https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/sql/sources/EqualTo.html + //A filter that evaluates to true iff the attribute evaluates to a value equal to value. + + final EqualTo eqtofilter = (EqualTo) sparkfilter; + hpccfilter = new FileFilter(new FieldFilter(eqtofilter.attribute(),FieldFilterRange.makeEq(eqtofilter.value()))); + } + else if (sparkfilter instanceof EqualNullSafe) + { + //https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/sql/sources/EqualNullSafe.html + //Performs equality comparison, similar to EqualTo. + //However, this differs from EqualTo in that it returns true (rather than NULL) if both inputs are NULL, + //and false (rather than NULL) if one of the input is NULL and the other is not NULL. + // + //No concept of NULL in HPCC/ECL + final EqualNullSafe eqnullsafefilter = (EqualNullSafe) sparkfilter; + hpccfilter = new FileFilter(new FieldFilter(eqnullsafefilter.attribute(),FieldFilterRange.makeEq(eqnullsafefilter.value()))); + } + else if (sparkfilter instanceof And) + { + //https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/sql/sources/And.html + //A filter that evaluates to true iff both left or right evaluate to true. + + final And andfilter = (And) sparkfilter; + hpccfilter = new FileFilter(ConvertToHPCCFileFilterString(andfilter.left())); + hpccfilter.andFilter(ConvertToHPCCFileFilterString(andfilter.right())); + } + else if (sparkfilter instanceof GreaterThan) + { + //https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/sql/sources/GreaterThan.html + //A filter that evaluates to true iff the attribute evaluates to a value greater than value. + + final GreaterThan gtfilter = (GreaterThan) sparkfilter; + hpccfilter = new FileFilter(new FieldFilter(gtfilter.attribute(),FieldFilterRange.makeGT(gtfilter.value()))); + } + else if (sparkfilter instanceof GreaterThanOrEqual) + { + //https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/sql/sources/GreaterThanOrEqual.html + //A filter that evaluates to true iff the attribute evaluates to a value greater than or equal to value. + final GreaterThanOrEqual gtefilter = (GreaterThanOrEqual) sparkfilter; + hpccfilter = new FileFilter(new FieldFilter(gtefilter.attribute(),FieldFilterRange.makeGE(gtefilter.value()))); + } + else if (sparkfilter instanceof In) + { + //https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/sql/sources/In.html + //A filter that evaluates to true iff the attribute evaluates to one of the values in the array. + // + //A in B,C A=[B],[C] + final In infilter = (In) sparkfilter; + hpccfilter = new FileFilter(new FieldFilter(infilter.attribute(), FieldFilterRange.makeIn(infilter.values()))); + } + else if (sparkfilter instanceof IsNotNull) + { + //https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/sql/sources/IsNotNull.html + //A filter that evaluates to true iff the attribute evaluates to a non-null value. + // + //No concept of NULL in hpcc/ecl, this filter should always resolve to false/emptystring/0? + + final IsNotNull wildfilter = (IsNotNull) sparkfilter; + hpccfilter = new FileFilter(new FieldFilter(wildfilter.attribute()));//RODRIGO not sure about this one + log.info("Filter 'IsNotNull' ignored -- No concept of NULL in HPCC/ECL"); + } + else if (sparkfilter instanceof IsNull) + { + //https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/sql/sources/IsNull.html + //A filter that evaluates to true iff the attribute evaluates to null. + // + //No concept of NULL in hpcc/ecl, this filter should always resolve to false/emptystring/0? + throw new UnsupportedOperationException("Filter 'IsNull' not supported"); + } + else if (sparkfilter instanceof LessThan) + { + //https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/sql/sources/LessThan.html + //A filter that evaluates to true iff the attribute evaluates to a value less than value. + final LessThan ltfilter = (LessThan) sparkfilter; + hpccfilter = new FileFilter(new FieldFilter(ltfilter.attribute(),FieldFilterRange.makeLT(ltfilter.value()))); + } + else if (sparkfilter instanceof LessThanOrEqual) + { + //https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/sql/sources/LessThanOrEqual.html + //A filter that evaluates to true iff the attribute evaluates to a value less than or equal to value. + final LessThan ltefilter = (LessThan) sparkfilter; + hpccfilter = new FileFilter(new FieldFilter(ltefilter.attribute(),FieldFilterRange.makeLE(ltefilter.value()))); + } + else if (sparkfilter instanceof Not) + { + //https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/sql/sources/Not.html + //A filter that evaluates to true iff child is evaluated to false. + final Not notfilter = (Not) sparkfilter; + Filter child = notfilter.child(); + if (child instanceof EqualTo) + { + final EqualTo eqfilter = (EqualTo) child; + hpccfilter = new FileFilter(new FieldFilter(eqfilter.attribute(),FieldFilterRange.makeNE(eqfilter.value()))); + } + else if (child instanceof EqualNullSafe) + { + final EqualNullSafe filter = (EqualNullSafe) child; + hpccfilter = new FileFilter(new FieldFilter(filter.attribute(),FieldFilterRange.makeNE(filter.value()))); + } + else if (child instanceof GreaterThan) + { + final GreaterThan filter = (GreaterThan) child; + hpccfilter = new FileFilter(new FieldFilter(filter.attribute(),FieldFilterRange.makeLE(filter.value()))); + } + else if (child instanceof GreaterThanOrEqual) + { + final GreaterThanOrEqual filter = (GreaterThanOrEqual) child; + hpccfilter = new FileFilter(new FieldFilter(filter.attribute(),FieldFilterRange.makeLT(filter.value()))); + } + else if (child instanceof LessThan) + { + final LessThan filter = (LessThan) child; + hpccfilter = new FileFilter(new FieldFilter(filter.attribute(),FieldFilterRange.makeGE(filter.value()))); + } + else if (child instanceof LessThanOrEqual) + { + final LessThanOrEqual filter = (LessThanOrEqual) child; + hpccfilter = new FileFilter(new FieldFilter(filter.attribute(),FieldFilterRange.makeGT(filter.value()))); + } + else + throw new UnsupportedOperationException("Filter 'Not' not supported"); + } + else if (sparkfilter instanceof Or) + { + //https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/sql/sources/Or.html + //A filter that evaluates to true iff at least one of left or right evaluates to true. + // + //A < B or A in [C..D] or A >= E A=(,B),[C,D],[E,) + + final Or orfilter = (Or) sparkfilter; + hpccfilter = new FileFilter(ConvertToHPCCFileFilterString(orfilter.left())); + hpccfilter.orFilter(ConvertToHPCCFileFilterString(orfilter.right())); + } + else if (sparkfilter instanceof StringContains) + { + //https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/sql/sources/StringContains.html + //A filter that evaluates to true iff the attribute evaluates to a string that contains the string value. + throw new UnsupportedOperationException("Filter 'StringContains' not supported"); + } + else if (sparkfilter instanceof StringEndsWith) + { + //https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/sql/sources/StringEndsWith.html + //A filter that evaluates to true iff the attribute evaluates to a string that starts with value. + throw new UnsupportedOperationException("Filter 'StringEndsWith' not supported"); + } + else if (sparkfilter instanceof StringStartsWith) + { + //https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/sql/sources/StringStartsWith.html + //A filter that evaluates to true iff the attribute evaluates to a string that starts with value. + // + //:= - substring match + //The format of the set is an optional comma-separated sequence of ranges + final StringStartsWith strstartswithfilter = (StringStartsWith) sparkfilter; + final String value = strstartswithfilter.value(); + final FieldFilterRange substring = FieldFilterRange.makeStartsWith(value, (short)value.length()); + + hpccfilter = new FileFilter(new FieldFilter(strstartswithfilter.attribute(), new FieldFilterRange[] {substring})); + } + + log.debug("Converted SPARK filter: '" + sparkfilter.toString() + "' to HPCC filter: '" + hpccfilter); + return hpccfilter; + } +} + diff --git a/spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/GenericRowRecordAccessor.java b/spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/GenericRowRecordAccessor.java new file mode 100644 index 000000000..7f88b16bc --- /dev/null +++ b/spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/GenericRowRecordAccessor.java @@ -0,0 +1,127 @@ +/******************************************************************************* + * HPCC SYSTEMS software Copyright (C) 2019 HPCC Systems®. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *******************************************************************************/ + +package org.hpccsystems.spark; + +import org.hpccsystems.dfs.client.IRecordAccessor; + +import org.hpccsystems.commons.ecl.FieldDef; +import org.hpccsystems.commons.ecl.FieldType; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema; +import org.apache.spark.sql.types.*; + +/** + * An implementation of IRecordAccessor that allows IRecordWriter to access Spark GenericRows. + */ +public class GenericRowRecordAccessor implements IRecordAccessor +{ + private GenericRowWithSchema row = null; + private FieldDef fieldDef = null; + private GenericRowRecordAccessor[] childRecordAccessors = null; + + public GenericRowRecordAccessor(StructType schema) throws IllegalArgumentException + { + try + { + this.fieldDef = SparkSchemaTranslator.toHPCCRecordDef(schema); + } + catch(Exception e) + { + throw new IllegalArgumentException(e.getMessage()); + } + } + + public GenericRowRecordAccessor(FieldDef fieldDef) + { + this.fieldDef = fieldDef; + this.childRecordAccessors = new GenericRowRecordAccessor[this.fieldDef.getNumDefs()]; + for (int i = 0; i < this.fieldDef.getNumDefs(); i++) + { + FieldDef fd = this.fieldDef.getDef(i); + boolean needsChildRecordAccessor = (fd.getFieldType() == FieldType.RECORD) + || (fd.getFieldType() == FieldType.DATASET && fd.getDef(0).getFieldType() == FieldType.RECORD); + + if (needsChildRecordAccessor) + { + FieldDef subFd = fd; + if (fd.getFieldType() == FieldType.DATASET) + { + subFd = fd.getDef(0); + } + childRecordAccessors[i] = new GenericRowRecordAccessor(subFd); + } + else + { + childRecordAccessors[i] = null; + } + } + + } + + public IRecordAccessor setRecord(Object rd) + { + this.row = (GenericRowWithSchema) rd; + return this; + } + + public int getNumFields() + { + return this.fieldDef.getNumDefs(); + } + + public Object getFieldValue(int index) + { + Object val = this.row.get(index); + + FieldDef fd = this.fieldDef.getDef(index); + if (fd.getFieldType() == FieldType.DATASET || fd.getFieldType() == FieldType.SET) + { + if (val instanceof List) + { + return val; + } + else if (val instanceof Object[]) + { + return Arrays.asList((Object[]) val); + } + else if (val instanceof scala.collection.Seq) + { + return scala.collection.JavaConversions.seqAsJavaList((scala.collection.Seq)val); + } + + return new ArrayList(); + } + else + { + return val; + } + } + + public FieldDef getFieldDefinition(int index) + { + return this.fieldDef.getDef(index); + } + + public IRecordAccessor getChildRecordAccessor(int index) + { + return this.childRecordAccessors[index]; + } +} diff --git a/spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/GenericRowRecordBuilder.java b/spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/GenericRowRecordBuilder.java new file mode 100644 index 000000000..0b1215ca2 --- /dev/null +++ b/spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/GenericRowRecordBuilder.java @@ -0,0 +1,119 @@ +/* + * ############################################################################## + * + * HPCC SYSTEMS software Copyright (C) 2019 HPCC Systems®. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * ############################################################################## + */ + +package org.hpccsystems.spark; + +import org.hpccsystems.dfs.client.IRecordBuilder; + +import org.hpccsystems.commons.ecl.FieldDef; +import org.hpccsystems.commons.ecl.FieldType; + +import java.util.List; + +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema; +import org.apache.spark.sql.types.*; + +/** + * An implementation of IRecordBuilder that allows IRecordReader to create Spark GenericRows. + */ +public class GenericRowRecordBuilder implements IRecordBuilder +{ + private Object[] fields = null; + private FieldDef fieldDef = null; + private StructType schema = null; + + private GenericRowRecordBuilder[] childRecordBuilders = null; + + public GenericRowRecordBuilder(FieldDef recordDef) throws IllegalArgumentException + { + setRecordDefinition(recordDef); + } + + public void setRecordDefinition(FieldDef fieldDef) throws IllegalArgumentException + { + this.fieldDef = fieldDef; + try { + this.schema = SparkSchemaTranslator.toSparkSchema(fieldDef); + } catch (Exception e) { + throw new IllegalArgumentException(e.getMessage()); + } + + this.childRecordBuilders = new GenericRowRecordBuilder[this.fieldDef.getNumDefs()]; + for (int i = 0; i < this.fieldDef.getNumDefs(); i++) + { + FieldDef fd = this.fieldDef.getDef(i); + boolean needsChildRecordBuilder = (fd.getFieldType() == FieldType.RECORD) + || (fd.getFieldType() == FieldType.DATASET && fd.getDef(0).getFieldType() == FieldType.RECORD); + + if (needsChildRecordBuilder) + { + FieldDef subFd = fd; + if (fd.getFieldType() == FieldType.DATASET) + { + subFd = fd.getDef(0); + } + childRecordBuilders[i] = new GenericRowRecordBuilder(subFd); + } + else + { + childRecordBuilders[i] = null; + } + } + } + + public FieldDef getRecordDefinition() + { + return this.fieldDef; + } + + public void startRecord() throws java.lang.InstantiationException + { + fields = new Object[this.fieldDef.getNumDefs()]; + } + + public Object finalizeRecord() throws java.lang.InstantiationException + { + GenericRowWithSchema row = new GenericRowWithSchema(fields, this.schema); + fields = null; + return row; + } + + public void setFieldValue(int index, Object value) throws IllegalArgumentException, IllegalAccessException + { + FieldDef fd = this.fieldDef.getDef(index); + if (fd.getFieldType() == FieldType.DATASET || fd.getFieldType() == FieldType.SET) + { + if (value instanceof List) + { + List listVal = (List) value; + this.fields[index] = listVal.toArray(); + } + else + { + this.fields[index] = new Object[0]; + } + } + else + { + this.fields[index] = value; + } + } + + public IRecordBuilder getChildRecordBuilder(int index) + { + return this.childRecordBuilders[index]; + } +} diff --git a/spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/HpccFile.java b/spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/HpccFile.java new file mode 100644 index 000000000..45b2bb289 --- /dev/null +++ b/spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/HpccFile.java @@ -0,0 +1,196 @@ +/******************************************************************************* + * HPCC SYSTEMS software Copyright (C) 2018 HPCC Systems®. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *******************************************************************************/ +package org.hpccsystems.spark; + +import java.io.Serializable; +import java.net.MalformedURLException; + + +import org.apache.spark.SparkContext; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.execution.python.EvaluatePython; +import org.hpccsystems.dfs.cluster.RemapInfo; +import org.hpccsystems.dfs.client.DataPartition; +import org.hpccsystems.commons.ecl.FieldDef; +import org.hpccsystems.commons.errors.HpccFileException; +import org.hpccsystems.ws.client.utils.Connection; + +import io.opentelemetry.api.trace.Span; + +/** + * Access to file content on a collection of one or more HPCC + * clusters. + * + */ +public class HpccFile extends org.hpccsystems.dfs.client.HPCCFile implements Serializable +{ + static private final long serialVersionUID = 1L; + private String parentTraceID = ""; + private String parentSpanID = ""; + + private int recordLimit = -1; + + // Make sure Python picklers have been registered + static { EvaluatePython.registerPicklers(); } + + /** + * Constructor for the HpccFile. + * Captures HPCC logical file information from the DALI Server + * for the clusters behind the ESP named by the Connection. + * + * @param fileName The HPCC file name + * @param espconninfo The ESP connection info (protocol,address,port,user,pass) + * @throws HpccFileException hpcc file exection + */ + public HpccFile(String fileName, Connection espconninfo) throws HpccFileException + { + super(fileName,espconninfo); + } + + /** + * Constructor for the HpccFile. + * Captures HPCC logical file information from the DALI Server + * for the clusters behind the ESP named by the Connection. + * + * @param fileName The HPCC file name + * @param connectionString to eclwatch. Format: {http|https}://{HOST}:{PORT}. + * @param user username + * @param pass password + * @throws MalformedURLException Malformed URL exception + * @throws HpccFileException hpcc file exception + */ + public HpccFile(String fileName, String connectionString, String user, String pass) throws MalformedURLException, HpccFileException + { + super(fileName,connectionString,user,pass); + } + + /** + * Constructor for the HpccFile. + * Captures HPCC logical file information from the DALI Server for the + * clusters behind the ESP named by the IP address and re-maps + * the address information for the THOR nodes to visible addresses + * when the THOR clusters are virtual. + * @param fileName The HPCC file name + * @param espconninfo esp connection information object + * @param targetColumnList a comma separated list of column names in dotted + * notation for columns within compound columns. + * @param filter a file filter to select records of interest + * @param remap_info address and port re-mapping info for THOR cluster + * @param maxParts optional the maximum number of partitions or zero for no max + * @param targetfilecluster optional - the hpcc cluster the target file resides in + * @throws HpccFileException hpcc file exception + */ + public HpccFile(String fileName, Connection espconninfo, String targetColumnList, String filter, RemapInfo remap_info, int maxParts, String targetfilecluster) throws HpccFileException + { + super(fileName,espconninfo,targetColumnList,filter,remap_info,maxParts,targetfilecluster); + } + + /** + * Set the opentelemetry trace context + * @param span the span to use for the trace context + */ + public void setTraceContext(Span span) + { + this.parentTraceID = span.getSpanContext().getTraceId(); + this.parentSpanID = span.getSpanContext().getSpanId(); + } + + /** + * Set the opentelemetry trace context + * @param parentTraceID hexadecimal trace id string + * @param parentSpanID hexadecimal span id string + */ + public void setTraceContext(String parentTraceID, String parentSpanID) + { + this.parentTraceID = parentTraceID; + this.parentSpanID = parentSpanID; + } + + /** + * Set file part record limit + * @param limit fire part record limit + */ + public void setFilePartRecordLimit(int limit) + { + this.recordLimit = limit; + } + + /** + * Returns the current file part record limit + * @return returns file part record limit + */ + public int getFilePartRecordLimit() + { + return this.recordLimit; + } + + /** + * Make a Spark Resilient Distributed Dataset (RDD) that provides access + * to THOR based datasets. Uses existing SparkContext, allows this function + * to be used from PySpark. + * @return An RDD of THOR data. + * @throws HpccFileException When there are errors reaching the THOR data + */ + public HpccRDD getRDD() throws HpccFileException + { + return getRDD(SparkContext.getOrCreate()); + } + + /** + * Make a Spark Resilient Distributed Dataset (RDD) that provides access + * to THOR based datasets. + * @param sc Spark Context + * @return An RDD of THOR data. + * @throws HpccFileException When there are errors reaching the THOR data + */ + public HpccRDD getRDD(SparkContext sc) throws HpccFileException + { + HpccRDD rdd = new HpccRDD(sc, getFileParts(), this.getRecordDefinition(), this.getProjectedRecordDefinition(), this.getFileAccessExpirySecs(), this.recordLimit); + rdd.setTraceContext(parentTraceID, parentSpanID); + return rdd; + } + /** + * Make a Spark Dataframe (Dataset (Row)) of THOR data available. + * @param session the Spark Session object + * @return a Dataframe of THOR data + * @throws HpccFileException when htere are errors reaching the THOR data. + */ + public Dataset getDataframe(SparkSession session) throws HpccFileException{ + FieldDef originalRD = this.getRecordDefinition(); + FieldDef projectedRD = this.getProjectedRecordDefinition(); + DataPartition[] fp = this.getFileParts(); + + HpccRDD hpccRDD = new HpccRDD(session.sparkContext(), fp, originalRD, projectedRD, this.getFileAccessExpirySecs(), this.recordLimit); + hpccRDD.setTraceContext(parentTraceID, parentSpanID); + JavaRDD rdd = (hpccRDD).toJavaRDD(); + + StructType schema = null; + try + { + schema = SparkSchemaTranslator.toSparkSchema(projectedRD); + } + catch(Exception e) + { + throw new HpccFileException(e.getMessage()); + } + + return session.createDataFrame(rdd, schema); + } +} diff --git a/spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/HpccFileWriter.java b/spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/HpccFileWriter.java new file mode 100644 index 000000000..c0fcd739d --- /dev/null +++ b/spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/HpccFileWriter.java @@ -0,0 +1,656 @@ +/******************************************************************************* + * HPCC SYSTEMS software Copyright (C) 2018 HPCC Systems®. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *******************************************************************************/ +package org.hpccsystems.spark; + +import java.io.Serializable; +import java.util.List; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.regex.Pattern; + +import java.math.BigDecimal; + +import java.util.regex.Matcher; + +import scala.reflect.ClassTag$; + +import org.hpccsystems.commons.errors.HpccFileException; +import org.hpccsystems.dfs.client.CompressionAlgorithm; +import org.hpccsystems.dfs.client.DataPartition; +import org.hpccsystems.dfs.client.HPCCRemoteFileWriter; +import org.hpccsystems.dfs.cluster.RemapInfo; +import org.hpccsystems.dfs.cluster.NullRemapper; +import org.hpccsystems.commons.ecl.FieldDef; +import org.hpccsystems.commons.ecl.RecordDefinitionTranslator; +import org.hpccsystems.ws.client.HPCCWsDFUClient; +import org.hpccsystems.ws.client.utils.Connection; +import org.hpccsystems.ws.client.wrappers.ArrayOfEspExceptionWrapper; +import org.hpccsystems.ws.client.wrappers.wsdfu.DFUCreateFileWrapper; +import org.hpccsystems.ws.client.wrappers.wsdfu.DFUFilePartWrapper; +import org.hpccsystems.ws.client.wrappers.wsdfu.DFUFileTypeWrapper; + +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; + +import org.hpccsystems.spark.SparkSchemaTranslator; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.LogManager; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.SparkContext; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.rdd.RDD; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.types.*; +import org.apache.spark.sql.execution.python.EvaluatePython; + +import net.razorvine.pickle.Unpickler; + +/** + * A helper class that creates a job in Spark that writes a given RDD to HPCC Systems. + */ +public class HpccFileWriter implements Serializable +{ + static private final long serialVersionUID = 1L; + static private final int DefaultExpiryTimeSecs = 300; + static private final Logger log = LogManager.getLogger(HpccFileWriter.class); + + // Transient so Java serialization does not try to serialize this + private transient HPCCWsDFUClient dfuClient = null; + private transient Connection connectionInfo = null; + + private String parentTraceID = ""; + private String parentSpanID = ""; + + private static void registerPicklingFunctions() + { + EvaluatePython.registerPicklers(); + Unpickler.registerConstructor("pyspark.sql.types", "Row", new RowConstructor()); + Unpickler.registerConstructor("pyspark.sql.types", "_create_row", new RowConstructor()); + Unpickler.registerConstructor("org.hpccsystems", "PySparkField", new PySparkFieldConstructor()); + } + + static + { + registerPicklingFunctions(); + } + + public HpccFileWriter(Connection espconninfo) throws HpccFileException + { + this.connectionInfo = espconninfo; + } + + /** + * HpccFileWriter Constructor + * Attempts to open a connection to the specified HPCC cluster and validates the user. + * @param connectionString of format {http|https}://{HOST}:{PORT}. Host and port are the same as the ecl watch host and port. + * @param user a valid ecl watch account + * @param pass the password for the provided user + * @throws Exception general exception + */ + public HpccFileWriter(String connectionString, String user, String pass) throws Exception + { + // Make sure we setup opentelemetry before HPCC4j + Utils.getOpenTelemetry(); + + // Verify connection & password + final Pattern connectionRegex = Pattern.compile("(http|https)://([^:]+):([0-9]+)", Pattern.CASE_INSENSITIVE); + Matcher matches = connectionRegex.matcher(connectionString); + if (matches.find() == false) + { + throw new Exception("Invalid connection string. Expected format: {http|https}://{HOST}:{PORT}"); + } + + this.connectionInfo = new Connection(matches.group(1), matches.group(2), matches.group(3)); + this.connectionInfo.setUserName(user); + this.connectionInfo.setPassword(pass); + } + + /** + * Set the trace context for the current job + * @param parentTraceID + * @param parentSpanID + */ + public void setTraceContext(String parentTraceID, String parentSpanID) + { + this.parentTraceID = parentTraceID; + this.parentSpanID = parentSpanID; + } + + private void abortFileCreation() + { + log.error("Abort file creation was called. This is currently a stub."); + } + + private class FilePartWriteResults implements Serializable + { + static private final long serialVersionUID = 1L; + + public long numRecords = 0; + public long dataLength = 0; + public boolean successful = true; // Default to true for empty partitions + public String errorMessage = null; + } + + /** + * Saves the provided RDD to the specified file within the specified cluster. Will use HPCC default file compression. + * Note: PySpark datasets can be written to HPCC by first calling inferSchema to generate a valid Java Schema + * and converting the PySpark RDD to a JavaRDD via the _py2java() helper + * @param scalaRDD The RDD to save to HPCC + * @param clusterName The name of the cluster to save to. + * @param fileName The name of the logical file in HPCC to create. Follows HPCC file name conventions. + * @return Returns the number of records written + * @throws Exception general exception + * @throws ArrayOfEspExceptionWrapper array of esp exception wrapper + */ + public long saveToHPCC(RDD scalaRDD, String clusterName, String fileName) throws Exception, ArrayOfEspExceptionWrapper + { + JavaRDD rdd = JavaRDD.fromRDD(scalaRDD, ClassTag$.MODULE$.apply(Row.class)); + return this.saveToHPCC(rdd, clusterName, fileName); + } + + /** + * Saves the provided RDD to the specified file within the specified cluster. Will use HPCC default file compression. + * Note: PySpark datasets can be written to HPCC by first calling inferSchema to generate a valid Java Schema + * and converting the PySpark RDD to a JavaRDD via the _py2java() helper + * @param schema The Schema of the provided RDD + * @param scalaRDD The RDD to save to HPCC + * @param clusterName The name of the cluster to save to. + * @param fileName The name of the logical file in HPCC to create. Follows HPCC file name conventions. + * @return Returns the number of records written + * @throws Exception general exception + * @throws ArrayOfEspExceptionWrapper array of esp exception wrapper + */ + public long saveToHPCC(StructType schema, RDD scalaRDD, String clusterName, String fileName) throws Exception, ArrayOfEspExceptionWrapper + { + JavaRDD rdd = JavaRDD.fromRDD(scalaRDD, ClassTag$.MODULE$.apply(Row.class)); + return this.saveToHPCC(schema, rdd, clusterName, fileName); + } + + /** + * Saves the provided RDD to the specified file within the specified cluster. Will use HPCC default file compression. + * Note: PySpark datasets can be written to HPCC by first calling inferSchema to generate a valid Java Schema + * and converting the PySpark RDD to a JavaRDD via the _py2java() helper + * @param javaRDD The RDD to save to HPCC + * @param clusterName The name of the cluster to save to. + * @param fileName The name of the logical file in HPCC to create. Follows HPCC file name conventions. + * @return Returns the number of records written + * @throws Exception general exception + * @throws ArrayOfEspExceptionWrapper array of esp exception wrapper + */ + public long saveToHPCC(JavaRDD javaRDD, String clusterName, String fileName) throws Exception, ArrayOfEspExceptionWrapper + { + return this.saveToHPCC(SparkContext.getOrCreate(), null, javaRDD, clusterName, fileName, CompressionAlgorithm.DEFAULT, false); + } + + /** + * Saves the provided RDD to the specified file within the specified cluster. Will use HPCC default file compression. + * Note: PySpark datasets can be written to HPCC by first calling inferSchema to generate a valid Java Schema + * and converting the PySpark RDD to a JavaRDD via the _py2java() helper + * @param schema The Schema of the provided RDD + * @param javaRDD The RDD to save to HPCC + * @param clusterName The name of the cluster to save to. + * @param fileName The name of the logical file in HPCC to create. Follows HPCC file name conventions. + * @return Returns the number of records written + * @throws Exception general exception + * @throws ArrayOfEspExceptionWrapper array of esp exception + */ + public long saveToHPCC(StructType schema, JavaRDD javaRDD, String clusterName, String fileName) throws Exception, ArrayOfEspExceptionWrapper + { + return this.saveToHPCC(SparkContext.getOrCreate(), schema, javaRDD, clusterName, fileName, CompressionAlgorithm.DEFAULT, false); + } + + /** + * Saves the provided RDD to the specified file within the specified cluster + * Note: PySpark datasets can be written to HPCC by first calling inferSchema to generate a valid Java Schema + * and converting the PySpark RDD to a JavaRDD via the _py2java() helper + * @param scalaRDD The RDD to save to HPCC + * @param clusterName The name of the cluster to save to. + * @param fileName The name of the logical file in HPCC to create. Follows HPCC file name conventions. + * @param fileCompression compression algorithm to use on files + * @param overwrite overwrite flag + * @return Returns the number of records written + * @throws Exception general exception + * @throws ArrayOfEspExceptionWrapper array of esp exception wrapper + */ + public long saveToHPCC(RDD scalaRDD, String clusterName, String fileName, CompressionAlgorithm fileCompression, boolean overwrite) + throws Exception, ArrayOfEspExceptionWrapper + { + JavaRDD rdd = JavaRDD.fromRDD(scalaRDD, ClassTag$.MODULE$.apply(Row.class)); + return this.saveToHPCC(SparkContext.getOrCreate(), null, rdd, clusterName, fileName, fileCompression, overwrite); + } + + /** + * Saves the provided RDD to the specified file within the specified cluster + * Note: PySpark datasets can be written to HPCC by first calling inferSchema to generate a valid Java Schema + * and converting the PySpark RDD to a JavaRDD via the _py2java() helper + * @param schema The Schema of the provided RDD + * @param scalaRDD The RDD to save to HPCC + * @param clusterName The name of the cluster to save to. + * @param fileName The name of the logical file in HPCC to create. Follows HPCC file name conventions. + * @param fileCompression compression algorithm to use on files + * @param overwrite overwrite flag + * @return Returns the number of records written + * @throws Exception general exception + * @throws ArrayOfEspExceptionWrapper array of esp exception wrapper + */ + public long saveToHPCC(StructType schema, RDD scalaRDD, String clusterName, String fileName, CompressionAlgorithm fileCompression, + boolean overwrite) throws Exception, ArrayOfEspExceptionWrapper + { + JavaRDD rdd = JavaRDD.fromRDD(scalaRDD, ClassTag$.MODULE$.apply(Row.class)); + return this.saveToHPCC(SparkContext.getOrCreate(), schema, rdd, clusterName, fileName, fileCompression, overwrite); + } + + /** + * Saves the provided RDD to the specified file within the specified cluster + * Note: PySpark datasets can be written to HPCC by first calling inferSchema to generate a valid Java Schema + * and converting the PySpark RDD to a JavaRDD via the _py2java() helper + * @param javaRDD The RDD to save to HPCC + * @param clusterName The name of the cluster to save to. + * @param fileName The name of the logical file in HPCC to create. Follows HPCC file name conventions. + * @param fileCompression compression algorithm to use on files + * @param overwrite overwrite flag + * @return Returns the number of records written + * @throws Exception general exception + * @throws ArrayOfEspExceptionWrapper array of esp exception wrapper + */ + public long saveToHPCC(JavaRDD javaRDD, String clusterName, String fileName, CompressionAlgorithm fileCompression, boolean overwrite) + throws Exception, ArrayOfEspExceptionWrapper + { + return this.saveToHPCC(SparkContext.getOrCreate(), null, javaRDD, clusterName, fileName, fileCompression, overwrite); + } + + /** + * Saves the provided RDD to the specified file within the specified cluster + * Note: PySpark datasets can be written to HPCC by first calling inferSchema to generate a valid Java Schema + * and converting the PySpark RDD to a JavaRDD via the _py2java() helper + * @param schema The Schema of the provided RDD + * @param javaRDD The RDD to save to HPCC + * @param clusterName The name of the cluster to save to. + * @param fileName The name of the logical file in HPCC to create. Follows HPCC file name conventions. + * @param fileCompression compression algorithm to use on files + * @param overwrite overwrite flag + * @return Returns the number of records written + * @throws Exception general exception + * @throws ArrayOfEspExceptionWrapper array of esp exception wrapper + */ + public long saveToHPCC(StructType schema, JavaRDD javaRDD, String clusterName, String fileName, CompressionAlgorithm fileCompression, + boolean overwrite) throws Exception, ArrayOfEspExceptionWrapper + { + return this.saveToHPCC(SparkContext.getOrCreate(), schema, javaRDD, clusterName, fileName, fileCompression, overwrite); + } + + /** + * Saves the provided RDD to the specified file within the specified cluster. Will use HPCC default file compression. + * Note: PySpark datasets can be written to HPCC by first calling inferSchema to generate a valid Java Schema + * and converting the PySpark RDD to a JavaRDD via the _py2java() helper + * @param sc The current SparkContext + * @param scalaRDD The RDD to save to HPCC + * @param clusterName The name of the cluster to save to. + * @param fileName The name of the logical file in HPCC to create. Follows HPCC file name conventions. + * @return Returns the number of records written + * @throws Exception general exception + * @throws ArrayOfEspExceptionWrapper array of esp exception wrapper + */ + public long saveToHPCC(SparkContext sc, RDD scalaRDD, String clusterName, String fileName) throws Exception, ArrayOfEspExceptionWrapper + { + JavaRDD rdd = JavaRDD.fromRDD(scalaRDD, ClassTag$.MODULE$.apply(Row.class)); + return saveToHPCC(sc, null, rdd, clusterName, fileName, CompressionAlgorithm.DEFAULT, false); + } + + /** + * Saves the provided RDD to the specified file within the specified cluster. Will use HPCC default file compression. + * Note: PySpark datasets can be written to HPCC by first calling inferSchema to generate a valid Java Schema + * and converting the PySpark RDD to a JavaRDD via the _py2java() helper + * @param sc The current SparkContext + * @param javaRDD The RDD to save to HPCC + * @param clusterName The name of the cluster to save to. + * @param fileName The name of the logical file in HPCC to create. Follows HPCC file name conventions. + * @return Returns the number of records written + * @throws Exception general exception + * @throws ArrayOfEspExceptionWrapper array of esp exception wrapper + */ + public long saveToHPCC(SparkContext sc, JavaRDD javaRDD, String clusterName, String fileName) throws Exception, ArrayOfEspExceptionWrapper + { + return saveToHPCC(sc, null, javaRDD, clusterName, fileName, CompressionAlgorithm.DEFAULT, false); + } + + /** + * Saves the provided RDD to the specified file within the specified cluster + * Note: PySpark datasets can be written to HPCC by first calling inferSchema to generate a valid Java Schema + * and converting the PySpark RDD to a JavaRDD via the _py2java() helper + * @param sc The current SparkContext + * @param scalaRDD The RDD to save to HPCC + * @param clusterName The name of the cluster to save to. + * @param fileName The name of the logical file in HPCC to create. Follows HPCC file name conventions. + * @param fileCompression compression algorithm to use on files + * @param overwrite overwrite flag + * @return Returns the number of records written + * @throws Exception general exception + * @throws ArrayOfEspExceptionWrapper array of esp exception wrapper + */ + public long saveToHPCC(SparkContext sc, RDD scalaRDD, String clusterName, String fileName, CompressionAlgorithm fileCompression, + boolean overwrite) throws Exception, ArrayOfEspExceptionWrapper + { + JavaRDD rdd = JavaRDD.fromRDD(scalaRDD, ClassTag$.MODULE$.apply(Row.class)); + return this.saveToHPCC(sc, null, rdd, clusterName, fileName, fileCompression, overwrite); + } + + /** + * Saves the provided RDD to the specified file within the specified cluster + * Note: PySpark datasets can be written to HPCC by first calling inferSchema to generate a valid Java Schema + * and converting the PySpark RDD to a JavaRDD via the _py2java() helper + * @param sc The current SparkContext + * @param rddSchema rdd schema + * @param rdd java rdd row + * @param clusterName The name of the cluster to save to. + * @param fileName The name of the logical file in HPCC to create. Follows HPCC file name conventions. + * @param fileCompression compression algorithm to use on files + * @param overwrite ovewrite flag + * @return Returns the number of records written + * @throws Exception general exception + * @throws ArrayOfEspExceptionWrapper array of esp exception wrapper + */ + public long saveToHPCC(SparkContext sc, StructType rddSchema, JavaRDD rdd, String clusterName, String fileName, + CompressionAlgorithm fileCompression, boolean overwrite) throws Exception, ArrayOfEspExceptionWrapper + { + this.dfuClient = HPCCWsDFUClient.get(this.connectionInfo); + + if (sc == null || rdd == null) + { + throw new Exception("Aborting write. A valid non-null SparkContext and RDD must be provided."); + } + + StructType schema = rddSchema; + if (schema == null) + { + Row firstRow = rdd.first(); + schema = firstRow.schema(); + } + + FieldDef recordDef = SparkSchemaTranslator.toHPCCRecordDef(schema); + String eclRecordDefn = RecordDefinitionTranslator.toECLRecord(recordDef); + boolean isCompressed = fileCompression != CompressionAlgorithm.NONE; + + Span createFileSpan = Utils.createChildSpan(parentTraceID, parentSpanID, "HpccFileWriter/CreateFile_" + fileName); + createFileSpan.setAttribute("cluster_name", clusterName); + createFileSpan.setAttribute("compressed", isCompressed); + createFileSpan.setAttribute("record_definition", eclRecordDefn); + createFileSpan.setStatus(StatusCode.OK); + + DFUCreateFileWrapper createResult = null; + try + { + createResult = dfuClient.createFile(fileName, clusterName, eclRecordDefn, DefaultExpiryTimeSecs, isCompressed, DFUFileTypeWrapper.Flat, ""); + } + catch (Exception e) + { + createFileSpan.recordException(e); + createFileSpan.setStatus(StatusCode.ERROR); + throw e; + } + finally + { + createFileSpan.end(); + } + + Span repartSpan = Utils.createChildSpan(parentTraceID, parentSpanID, "HpccFileWriter/Repartition_" + fileName); + repartSpan.setStatus(StatusCode.OK); + + DFUFilePartWrapper[] dfuFileParts = createResult.getFileParts(); + DataPartition[] hpccPartitions = DataPartition.createPartitions(dfuFileParts, + new NullRemapper(new RemapInfo(), createResult.getFileAccessInfo()), dfuFileParts.length, createResult.getFileAccessInfoBlob()); + + if (hpccPartitions.length != rdd.getNumPartitions()) + { + rdd = rdd.repartition(hpccPartitions.length); + if (rdd.getNumPartitions() != hpccPartitions.length) + { + Exception wrappedException = new Exception("Repartitioning RDD failed. Aborting write."); + repartSpan.recordException(wrappedException); + repartSpan.setStatus(StatusCode.ERROR); + repartSpan.end(); + + throw wrappedException; + } + } + + repartSpan.end(); + + //------------------------------------------------------------------------------ + // Write partitions to file parts + //------------------------------------------------------------------------------ + + Function2, Iterator> writeFunc = (Integer partitionIndex, Iterator it) -> + { + HpccFileWriter.registerPicklingFunctions(); + DataPartition thisPart = hpccPartitions[partitionIndex]; + Span filePartWriteSpan = Utils.createChildSpan(parentTraceID, parentSpanID, "HpccFileWriter/WritePart_" + fileName + "_" + partitionIndex); + filePartWriteSpan.setStatus(StatusCode.OK); + + HPCCRemoteFileWriter.FileWriteContext writeContext = new HPCCRemoteFileWriter.FileWriteContext(); + writeContext.recordDef = recordDef; + writeContext.fileCompression = fileCompression; + writeContext.parentSpan = filePartWriteSpan; + + GenericRowRecordAccessor recordAccessor = new GenericRowRecordAccessor(recordDef); + HPCCRemoteFileWriter fileWriter = new HPCCRemoteFileWriter(writeContext, thisPart, recordAccessor); + + FilePartWriteResults result = new FilePartWriteResults(); + try + { + fileWriter.writeRecords(it); + fileWriter.close(); + + result.dataLength = fileWriter.getBytesWritten(); + result.numRecords = fileWriter.getRecordsWritten(); + result.successful = true; + } + catch (Exception e) + { + result.successful = false; + result.errorMessage = e.getMessage(); + + filePartWriteSpan.recordException(e); + filePartWriteSpan.setStatus(StatusCode.ERROR); + } + finally + { + filePartWriteSpan.end(); + } + + List resultList = Arrays.asList(result); + return resultList.iterator(); + }; + + // Create Write Job + Span issueWriteSpan = Utils.createChildSpan(parentTraceID, parentSpanID, "HpccFileWriter/WriteDataset_" + fileName); + issueWriteSpan.setStatus(StatusCode.OK); + + JavaRDD writeResultsRDD = rdd.mapPartitionsWithIndex(writeFunc, true); + List writeResultsList = writeResultsRDD.collect(); + + long recordsWritten = 0; + long dataWritten = 0; + for (int i = 0; i < writeResultsList.size(); i++) + { + FilePartWriteResults result = writeResultsList.get(i); + recordsWritten += result.numRecords; + dataWritten += result.dataLength; + + if (result.successful == false) + { + abortFileCreation(); + Exception wrappedException = new Exception("Writing failed with error: " + result.errorMessage); + issueWriteSpan.recordException(wrappedException); + issueWriteSpan.setStatus(StatusCode.ERROR); + issueWriteSpan.end(); + + throw wrappedException; + } + } + + issueWriteSpan.end(); + + //------------------------------------------------------------------------------ + // Publish and finalize the temp file + //------------------------------------------------------------------------------ + + Span publishFileSpan = Utils.createChildSpan(parentTraceID, parentSpanID, "HpccFileWriter/PublishFile_" + fileName); + publishFileSpan.setAttribute("records_written", recordsWritten); + publishFileSpan.setAttribute("data_written", dataWritten); + publishFileSpan.setAttribute("overwrite", overwrite); + publishFileSpan.setStatus(StatusCode.OK); + + try + { + dfuClient.publishFile(createResult.getFileID(), eclRecordDefn, recordsWritten, dataWritten, overwrite); + } + catch (Exception e) + { + Exception wrappedException = new Exception("Failed to publish file with error: " + e.getMessage()); + publishFileSpan.recordException(wrappedException); + publishFileSpan.setStatus(StatusCode.ERROR); + throw wrappedException; + } + finally + { + publishFileSpan.end(); + } + + return recordsWritten; + } + + /** + * Generates an inferred schema based on an example Map of FieldNames to Example Field Objects. + * This function is targeted primary at helping PySpark users write datasets back to HPCC. + * @param exampleFields list of python spark fields + * @return Returns a valid Spark schema based on the example rowDictionary + * @throws Exception general exception + */ + public StructType inferSchema(List exampleFields) throws Exception + { + return generateRowSchema(exampleFields); + } + + private StructType generateRowSchema(List exampleFields) throws Exception + { + StructField[] fields = new StructField[exampleFields.size()]; + + int index = 0; + for (PySparkField fieldInfo : exampleFields) + { + fields[index] = generateSchemaField(fieldInfo.getName(), fieldInfo.getValue()); + index++; + } + + return new StructType(fields); + } + + private StructField generateSchemaField(String name, Object obj) throws Exception + { + Metadata empty = Metadata.empty(); + boolean nullable = false; + + DataType type = DataTypes.NullType; + if (obj instanceof String) + { + type = DataTypes.StringType; + } + else if (obj instanceof Byte) + { + type = DataTypes.ByteType; + } + else if (obj instanceof Short) + { + type = DataTypes.ShortType; + } + else if (obj instanceof Integer) + { + type = DataTypes.IntegerType; + } + else if (obj instanceof Long) + { + type = DataTypes.LongType; + } + else if (obj instanceof byte[]) + { + type = DataTypes.BinaryType; + } + else if (obj instanceof Boolean) + { + type = DataTypes.BooleanType; + } + else if (obj instanceof Float) + { + type = DataTypes.FloatType; + } + else if (obj instanceof Double) + { + type = DataTypes.DoubleType; + } + else if (obj instanceof BigDecimal) + { + BigDecimal decimal = (BigDecimal) obj; + int precision = decimal.precision(); + int scale = decimal.scale(); + + // Spark SQL only supports 38 digits in decimal values + if (precision > DecimalType.MAX_PRECISION()) + { + scale -= (precision - DecimalType.MAX_PRECISION()); + if (scale < 0) + { + scale = 0; + } + + precision = DecimalType.MAX_PRECISION(); + } + + type = DataTypes.createDecimalType(precision, scale); + } + else if (obj instanceof List) + { + List list = (List) obj; + if (list.size() == 0) + { + throw new Exception( + "Unable to infer row schema. Encountered an empty List: " + name + ". All lists must have an example row to infer schema."); + } + + Object firstChild = list.get(0); + if (firstChild instanceof PySparkField) + { + List rowFields = (List) (List) list; + type = generateRowSchema(rowFields); + } + else + { + StructField childField = generateSchemaField("temp", firstChild); + type = DataTypes.createArrayType(childField.dataType()); + nullable = true; + } + } + else + { + throw new Exception("Encountered unsupported type: " + obj.getClass().getName() + + ". Ensure that the entire example row hierarchy has been converted to a Dictionary. Including rows in child datasets."); + } + + return new StructField(name, type, nullable, empty); + } + +} diff --git a/spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/HpccRDD.java b/spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/HpccRDD.java new file mode 100644 index 000000000..0f8eedcf4 --- /dev/null +++ b/spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/HpccRDD.java @@ -0,0 +1,264 @@ +/******************************************************************************* + * HPCC SYSTEMS software Copyright (C) 2018 HPCC Systems®. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *******************************************************************************/ +package org.hpccsystems.spark; + +import java.io.Serializable; +import java.util.Iterator; + +import java.util.Arrays; + +import org.apache.spark.Dependency; +import org.apache.spark.InterruptibleIterator; +import org.apache.spark.Partition; +import org.apache.spark.SparkContext; +import org.apache.spark.TaskContext; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.rdd.RDD; +import org.apache.spark.sql.execution.python.EvaluatePython; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.types.StructType; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.LogManager; + +import org.hpccsystems.dfs.client.DataPartition; +import org.hpccsystems.dfs.client.HpccRemoteFileReader; + +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; + +import org.hpccsystems.commons.ecl.FieldDef; + +import scala.collection.JavaConverters; +import scala.collection.Seq; +import scala.collection.mutable.ArrayBuffer; +import scala.reflect.ClassTag; +import scala.reflect.ClassTag$; +import net.razorvine.pickle.Unpickler; + +/** + * The implementation of the RDD(GenericRowWithSchema) that allows reading a Dataset from HPCC Systems. + * + */ +public class HpccRDD extends RDD implements Serializable +{ + private static final long serialVersionUID = 1L; + private static final Logger log = LogManager.getLogger(HpccRDD.class); + private static final ClassTag CT_RECORD = ClassTag$.MODULE$.apply(Row.class); + + public static int DEFAULT_CONNECTION_TIMEOUT = 120; + + private String parentTraceID = ""; + private String parentSpanID = ""; + + private InternalPartition[] parts; + private FieldDef originalRecordDef = null; + private FieldDef projectedRecordDef = null; + private int connectionTimeout = DEFAULT_CONNECTION_TIMEOUT; + private int recordLimit = -1; + + private static void registerPicklingFunctions() + { + EvaluatePython.registerPicklers(); + Unpickler.registerConstructor("pyspark.sql.types", "Row", new RowConstructor()); + Unpickler.registerConstructor("pyspark.sql.types", "_create_row", new RowConstructor()); + } + + private class InternalPartition implements Partition + { + private static final long serialVersionUID = 1L; + + public DataPartition partition; + + public int hashCode() + { + return this.index(); + } + + public int index() + { + return partition.index(); + } + } + + /** + * @param sc spark context + * @param dataParts data parts + * @param originalRD original record definition + */ + public HpccRDD(SparkContext sc, DataPartition[] dataParts, FieldDef originalRD) + { + this(sc,dataParts,originalRD,originalRD); + } + + /** + * @param sc spark context + * @param dataParts data parts + * @param originalRD original record definition + * @param projectedRD projected record definition + */ + public HpccRDD(SparkContext sc, DataPartition[] dataParts, FieldDef originalRD, FieldDef projectedRD) + { + this(sc,dataParts,originalRD,projectedRD,DEFAULT_CONNECTION_TIMEOUT,-1); + } + + /** + * @param sc spark context + * @param dataParts data parts + * @param originalRD original record definition + * @param projectedRD projected record definition + * @param connectTimeout connection timeout + * @param limit file limit + */ + public HpccRDD(SparkContext sc, DataPartition[] dataParts, FieldDef originalRD, FieldDef projectedRD, int connectTimeout, int limit) + { + super(sc, new ArrayBuffer>(), CT_RECORD); + this.parts = new InternalPartition[dataParts.length]; + for (int i = 0; i < dataParts.length; i++) + { + this.parts[i] = new InternalPartition(); + this.parts[i].partition = dataParts[i]; + } + + this.originalRecordDef = originalRD; + this.projectedRecordDef = projectedRD; + this.connectionTimeout = connectTimeout; + this.recordLimit = limit; + } + + /** + * Set the trace context for this RDD. + * @param parentTraceID + * @param parentSpanID + */ + public void setTraceContext(String parentTraceID, String parentSpanID) + { + this.parentTraceID = parentTraceID; + this.parentSpanID = parentSpanID; + } + + /** + * Wrap this RDD as a JavaRDD so the Java API can be used. + * @return a JavaRDD wrapper of the HpccRDD. + */ + public JavaRDD asJavaRDD() + { + JavaRDD jRDD = new JavaRDD(this, CT_RECORD); + return jRDD; + } + + /* (non-Javadoc) + * @see org.apache.spark.rdd.RDD#compute(org.apache.spark.Partition, org.apache.spark.TaskContext) + */ + @Override + public InterruptibleIterator compute(Partition p_arg, TaskContext ctx) + { + HpccRDD.registerPicklingFunctions(); + + final InternalPartition this_part = (InternalPartition) p_arg; + final FieldDef originalRD = this.originalRecordDef; + final FieldDef projectedRD = this.projectedRecordDef; + + if (originalRD == null) + { + log.error("Original record defintion is null. Aborting."); + return null; + } + + if (projectedRD == null) + { + log.error("Projected record defintion is null. Aborting."); + return null; + } + + scala.collection.Iterator iter = null; + try + { + Span sparkPartReadSpan = Utils.createChildSpan(parentTraceID, parentSpanID, "HpccRDD.Compute/Read_" + this_part.partition.getThisPart()); + + // Add taskID, stageID, stage attempt number, and task attempt number as attributes to the span + sparkPartReadSpan.setAttribute("task.id", ctx.taskAttemptId()); + sparkPartReadSpan.setAttribute("task.attempt", ctx.attemptNumber()); + sparkPartReadSpan.setAttribute("stage.id", ctx.stageId()); + sparkPartReadSpan.setAttribute("stage.attempt", ctx.stageAttemptNumber()); + + // Defaulting to OK, to reduce state tracking complexity below. If an error occurs this will be overwritten + sparkPartReadSpan.setStatus(StatusCode.OK); + + HpccRemoteFileReader.FileReadContext context = new HpccRemoteFileReader.FileReadContext(); + context.originalRD = originalRD; + context.connectTimeout = connectionTimeout; + context.recordReadLimit = recordLimit; + context.parentSpan = sparkPartReadSpan; + final HpccRemoteFileReader fileReader = new HpccRemoteFileReader(context, this_part.partition, new GenericRowRecordBuilder(projectedRD)); + + // This will be called for both failure & success + ctx.addTaskCompletionListener(taskContext -> + { + if (fileReader != null) + { + try + { + fileReader.close(); + } + catch(Exception e) + { + log.error("Error while attempting to close file reader: " + e.getMessage()); + sparkPartReadSpan.recordException(e); + } + } + + if (taskContext.isInterrupted()) + { + sparkPartReadSpan.setStatus(StatusCode.ERROR); + } + + sparkPartReadSpan.end(); + }); + + // This will be called before the above completion listener + ctx.addTaskFailureListener((TaskContext taskContext, Throwable error) -> { + sparkPartReadSpan.recordException(error); + sparkPartReadSpan.setStatus(StatusCode.ERROR); + }); + + iter = JavaConverters.asScalaIteratorConverter(fileReader).asScala(); + } + catch (Exception e) + { + log.error("Failed to create remote file reader with error: " + e.getMessage()); + return null; + } + + return new InterruptibleIterator(ctx, iter); + } + + @Override + public Seq getPreferredLocations(Partition split) + { + final InternalPartition part = (InternalPartition) split; + return JavaConverters.asScalaBufferConverter(Arrays.asList(part.partition.getCopyLocations()[0])).asScala().seq(); + } + + /* (non-Javadoc) + * @see org.apache.spark.rdd.RDD#getPartitions() + */ + @Override + public Partition[] getPartitions() + { + return parts; + } + +} diff --git a/spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/PySparkField.java b/spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/PySparkField.java new file mode 100644 index 000000000..a34334b4f --- /dev/null +++ b/spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/PySparkField.java @@ -0,0 +1,38 @@ +/******************************************************************************* + * HPCC SYSTEMS software Copyright (C) 2019 HPCC Systems®. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *******************************************************************************/ +package org.hpccsystems.spark; + +public class PySparkField +{ + private String name = null; + private Object value = null; + + public PySparkField(String n, Object v) + { + name = n; + value = v; + } + + public String getName() + { + return name; + } + + public Object getValue() + { + return value; + } +} diff --git a/spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/PySparkFieldConstructor.java b/spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/PySparkFieldConstructor.java new file mode 100644 index 000000000..4b6ae4cca --- /dev/null +++ b/spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/PySparkFieldConstructor.java @@ -0,0 +1,39 @@ +/******************************************************************************* + * HPCC SYSTEMS software Copyright (C) 2019 HPCC Systems®. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *******************************************************************************/ +package org.hpccsystems.spark; + +import net.razorvine.pickle.IObjectConstructor; +import net.razorvine.pickle.PickleException; + +public class PySparkFieldConstructor implements IObjectConstructor +{ + + public PySparkFieldConstructor() {} + + @Override + public Object construct(Object[] tupleFields) throws PickleException + { + // PySparkFields consist of two fields Name & Value + if (tupleFields.length != 2 || tupleFields[0] instanceof String == false) + { + throw new PickleException("Unexpected PySparkField data layout."); + } + + String name = (String) tupleFields[0]; + Object value = (Object) tupleFields[1]; + return new PySparkField(name,value); + } +} diff --git a/spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/RowConstructor.java b/spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/RowConstructor.java new file mode 100644 index 000000000..1651726f3 --- /dev/null +++ b/spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/RowConstructor.java @@ -0,0 +1,40 @@ +/******************************************************************************* + * HPCC SYSTEMS software Copyright (C) 2019 HPCC Systems®. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *******************************************************************************/ +package org.hpccsystems.spark; + +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema; + +import net.razorvine.pickle.IObjectConstructor; +import net.razorvine.pickle.PickleException; + +public class RowConstructor implements IObjectConstructor +{ + + public RowConstructor() {} + + @Override + public Object construct(Object[] tupleFields) throws PickleException + { + // PySpark Rows consist of two properties. An ArrayList of field names and Object[] array of field values. + if (tupleFields.length != 2) + { + throw new PickleException("Unexpected Row data layout."); + } + + Object[] rowFields = (Object[]) tupleFields[1]; + return new GenericRowWithSchema(rowFields,null); + } +} diff --git a/spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/SparkSchemaTranslator.java b/spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/SparkSchemaTranslator.java new file mode 100644 index 000000000..c1199b102 --- /dev/null +++ b/spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/SparkSchemaTranslator.java @@ -0,0 +1,191 @@ +/******************************************************************************* + * HPCC SYSTEMS software Copyright (C) 2019 HPCC Systems®. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *******************************************************************************/ +package org.hpccsystems.spark; + +import org.hpccsystems.commons.ecl.FieldDef; +import org.hpccsystems.commons.ecl.FieldType; +import org.hpccsystems.commons.ecl.HpccSrcType; + +import org.apache.spark.sql.types.*; + +/** + * A helper class that translates an HPCC Systems record defintion to a Spark Schema. + */ +public class SparkSchemaTranslator +{ + private static StructField toSchemaElement(FieldDef field) throws Exception + { + Metadata empty = Metadata.empty(); + boolean nullable = false; + + DataType type = DataTypes.NullType; + switch (field.getFieldType()) + { + case VAR_STRING: + case STRING: + case CHAR: + type = DataTypes.StringType; + break; + case FILEPOS: + case INTEGER: + type = DataTypes.LongType; + break; + case BINARY: + type = DataTypes.BinaryType; + break; + case BOOLEAN: + type = DataTypes.BooleanType; + break; + case REAL: + type = DataTypes.DoubleType; + break; + case DECIMAL: + int precision = field.getPrecision(); + int scale = field.getScale(); + + // Spark SQL only supports 38 digits in decimal values + if (precision > DecimalType.MAX_PRECISION()) + { + scale -= (precision - DecimalType.MAX_PRECISION()); + if (scale < 0) + { + scale = 0; + } + + precision = DecimalType.MAX_PRECISION(); + } + + type = DataTypes.createDecimalType(precision,scale); + break; + case SET: + case DATASET: + StructField childField = toSchemaElement(field.getDef(0)); + type = DataTypes.createArrayType(childField.dataType()); + nullable = true; + break; + case RECORD: + StructField[] childFields = new StructField[field.getNumDefs()]; + for (int i = 0; i < field.getNumDefs(); i++) + { + childFields[i] = toSchemaElement(field.getDef(i)); + } + type = DataTypes.createStructType(childFields); + break; + case UNKNOWN: + type = DataTypes.NullType; + throw new Exception("Conversion from HPCC FieldDef to Spark StructField failed. Encountered unknown type."); + } + + return new StructField(field.getFieldName(), type, nullable, empty); + } + + /** + * translate a FieldDef into a StructField object of the schema + * @param recordDef field definition + * @return datatype from fields + * @throws Exception general exception + */ + public static StructType toSparkSchema(FieldDef recordDef) throws Exception + { + if (recordDef.getFieldType() != FieldType.RECORD) + { + return null; + } + + StructField[] fields = new StructField[recordDef.getNumDefs()]; + for (int i = 0; i < recordDef.getNumDefs(); i++) + { + fields[i] = toSchemaElement(recordDef.getDef(i)); + } + + return DataTypes.createStructType(fields); + } + + private static FieldDef toFieldDef(StructField sparkField) throws Exception + { + DataType type = sparkField.dataType(); + if (type instanceof ArrayType) { + ArrayType array = (ArrayType) sparkField.dataType(); + StructField tempField = new StructField(sparkField.name(),array.elementType(), + false,Metadata.empty()); + + FieldDef[] childDef = new FieldDef[1]; + childDef[0] = toFieldDef(tempField); + + if (array.elementType() instanceof StructType) { + return new FieldDef(sparkField.name(), FieldType.DATASET, "DATASET", 0, false, false, + HpccSrcType.LITTLE_ENDIAN, childDef); + } else { + return new FieldDef(sparkField.name(), FieldType.SET, "SET", 0, false, false, + HpccSrcType.LITTLE_ENDIAN, childDef); + } + } else if (type instanceof BinaryType) { + return new FieldDef(sparkField.name(), FieldType.BINARY, "BINARY", 0, false, false, + HpccSrcType.LITTLE_ENDIAN, new FieldDef[0]); + } else if (type instanceof BooleanType) { + return new FieldDef(sparkField.name(), FieldType.BOOLEAN, "BOOL", 1, true, false, + HpccSrcType.LITTLE_ENDIAN, new FieldDef[0]); + } else if (type instanceof ByteType) { + return new FieldDef(sparkField.name(), FieldType.INTEGER, "INTEGER1", 1, true, false, + HpccSrcType.LITTLE_ENDIAN, new FieldDef[0]); + } else if (type instanceof DecimalType) { + FieldDef ret = new FieldDef(sparkField.name(), FieldType.DECIMAL, "DECIMAL", 1, true, false, + HpccSrcType.LITTLE_ENDIAN, new FieldDef[0]); + DecimalType decimal = (DecimalType) type; + ret.setPrecision(decimal.precision()); + ret.setScale(decimal.scale()); + return ret; + } else if (type instanceof DoubleType) { + return new FieldDef(sparkField.name(), FieldType.REAL, "REAL8", 8, true, false, + HpccSrcType.LITTLE_ENDIAN, new FieldDef[0]); + } else if (type instanceof FloatType) { + return new FieldDef(sparkField.name(), FieldType.REAL, "REAL4", 4, true, false, + HpccSrcType.LITTLE_ENDIAN, new FieldDef[0]); + } else if (type instanceof IntegerType) { + return new FieldDef(sparkField.name(), FieldType.INTEGER, "INTEGER4", 4, true, false, + HpccSrcType.LITTLE_ENDIAN, new FieldDef[0]); + } else if (type instanceof LongType) { + return new FieldDef(sparkField.name(), FieldType.INTEGER, "INTEGER8", 8, true, false, + HpccSrcType.LITTLE_ENDIAN, new FieldDef[0]); + } else if (type instanceof ShortType) { + return new FieldDef(sparkField.name(), FieldType.INTEGER, "INTEGER2", 2, true, false, + HpccSrcType.LITTLE_ENDIAN, new FieldDef[0]); + } else if (type instanceof StringType) { + return new FieldDef(sparkField.name(), FieldType.STRING, "UTF8", 0, false, false, + HpccSrcType.UTF8, new FieldDef[0]); + } else if (type instanceof StructType) { + StructType schema = (StructType) type; + StructField schemaFields[] = schema.fields(); + FieldDef[] children = new FieldDef[schemaFields.length]; + + for (int i = 0; i < schemaFields.length; i++) { + children[i] = toFieldDef(schemaFields[i]); + } + + return new FieldDef(sparkField.name(), FieldType.RECORD, "RECORD", 0, false, false, + HpccSrcType.LITTLE_ENDIAN,children); + } else { + throw new Exception("Conversion from Spark StuctField to HPCC FieldDef failed. Encountered unexpected type: " + type); + } + } + + public static FieldDef toHPCCRecordDef(StructType schema) throws Exception + { + + StructField tempField = new StructField("RootRecord",schema,false,Metadata.empty()); + return toFieldDef(tempField); + } +} diff --git a/spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/Utils.java b/spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/Utils.java new file mode 100644 index 000000000..da45d053e --- /dev/null +++ b/spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/Utils.java @@ -0,0 +1,127 @@ +/******************************************************************************* + * HPCC SYSTEMS software Copyright (C) 2024 HPCC Systems®. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + *******************************************************************************/ +package org.hpccsystems.spark; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.SpanId; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.TraceFlags; +import io.opentelemetry.api.trace.TraceId; +import io.opentelemetry.api.trace.TraceState; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.LogManager; + +public class Utils +{ + private static Logger log = LogManager.getLogger(Utils.class); + + private static OpenTelemetry globalOpenTelemetry = null; + private static Tracer dfsClientTracer = null; + + static + { + System.setProperty("otel.service.name", "org.hpccsystems.spark"); + } + + public static OpenTelemetry getOpenTelemetry() + { + if (globalOpenTelemetry == null) + { + if (Boolean.getBoolean("otel.java.global-autoconfigure.enabled")) + { + globalOpenTelemetry = AutoConfiguredOpenTelemetrySdk.initialize().getOpenTelemetrySdk(); + } + else + { + globalOpenTelemetry = GlobalOpenTelemetry.get(); + } + } + + return globalOpenTelemetry; + } + + public static Tracer getTelemetryTracer() + { + if (dfsClientTracer == null) + { + dfsClientTracer = getOpenTelemetry().getTracer("org.hpccsystems.spark"); + } + + return dfsClientTracer; + } + + public static Span createSpan(String name) + { + return createChildSpan(null, name); + } + + public static Span createChildSpan(Span parentSpan, String name) + { + Span span = null; + if (parentSpan == null) + { + span = Utils.getTelemetryTracer().spanBuilder(name) + .setSpanKind(SpanKind.CLIENT) + .startSpan(); + } + else + { + span = Utils.getTelemetryTracer().spanBuilder(name) + .setParent(Context.current().with(parentSpan)) + .setSpanKind(SpanKind.CLIENT) + .startSpan(); + } + + span.makeCurrent(); + return span; + } + + public static Span createChildSpan(String traceID, String parentSpanID, String childName) + { + // Check if traceID & parentSpanID are valid + if (!TraceId.isValid(traceID)) + { + log.error("Error creating child span, invalid parent traceID: " + traceID + ". Creating a disconnected span."); + return createSpan(childName); + } + + if (!SpanId.isValid(parentSpanID)) + { + log.error("Error creating child span, invalid parent spanID: " + parentSpanID + ". Creating a disconnected span."); + return createSpan(childName); + } + + SpanContext parentSpanContext = SpanContext.createFromRemoteParent( + traceID, + parentSpanID, + TraceFlags.getSampled(), + TraceState.getDefault() + ); + Context parentContext = Context.current().with(Span.wrap(parentSpanContext)); + + Span childSpan = getTelemetryTracer().spanBuilder(childName) + .setParent(parentContext) + .setSpanKind(SpanKind.CLIENT) + .startSpan(); + childSpan.makeCurrent(); + return childSpan; + } + +} diff --git a/spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/datasource/HpccOptions.java b/spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/datasource/HpccOptions.java new file mode 100644 index 000000000..cc90aa554 --- /dev/null +++ b/spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/datasource/HpccOptions.java @@ -0,0 +1,154 @@ +package org.hpccsystems.spark.datasource; + +import java.util.TreeMap; + +import org.hpccsystems.dfs.client.CompressionAlgorithm; +import org.hpccsystems.ws.client.utils.Connection; + +/** + * A helper class that extracts options from the key value pairs provided by Spark. + */ +public class HpccOptions +{ + public Connection connectionInfo = null; + public String clusterName = null; + public String fileName = null; + public String projectList = null; + public CompressionAlgorithm compression = CompressionAlgorithm.DEFAULT; + public String filterString = null; + public int expirySeconds = 120; + public int filePartLimit = -1; + public boolean useTLK = false; + + public String traceID = ""; + public String spanID = ""; + + public HpccOptions(TreeMap parameters) throws Exception + { + // Extract connection + String connectionString = null; + if (parameters.containsKey("host")) + { + connectionString = (String) parameters.get("host"); + } + + String username = null; + if (parameters.containsKey("username")) + { + username = (String) parameters.get("username"); + } + + String password = null; + if (parameters.containsKey("password")) + { + password = (String) parameters.get("password"); + } + + connectionInfo = new Connection(connectionString); + connectionInfo.setUserName(username); + connectionInfo.setPassword(password); + + if (parameters.containsKey("path")) + { + fileName = (String) parameters.get("path"); + + // Remove leading forward slashes + fileName = fileName.replaceAll("^/+", ""); + + String[] filePathParts = fileName.split("/|::"); + fileName = String.join("::", filePathParts); + + } + + if (parameters.containsKey("cluster")) + { + clusterName = (String) parameters.get("cluster"); + } + + // Use default value in HpccOptions + if (parameters.containsKey("fileaccesstimeout")) + { + String timeoutStr = (String) parameters.get("fileaccesstimeout"); + expirySeconds = Integer.parseInt(timeoutStr); + } + + if (parameters.containsKey("limitperfilepart")) + { + String limitStr = (String) parameters.get("limitperfilepart"); + filePartLimit = Integer.parseInt(limitStr); + } + + if (parameters.containsKey("projectlist")) + { + projectList = (String) parameters.get("projectlist"); + } + + if (parameters.containsKey("usetlk")) + { + String useTLKStr = (String) parameters.get("usetlk"); + useTLK = Boolean.parseBoolean(useTLKStr.toLowerCase()); + } + + compression = CompressionAlgorithm.DEFAULT; + if (parameters.containsKey("compression")) + { + + String compressionStr = (String) parameters.get("compression"); + compressionStr = compressionStr.toLowerCase(); + + switch (compressionStr) + { + case "none": + { + compression = CompressionAlgorithm.NONE; + break; + } + case "lz4": + { + compression = CompressionAlgorithm.LZ4; + break; + } + case "flz": + { + compression = CompressionAlgorithm.FLZ; + break; + } + case "lzw": + { + compression = CompressionAlgorithm.LZW; + break; + } + default: + { + compression = CompressionAlgorithm.DEFAULT; + break; + } + } + } + + if (parameters.containsKey("filter")) + { + filterString = (String) parameters.get("filter"); + } + + if (parameters.containsKey("traceid")) + { + traceID = (String) parameters.get("traceid"); + } + + if (parameters.containsKey("spanid")) + { + spanID = (String) parameters.get("spanid"); + } + } + + @Override + public String toString() + { + String tostring = "[Connection: '" + connectionInfo + "', " + "clusterName: '" + clusterName; + tostring += "', fileName: '" + fileName + "', projectList: '" + projectList + "', compression: '" + compression + "',"; + tostring += "filterString: '" + filterString + "', " + "expirySeconds: '" + expirySeconds + "', filePartLimit: '" + filePartLimit + "']"; + + return tostring; + } +} diff --git a/spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/datasource/HpccRelation.java b/spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/datasource/HpccRelation.java new file mode 100644 index 000000000..43e26124e --- /dev/null +++ b/spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/datasource/HpccRelation.java @@ -0,0 +1,251 @@ +package org.hpccsystems.spark.datasource; + +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.logging.log4j.LogManager; + +import org.apache.spark.rdd.RDD; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.sources.BaseRelation; +import org.apache.spark.sql.sources.Filter; +import org.apache.spark.sql.sources.PrunedFilteredScan; +import org.apache.spark.sql.types.StructType; +import org.hpccsystems.commons.ecl.FileFilter; +import org.hpccsystems.spark.FileFilterConverter; +import org.hpccsystems.spark.HpccFile; +import org.hpccsystems.spark.SparkSchemaTranslator; +import org.hpccsystems.spark.Utils; + +import io.opentelemetry.api.trace.Span; + +/** + * Represents a dataset in HPCC Systems with a known schema. + */ +public class HpccRelation extends BaseRelation implements PrunedFilteredScan +{ + private static Logger log = LogManager.getLogger(HpccRelation.class); + + private HpccFile hpccFile = null; + private SQLContext sqlContext = null; + private HpccOptions options = null; + + public HpccRelation(SQLContext context, HpccOptions opts) + { + // Make sure we setup opentelemetry before HPCC4j + Utils.getOpenTelemetry(); + + sqlContext = context; + options = opts; + } + + public HpccFile getFile() + { + if (hpccFile == null) + { + try + { + hpccFile = new HpccFile(options.fileName, options.connectionInfo); + hpccFile.setTargetfilecluster(options.clusterName); + hpccFile.setFileAccessExpirySecs(options.expirySeconds); + hpccFile.setUseTLK(options.useTLK); + + hpccFile.setTraceContext(options.traceID, options.spanID); + + if (options.projectList != null) + { + hpccFile.setProjectList(options.projectList); + } + } + catch (Exception e) + { + String error = "Unable to construct HccFile with error: " + e.getMessage(); + log.error(error); + throw new RuntimeException(error); + } + + hpccFile.setFilePartRecordLimit(options.filePartLimit); + + try + { + if (options.filterString != null) + { + hpccFile.setFilter(options.filterString); + } + } + catch (Exception e) + { + String error = "Unable to set filter: " + options.filterString + " on HpccFile with error: " + e.getMessage(); + log.error(error); + throw new RuntimeException(error); + } + + try + { + if (options.projectList != null) + { + hpccFile.setProjectList(options.projectList); + } + } + catch (Exception e) + { + String error = "Unable to set project list: " + options.projectList + " on HpccFile with error: " + e.getMessage(); + log.error(error); + throw new RuntimeException(error); + } + } + + return hpccFile; + } + + @Override + public boolean needConversion() + { + return true; + } + + @Override + public StructType schema() + { + HpccFile file = getFile(); + + StructType schema = null; + try + { + schema = SparkSchemaTranslator.toSparkSchema(file.getProjectedRecordDefinition()); + } + catch (Exception e) + { + String error = "Unable to translate HPCC record defintion to Spark schema:" + e.getMessage(); + log.error(error); + throw new RuntimeException(error); + } + + return schema; + } + + public long sizeInBytes() + { + return super.sizeInBytes(); + } + + @Override + public SQLContext sqlContext() + { + return sqlContext; + } + + @Override + public Filter[] unhandledFilters(Filter[] filters) + { + List unhandledFilters = new ArrayList(); + for (Filter filter : filters) + { + try + { + FileFilter filefilter = FileFilterConverter.ConvertToHPCCFileFilterString(filter); //somewhat expensive action + if (filefilter != null && !filefilter.isEmpty()) + continue; + } + catch (Exception e) + { + log.warn("Unsupported filter: " + filter.toString() + " with error: " + e.getMessage()); + } + unhandledFilters.add(filter); + } + + return unhandledFilters.toArray(new Filter[0]); + } + + @Override + public RDD buildScan(String[] requiredColumns, Filter[] filters) + { + String projectList = String.join(", ", requiredColumns); + + RDD ret = null; + try + { + HpccFile file = getFile(); + + if (filters != null && filters.length != 0) + { + try + { + FileFilter filefilter = FileFilterConverter.CovertToHPCCFileFilter(filters); + if (filefilter != null && !filefilter.isEmpty()) + file.setFilter(filefilter); + } + catch (Exception e) + { + log.error("Could not apply filter(s) to File '" + file.getFileName() + "': " + e.getLocalizedMessage() ); + throw new RuntimeException("Could not apply filter(s) to File '" + file.getFileName() + "': " + e.getLocalizedMessage()); + } + } + + if (options.projectList != null) + { + projectList = options.projectList; + } + file.setProjectList(projectList); + + ret = file.getRDD(sqlContext.sparkContext()); + } + catch (Exception e) + { + String error = "Unable to create HpccRDD with error: " + e.getMessage(); + log.error(error); + throw new RuntimeException(error); + } + + return ret; + } + + public boolean equals(Object rhs) + { + if (rhs instanceof HpccRelation) + { + HpccRelation otherRelation = (HpccRelation) rhs; + HpccOptions otherOptions = otherRelation.options; + + boolean nameMatches = otherOptions.fileName.equals(options.fileName); + + boolean projectListMatches = true; + if (otherOptions.projectList != null) + { + if (options.projectList != null) + { + projectListMatches = otherOptions.projectList.equals(options.projectList); + } + else + { + return false; + } + } + + // The filter string can be overriden during buildScan, but this will only lead to + // false negatives. So we don't have to worry about that. + boolean filterStringMatches = true; + if (otherOptions.filterString != null) + { + if (options.filterString != null) + { + otherOptions.filterString.equals(options.filterString); + } + else + { + return false; + } + } + + boolean hostMatches = options.connectionInfo.getHost().equals(otherOptions.connectionInfo.getHost()); + boolean filePartLimitMatches = otherOptions.filePartLimit == options.filePartLimit; + + return hostMatches && nameMatches && projectListMatches && filterStringMatches && filePartLimitMatches; + } + + return false; + } +} diff --git a/spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/datasource/HpccRelationProvider.java b/spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/datasource/HpccRelationProvider.java new file mode 100644 index 000000000..bf5df9e35 --- /dev/null +++ b/spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/datasource/HpccRelationProvider.java @@ -0,0 +1,111 @@ +package org.hpccsystems.spark.datasource; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.SaveMode; + +import org.apache.spark.sql.sources.BaseRelation; +import org.apache.spark.sql.sources.CreatableRelationProvider; +import org.apache.spark.sql.sources.DataSourceRegister; +import org.apache.spark.sql.sources.RelationProvider; + +import org.hpccsystems.ws.client.wrappers.ArrayOfEspExceptionWrapper; +import org.hpccsystems.spark.HpccFileWriter; + +import scala.collection.JavaConversions; +import scala.collection.immutable.Map; + +import java.util.TreeMap; + +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.LogManager; + +/** + * Registers HPCC Systems as a DataSource. Allows reading of a dataset in HPCC Systems and writing a new dataset to HPCC Systems. + */ +public class HpccRelationProvider implements RelationProvider, CreatableRelationProvider, DataSourceRegister +{ + private static Logger log = LogManager.getLogger(HpccRelationProvider.class); + + @Override + public String shortName() + { + return "hpcc"; + } + + @Override + public BaseRelation createRelation(SQLContext sqlContext, Map parameters) + { + HpccOptions options = null; + try + { + options = extractOptions(parameters); + } + catch (Exception e) + { + String error = "Error while attempting to extract HpccOptions: " + e.getMessage(); + log.error(error); + throw new RuntimeException(error); + } + return new HpccRelation(sqlContext, options); + } + + @Override + public BaseRelation createRelation(SQLContext sqlContext, SaveMode mode, Map parameters, Dataset data) + { + boolean overwrite = false; + if (mode == SaveMode.Append) + { + String error = "Append mode is not supported in HPCC."; + log.error(error); + throw new RuntimeException(error); + } + else if (mode == SaveMode.Overwrite) + { + overwrite = true; + } + + HpccOptions options = null; + try + { + options = extractOptions(parameters); + } + catch (Exception e) + { + String error = "Error while attempting to extract HpccOptions: " + e.getMessage(); + log.error(error); + throw new RuntimeException(error); + } + + try + { + HpccFileWriter fileWriter = new HpccFileWriter(options.connectionInfo); + fileWriter.setTraceContext(options.traceID, options.spanID); + fileWriter.saveToHPCC(sqlContext.sparkContext(), data.schema(), data.rdd().toJavaRDD(), options.clusterName, options.fileName, + options.compression, overwrite); + + // Attempt to open the new file + return createRelation(sqlContext, parameters); + } + catch (Exception e) + { + String error = "Unable to save file to HPCC with error: " + e.getMessage(); + log.error(error); + throw new RuntimeException(error,e); + } + } + + private HpccOptions extractOptions(Map scalaParams) throws Exception + { + java.util.Map parameters = JavaConversions.mapAsJavaMap(scalaParams); + + // Use TreeMap to avoid case sensitivity + TreeMap paramTreeMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); + paramTreeMap.putAll(parameters); + + HpccOptions options = new HpccOptions(paramTreeMap); + return options; + } + +} diff --git a/spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/datasource/package-info.java b/spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/datasource/package-info.java new file mode 100644 index 000000000..9bf54e0f5 --- /dev/null +++ b/spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/datasource/package-info.java @@ -0,0 +1,6 @@ +/** + *
+ * Provides mechanism to stream HPCC Systems data via Spark Relation.
+ * 
+ */ +package org.hpccsystems.spark.datasource; diff --git a/spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/package-info.java b/spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/package-info.java new file mode 100644 index 000000000..5f612d7f1 --- /dev/null +++ b/spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/package-info.java @@ -0,0 +1,38 @@ +/******************************************************************************* + * HPCC SYSTEMS software Copyright (C) 2018 HPCC Systems®. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *******************************************************************************/ +/** + * Provides access to data residing in HPCC Systems or Spark environments. + * + * The DFSClient from HPCC Systems is used to used to access + * HPCC Systems data files' metadata including the location and layout of the file, and + * also requests data file access privilages. + * An RDD is provided to read the file in parallel by file part. + * + * The main classes are: + *
    + *
  • Content is the abstract class defining field content. There are concrete + * classes for each of the different content types.
  • + *
  • FieldType is an enumeration type listing the types of content.
  • + *
  • HpccPart implements the Spark Partition interface.
  • + *
  • HpccFile is the metadata for a file on an HPCC THOR cluster.
  • + *
  • HpccFileException is the general exception class.
  • + *
  • HpccRDD extends RDD(Record) class for Spark.
  • + *
  • HpccRemoteFileReader is the facade for the type of file reader.
  • + *
  • Record is the container class holding the data for a record from THOR.
  • + *
+ * + */ +package org.hpccsystems.spark; diff --git a/spark-hpcc/DataAccess/src/main/javadoc/overview.html b/spark-hpcc/DataAccess/src/main/javadoc/overview.html new file mode 100644 index 000000000..5828b7418 --- /dev/null +++ b/spark-hpcc/DataAccess/src/main/javadoc/overview.html @@ -0,0 +1,7 @@ + +
+This project enables HPCC Systems / Spark interoperability.
+
+The DataAccess project contains the classes which expose distributed streaming of HPCC based data via Spark constructs. In addition, the HPCC data is exposed as a Dataframe for the convenience of the Spark developer.
+    
+ \ No newline at end of file diff --git a/spark-hpcc/DataAccess/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/spark-hpcc/DataAccess/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister new file mode 100644 index 000000000..570936da3 --- /dev/null +++ b/spark-hpcc/DataAccess/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -0,0 +1 @@ +org.hpccsystems.spark.datasource.HpccRelationProvider diff --git a/spark-hpcc/DataAccess/src/main/resources/log4j.properties b/spark-hpcc/DataAccess/src/main/resources/log4j.properties new file mode 100644 index 000000000..b8747f7c2 --- /dev/null +++ b/spark-hpcc/DataAccess/src/main/resources/log4j.properties @@ -0,0 +1,27 @@ +# Set everything to be logged to the console +log4j.rootCategory=INFO, console +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n + +# Set the default spark-shell log level to WARN. When running the spark-shell, the +# log level for this class is used to overwrite the root logger's log level, so that +# the user can have different defaults for the shell and regular Spark apps. +log4j.logger.org.apache.spark.repl.Main=WARN + +# Settings to quiet third party logs that are too verbose +log4j.logger.org.spark_project.jetty=WARN +log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR +log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO +log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO +log4j.logger.org.apache.parquet=ERROR +log4j.logger.parquet=ERROR +log4j.logger.org.apache.axis2.enterprise=FATAL +log4j.logger.de.hunsicker.jalopy.io=FATAL +log4j.logger.httpclient.wire.header=FATAL +log4j.logger.org.apache.commons.httpclient=FATAL + +# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support +log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL +log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR \ No newline at end of file diff --git a/spark-hpcc/DataAccess/src/test/java/org/hpccsystems/spark/BaseIntegrationTest.java b/spark-hpcc/DataAccess/src/test/java/org/hpccsystems/spark/BaseIntegrationTest.java new file mode 100644 index 000000000..b7d621f0f --- /dev/null +++ b/spark-hpcc/DataAccess/src/test/java/org/hpccsystems/spark/BaseIntegrationTest.java @@ -0,0 +1,167 @@ +/******************************************************************************* + * HPCC SYSTEMS software Copyright (C) 2023 HPCC Systems®. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *******************************************************************************/ +package org.hpccsystems.spark; + +import java.io.File; +import java.io.FilenameFilter; +import java.net.URL; +import java.nio.file.Path; +import java.nio.file.Paths; + +import org.apache.spark.SparkConf; +import org.apache.spark.SparkContext; +import org.apache.spark.sql.SparkSession; + +class BaseIntegrationTest +{ + static SparkContext sparkContext = null; + + public File findRecentlyBuiltSparkJar() + { + try + { + URL url = BaseIntegrationTest.class.getProtectionDomain().getCodeSource().getLocation(); + Path parentPath = Paths.get(url.toURI()).getParent(); + + FilenameFilter filter = new FilenameFilter() + { + @Override + public boolean accept(File dir, String name) + { + return name.matches("spark-hpcc.*-jar-with-dependencies\\.jar"); + } + }; + + File[] files = parentPath.toFile().listFiles(filter); + if (files != null && files.length > 0) + { + // Return the mostly recently modified Spark jar. This should always be the correct jar + // as the integration tests will run right after the build step is complete. + File mostRecentlyModifiedFile = null; + long lastModifiedTime = Long.MIN_VALUE; + + for (File file : files) + { + long modifiedTime = file.lastModified(); + if (modifiedTime > lastModifiedTime) + { + mostRecentlyModifiedFile = file; + lastModifiedTime = modifiedTime; + } + } + + return mostRecentlyModifiedFile; + } + } catch (Exception e) + { + System.out.println("Error finding spark jar file with exception: " + e.getMessage()); + } + + return null; + } + + public SparkConf getDefaultSparkConf() + { + File sparkJar = findRecentlyBuiltSparkJar(); + + String sparkJarPath = ""; + if (sparkJar != null) + { + sparkJarPath = sparkJar.getAbsolutePath(); + System.out.println("Spark jar: " + sparkJarPath); + } + else + { + System.out.println("Unable to find spark jar matching pattern: spark-hpcc.*-jar-with-dependencies.jar, " + + "in directory [PROJECT_ROOT]/DataAccess/target/, check maven package / verify output for errors."); + } + + String[] jars = { + sparkJarPath + }; + + return new SparkConf() + .setMaster("local") + .setAppName("Spark-HPCC-Connector-Test") + .set("spark.driver.allowMultipleContexts", "false") + .set("spark.sql.allowMultipleContexts", "false") + .setJars(jars); + } + + public SparkContext getOrCreateSparkContext() + { + if (sparkContext != null) + { + return sparkContext; + } + + return getOrCreateSparkContext(getDefaultSparkConf()); + } + + public SparkContext getOrCreateSparkContext(SparkConf conf) + { + if (sparkContext != null) + { + sparkContext.stop(); + SparkSession.clearActiveSession(); + SparkSession.clearDefaultSession(); + + sparkContext = new SparkContext(conf); + } + + return sparkContext; + } + + public SparkSession getOrCreateSparkSession() + { + SparkSession spark = SparkSession + .builder() + .appName("Spark-HPCC-Connector-Test") + .config(getDefaultSparkConf()) + .getOrCreate(); + return spark; + } + + public SparkSession getOrCreateSparkSession(SparkConf conf) + { + SparkSession spark = SparkSession + .builder() + .appName("Spark-HPCC-Connector-Test") + .config(conf) + .getOrCreate(); + return spark; + } + + public String getHPCCClusterURL() + { + return System.getProperty("hpccconn", "https://eclwatch.default:8010"); + } + + public String getHPCCClusterUser() + { + return System.getProperty("hpccuser", ""); + } + + public String getHPCCClusterPass() + { + return System.getProperty("hpccpass", ""); + } + + public String getThorCluster() + { + return System.getProperty("thorclustername", "data"); + } +} diff --git a/spark-hpcc/DataAccess/src/test/java/org/hpccsystems/spark/DataframeIntegrationTest.java b/spark-hpcc/DataAccess/src/test/java/org/hpccsystems/spark/DataframeIntegrationTest.java new file mode 100644 index 000000000..46123424a --- /dev/null +++ b/spark-hpcc/DataAccess/src/test/java/org/hpccsystems/spark/DataframeIntegrationTest.java @@ -0,0 +1,179 @@ +/******************************************************************************* + * HPCC SYSTEMS software Copyright (C) 2023 HPCC Systems®. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *******************************************************************************/ +package org.hpccsystems.spark; + +import java.util.List; +import java.math.BigDecimal; +import java.util.ArrayList; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; + +import org.junit.Assert; +import org.junit.Test; + +public class DataframeIntegrationTest extends BaseIntegrationTest +{ + + @Test + public void integerKeyValueWriteReadTest() + { + SparkSession spark = getOrCreateSparkSession(); + + // Create the schema + StructType schema = DataTypes.createStructType(new StructField[] { + DataTypes.createStructField("key", DataTypes.LongType, false), + DataTypes.createStructField("value", DataTypes.LongType, false) + }); + + // Write dataset to HPCC + List rows = new ArrayList(); + for (int i = 0; i < 1000; i++) { + Object[] fields = new Object[2]; + fields[0] = Long.valueOf(i); + fields[1] = Long.valueOf(i); + rows.add(new GenericRowWithSchema(fields, schema)); + } + + Dataset writtenDataSet = spark.createDataFrame(rows, schema); + + String datasetPath = "spark::test::integer_kv"; + writtenDataSet.write() + .format("hpcc") + .mode("overwrite") + .option("cluster", getThorCluster()) + .option("host", getHPCCClusterURL()) + .option("username", getHPCCClusterUser()) + .option("password", getHPCCClusterPass()) + .save(datasetPath); + + // Read dataset from HPCC + Dataset readDataSet = spark.read() + .format("hpcc") + .option("cluster", getThorCluster()) + .option("host", getHPCCClusterURL()) + .option("username", getHPCCClusterUser()) + .option("password", getHPCCClusterPass()) + .load(datasetPath); + + StructType readSchema = readDataSet.schema(); + System.out.println(readSchema); + + Dataset diff = writtenDataSet.exceptAll(readDataSet); + Assert.assertTrue("Difference found between written and read datasets", diff.isEmpty()); + } + + @Test + public void allTypesWriteReadTest() + { + SparkSession spark = getOrCreateSparkSession(); + + StructType inlineSchema = DataTypes.createStructType(new StructField[] { + DataTypes.createStructField("key", DataTypes.IntegerType, false), + DataTypes.createStructField("val", DataTypes.IntegerType, false) + }); + + StructType childSchema = DataTypes.createStructType(new StructField[] { + DataTypes.createStructField("test", DataTypes.IntegerType, false), + DataTypes.createStructField("test2", DataTypes.IntegerType, false) + }); + + // Create the schema + StructType schema = DataTypes.createStructType(new StructField[] { + DataTypes.createStructField("byteVal", DataTypes.ByteType, false), + DataTypes.createStructField("shortVal", DataTypes.ShortType, false), + DataTypes.createStructField("intVal", DataTypes.IntegerType, false), + DataTypes.createStructField("longVal", DataTypes.LongType, false), + DataTypes.createStructField("floatVal", DataTypes.FloatType, false), + DataTypes.createStructField("doubleVal", DataTypes.DoubleType, false), + DataTypes.createStructField("decimalVal", DataTypes.createDecimalType(16, 8), false), + DataTypes.createStructField("stringVal", DataTypes.StringType, false), + DataTypes.createStructField("binaryVal", DataTypes.BinaryType, false), + DataTypes.createStructField("setVal", DataTypes.createArrayType(DataTypes.IntegerType), false), + DataTypes.createStructField("inlineRec", inlineSchema, false), + DataTypes.createStructField("childDataset", DataTypes.createArrayType(childSchema), false), + }); + + // Write dataset to HPCC + List rows = new ArrayList(); + for (int i = 0; i < 1000; i++) + { + Object[] fields = new Object[12]; + fields[0] = Byte.valueOf((byte) i); + fields[1] = Short.valueOf((short) i); + fields[2] = Integer.valueOf((int) i); + fields[3] = Long.valueOf((long) i); + fields[4] = Float.valueOf(0); + fields[5] = Double.valueOf(10.42); + fields[6] = new BigDecimal(10.42); + fields[7] = "TestString"; + fields[8] = new String("BinaryVal").getBytes(); + + Integer[] set = new Integer[2]; + set[0] = Integer.valueOf(i); + set[1] = Integer.valueOf(i); + fields[9] = set; + + Object[] inlineRec = new Object[2]; + inlineRec[0] = Integer.valueOf(i); + inlineRec[1] = Integer.valueOf(i); + fields[10] = new GenericRowWithSchema(inlineRec, childSchema); + + int numChildRows = 10; + List childDataset = new ArrayList(); + for (int j = 0; j < numChildRows; j++) + { + Object[] childRec = new Object[2]; + childRec[0] = Integer.valueOf(j); + childRec[1] = Integer.valueOf(j); + + childDataset.add(new GenericRowWithSchema(childRec, childSchema)); + } + fields[11] = childDataset.toArray(); + + rows.add(new GenericRowWithSchema(fields, schema)); + } + + Dataset writtenDataSet = spark.createDataFrame(rows, schema); + + String datasetPath = "spark::test::all_types"; + writtenDataSet.write() + .format("hpcc") + .mode("overwrite") + .option("cluster", getThorCluster()) + .option("host", getHPCCClusterURL()) + .option("username", getHPCCClusterUser()) + .option("password", getHPCCClusterPass()) + .save(datasetPath); + + // Read dataset from HPCC + Dataset readDataSet = spark.read() + .format("hpcc") + .option("cluster", getThorCluster()) + .option("host", getHPCCClusterURL()) + .option("username", getHPCCClusterUser()) + .option("password", getHPCCClusterPass()) + .load(datasetPath); + + Dataset diff = writtenDataSet.exceptAll(readDataSet); + Assert.assertTrue("Difference found between written and read datasets", diff.isEmpty()); + } +} diff --git a/spark-hpcc/DataAccess/src/test/java/org/hpccsystems/spark/FileFilterTests.java b/spark-hpcc/DataAccess/src/test/java/org/hpccsystems/spark/FileFilterTests.java new file mode 100644 index 000000000..bedb8ab6a --- /dev/null +++ b/spark-hpcc/DataAccess/src/test/java/org/hpccsystems/spark/FileFilterTests.java @@ -0,0 +1,111 @@ +/******************************************************************************* + * HPCC SYSTEMS software Copyright (C) 2019 HPCC Systems. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *******************************************************************************/ +package org.hpccsystems.spark; + +import org.junit.Assert; +import org.apache.spark.sql.sources.EqualTo; +import org.apache.spark.sql.sources.Filter; +import org.apache.spark.sql.sources.GreaterThan; +import org.apache.spark.sql.sources.GreaterThanOrEqual; +import org.apache.spark.sql.sources.In; +import org.apache.spark.sql.sources.LessThan; +import org.apache.spark.sql.sources.Not; +import org.apache.spark.sql.sources.Or; +import org.apache.spark.sql.sources.StringStartsWith; +import org.hpccsystems.commons.ecl.FileFilter; +import org.junit.Before; +import org.junit.Test; + +import org.junit.experimental.categories.Category; + +@Category(org.hpccsystems.commons.annotations.BaseTests.class) +public class FileFilterTests +{ + + @Before + public void setUp() throws Exception + { + } + + @Test + public void testNotSparkFilterstoHPCCFilters() + { + System.out.println("\n----------Spark 'Not' filter to HPCC Tests----------"); + + try + { + Filter child = new LessThan("field1", 8); + Not notlessthan = new Not(child); + FileFilter hpccnotlessthan = FileFilterConverter.ConvertToHPCCFileFilterString(notlessthan); + Assert.assertNotNull(hpccnotlessthan); + + GreaterThanOrEqual gte = new GreaterThanOrEqual("field1", 8); + FileFilter hpccgte = FileFilterConverter.ConvertToHPCCFileFilterString(gte); + Assert.assertNotNull(hpccgte); + + Assert.assertEquals(hpccnotlessthan.toJson(), hpccgte.toJson()); + } + catch (Exception e) { + // TODO: handle exception + } + } + + @Test + public void testSparkFilterstoHPCCFilters() + { + + System.out.println("\n----------Spark to HPCC filter Tests----------"); + + org.apache.spark.sql.sources.Filter [] sparkfilters = new org.apache.spark.sql.sources.Filter[8]; + StringStartsWith ssw = new StringStartsWith("Fname", "Rod"); + LessThan lt = new LessThan("field1", 12); + GreaterThan gt = new GreaterThan("field1", 8); + Or or = new Or(lt, gt); + sparkfilters[0] = ssw; + sparkfilters[1] = or; + + In in = new In("field1", new Object [] { "str", "values", "etc"}); + sparkfilters[2] = in; + + In innumber = new In("field1", new Object [] { 1, 2, 3, 4, 5.6}); + sparkfilters[3] = innumber; + + LessThan lta = new LessThan("alphafield", "XYZ"); + sparkfilters[4] = lta; + + Filter child = new EqualTo("field1", "true"); + org.apache.spark.sql.sources.Not n = new org.apache.spark.sql.sources.Not(child ); + sparkfilters[5] = n; + + Filter eq5 = new EqualTo("field1", 5); + sparkfilters[6] = eq5; + + child = new LessThan("field1", -3.2); + n = new Not(child); + sparkfilters[7] = n; + + try + { + FileFilter hpccfilters = FileFilterConverter.CovertToHPCCFileFilter(sparkfilters); + System.out.println("\n----------Converting Spark to HPCC filter output----------"); + System.out.println(hpccfilters.toJson()); + } + catch (Exception e) + { + e.printStackTrace(); + } + } +} diff --git a/spark-hpcc/DataAccess/src/test/java/org/hpccsystems/spark/HpccRelationIntegrationTest.java b/spark-hpcc/DataAccess/src/test/java/org/hpccsystems/spark/HpccRelationIntegrationTest.java new file mode 100644 index 000000000..6d1590f15 --- /dev/null +++ b/spark-hpcc/DataAccess/src/test/java/org/hpccsystems/spark/HpccRelationIntegrationTest.java @@ -0,0 +1,192 @@ +package org.hpccsystems.spark; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.TreeMap; + +import javax.xml.validation.Schema; + +import org.apache.spark.SparkConf; +import org.apache.spark.SparkContext; +import org.apache.spark.rdd.RDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema; +import org.apache.spark.sql.sources.EqualTo; +import org.apache.spark.sql.sources.Filter; +import org.apache.spark.sql.sources.GreaterThan; +import org.apache.spark.sql.sources.In; +import org.apache.spark.sql.sources.IsNull; +import org.apache.spark.sql.sources.LessThan; +import org.apache.spark.sql.sources.Not; +import org.apache.spark.sql.sources.Or; +import org.apache.spark.sql.sources.StringContains; +import org.apache.spark.sql.sources.StringEndsWith; +import org.apache.spark.sql.sources.StringStartsWith; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.hpccsystems.dfs.client.CompressionAlgorithm; +import org.hpccsystems.spark.datasource.HpccOptions; +import org.hpccsystems.spark.datasource.HpccRelation; +import org.junit.After; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import scala.collection.JavaConverters; +import scala.collection.Seq; + +@Category(org.hpccsystems.commons.annotations.BaseTests.class) +public class HpccRelationIntegrationTest extends BaseIntegrationTest +{ + @Test + public void testbuildScanAllValid() throws Exception + { + SparkSession spark = getOrCreateSparkSession(); + SQLContext sqlcontext = new SQLContext(spark); + + // Create the schema + StructType schema = DataTypes.createStructType(new StructField[] { + DataTypes.createStructField("key", DataTypes.LongType, false), + DataTypes.createStructField("value", DataTypes.LongType, false) + }); + + // Write dataset to HPCC + List rows = new ArrayList(); + for (int i = 0; i < 1000; i++) { + Object[] fields = new Object[2]; + fields[0] = Long.valueOf(i); + fields[1] = Long.valueOf(i); + rows.add(new GenericRowWithSchema(fields, schema)); + } + + Dataset writtenDataSet = spark.createDataFrame(rows, schema); + + String testDataset = "spark::test::integer_kv"; + writtenDataSet.write() + .format("hpcc") + .mode("overwrite") + .option("cluster", getThorCluster()) + .option("host", getHPCCClusterURL()) + .option("username", getHPCCClusterUser()) + .option("password", getHPCCClusterPass()) + .save(testDataset); + + TreeMap paramTreeMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); + paramTreeMap.put("host", getHPCCClusterURL()); + paramTreeMap.put("path", testDataset); + paramTreeMap.put("cluster", getThorCluster()); + paramTreeMap.put("username", getHPCCClusterUser()); + paramTreeMap.put("password", getHPCCClusterPass()); + + HpccOptions hpccopts = new HpccOptions(paramTreeMap); + HpccRelation hpccRelation = new HpccRelation(sqlcontext, hpccopts); + + Filter[] supportedSparkFilters = { + new Or(new LessThan("key", 12), new GreaterThan("key", 8)), + new In("key", new Object [] { 1, 2, 3, 4, 5}), + new EqualTo("key", 5), + new Not(new LessThan("key", 3)), + }; + + RDD rdd = hpccRelation.buildScan(new String[]{"key"}, supportedSparkFilters); + Assert.assertTrue("Unexpected filter result count", rdd.count() == 1); + } + + @Test + public void testOptionsPassThrough() throws Exception + { + SparkSession spark = getOrCreateSparkSession(); + SQLContext sqlcontext = new SQLContext(spark); + + TreeMap paramTreeMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); + + String url = getHPCCClusterURL(); + String user = "user"; + String pass = "pass"; + paramTreeMap.put("host", url); + paramTreeMap.put("username", user); + paramTreeMap.put("password", pass); + + String path = "spark::test::integer_kv"; + paramTreeMap.put("path", path); + + paramTreeMap.put("cluster", getThorCluster()); + paramTreeMap.put("useTLK", "True"); // Defaults to false, also should be case insensitive + paramTreeMap.put("fileAccessTimeout", "120000"); + paramTreeMap.put("limitPerFilePart", "100"); + + String projectList = "key, value"; + paramTreeMap.put("projectList", projectList); + + String filterStr = "key > 5"; + paramTreeMap.put("filter", filterStr); + + paramTreeMap.put("compression", "LZ4"); + + HpccOptions hpccopts = new HpccOptions(paramTreeMap); + + // These options don't currently have accessors in HPCCFile + Assert.assertEquals(url, hpccopts.connectionInfo.getUrl()); + Assert.assertEquals(user, hpccopts.connectionInfo.getUserName()); + Assert.assertEquals(pass, hpccopts.connectionInfo.getPassword()); + Assert.assertEquals(filterStr, hpccopts.filterString); + Assert.assertEquals(CompressionAlgorithm.LZ4, hpccopts.compression); + + HpccRelation hpccRelation = new HpccRelation(sqlcontext, hpccopts); + + Assert.assertEquals(true, hpccRelation.getFile().getUseTLK()); + Assert.assertEquals(getThorCluster(), hpccRelation.getFile().getTargetfilecluster()); + Assert.assertEquals(path, hpccRelation.getFile().getFileName()); + Assert.assertEquals(120000, hpccRelation.getFile().getFileAccessExpirySecs()); + Assert.assertEquals(100, hpccRelation.getFile().getFilePartRecordLimit()); + Assert.assertEquals(projectList, hpccRelation.getFile().getProjectList()); + } + + @Test + public void testUnhandledFiltersAllValid() throws Exception + { + HpccRelation hpccRelation = new HpccRelation(null, null); + + Filter[] supportedSparkFilters = { + new StringStartsWith("fixstr8", "Rod"), + new Or(new LessThan("int8", 12), new GreaterThan("int8", 8)), + new In("int8", new Object [] { "str", "values", "etc"}), + new In("int8", new Object [] { 1, 2, 3, 4, 5.6}), + new LessThan("fixstr8", "XYZ"), + new Not(new EqualTo("fixstr8", "true")), + new EqualTo("int8", 5), + new Not(new LessThan("int8", 3)) + }; + + Filter [] unhandledsparkfilters = hpccRelation.unhandledFilters(supportedSparkFilters); + + Assert.assertTrue("Unexpected unhandled filters detected" , unhandledsparkfilters.length == 0); + } + + @Test + public void testUnhandledFiltersNoneValid() throws Exception + { + HpccRelation hpccRelation = new HpccRelation(null, null); + + Filter[] unsupportedSparkFilters = { + new IsNull("something"), + new Or(new LessThan("int8", 12), new GreaterThan("int4", 8)), + new Not(new Or(new LessThan("int8", 12), new GreaterThan("int8", 8))), + new Not(new In("int8", new Object [] { 1, 2, 3, 4, 5.6})), + new StringContains("somestring", "some"), + new StringEndsWith("somestring", "ing") + }; + + Filter[] unhandledsparkfilters = hpccRelation.unhandledFilters(unsupportedSparkFilters); + + Assert.assertTrue("Unexpected unhandled filters detected" , unhandledsparkfilters.length == unsupportedSparkFilters.length); + } +} diff --git a/spark-hpcc/Examples/PySparkExample.ipynb b/spark-hpcc/Examples/PySparkExample.ipynb new file mode 100644 index 000000000..d540893b9 --- /dev/null +++ b/spark-hpcc/Examples/PySparkExample.ipynb @@ -0,0 +1,321 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "497761f5", + "metadata": {}, + "source": [ + "# Spark-HPCC Connector for HPCC Systems Platform and Spark Connectivity\n", + "\n", + "This example demonstrates how to use the Spark-HPCC Connector to read and write data from / to HPCC Systems clusters, as well as providing basic setup information for the Spark-HPCC connector.\n", + "\n", + "## Spark-HPCC Connector Installation:\n", + "\n", + "---\n", + "\n", + "The Spark-HPCC Connector jar and its dependencies need to be made available to all Spark worker nodes and the Spark driver application. This can be done by adding the Spark-HPCC connector jar to the classpath on every node in the Spark cluster and to the classpath for the Spark driver, or by using the ```--jars``` option when executing spark-submit or pyspark.\n", + "\n", + "Download the Spark-HPCC jar with dependencies from Maven Central: https://mvnrepository.com/artifact/org.hpccsystems/spark-hpcc\n", + "\n", + "### Example of using the jars option:\n", + "```\n", + "pyspark --jars spark-hpcc-9.2.2-1-jar-with-dependencies.jar\n", + "```\n", + "\n", + "### Adding Spark-HPCC jar to classpath\n", + "The Spark-HPCC jar can also be added to the classpath through various means depending on the configuration of your Spark cluster, more information about updating the classpath can be found within the Spark documentation: https://spark.apache.org/docs/latest/configuration.html" + ] + }, + { + "cell_type": "markdown", + "id": "eb1182be", + "metadata": {}, + "source": [ + "# Creating a test dataset\n", + "\n", + "The following code will create a dataframe with two columns, key and fill, that will be used to demonstrate the reading and writing functionality of the Spark-HPCC connector.\n", + "\n", + "---" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7103a826", + "metadata": {}, + "outputs": [], + "source": [ + "from pyspark.sql import SparkSession\n", + "import random" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "44c6d7e4", + "metadata": {}, + "outputs": [], + "source": [ + "data = [(i, int(1e10 * random.random())) for i in range(1000)]\n", + "df = spark.createDataFrame(data, [\"key\", \"fill\"])\n", + "df.show()" + ] + }, + { + "cell_type": "markdown", + "id": "2668405b", + "metadata": {}, + "source": [ + "# Writing Data to HPCC Systems\n", + "\n", + "---\n", + "\n", + "A Spark Dataframe can be written to HPCC using the Spark DataSource API.\n", + "- **Mode**: This is the Spark SaveMode, the Spark-HPCC Connector supports: *[ErrorIfExists, Ignore, Overwrite]*\n", + " - Defaults to ErrorIfExists\n", + "- **Host**: The URL of an ESP running on the target HPCC Systems cluster.\n", + "- **Username / Password**: Credentials for an HPCC Systems cluster user, can be empty or null if security isn't enabled on the target cluster.\n", + "- **Cluster**: The name of the underlying Thor cluster storage plane, this will change based on the target HPCC Systems cluster configuration, but will default to \"mythor\" on bare-metal and \"data\" on containerized systems.\n", + "- **Path**: The file path for the dataset within the HPCC Systems cluster. **Note** Spark-HPCC versions [9.2.110, 9.4.84, 9.6.36, 9.8.10] and above allows for paths to be defined with \"/\" path delimiter instead of the HPCC \"::\" delimiter this fixes URI formatting errors on Databricks.\n", + "- **Compression**: The compression algorithm to use when writing the file to the HPCC Systems cluster.\n", + " - Options: *[default, none, lz4, flz, lzw]*\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "05ba80cb", + "metadata": {}, + "outputs": [], + "source": [ + "df.write.save(format=\"hpcc\",\n", + " mode=\"overwrite\",\n", + " host=\"http://127.0.0.1:8010\",\n", + " username=\"\",\n", + " password=\"\",\n", + " cluster=\"mythor\",\n", + " #path=\"spark::test::dataset\", Old path format not supported on Databricks\n", + " path=\"/spark/test/dataset\",\n", + " compression=\"default\")" + ] + }, + { + "cell_type": "markdown", + "id": "1c4d4c9f", + "metadata": {}, + "source": [ + "# Reading Data from HPCC Systems\n", + "\n", + "---\n", + "\n", + "A dataset from within an HPCC Systems cluster can be read via the Spark Datasource API.\n", + "\n", + "- **Host**: The URL of an ESP running on the target HPCC Systems cluster.\n", + "- **Username / Password**: Credentials for an HPCC Systems cluster user, can be empty or null if security isn't enabled on the target cluster.\n", + "- **Cluster**: The name of the underlying Thor cluster storage plane, this will change based on the target HPCC Systems cluster configuration, but will default to \"mythor\" on bare-metal and \"data\" on containerized systems.\n", + "- **Path**: The file path for the dataset within the HPCC Systems cluster. **Note** Spark-HPCC versions [9.2.110, 9.4.84, 9.6.36, 9.8.10] and above allows for paths to be defined with \"/\" path delimiter instead of the HPCC \"::\" delimiter this fixes URI formatting errors on Databricks.\n", + "- **limitPerFilePart**: *Optional* Limit on the number of records to be read per file part / partition within the HPCC Systems dataset.\n", + "- **projectList**: *Optional* The columns that should be read from the HPCC Systems dataset.\n", + "- **useTLK** *Optional* Defaults to false, determines whether or not the TLK (Top Level Key) should be used when reading index files. \n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e8d49d8f", + "metadata": {}, + "outputs": [], + "source": [ + "readDf = spark.read.load(format=\"hpcc\",\n", + " host=\"http://127.0.0.1:8010\",\n", + " username=\"\",\n", + " password=\"\",\n", + " useTLK=\"false\",\n", + " cluster=\"mythor\",\n", + " #path=\"spark::test::dataset\", Old path format not supported on Databricks\n", + " path=\"/spark/test/dataset\",\n", + " limitPerFilePart=100,\n", + " projectList=\"key, fill\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c16a758c", + "metadata": {}, + "outputs": [], + "source": [ + "readDf.show()" + ] + }, + { + "cell_type": "markdown", + "id": "731e0dfd", + "metadata": {}, + "source": [ + "# OpenTelemetry Support\n", + "\n", + "---\n", + "\n", + "Spark-HPCC after 9.8.12 supports OpenTelemetry tracing. In order to utilize tracing with PySpark OpenTelemetry will need to be enabled and configured within your PySpark code, exporter jars will need to be added to the Spark Java class path, and finally tracing information needs to passed from Python into the Spark-HPCC APIs.\n", + "\n", + "## Python Setup\n", + "The following python libraries need to be installed:\n", + "```\n", + "!pip install opentelemetry-api\n", + "!pip install opentelemetry-sdk\n", + "!pip install opentelemetry-exporter-otlp-proto-grpc\n", + "```\n", + "\n", + "See: https://opentelemetry.io/docs/zero-code/python/configuration for more information on Python OpenTelemetry configuration\n", + "\n", + "\n", + "## Java Setup\n", + "The following jars will need to be available on the classpath in Spark:\n", + "```\n", + "opentelemetry-exporter-otlp-1.38.0.jar\n", + "opentelemetry-exporter-sender-okhttp-1.38.0.jar\n", + "```\n", + "The Java OpenTelemetry SDK is auto-configured based on environment variables. By default all tracing will be exported to logging. In order to correctly export logs to an external aggregator changing environment variables is required; See https://opentelemetry.io/docs/languages/java/configuration/ for more information on available configuration.\n", + "\n", + "Example Java environment variables to configure the otlp grpc exporter:\n", + "```\n", + "'OTEL_TRACES_EXPORTER' = 'otlp'\n", + "'OTEL_LOGS_EXPORTER' = 'logging'\n", + "'OTEL_METRICS_EXPORTER' = 'logging'\n", + "'OTEL_EXPORTER_OTLP_PROTOCOL' = 'grpc'\n", + "'OTEL_EXPORTER_OTLP_ENDPOINT' = 'http://localhost:4317'\n", + "'OTEL_JAVA_GLOBAL_AUTOCONFIGURE_ENABLED' = 'true'\n", + "```\n", + "\n", + "## Example PySpark Command:\n", + "```bash\n", + "pyspark \\\n", + " --jars ./spark-hpcc-9.8.12-0-jar-with-dependencies.jar,./opentelemetry-exporter-otlp-1.38.0.jar,./opentelemetry-exporter-sender-okhttp-1.38.0.jar \\\n", + " --conf \"spark.driver.extraJavaOptions=-Dotel.java.global-autoconfigure.enabled=true \\\n", + " -Dotel.traces.exporter=otlp \\\n", + " -Dotel.logs.exporter=logging \\\n", + " -Dotel.metrics.exporter=logging \\\n", + " -Dotel.exporter.otlp.protocol=http/protobuf \\\n", + " -Dotel.exporter.otlp.endpoint=http://localhost:4318\" \\\n", + " --conf \"spark.executor.extraJavaOptions=-Dotel.java.global-autoconfigure.enabled=true \\\n", + " -Dotel.traces.exporter=otlp \\\n", + " -Dotel.logs.exporter=logging \\\n", + " -Dotel.metrics.exporter=logging \\\n", + " -Dotel.exporter.otlp.protocol=http/protobuf \\\n", + " -Dotel.exporter.otlp.endpoint=http://localhost:4318\"\n", + "```" + ] + }, + { + "cell_type": "markdown", + "id": "341f267a", + "metadata": {}, + "source": [ + "# OpenTelemetry Example\n", + "\n", + "---\n", + "\n", + "Spark-HPCC APIs now support the ability to pass in the OpenTelemetry TraceID & SpanID to propagate tracing.\n", + "\n", + "- **traceID**: *Optional* The hexadecimal string representing the current trace.\n", + "- **spanID** *Optional* The hexadecimal string representing the current span." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d195e46a", + "metadata": {}, + "outputs": [], + "source": [ + "from pyspark.sql import SparkSession\n", + "import os\n", + "\n", + "from opentelemetry import trace\n", + "from opentelemetry.sdk.trace import TracerProvider\n", + "from opentelemetry.sdk.trace.export import (\n", + " BatchSpanProcessor,\n", + ")\n", + "from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter\n", + "\n", + "# Configure Python OpenTelemetry\n", + "# Note: this needs to be done seperately from the Java configuration\n", + "otlp_exporter = OTLPSpanExporter(\n", + " endpoint=\"http://localhost:4317\",\n", + ")\n", + "\n", + "provider = TracerProvider()\n", + "processor = BatchSpanProcessor(otlp_exporter)\n", + "provider.add_span_processor(processor)\n", + "\n", + "trace.set_tracer_provider(provider)\n", + "tracer = trace.get_tracer(\"spark.example.tracer\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "dd4552d1", + "metadata": {}, + "outputs": [], + "source": [ + "data = [(i, int(1e10 * random.random())) for i in range(1000)]\n", + "df = spark.createDataFrame(data, [\"key\", \"fill\"])\n", + "\n", + "# Example Spark-HPCC Write with OpenTelemetry Tracing\n", + "with tracer.start_as_current_span(\"PySpark.WriteSpan\") as writeSpan:\n", + "\n", + " # Convert trace & span IDs to hex string\n", + " trace_id = format(writeSpan.get_span_context().trace_id, '032x')\n", + " span_id = format(writeSpan.get_span_context().span_id, '016x')\n", + "\n", + " df.write.save(format=\"hpcc\",\n", + " mode=\"overwrite\",\n", + " host=\"http://127.0.01:8010\",\n", + " cluster=\"mythor\",\n", + " path=\"spark::test::dataset\",\n", + " compression=\"default\",\n", + " traceID=trace_id,\n", + " spanID=span_id)\n", + "\n", + "# Example Spark-HPCC Read with OpenTelemetry Tracing\n", + "with tracer.start_as_current_span(\"PySpark.ReadSpan\") as readSpan:\n", + "\n", + " # Convert trace & span IDs to hex string\n", + " trace_id = format(readSpan.get_span_context().trace_id, '032x')\n", + " span_id = format(readSpan.get_span_context().span_id, '016x')\n", + "\n", + " readDf = spark.read.load(format=\"hpcc\",\n", + " host=\"http://127.0.0.1:8010\",\n", + " cluster=\"mythor\",\n", + " path=\"spark::test::dataset\",\n", + " traceID=trace_id,\n", + " spanID=span_id)\n", + " # Note: Spark won't read a dataset until it is used, therefore the count needs to be part of the above SparkReadSpan\n", + " readDf.count()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.4" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/spark-hpcc/LICENSE b/spark-hpcc/LICENSE new file mode 100644 index 000000000..8dada3eda --- /dev/null +++ b/spark-hpcc/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright {yyyy} {name of copyright owner} + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/spark-hpcc/README.md b/spark-hpcc/README.md new file mode 100644 index 000000000..049faf292 --- /dev/null +++ b/spark-hpcc/README.md @@ -0,0 +1,48 @@ +[![Maven Central](https://maven-badges.herokuapp.com/maven-central/org.hpccsystems/spark-hpcc/badge.svg?subject=spark-hpcc)](https://maven-badges.herokuapp.com/maven-central/org.hpccsystems/spark-hpcc) + + + + + + + + + + + + + +
+ :zap: Note: This project references log4j which has been reported to include security vulnerabilitie(s) in versions prior to v2.15.0 +
+
    +
  • The Spark-HPCC project no longer references the offending log4j versions
  • +
  • Users of Spark-HPCC are strongly encouraged to update to the latest version
  • +
  • Learn more about the vulnerabiltiy: https://github.com/advisories/GHSA-jfh8-c2jp-5v3q
  • +
+
+ +# Spark-HPCC +Spark classes for HPCC Systems / Spark interoperability + +### DataAccess +The DataAccess project contains the classes which expose distributed +streaming of HPCC based data via Spark constructs. In addition, +the HPCC data is exposed as a Dataframe for the convenience of the Spark developer. + +### Dependencies +The spark-hpcc target jar does not package any of the Spark libraries it depends on. +If using a standard Spark submission pipeline such as spark-submit these dependencies will be provided as part of the Spark installation. +However, if your pipeline executes a jar directly you may need to add the Spark libraries from your $SPARK_HOME to the classpath. + +### Examples & Documentation +See: [Examples](https://github.com/hpcc-systems/Spark-HPCC/tree/master/Examples) for example usage of the connector as well as API documentation for the reading and writing APIs. + +## Please note: +##### As reported by github: + +"In all versions of Apache Spark, its standalone resource manager accepts code to execute on a 'master' host, that then runs that code on 'worker' hosts. The master itself does not, by design, execute user code. A specially-crafted request to the master can, however, cause the master to execute code too. Note that this does not affect standalone clusters with authentication enabled. While the master host typically has less outbound access to other resources than a worker, the execution of code on the master is nevertheless unexpected. +Mitigation + +Enable authentication on any Spark standalone cluster that is not otherwise secured from unwanted access, for example by network-level restrictions. Use spark.authenticate and related security properties described at https://spark.apache.org/docs/latest/security.html" +