From 4919459d6f549cc0573e8f4178221d0272d30022 Mon Sep 17 00:00:00 2001 From: YANGDB Date: Wed, 8 Jan 2025 19:31:26 -0800 Subject: [PATCH] update docker and opensearch to support flint acceleration indices Signed-off-by: YANGDB --- docker/apache-spark-iceberg/.env | 5 +- .../apache-spark-iceberg/docker-compose.yml | 62 ++- ...n Introduction to the Flint Java API.ipynb | 469 ++++++++++++++++++ .../opensearch/opensearch.yml | 25 + .../security/config/internal_users.yml | 4 + .../opensearch/security/config/roles.yml | 7 + .../security/config/roles_mapping.yml | 3 + .../apache-spark-iceberg/spark-defaults.conf | 17 +- .../tutorial/Flint - Getting Started.md | 153 ++++++ .../tutorial/Using-Flint-API.md | 4 + 10 files changed, 737 insertions(+), 12 deletions(-) create mode 100644 docker/apache-spark-iceberg/notebooks/Flint - An Introduction to the Flint Java API.ipynb create mode 100644 docker/apache-spark-iceberg/opensearch/opensearch.yml create mode 100644 docker/apache-spark-iceberg/opensearch/security/config/internal_users.yml create mode 100644 docker/apache-spark-iceberg/opensearch/security/config/roles.yml create mode 100644 docker/apache-spark-iceberg/opensearch/security/config/roles_mapping.yml create mode 100644 docker/apache-spark-iceberg/tutorial/Flint - Getting Started.md create mode 100644 docker/apache-spark-iceberg/tutorial/Using-Flint-API.md diff --git a/docker/apache-spark-iceberg/.env b/docker/apache-spark-iceberg/.env index 35d83f178..4e8ff2122 100644 --- a/docker/apache-spark-iceberg/.env +++ b/docker/apache-spark-iceberg/.env @@ -2,7 +2,10 @@ MASTER_UI_PORT=8080 MASTER_PORT=7077 UI_PORT=4040 PPL_JAR=../../sparkPPLCosmetic/target/scala-2.12/opensearch-spark-ppl-assembly-0.7.0-SNAPSHOT.jar -FLINT_JAR=../../sparkSqlApplicationCosmetic/target/scala-2.12/opensearch-spark-sql-application-assembly-0.7.0-SNAPSHOT.jar +FLINT_JAR=../../flint-spark-integration/target/scala-2.12/flint-spark-integration-assembly-0.7.0-SNAPSHOT.jar +OPENSEARCH_VERSION=latest +DASHBOARDS_VERSION=latest + OPENSEARCH_NODE_MEMORY=512m OPENSEARCH_ADMIN_PASSWORD=C0rrecthorsebatterystaple. OPENSEARCH_PORT=9200 diff --git a/docker/apache-spark-iceberg/docker-compose.yml b/docker/apache-spark-iceberg/docker-compose.yml index ec6f9d310..b61b27847 100644 --- a/docker/apache-spark-iceberg/docker-compose.yml +++ b/docker/apache-spark-iceberg/docker-compose.yml @@ -1,9 +1,9 @@ version: "3" services: - spark-iceberg: + spark-tutorial: image: tabulario/spark-iceberg - container_name: spark-iceberg + container_name: spark-tutorial networks: iceberg_net: depends_on: @@ -11,7 +11,7 @@ services: - minio volumes: - ./warehouse:/home/iceberg/warehouse - - ./notebooks:/home/iceberg/notebooks/notebooks + - ./notebooks:/home/iceberg/notebooks/PPL - type: bind source: ./spark-defaults.conf target: /opt/spark/conf/spark-defaults.conf @@ -51,6 +51,8 @@ services: - MINIO_ROOT_USER=admin - MINIO_ROOT_PASSWORD=password - MINIO_DOMAIN=minio + volumes: + - minio-data:/data networks: iceberg_net: aliases: @@ -78,5 +80,57 @@ services: /usr/bin/mc policy set public minio/warehouse; tail -f /dev/null " + opensearch: + image: opensearchproject/opensearch:${OPENSEARCH_VERSION:-latest} + container_name: opensearch + environment: + - cluster.name=opensearch-cluster + - node.name=opensearch + - discovery.seed_hosts=opensearch + - cluster.initial_cluster_manager_nodes=opensearch + - bootstrap.memory_lock=true + - plugins.security.ssl.http.enabled=false + - OPENSEARCH_JAVA_OPTS=-Xms${OPENSEARCH_NODE_MEMORY:-512m} -Xmx${OPENSEARCH_NODE_MEMORY:-512m} + - OPENSEARCH_INITIAL_ADMIN_PASSWORD=${OPENSEARCH_ADMIN_PASSWORD} + ulimits: + memlock: + soft: -1 + hard: -1 + nofile: + soft: 65536 + hard: 65536 + volumes: + - opensearch-data:/usr/share/opensearch/data + - ./opensearch/opensearch.yml:/usr/share/opensearch/config/opensearch.yml + - ./opensearch/security/config:/usr/share/opensearch/plugins/opensearch-security/securityconfig + ports: + - ${OPENSEARCH_PORT:-9200}:9200 + - 9600:9600 + expose: + - "${OPENSEARCH_PORT:-9200}" + healthcheck: + test: ["CMD", "curl", "-f", "-u", "admin:${OPENSEARCH_ADMIN_PASSWORD}", "http://localhost:9200/_cluster/health"] + interval: 1m + timeout: 5s + retries: 3 + networks: + iceberg_net: + opensearch-dashboards: + image: opensearchproject/opensearch-dashboards:${DASHBOARDS_VERSION} + container_name: opensearch-dashboards + ports: + - ${OPENSEARCH_DASHBOARDS_PORT:-5601}:5601 + expose: + - "${OPENSEARCH_DASHBOARDS_PORT:-5601}" + environment: + OPENSEARCH_HOSTS: '["http://opensearch:9200"]' + depends_on: + - opensearch + networks: + iceberg_net: networks: - iceberg_net: \ No newline at end of file + iceberg_net: + +volumes: + opensearch-data: + minio-data: \ No newline at end of file diff --git a/docker/apache-spark-iceberg/notebooks/Flint - An Introduction to the Flint Java API.ipynb b/docker/apache-spark-iceberg/notebooks/Flint - An Introduction to the Flint Java API.ipynb new file mode 100644 index 000000000..dd85508ec --- /dev/null +++ b/docker/apache-spark-iceberg/notebooks/Flint - An Introduction to the Flint Java API.ipynb @@ -0,0 +1,469 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "16f6bb49", + "metadata": {}, + "source": [ + "![opensearcg-logo](https://us1.discourse-cdn.com/flex019/uploads/mauve_hedgehog/original/2X/0/0b01dfaa45d68486cf767973cc5a34c731d5467d.png)" + ] + }, + { + "cell_type": "markdown", + "id": "c82657e9", + "metadata": {}, + "source": [ + "# An Introduction to the Flint Java API" + ] + }, + { + "cell_type": "markdown", + "id": "3ee90ad2", + "metadata": {}, + "source": [ + "## [Part 1 - Loading a Catalog and Creating a Table](https://tabular.io/blog/java-api-part-1/)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "72e68c62", + "metadata": {}, + "outputs": [], + "source": [ + "import org.apache.iceberg.catalog.Catalog;\n", + "import org.apache.hadoop.conf.Configuration;\n", + "import org.apache.iceberg.CatalogProperties;\n", + "import org.apache.iceberg.rest.RESTCatalog;\n", + "import org.apache.iceberg.aws.s3.S3FileIOProperties;\n", + "\n", + "Map properties = new HashMap<>();\n", + "\n", + "properties.put(CatalogProperties.CATALOG_IMPL, \"org.apache.iceberg.rest.RESTCatalog\");\n", + "properties.put(CatalogProperties.URI, \"http://rest:8181\");\n", + "properties.put(CatalogProperties.WAREHOUSE_LOCATION, \"s3a://warehouse/wh\");\n", + "properties.put(CatalogProperties.FILE_IO_IMPL, \"org.apache.iceberg.aws.s3.S3FileIO\");\n", + "properties.put(S3FileIOProperties.ENDPOINT, \"http://minio:9000\");\n", + "\n", + "RESTCatalog catalog = new RESTCatalog();\n", + "Configuration conf = new Configuration();\n", + "catalog.setConf(conf);\n", + "catalog.initialize(\"demo\", properties);\n", + "catalog.name();" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4be615e7", + "metadata": {}, + "outputs": [], + "source": [ + "import org.apache.iceberg.Schema;\n", + "import org.apache.iceberg.types.Types;\n", + "\n", + "Schema schema = new Schema(\n", + " Types.NestedField.required(1, \"level\", Types.StringType.get()),\n", + " Types.NestedField.required(2, \"event_time\", Types.TimestampType.withZone()),\n", + " Types.NestedField.required(3, \"message\", Types.StringType.get()),\n", + " Types.NestedField.optional(4, \"call_stack\", Types.ListType.ofRequired(5, Types.StringType.get()))\n", + " );\n", + "schema" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b7299d16", + "metadata": {}, + "outputs": [], + "source": [ + "import org.apache.iceberg.PartitionSpec;\n", + "\n", + "PartitionSpec spec = PartitionSpec.builderFor(schema)\n", + " .hour(\"event_time\")\n", + " .identity(\"level\")\n", + " .build();\n", + "spec" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4d900c97", + "metadata": {}, + "outputs": [], + "source": [ + "import org.apache.iceberg.catalog.TableIdentifier;\n", + "import org.apache.iceberg.catalog.Namespace;\n", + "\n", + "Namespace nyc = Namespace.of(\"nyc\");\n", + "TableIdentifier name = TableIdentifier.of(nyc, \"logs\");\n", + "name" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8a4d8a6e", + "metadata": {}, + "outputs": [], + "source": [ + "catalog.createTable(name, schema, spec)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7d8c46df", + "metadata": {}, + "outputs": [], + "source": [ + "catalog.dropTable(name)" + ] + }, + { + "cell_type": "markdown", + "id": "fe62e0a9", + "metadata": {}, + "source": [ + "## [Part 2 - Table Scans](https://tabular.io/blog/java-api-part-2/)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c1e7aa7a", + "metadata": {}, + "outputs": [], + "source": [ + "catalog.createTable(name, schema, spec)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "78c95e06", + "metadata": {}, + "outputs": [], + "source": [ + "import org.apache.spark.sql.SparkSession;\n", + "\n", + "SparkSession spark = SparkSession\n", + " .builder()\n", + " .master(\"local[*]\")\n", + " .appName(\"Java API Demo\")\n", + " .config(\"spark.sql.extensions\", \"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions\")\n", + " .config(\"spark.sql.catalog.demo\", \"org.apache.iceberg.spark.SparkCatalog\")\n", + " .config(\"spark.sql.catalog.demo.catalog-impl\", \"org.apache.iceberg.rest.RESTCatalog\")\n", + " .config(\"spark.sql.catalog.demo.uri\", \"http://rest:8181\")\n", + " .config(\"spark.sql.catalog.demo.io-impl\", \"org.apache.iceberg.aws.s3.S3FileIO\")\n", + " .config(\"spark.sql.catalog.demo.s3.endpoint\", \"http://minio:9000\")\n", + " .config(\"spark.sql.defaultCatalog\", \"demo\")\n", + " .config(\"spark.eventLog.enabled\", \"true\")\n", + " .config(\"spark.eventLog.dir\", \"/home/iceberg/spark-events\")\n", + " .config(\"spark.history.fs.logDirectory\", \"/home/iceberg/spark-events\")\n", + " .getOrCreate();\n", + "\n", + "spark.sparkContext().setLogLevel(\"ERROR\");" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0b17f820", + "metadata": {}, + "outputs": [], + "source": [ + "String query = \"INSERT INTO demo.nyc.logs \"\n", + " + \"VALUES \"\n", + " + \"('info', timestamp 'today', 'Just letting you know!', array('stack trace line 1', 'stack trace line 2', 'stack trace line 3')), \"\n", + " + \"('warning', timestamp 'today', 'You probably should not do this!', array('stack trace line 1', 'stack trace line 2', 'stack trace line 3')), \"\n", + " + \"('error', timestamp 'today', 'This was a fatal application error!', array('stack trace line 1', 'stack trace line 2', 'stack trace line 3'))\";\n", + "\n", + "spark.sql(query).show()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "15ca1822", + "metadata": {}, + "outputs": [], + "source": [ + "import org.apache.iceberg.catalog.Catalog;\n", + "import org.apache.hadoop.conf.Configuration;\n", + "import org.apache.iceberg.CatalogProperties;\n", + "import org.apache.iceberg.rest.RESTCatalog;\n", + "\n", + "Map properties = new HashMap<>();\n", + "\n", + "properties.put(CatalogProperties.CATALOG_IMPL, \"org.apache.iceberg.rest.RESTCatalog\");\n", + "properties.put(CatalogProperties.URI, \"http://rest:8181\");\n", + "properties.put(CatalogProperties.WAREHOUSE_LOCATION, \"s3a://warehouse/wh/\");\n", + "properties.put(CatalogProperties.FILE_IO_IMPL, \"org.apache.iceberg.aws.s3.S3FileIO\");\n", + "properties.put(S3FileIOProperties.ENDPOINT, \"http://minio:9000\");\n", + "\n", + "RESTCatalog catalog = new RESTCatalog();\n", + "Configuration conf = new Configuration();\n", + "catalog.setConf(conf);\n", + "catalog.initialize(\"demo\", properties);" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3a5cf423", + "metadata": {}, + "outputs": [], + "source": [ + "import org.apache.iceberg.Table;\n", + "import org.apache.iceberg.TableScan;\n", + "import org.apache.iceberg.catalog.Namespace;\n", + "import org.apache.iceberg.catalog.TableIdentifier;\n", + "\n", + "Namespace nyc = Namespace.of(\"nyc\");\n", + "TableIdentifier name = TableIdentifier.of(nyc, \"logs\");\n", + "Table table = catalog.loadTable(name);" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e472d6a1", + "metadata": {}, + "outputs": [], + "source": [ + "import org.apache.iceberg.io.CloseableIterable;\n", + "import org.apache.iceberg.data.Record;\n", + "import org.apache.iceberg.data.IcebergGenerics;\n", + "\n", + "CloseableIterable result = IcebergGenerics.read(table).build();" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0d32f41c", + "metadata": {}, + "outputs": [], + "source": [ + "for (Record r: result) {\n", + " System.out.println(r);\n", + "}" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7dffc238", + "metadata": {}, + "outputs": [], + "source": [ + "import org.apache.iceberg.expressions.Expressions;\n", + "\n", + "CloseableIterable result = IcebergGenerics.read(table)\n", + " .where(Expressions.equal(\"level\", \"error\"))\n", + " .build();" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ec2b0431", + "metadata": {}, + "outputs": [], + "source": [ + "import org.apache.iceberg.CombinedScanTask;\n", + "import org.apache.iceberg.TableScan;\n", + "\n", + "TableScan scan = table.newScan();" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "09d13c6b", + "metadata": {}, + "outputs": [], + "source": [ + "import org.apache.iceberg.expressions.Expressions;\n", + "\n", + "TableScan filteredScan = scan.filter(Expressions.equal(\"level\", \"info\")).select(\"message\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1857c10f", + "metadata": {}, + "outputs": [], + "source": [ + "Iterable result = filteredScan.planTasks();" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ea206ec7", + "metadata": {}, + "outputs": [], + "source": [ + "import org.apache.iceberg.DataFile;\n", + "\n", + "CombinedScanTask task = result.iterator().next();\n", + "DataFile dataFile = task.files().iterator().next().file();\n", + "System.out.println(dataFile);" + ] + }, + { + "cell_type": "markdown", + "id": "41e9e10f", + "metadata": {}, + "source": [ + "## [Part 3 - Table Scans](https://tabular.io/blog/java-api-part-3/)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "81033412", + "metadata": {}, + "outputs": [], + "source": [ + "import org.apache.iceberg.Schema;\n", + "import org.apache.iceberg.types.Types;\n", + "import org.apache.iceberg.catalog.Namespace;\n", + "import org.apache.iceberg.catalog.TableIdentifier;\n", + "import org.apache.iceberg.PartitionSpec;\n", + "\n", + "Schema schema = new Schema(\n", + " Types.NestedField.optional(1, \"event_id\", Types.StringType.get()),\n", + " Types.NestedField.optional(2, \"username\", Types.StringType.get()),\n", + " Types.NestedField.optional(3, \"userid\", Types.IntegerType.get()),\n", + " Types.NestedField.optional(4, \"api_version\", Types.StringType.get()),\n", + " Types.NestedField.optional(5, \"command\", Types.StringType.get())\n", + " );\n", + "\n", + "Namespace webapp = Namespace.of(\"webapp\");\n", + "TableIdentifier name = TableIdentifier.of(webapp, \"user_events\");\n", + "catalog.createTable(name, schema, PartitionSpec.unpartitioned());" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "12c45c6b", + "metadata": {}, + "outputs": [], + "source": [ + "import java.util.UUID;\n", + "import com.google.common.collect.ImmutableList;\n", + "import com.google.common.collect.ImmutableMap;\n", + "import org.apache.iceberg.data.GenericRecord;\n", + "\n", + "GenericRecord record = GenericRecord.create(schema);\n", + "ImmutableList.Builder builder = ImmutableList.builder();\n", + "builder.add(record.copy(ImmutableMap.of(\"event_id\", UUID.randomUUID().toString(), \"username\", \"Bruce\", \"userid\", 1, \"api_version\", \"1.0\", \"command\", \"grapple\")));\n", + "builder.add(record.copy(ImmutableMap.of(\"event_id\", UUID.randomUUID().toString(), \"username\", \"Wayne\", \"userid\", 1, \"api_version\", \"1.0\", \"command\", \"glide\")));\n", + "builder.add(record.copy(ImmutableMap.of(\"event_id\", UUID.randomUUID().toString(), \"username\", \"Clark\", \"userid\", 1, \"api_version\", \"2.0\", \"command\", \"fly\")));\n", + "builder.add(record.copy(ImmutableMap.of(\"event_id\", UUID.randomUUID().toString(), \"username\", \"Kent\", \"userid\", 1, \"api_version\", \"1.0\", \"command\", \"land\")));\n", + "ImmutableList records = builder.build();" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "83bc5319", + "metadata": {}, + "outputs": [], + "source": [ + "import org.apache.iceberg.Files;\n", + "import org.apache.iceberg.io.DataWriter;\n", + "import org.apache.iceberg.io.OutputFile;\n", + "import org.apache.iceberg.parquet.Parquet;\n", + "import org.apache.iceberg.data.parquet.GenericParquetWriter;\n", + "\n", + "String filepath = table.location() + \"/\" + UUID.randomUUID().toString();\n", + "OutputFile file = table.io().newOutputFile(filepath);\n", + "DataWriter dataWriter =\n", + " Parquet.writeData(file)\n", + " .schema(schema)\n", + " .createWriterFunc(GenericParquetWriter::buildWriter)\n", + " .overwrite()\n", + " .withSpec(PartitionSpec.unpartitioned())\n", + " .build();\n", + "try {\n", + " for (GenericRecord record : builder.build()) {\n", + " dataWriter.write(record);\n", + " }\n", + "} finally {\n", + " dataWriter.close();\n", + "}" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "469e6af4", + "metadata": {}, + "outputs": [], + "source": [ + "import org.apache.iceberg.DataFile;\n", + "\n", + "DataFile dataFile = dataWriter.toDataFile();" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "142b6ed1", + "metadata": {}, + "outputs": [], + "source": [ + "import org.apache.iceberg.catalog.Namespace;\n", + "import org.apache.iceberg.catalog.TableIdentifier;\n", + "import org.apache.iceberg.Table;\n", + "\n", + "Namespace webapp = Namespace.of(\"webapp\");\n", + "TableIdentifier name = TableIdentifier.of(webapp, \"user_events\");\n", + "Table tbl = catalog.loadTable(name);\n", + "tbl.newAppend().appendFile(dataFile).commit()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c61e9e79", + "metadata": {}, + "outputs": [], + "source": [ + "import org.apache.iceberg.io.CloseableIterable;\n", + "import org.apache.iceberg.data.Record;\n", + "import org.apache.iceberg.data.IcebergGenerics;\n", + "\n", + "CloseableIterable result = IcebergGenerics.read(tbl).build();\n", + "for (Record r: result) {\n", + " System.out.println(r);\n", + "}" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Java", + "language": "java", + "name": "java" + }, + "language_info": { + "codemirror_mode": "java", + "file_extension": ".jshell", + "mimetype": "text/x-java-source", + "name": "Java", + "pygments_lexer": "java", + "version": "11.0.15+10-post-Debian-1deb11u1" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/docker/apache-spark-iceberg/opensearch/opensearch.yml b/docker/apache-spark-iceberg/opensearch/opensearch.yml new file mode 100644 index 000000000..94ed1de00 --- /dev/null +++ b/docker/apache-spark-iceberg/opensearch/opensearch.yml @@ -0,0 +1,25 @@ +# Network and plugin configuration +network.host: 0.0.0.0 +plugins.security.ssl.http.enabled: false +plugins.security.ssl.transport.enabled: false + +# Enable security plugin +plugins.security.enabled: true +plugins.security.audit.type: internal_opensearch +plugins.security.allow_default_init_securityindex: true + +# Allow anonymous access (disable in production) +plugins.security.allow_unsafe_democertificates: true +plugins.security.authcz.admin_dn: + - "CN=kirk,OU=client,O=client,L=test,C=de" + +# Cluster settings +cluster.name: opensearch-cluster +node.name: opensearch-node1 +discovery.type: single-node + +# Enable CORS (if needed) +http.cors.enabled: true +http.cors.allow-origin: "*" +http.cors.allow-methods: GET, POST, PUT, DELETE, OPTIONS +http.cors.allow-headers: X-Requested-With, X-Auth-Token, Content-Type, Content-Length, Authorization, Access-Control-Allow-Headers, Accept \ No newline at end of file diff --git a/docker/apache-spark-iceberg/opensearch/security/config/internal_users.yml b/docker/apache-spark-iceberg/opensearch/security/config/internal_users.yml new file mode 100644 index 000000000..e590f6356 --- /dev/null +++ b/docker/apache-spark-iceberg/opensearch/security/config/internal_users.yml @@ -0,0 +1,4 @@ +admin: + hash: "{SHA256}8ea37aedc8c37a3d54383e5abe289b65b74b585357353e4a8a9ac7f13dca3104" + roles: + - "flint_role" diff --git a/docker/apache-spark-iceberg/opensearch/security/config/roles.yml b/docker/apache-spark-iceberg/opensearch/security/config/roles.yml new file mode 100644 index 000000000..c606f8036 --- /dev/null +++ b/docker/apache-spark-iceberg/opensearch/security/config/roles.yml @@ -0,0 +1,7 @@ +flint_role: + cluster: + - "all" + indices: + - names: [".query_execution_request*"] + privileges: + - "all" diff --git a/docker/apache-spark-iceberg/opensearch/security/config/roles_mapping.yml b/docker/apache-spark-iceberg/opensearch/security/config/roles_mapping.yml new file mode 100644 index 000000000..b0b59bb9a --- /dev/null +++ b/docker/apache-spark-iceberg/opensearch/security/config/roles_mapping.yml @@ -0,0 +1,3 @@ +flint_role: + users: + - "admin" diff --git a/docker/apache-spark-iceberg/spark-defaults.conf b/docker/apache-spark-iceberg/spark-defaults.conf index ecc260aa1..5afcc186b 100644 --- a/docker/apache-spark-iceberg/spark-defaults.conf +++ b/docker/apache-spark-iceberg/spark-defaults.conf @@ -27,13 +27,16 @@ # spark.executor.extraJavaOptions -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three" spark.sql.extensions org.opensearch.flint.spark.FlintPPLSparkExtensions, org.opensearch.flint.spark.FlintSparkExtensions, org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions -spark.sql.catalog.demo org.apache.spark.opensearch.catalog.OpenSearchCatalog, org.apache.iceberg.spark.SparkCatalog -spark.sql.catalog.demo.type rest -spark.sql.catalog.demo.uri http://rest:8181 -spark.sql.catalog.demo.io-impl org.apache.iceberg.aws.s3.S3FileIO -spark.sql.catalog.demo.warehouse s3://warehouse/wh/ -spark.sql.catalog.demo.s3.endpoint http://minio:9000 -spark.sql.defaultCatalog demo +spark.sql.catalog.demo org.apache.spark.opensearch.catalog.OpenSearchCatalog + +spark.sql.catalog.iceberg org.apache.iceberg.spark.SparkCatalog +spark.sql.catalog.iceberg.type rest +spark.sql.catalog.iceberg.uri http://rest:8181 +spark.sql.catalog.iceberg.io-impl org.apache.iceberg.aws.s3.S3FileIO +spark.sql.catalog.iceberg.warehouse s3://warehouse/wh/ +spark.sql.catalog.iceberg.s3.endpoint http://minio:9000 + +spark.sql.defaultCatalog iceberg spark.eventLog.enabled true spark.eventLog.dir /home/iceberg/spark-events spark.history.fs.logDirectory /home/iceberg/spark-events diff --git a/docker/apache-spark-iceberg/tutorial/Flint - Getting Started.md b/docker/apache-spark-iceberg/tutorial/Flint - Getting Started.md new file mode 100644 index 000000000..1a8e6ca59 --- /dev/null +++ b/docker/apache-spark-iceberg/tutorial/Flint - Getting Started.md @@ -0,0 +1,153 @@ +## Flint - Getting Started +This tutorial introduces the usage of Flint as a caching and acceleration platform on top of spark. + + +### Catalog +Spark catalog is ... + +We created 2 catalogs : + - Iceberg - for Iceberg table format demonstration + - Demo - for OpenSearch index acceleration demonstration + +````yaml +spark.sql.extensions org.opensearch.flint.spark.FlintPPLSparkExtensions, org.opensearch.flint.spark.FlintSparkExtensions, org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions +spark.sql.catalog.demo org.apache.spark.opensearch.catalog.OpenSearchCatalog + +spark.sql.catalog.iceberg org.apache.iceberg.spark.SparkCatalog +spark.sql.catalog.iceberg.type rest +spark.sql.catalog.iceberg.uri http://rest:8181 +spark.sql.catalog.iceberg.io-impl org.apache.iceberg.aws.s3.S3FileIO +spark.sql.catalog.iceberg.warehouse s3://warehouse/wh/ +spark.sql.catalog.iceberg.s3.endpoint http://minio:9000 + +spark.sql.defaultCatalog iceberg +```` +Here are the [spark-defaults.conf](../spark-defaults.conf) + +### SQL Console +Now we can see the default catalog +```sql +% SHOW CATALOGS +catalog +------- +demo +spark_catalog + +``` + +#### Databased & Tables in Catalog +Each catalog contains databases where the default one us called `default` + +```sql +% SHOW DATABASES IN iceberg; +------- +namespace +default +``` + +Each `database` contains tables : +```sql +% SHOW TABLES IN iceberg.default; +namespace tableName isTemporary +------- -------- ----------- +default iceberg_table False +``` + +Here we see an `iceberg_table` that was created previously. + +### Creating the NYC database with its taxis table +```sql +CREATE TABLE IF NOT EXISTS iceberg.nyc.taxis ( + VendorID bigint, + tpep_pickup_datetime timestamp, + tpep_dropoff_datetime timestamp, + passenger_count double, + trip_distance double, + RatecodeID double, + store_and_fwd_flag string, + PULocationID bigint, + DOLocationID bigint, + payment_type bigint, + fare_amount double, + extra double, + mta_tax double, + tip_amount double, + tolls_amount double, + improvement_surcharge double, + total_amount double, + congestion_surcharge double, + airport_fee double +) +USING iceberg +PARTITIONED BY (days(tpep_pickup_datetime)) +``` + +### Populating the NYC.Taxis table using the dataset + +The following code would populate the table with the ... + +```python +from pyspark.sql import SparkSession +spark = SparkSession.builder.appName("Jupyter").getOrCreate() + +for filename in [ + "yellow_tripdata_2022-04.parquet", + "yellow_tripdata_2022-03.parquet", + "yellow_tripdata_2022-02.parquet", + "yellow_tripdata_2022-01.parquet", + "yellow_tripdata_2021-12.parquet", +]: + df = spark.read.parquet(f"/home/iceberg/data/{filename}") + df.write.mode("append").saveAsTable("nyc.taxis") +``` + +### Querying the Data + +Using SQL ... + +```sql +select * from iceberg.nyc.taxis limit 10; +---------------------------------------------------------------------------------------------------------------------------------------- +VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count trip_distance RatecodeID store_and_fwd_flag PULocationID DOLocationID payment_type fare_amount extra mta_tax tip_amount tolls_amount improvement_surcharge total_amount congestion_surcharge airport_fee +2 2022-02-25 08:12:28 2022-02-25 09:02:31 1.0 12.46 1.0 N 138 186 1 42.5 0.5 0.5 10.82 6.55 0.3 64.92 2.5 1.25 +2 2022-02-25 09:04:15 2022-02-25 09:09:12 1.0 0.71 1.0 N 186 90 2 5.0 0.5 0.5 0.0 0.0 0.3 8.8 2.5 0.0 +2 2022-02-25 09:14:20 2022-02-25 09:22:25 1.0 1.17 1.0 N 234 161 1 7.0 0.5 0.5 2.7 0.0 0.3 13.5 2.5 0.0 +2 2022-02-25 10:13:47 2022-02-25 10:52:18 1.0 9.71 1.0 N 138 68 1 35.5 0.5 0.5 9.42 6.55 0.3 56.52 2.5 1.25 +2 2022-02-25 10:59:43 2022-02-25 11:14:48 1.0 1.58 1.0 N 100 163 1 10.5 0.5 0.5 4.29 0.0 0.3 18.59 2.5 0.0 +2 2022-02-25 12:26:44 2022-02-25 12:50:31 1.0 10.81 1.0 N 138 230 1 31.0 0.5 0.5 8.52 6.55 0.3 51.12 2.5 1.25 +2 2022-02-25 12:51:53 2022-02-25 12:59:59 1.0 0.82 1.0 N 230 237 1 6.5 0.5 0.5 2.06 0.0 0.3 12.36 2.5 0.0 +2 2022-02-25 13:03:07 2022-02-25 13:14:07 1.0 1.87 1.0 N 237 50 1 9.0 0.5 0.5 3.2 0.0 0.3 16.0 2.5 0.0 +2 2022-02-25 16:02:40 2022-02-25 16:36:45 1.0 10.47 1.0 N 138 163 1 33.0 0.0 0.5 8.82 6.55 0.3 52.92 2.5 1.25 +2 2022-02-25 16:43:33 2022-02-25 16:53:51 1.0 1.95 1.0 N 162 107 1 9.0 0.0 0.5 2.46 0.0 0.3 14.76 2.5 0.0 +``` +Using PPL ... + +```sql +source=`nyc`.`taxis` | head 10; +---------------------------------------------------------------------------------------------------------------------------------------- +VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count trip_distance RatecodeID store_and_fwd_flag PULocationID DOLocationID payment_type fare_amount extra mta_tax tip_amount tolls_amount improvement_surcharge total_amount congestion_surcharge airport_fee +2 2022-02-25 08:12:28 2022-02-25 09:02:31 1.0 12.46 1.0 N 138 186 1 42.5 0.5 0.5 10.82 6.55 0.3 64.92 2.5 1.25 +2 2022-02-25 09:04:15 2022-02-25 09:09:12 1.0 0.71 1.0 N 186 90 2 5.0 0.5 0.5 0.0 0.0 0.3 8.8 2.5 0.0 +2 2022-02-25 09:14:20 2022-02-25 09:22:25 1.0 1.17 1.0 N 234 161 1 7.0 0.5 0.5 2.7 0.0 0.3 13.5 2.5 0.0 +2 2022-02-25 10:13:47 2022-02-25 10:52:18 1.0 9.71 1.0 N 138 68 1 35.5 0.5 0.5 9.42 6.55 0.3 56.52 2.5 1.25 +2 2022-02-25 10:59:43 2022-02-25 11:14:48 1.0 1.58 1.0 N 100 163 1 10.5 0.5 0.5 4.29 0.0 0.3 18.59 2.5 0.0 +2 2022-02-25 12:26:44 2022-02-25 12:50:31 1.0 10.81 1.0 N 138 230 1 31.0 0.5 0.5 8.52 6.55 0.3 51.12 2.5 1.25 +2 2022-02-25 12:51:53 2022-02-25 12:59:59 1.0 0.82 1.0 N 230 237 1 6.5 0.5 0.5 2.06 0.0 0.3 12.36 2.5 0.0 +2 2022-02-25 13:03:07 2022-02-25 13:14:07 1.0 1.87 1.0 N 237 50 1 9.0 0.5 0.5 3.2 0.0 0.3 16.0 2.5 0.0 +2 2022-02-25 16:02:40 2022-02-25 16:36:45 1.0 10.47 1.0 N 138 163 1 33.0 0.0 0.5 8.82 6.55 0.3 52.92 2.5 1.25 +2 2022-02-25 16:43:33 2022-02-25 16:53:51 1.0 1.95 1.0 N 162 107 1 9.0 0.0 0.5 2.46 0.0 0.3 14.76 2.5 0.0 + +``` + +### Creating a MV using OpenSearch Index Acceleration + +This example shows how to create ... + +```sql + CREATE MATERIALIZED VIEW nyc_taxi_mv + AS select * from iceberg.nyc.taxis limit 10 + WITH ( + auto_refresh = false + ) + + ``` \ No newline at end of file diff --git a/docker/apache-spark-iceberg/tutorial/Using-Flint-API.md b/docker/apache-spark-iceberg/tutorial/Using-Flint-API.md new file mode 100644 index 000000000..16b246f15 --- /dev/null +++ b/docker/apache-spark-iceberg/tutorial/Using-Flint-API.md @@ -0,0 +1,4 @@ +![opensearcg-logo](https://us1.discourse-cdn.com/flex019/uploads/mauve_hedgehog/original/2X/0/0b01dfaa45d68486cf767973cc5a34c731d5467d.png) + +## Using The Flint Java API +This tutorial will assist users to learn and operate the Flint's Java API