-
Notifications
You must be signed in to change notification settings - Fork 24
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #780 from Michael-Gardner/HPCC4J-673
HPCC4J-673 Fix Poms and directory structure to include spark-hpcc as a module of the hpcc4j project
- Loading branch information
Showing
30 changed files
with
4,263 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,5 @@ | ||
*/target | ||
target/ | ||
bin/ | ||
.project | ||
.settings/ | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,321 @@ | ||
{ | ||
"cells": [ | ||
{ | ||
"cell_type": "markdown", | ||
"id": "497761f5", | ||
"metadata": {}, | ||
"source": [ | ||
"# Spark-HPCC Connector for HPCC Systems Platform and Spark Connectivity\n", | ||
"\n", | ||
"This example demonstrates how to use the Spark-HPCC Connector to read and write data from / to HPCC Systems clusters, as well as providing basic setup information for the Spark-HPCC connector.\n", | ||
"\n", | ||
"## Spark-HPCC Connector Installation:\n", | ||
"\n", | ||
"---\n", | ||
"\n", | ||
"The Spark-HPCC Connector jar and its dependencies need to be made available to all Spark worker nodes and the Spark driver application. This can be done by adding the Spark-HPCC connector jar to the classpath on every node in the Spark cluster and to the classpath for the Spark driver, or by using the ```--jars``` option when executing spark-submit or pyspark.\n", | ||
"\n", | ||
"Download the Spark-HPCC jar with dependencies from Maven Central: https://mvnrepository.com/artifact/org.hpccsystems/spark-hpcc\n", | ||
"\n", | ||
"### Example of using the jars option:\n", | ||
"```\n", | ||
"pyspark --jars spark-hpcc-9.2.2-1-jar-with-dependencies.jar\n", | ||
"```\n", | ||
"\n", | ||
"### Adding Spark-HPCC jar to classpath\n", | ||
"The Spark-HPCC jar can also be added to the classpath through various means depending on the configuration of your Spark cluster, more information about updating the classpath can be found within the Spark documentation: https://spark.apache.org/docs/latest/configuration.html" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"id": "eb1182be", | ||
"metadata": {}, | ||
"source": [ | ||
"# Creating a test dataset\n", | ||
"\n", | ||
"The following code will create a dataframe with two columns, key and fill, that will be used to demonstrate the reading and writing functionality of the Spark-HPCC connector.\n", | ||
"\n", | ||
"---" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"id": "7103a826", | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"from pyspark.sql import SparkSession\n", | ||
"import random" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"id": "44c6d7e4", | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"data = [(i, int(1e10 * random.random())) for i in range(1000)]\n", | ||
"df = spark.createDataFrame(data, [\"key\", \"fill\"])\n", | ||
"df.show()" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"id": "2668405b", | ||
"metadata": {}, | ||
"source": [ | ||
"# Writing Data to HPCC Systems\n", | ||
"\n", | ||
"---\n", | ||
"\n", | ||
"A Spark Dataframe can be written to HPCC using the Spark DataSource API.\n", | ||
"- **Mode**: This is the Spark SaveMode, the Spark-HPCC Connector supports: *[ErrorIfExists, Ignore, Overwrite]*\n", | ||
" - Defaults to ErrorIfExists\n", | ||
"- **Host**: The URL of an ESP running on the target HPCC Systems cluster.\n", | ||
"- **Username / Password**: Credentials for an HPCC Systems cluster user, can be empty or null if security isn't enabled on the target cluster.\n", | ||
"- **Cluster**: The name of the underlying Thor cluster storage plane, this will change based on the target HPCC Systems cluster configuration, but will default to \"mythor\" on bare-metal and \"data\" on containerized systems.\n", | ||
"- **Path**: The file path for the dataset within the HPCC Systems cluster. **Note** Spark-HPCC versions [9.2.110, 9.4.84, 9.6.36, 9.8.10] and above allows for paths to be defined with \"/\" path delimiter instead of the HPCC \"::\" delimiter this fixes URI formatting errors on Databricks.\n", | ||
"- **Compression**: The compression algorithm to use when writing the file to the HPCC Systems cluster.\n", | ||
" - Options: *[default, none, lz4, flz, lzw]*\n" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"id": "05ba80cb", | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"df.write.save(format=\"hpcc\",\n", | ||
" mode=\"overwrite\",\n", | ||
" host=\"http://127.0.0.1:8010\",\n", | ||
" username=\"\",\n", | ||
" password=\"\",\n", | ||
" cluster=\"mythor\",\n", | ||
" #path=\"spark::test::dataset\", Old path format not supported on Databricks\n", | ||
" path=\"/spark/test/dataset\",\n", | ||
" compression=\"default\")" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"id": "1c4d4c9f", | ||
"metadata": {}, | ||
"source": [ | ||
"# Reading Data from HPCC Systems\n", | ||
"\n", | ||
"---\n", | ||
"\n", | ||
"A dataset from within an HPCC Systems cluster can be read via the Spark Datasource API.\n", | ||
"\n", | ||
"- **Host**: The URL of an ESP running on the target HPCC Systems cluster.\n", | ||
"- **Username / Password**: Credentials for an HPCC Systems cluster user, can be empty or null if security isn't enabled on the target cluster.\n", | ||
"- **Cluster**: The name of the underlying Thor cluster storage plane, this will change based on the target HPCC Systems cluster configuration, but will default to \"mythor\" on bare-metal and \"data\" on containerized systems.\n", | ||
"- **Path**: The file path for the dataset within the HPCC Systems cluster. **Note** Spark-HPCC versions [9.2.110, 9.4.84, 9.6.36, 9.8.10] and above allows for paths to be defined with \"/\" path delimiter instead of the HPCC \"::\" delimiter this fixes URI formatting errors on Databricks.\n", | ||
"- **limitPerFilePart**: *Optional* Limit on the number of records to be read per file part / partition within the HPCC Systems dataset.\n", | ||
"- **projectList**: *Optional* The columns that should be read from the HPCC Systems dataset.\n", | ||
"- **useTLK** *Optional* Defaults to false, determines whether or not the TLK (Top Level Key) should be used when reading index files. \n" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"id": "e8d49d8f", | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"readDf = spark.read.load(format=\"hpcc\",\n", | ||
" host=\"http://127.0.0.1:8010\",\n", | ||
" username=\"\",\n", | ||
" password=\"\",\n", | ||
" useTLK=\"false\",\n", | ||
" cluster=\"mythor\",\n", | ||
" #path=\"spark::test::dataset\", Old path format not supported on Databricks\n", | ||
" path=\"/spark/test/dataset\",\n", | ||
" limitPerFilePart=100,\n", | ||
" projectList=\"key, fill\")" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"id": "c16a758c", | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"readDf.show()" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"id": "731e0dfd", | ||
"metadata": {}, | ||
"source": [ | ||
"# OpenTelemetry Support\n", | ||
"\n", | ||
"---\n", | ||
"\n", | ||
"Spark-HPCC after 9.8.12 supports OpenTelemetry tracing. In order to utilize tracing with PySpark OpenTelemetry will need to be enabled and configured within your PySpark code, exporter jars will need to be added to the Spark Java class path, and finally tracing information needs to passed from Python into the Spark-HPCC APIs.\n", | ||
"\n", | ||
"## Python Setup\n", | ||
"The following python libraries need to be installed:\n", | ||
"```\n", | ||
"!pip install opentelemetry-api\n", | ||
"!pip install opentelemetry-sdk\n", | ||
"!pip install opentelemetry-exporter-otlp-proto-grpc\n", | ||
"```\n", | ||
"\n", | ||
"See: https://opentelemetry.io/docs/zero-code/python/configuration for more information on Python OpenTelemetry configuration\n", | ||
"\n", | ||
"\n", | ||
"## Java Setup\n", | ||
"The following jars will need to be available on the classpath in Spark:\n", | ||
"```\n", | ||
"opentelemetry-exporter-otlp-1.38.0.jar\n", | ||
"opentelemetry-exporter-sender-okhttp-1.38.0.jar\n", | ||
"```\n", | ||
"The Java OpenTelemetry SDK is auto-configured based on environment variables. By default all tracing will be exported to logging. In order to correctly export logs to an external aggregator changing environment variables is required; See https://opentelemetry.io/docs/languages/java/configuration/ for more information on available configuration.\n", | ||
"\n", | ||
"Example Java environment variables to configure the otlp grpc exporter:\n", | ||
"```\n", | ||
"'OTEL_TRACES_EXPORTER' = 'otlp'\n", | ||
"'OTEL_LOGS_EXPORTER' = 'logging'\n", | ||
"'OTEL_METRICS_EXPORTER' = 'logging'\n", | ||
"'OTEL_EXPORTER_OTLP_PROTOCOL' = 'grpc'\n", | ||
"'OTEL_EXPORTER_OTLP_ENDPOINT' = 'http://localhost:4317'\n", | ||
"'OTEL_JAVA_GLOBAL_AUTOCONFIGURE_ENABLED' = 'true'\n", | ||
"```\n", | ||
"\n", | ||
"## Example PySpark Command:\n", | ||
"```bash\n", | ||
"pyspark \\\n", | ||
" --jars ./spark-hpcc-9.8.12-0-jar-with-dependencies.jar,./opentelemetry-exporter-otlp-1.38.0.jar,./opentelemetry-exporter-sender-okhttp-1.38.0.jar \\\n", | ||
" --conf \"spark.driver.extraJavaOptions=-Dotel.java.global-autoconfigure.enabled=true \\\n", | ||
" -Dotel.traces.exporter=otlp \\\n", | ||
" -Dotel.logs.exporter=logging \\\n", | ||
" -Dotel.metrics.exporter=logging \\\n", | ||
" -Dotel.exporter.otlp.protocol=http/protobuf \\\n", | ||
" -Dotel.exporter.otlp.endpoint=http://localhost:4318\" \\\n", | ||
" --conf \"spark.executor.extraJavaOptions=-Dotel.java.global-autoconfigure.enabled=true \\\n", | ||
" -Dotel.traces.exporter=otlp \\\n", | ||
" -Dotel.logs.exporter=logging \\\n", | ||
" -Dotel.metrics.exporter=logging \\\n", | ||
" -Dotel.exporter.otlp.protocol=http/protobuf \\\n", | ||
" -Dotel.exporter.otlp.endpoint=http://localhost:4318\"\n", | ||
"```" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"id": "341f267a", | ||
"metadata": {}, | ||
"source": [ | ||
"# OpenTelemetry Example\n", | ||
"\n", | ||
"---\n", | ||
"\n", | ||
"Spark-HPCC APIs now support the ability to pass in the OpenTelemetry TraceID & SpanID to propagate tracing.\n", | ||
"\n", | ||
"- **traceID**: *Optional* The hexadecimal string representing the current trace.\n", | ||
"- **spanID** *Optional* The hexadecimal string representing the current span." | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"id": "d195e46a", | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"from pyspark.sql import SparkSession\n", | ||
"import os\n", | ||
"\n", | ||
"from opentelemetry import trace\n", | ||
"from opentelemetry.sdk.trace import TracerProvider\n", | ||
"from opentelemetry.sdk.trace.export import (\n", | ||
" BatchSpanProcessor,\n", | ||
")\n", | ||
"from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter\n", | ||
"\n", | ||
"# Configure Python OpenTelemetry\n", | ||
"# Note: this needs to be done seperately from the Java configuration\n", | ||
"otlp_exporter = OTLPSpanExporter(\n", | ||
" endpoint=\"http://localhost:4317\",\n", | ||
")\n", | ||
"\n", | ||
"provider = TracerProvider()\n", | ||
"processor = BatchSpanProcessor(otlp_exporter)\n", | ||
"provider.add_span_processor(processor)\n", | ||
"\n", | ||
"trace.set_tracer_provider(provider)\n", | ||
"tracer = trace.get_tracer(\"spark.example.tracer\")" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"id": "dd4552d1", | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"data = [(i, int(1e10 * random.random())) for i in range(1000)]\n", | ||
"df = spark.createDataFrame(data, [\"key\", \"fill\"])\n", | ||
"\n", | ||
"# Example Spark-HPCC Write with OpenTelemetry Tracing\n", | ||
"with tracer.start_as_current_span(\"PySpark.WriteSpan\") as writeSpan:\n", | ||
"\n", | ||
" # Convert trace & span IDs to hex string\n", | ||
" trace_id = format(writeSpan.get_span_context().trace_id, '032x')\n", | ||
" span_id = format(writeSpan.get_span_context().span_id, '016x')\n", | ||
"\n", | ||
" df.write.save(format=\"hpcc\",\n", | ||
" mode=\"overwrite\",\n", | ||
" host=\"http://127.0.01:8010\",\n", | ||
" cluster=\"mythor\",\n", | ||
" path=\"spark::test::dataset\",\n", | ||
" compression=\"default\",\n", | ||
" traceID=trace_id,\n", | ||
" spanID=span_id)\n", | ||
"\n", | ||
"# Example Spark-HPCC Read with OpenTelemetry Tracing\n", | ||
"with tracer.start_as_current_span(\"PySpark.ReadSpan\") as readSpan:\n", | ||
"\n", | ||
" # Convert trace & span IDs to hex string\n", | ||
" trace_id = format(readSpan.get_span_context().trace_id, '032x')\n", | ||
" span_id = format(readSpan.get_span_context().span_id, '016x')\n", | ||
"\n", | ||
" readDf = spark.read.load(format=\"hpcc\",\n", | ||
" host=\"http://127.0.0.1:8010\",\n", | ||
" cluster=\"mythor\",\n", | ||
" path=\"spark::test::dataset\",\n", | ||
" traceID=trace_id,\n", | ||
" spanID=span_id)\n", | ||
" # Note: Spark won't read a dataset until it is used, therefore the count needs to be part of the above SparkReadSpan\n", | ||
" readDf.count()" | ||
] | ||
} | ||
], | ||
"metadata": { | ||
"kernelspec": { | ||
"display_name": "Python 3 (ipykernel)", | ||
"language": "python", | ||
"name": "python3" | ||
}, | ||
"language_info": { | ||
"codemirror_mode": { | ||
"name": "ipython", | ||
"version": 3 | ||
}, | ||
"file_extension": ".py", | ||
"mimetype": "text/x-python", | ||
"name": "python", | ||
"nbconvert_exporter": "python", | ||
"pygments_lexer": "ipython3", | ||
"version": "3.11.4" | ||
} | ||
}, | ||
"nbformat": 4, | ||
"nbformat_minor": 5 | ||
} |
Oops, something went wrong.