From 99fa47b43c39ed791bd2d75519c00f15cfcb3f36 Mon Sep 17 00:00:00 2001 From: Jolanrensen Date: Sat, 17 Jul 2021 17:12:59 +0200 Subject: [PATCH 1/9] trying to add some stdlib functions, still working on it --- .../org/jetbrains/kotlinx/spark/api/ApiV1.kt | 157 ++++++++++++++++++ .../jetbrains/kotlinx/spark/api/ApiTest.kt | 10 ++ 2 files changed, 167 insertions(+) diff --git a/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt b/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt index 2dde48cb..63db2cce 100644 --- a/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt +++ b/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt @@ -185,6 +185,163 @@ private fun kotlinClassEncoder(schema: DataType, kClass: KClass<*>): Encoder ) } +/** + * Allows `for (element in dataset)`. + */ +operator fun Dataset.iterator(): Iterator = toLocalIterator() + +/** + * Returns `true` if [element] is found in the collection. + */ +operator fun Dataset.contains(element: T): Boolean { + return indexOf(element) >= 0L +} + +/** + * Returns an element at the given [index] or throws an [IndexOutOfBoundsException] if the [index] is out of bounds of this collection. + */ +fun Dataset.elementAt(index: Long): T { + return elementAtOrElse(index) { throw IndexOutOfBoundsException("Collection doesn't contain element at index $index.") } +} + +/** + * Returns an element at the given [index] or the result of calling the [defaultValue] function if the [index] is out of bounds of this collection. + */ +fun Dataset.elementAtOrElse(index: Long, defaultValue: (Long) -> T): T { + if (index < 0L) + return defaultValue(index) + val iterator = iterator() + var count = 0L + while (iterator.hasNext()) { + val element = iterator.next() + if (index == count++) + return element + } + return defaultValue(index) +} + +/** + * Returns an element at the given [index] or `null` if the [index] is out of bounds of this collection. + */ +fun Dataset.elementAtOrNull(index: Long): T? { + if (index < 0L) + return null + val iterator = iterator() + var count = 0L + while (iterator.hasNext()) { + val element = iterator.next() + if (index == count++) + return element + } + return null +} + +/** + * Returns the first element matching the given [predicate], or `null` if no such element was found. + */ +inline fun Dataset.find(predicate: (T) -> Boolean): T? { + return firstOrNull(predicate) +} + +/** + * Returns the last element matching the given [predicate], or `null` if no such element was found. + */ +inline fun Dataset.findLast(predicate: (T) -> Boolean): T? { + return TODO()//lastOrNull(predicate) +} + +/** + * Returns the first element matching the given [predicate]. + * @throws [NoSuchElementException] if no such element is found. + */ +inline fun Dataset.first(predicate: (T) -> Boolean): T { + for (element in toLocalIterator()) if (predicate(element)) return element + throw NoSuchElementException("Collection contains no element matching the predicate.") +} + +/** + * Returns the first non-null value produced by [transform] function being applied to elements of this collection in iteration order, + * or throws [NoSuchElementException] if no non-null value was produced. + */ +inline fun Dataset.firstNotNullOf(transform: (T) -> R?): R { + return firstNotNullOfOrNull(transform) ?: throw NoSuchElementException("No element of the collection was transformed to a non-null value.") +} + +/** + * Returns the first non-null value produced by [transform] function being applied to elements of this collection in iteration order, + * or `null` if no non-null value was produced. + */ +inline fun Dataset.firstNotNullOfOrNull(transform: (T) -> R?): R? { + for (element in this) { + val result = transform(element) + if (result != null) { + return result + } + } + return null +} + +/** + * Returns the first element, or `null` if the collection is empty. + */ + fun Dataset.firstOrNull(): T? { + val iterator = iterator() + if (!iterator.hasNext()) + return null + return iterator.next() +} + +/** + * Returns the first element matching the given [predicate], or `null` if element was not found. + */ +inline fun Dataset.firstOrNull(predicate: (T) -> Boolean): T? { + for (element in this) if (predicate(element)) return element + return null +} + +/** + * Returns first index of [element], or -1 if the collection does not contain element. + */ +fun Dataset.indexOf(element: T): Long { + var index = 0L + for (item in iterator()) { + if (element == item) + return index + index++ + } + return -1L +} + +/** + * Returns index of the first element matching the given [predicate], or -1 if the collection does not contain such element. + */ +inline fun Dataset.indexOfFirst(predicate: (T) -> Boolean): Long { + var index = 0L + for (item in this) { + if (predicate(item)) + return index + index++ + } + return -1L +} + +/** + * Returns index of the last element matching the given [predicate], or -1 if the collection does not contain such element. + */ +inline fun Dataset.indexOfLast(predicate: (T) -> Boolean): Long { + // TODO might be able to improve + var lastIndex = -1L + var index = 0L + for (item in this) { + if (predicate(item)) + lastIndex = index + index++ + } + return lastIndex +} + + + inline fun Dataset.map(noinline func: (T) -> R): Dataset = map(MapFunction(func), encoder()) diff --git a/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt b/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt index bff38ac1..5fefc020 100644 --- a/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt +++ b/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt @@ -557,6 +557,16 @@ class ApiTest : ShouldSpec({ first.someEnumArray shouldBe arrayOf(SomeEnum.A, SomeEnum.B) first.someOtherArray shouldBe arrayOf(SomeOtherEnum.C, SomeOtherEnum.D) first.enumMap shouldBe mapOf(SomeEnum.A to SomeOtherEnum.C) + } + should("Have more stdlib functions for Datasets") { + val dataset = listOf(1, 2, 3).toDS() + (1 in dataset) shouldBe true + dataset.indexOf(1) shouldBe 0L + + dataset.first() + + + } } } From 2b15645b7be77b6908223145b3caf9dd8e92339d Mon Sep 17 00:00:00 2001 From: Jolanrensen Date: Mon, 19 Jul 2021 00:05:57 +0200 Subject: [PATCH 2/9] adding more and more functions from stdlib built on spark functions --- .../spark/extensions/KSparkExtensions.scala | 1 + .../org/jetbrains/kotlinx/spark/api/ApiV1.kt | 193 +++++++++--------- .../jetbrains/kotlinx/spark/api/ApiTest.kt | 4 +- 3 files changed, 95 insertions(+), 103 deletions(-) diff --git a/core/3.0/src/main/scala/org/jetbrains/kotlinx/spark/extensions/KSparkExtensions.scala b/core/3.0/src/main/scala/org/jetbrains/kotlinx/spark/extensions/KSparkExtensions.scala index 6b4935f1..3ee379fc 100644 --- a/core/3.0/src/main/scala/org/jetbrains/kotlinx/spark/extensions/KSparkExtensions.scala +++ b/core/3.0/src/main/scala/org/jetbrains/kotlinx/spark/extensions/KSparkExtensions.scala @@ -34,6 +34,7 @@ object KSparkExtensions { def collectAsList[T](ds: Dataset[T]): util.List[T] = JavaConverters.seqAsJavaList(ds.collect()) + def tailAsList[T](ds: Dataset[T], n: Int): util.List[T] = util.Arrays.asList(ds.tail(n) : _*) def debugCodegen(df: Dataset[_]): Unit = { import org.apache.spark.sql.execution.debug._ diff --git a/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt b/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt index 63db2cce..d093d624 100644 --- a/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt +++ b/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt @@ -74,6 +74,7 @@ import kotlin.Unit import kotlin.also import kotlin.apply import kotlin.invoke +import kotlin.random.Random import kotlin.reflect.* import kotlin.reflect.full.findAnnotation import kotlin.reflect.full.isSubclassOf @@ -188,160 +189,151 @@ private fun kotlinClassEncoder(schema: DataType, kClass: KClass<*>): Encoder /** * Allows `for (element in dataset)`. */ +@Deprecated( + message = "Note that this creates an iterator which can consume lots of memory. `.forEach {}` might be more efficient.", + level = DeprecationLevel.WARNING +) operator fun Dataset.iterator(): Iterator = toLocalIterator() /** * Returns `true` if [element] is found in the collection. */ -operator fun Dataset.contains(element: T): Boolean { - return indexOf(element) >= 0L -} - -/** - * Returns an element at the given [index] or throws an [IndexOutOfBoundsException] if the [index] is out of bounds of this collection. - */ -fun Dataset.elementAt(index: Long): T { - return elementAtOrElse(index) { throw IndexOutOfBoundsException("Collection doesn't contain element at index $index.") } -} - -/** - * Returns an element at the given [index] or the result of calling the [defaultValue] function if the [index] is out of bounds of this collection. - */ -fun Dataset.elementAtOrElse(index: Long, defaultValue: (Long) -> T): T { - if (index < 0L) - return defaultValue(index) - val iterator = iterator() - var count = 0L - while (iterator.hasNext()) { - val element = iterator.next() - if (index == count++) - return element - } - return defaultValue(index) -} - -/** - * Returns an element at the given [index] or `null` if the [index] is out of bounds of this collection. - */ -fun Dataset.elementAtOrNull(index: Long): T? { - if (index < 0L) - return null - val iterator = iterator() - var count = 0L - while (iterator.hasNext()) { - val element = iterator.next() - if (index == count++) - return element - } - return null -} +inline operator fun Dataset.contains(element: T): Boolean = + !filter { it == element }.isEmpty /** * Returns the first element matching the given [predicate], or `null` if no such element was found. */ -inline fun Dataset.find(predicate: (T) -> Boolean): T? { +fun Dataset.find(predicate: (T) -> Boolean): T? { return firstOrNull(predicate) } /** * Returns the last element matching the given [predicate], or `null` if no such element was found. */ -inline fun Dataset.findLast(predicate: (T) -> Boolean): T? { - return TODO()//lastOrNull(predicate) +fun Dataset.findLast(predicate: (T) -> Boolean): T? { + return lastOrNull(predicate) } /** * Returns the first element matching the given [predicate]. * @throws [NoSuchElementException] if no such element is found. */ -inline fun Dataset.first(predicate: (T) -> Boolean): T { - for (element in toLocalIterator()) if (predicate(element)) return element - throw NoSuchElementException("Collection contains no element matching the predicate.") -} +fun Dataset.first(predicate: (T) -> Boolean): T = + filter(predicate).first() /** * Returns the first non-null value produced by [transform] function being applied to elements of this collection in iteration order, * or throws [NoSuchElementException] if no non-null value was produced. */ -inline fun Dataset.firstNotNullOf(transform: (T) -> R?): R { - return firstNotNullOfOrNull(transform) ?: throw NoSuchElementException("No element of the collection was transformed to a non-null value.") -} +inline fun Dataset.firstNotNullOf(noinline transform: (T) -> R?): R = + map(transform) + .filterNotNull() + .first() /** * Returns the first non-null value produced by [transform] function being applied to elements of this collection in iteration order, * or `null` if no non-null value was produced. */ -inline fun Dataset.firstNotNullOfOrNull(transform: (T) -> R?): R? { - for (element in this) { - val result = transform(element) - if (result != null) { - return result - } - } - return null -} +inline fun Dataset.firstNotNullOfOrNull(noinline transform: (T) -> R?): R? = + map(transform) + .filterNotNull() + .firstOrNull() /** * Returns the first element, or `null` if the collection is empty. */ - fun Dataset.firstOrNull(): T? { - val iterator = iterator() - if (!iterator.hasNext()) - return null - return iterator.next() -} +fun Dataset.firstOrNull(): T? = if (isEmpty) null else first() /** * Returns the first element matching the given [predicate], or `null` if element was not found. */ -inline fun Dataset.firstOrNull(predicate: (T) -> Boolean): T? { - for (element in this) if (predicate(element)) return element - return null -} +fun Dataset.firstOrNull(predicate: (T) -> Boolean): T? = filter(predicate).firstOrNull() /** - * Returns first index of [element], or -1 if the collection does not contain element. + * Returns the last element. + * + * @throws NoSuchElementException if the collection is empty. */ -fun Dataset.indexOf(element: T): Long { - var index = 0L - for (item in iterator()) { - if (element == item) - return index - index++ - } - return -1L +fun Dataset.last(): T = tailAsList(1).first() + +/** + * Returns the last element matching the given [predicate]. + * + * @throws NoSuchElementException if no such element is found. + */ +fun Dataset.last(predicate: (T) -> Boolean): T = filter(predicate).last() + +/** + * Returns the last element, or `null` if the collection is empty. + */ +fun Dataset.lastOrNull(): T? = if (isEmpty) null else last() + +/** + * Returns the last element matching the given [predicate], or `null` if no such element was found. + */ +fun Dataset.lastOrNull(predicate: (T) -> Boolean): T? = filter(predicate).lastOrNull() + +/** + * Returns the last `n` rows in the Dataset as a list. + * + * Running tail requires moving data into the application's driver process, and doing so with + * a very large `n` can crash the driver process with OutOfMemoryError. + */ +fun Dataset.tailAsList(n: Int): List = KSparkExtensions.tailAsList(this, n) + +/** + * Returns a random element from this Dataset using the specified source of randomness. + * + * @param seed seed for the random number generator + * + * @throws NoSuchElementException if this collection is empty. + */ +fun Dataset.random(seed: Long = Random.nextLong()): T = + randomOrNull(seed) ?: throw NoSuchElementException("Collection is empty.") + +/** + * Returns a random element from this collection using the specified source of randomness, or `null` if this collection is empty. + * @param seed seed for the random number generator + */ +fun Dataset.randomOrNull(seed: Long = Random.nextLong()): T? { + if (isEmpty) + return null + + return toJavaRDD() + .takeSample(false, 1, seed) + .first() } /** - * Returns index of the first element matching the given [predicate], or -1 if the collection does not contain such element. + * Returns the single element, or throws an exception if the Dataset is empty or has more than one element. */ -inline fun Dataset.indexOfFirst(predicate: (T) -> Boolean): Long { - var index = 0L - for (item in this) { - if (predicate(item)) - return index - index++ +fun Dataset.single(): T { + if (isEmpty) + throw NoSuchElementException("Dataset is empty.") + + val firstTwo: List = takeAsList(2) // less heavy than count() + return when (firstTwo.size) { + 1 -> firstTwo.first() + else -> throw IllegalArgumentException("Dataset has more than one element.") } - return -1L } /** - * Returns index of the last element matching the given [predicate], or -1 if the collection does not contain such element. + * Returns single element, or `null` if the Dataset is empty or has more than one element. */ -inline fun Dataset.indexOfLast(predicate: (T) -> Boolean): Long { - // TODO might be able to improve - var lastIndex = -1L - var index = 0L - for (item in this) { - if (predicate(item)) - lastIndex = index - index++ +fun Dataset.singleOrNull(): T? { + if (isEmpty) + return null + + val firstTwo: List = takeAsList(2) // less heavy than count() + return when (firstTwo.size) { + 1 -> firstTwo.first() + else -> null } - return lastIndex } - inline fun Dataset.map(noinline func: (T) -> R): Dataset = map(MapFunction(func), encoder()) @@ -357,7 +349,8 @@ inline fun Dataset.groupByKey(noinline func: (T) -> R): KeyVal inline fun Dataset.mapPartitions(noinline func: (Iterator) -> Iterator): Dataset = mapPartitions(func, encoder()) -fun Dataset.filterNotNull() = filter { it != null } +@Suppress("UNCHECKED_CAST") +fun Dataset.filterNotNull(): Dataset = filter { it != null } as Dataset inline fun KeyValueGroupedDataset.mapValues(noinline func: (VALUE) -> R): KeyValueGroupedDataset = mapValues(MapFunction(func), encoder()) diff --git a/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt b/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt index 5fefc020..d24c3f43 100644 --- a/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt +++ b/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt @@ -561,9 +561,7 @@ class ApiTest : ShouldSpec({ should("Have more stdlib functions for Datasets") { val dataset = listOf(1, 2, 3).toDS() (1 in dataset) shouldBe true - dataset.indexOf(1) shouldBe 0L - - dataset.first() + dataset.tailAsList(2) shouldBe listOf(2, 3) From e0d9d6b4382ff70e38c0b7e19c9fe1353bb6f8ac Mon Sep 17 00:00:00 2001 From: Jolanrensen Date: Tue, 27 Jul 2021 23:30:45 +0200 Subject: [PATCH 3/9] working on dropWhile etc --- .../org/jetbrains/kotlinx/spark/api/ApiV1.kt | 96 +++++++++++++++++-- .../jetbrains/kotlinx/spark/api/ApiTest.kt | 8 +- 2 files changed, 92 insertions(+), 12 deletions(-) diff --git a/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt b/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt index d093d624..b4695b03 100644 --- a/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt +++ b/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt @@ -28,6 +28,7 @@ import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.* import org.apache.spark.sql.Encoders.* import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.functions.* import org.apache.spark.sql.streaming.GroupState import org.apache.spark.sql.streaming.GroupStateTimeout import org.apache.spark.sql.streaming.OutputMode @@ -188,21 +189,29 @@ private fun kotlinClassEncoder(schema: DataType, kClass: KClass<*>): Encoder /** * Allows `for (element in dataset)`. + * + * Note that this creates an iterator which can consume lots of memory. `.forEach {}` might be more efficient. + * TODO: Add plugin inspection hint */ -@Deprecated( - message = "Note that this creates an iterator which can consume lots of memory. `.forEach {}` might be more efficient.", - level = DeprecationLevel.WARNING -) operator fun Dataset.iterator(): Iterator = toLocalIterator() +fun Dataset.toIterable(): Iterable = Iterable { toLocalIterator() } + /** * Returns `true` if [element] is found in the collection. + * + * Note: Converting the dataset to an [Iterable] first might be a faster but potentially more memory + * intensive solution. See [toIterable]. + * TODO: Add plugin inspection hint */ -inline operator fun Dataset.contains(element: T): Boolean = - !filter { it == element }.isEmpty +inline operator fun Dataset.contains(element: T): Boolean = !filter { it == element }.isEmpty /** * Returns the first element matching the given [predicate], or `null` if no such element was found. + * + * Note: Converting the dataset to an [Iterable] first might be a faster but potentially more memory + * intensive solution. See [toIterable]. + * TODO: Add plugin inspection hint */ fun Dataset.find(predicate: (T) -> Boolean): T? { return firstOrNull(predicate) @@ -210,6 +219,7 @@ fun Dataset.find(predicate: (T) -> Boolean): T? { /** * Returns the last element matching the given [predicate], or `null` if no such element was found. + * TODO: Add plugin inspection hint */ fun Dataset.findLast(predicate: (T) -> Boolean): T? { return lastOrNull(predicate) @@ -218,6 +228,7 @@ fun Dataset.findLast(predicate: (T) -> Boolean): T? { /** * Returns the first element matching the given [predicate]. * @throws [NoSuchElementException] if no such element is found. + * TODO: Add plugin inspection hint */ fun Dataset.first(predicate: (T) -> Boolean): T = filter(predicate).first() @@ -225,6 +236,7 @@ fun Dataset.first(predicate: (T) -> Boolean): T = /** * Returns the first non-null value produced by [transform] function being applied to elements of this collection in iteration order, * or throws [NoSuchElementException] if no non-null value was produced. + * TODO: Add plugin inspection hint */ inline fun Dataset.firstNotNullOf(noinline transform: (T) -> R?): R = map(transform) @@ -234,6 +246,7 @@ inline fun Dataset.firstNotNullOf(noinline trans /** * Returns the first non-null value produced by [transform] function being applied to elements of this collection in iteration order, * or `null` if no non-null value was produced. + * TODO: Add plugin inspection hint */ inline fun Dataset.firstNotNullOfOrNull(noinline transform: (T) -> R?): R? = map(transform) @@ -247,6 +260,7 @@ fun Dataset.firstOrNull(): T? = if (isEmpty) null else first() /** * Returns the first element matching the given [predicate], or `null` if element was not found. + * TODO: Add plugin inspection hint */ fun Dataset.firstOrNull(predicate: (T) -> Boolean): T? = filter(predicate).firstOrNull() @@ -261,16 +275,19 @@ fun Dataset.last(): T = tailAsList(1).first() * Returns the last element matching the given [predicate]. * * @throws NoSuchElementException if no such element is found. + * TODO: Add plugin inspection hint */ fun Dataset.last(predicate: (T) -> Boolean): T = filter(predicate).last() /** * Returns the last element, or `null` if the collection is empty. + * TODO: Add plugin inspection hint */ fun Dataset.lastOrNull(): T? = if (isEmpty) null else last() /** * Returns the last element matching the given [predicate], or `null` if no such element was found. + * TODO: Add plugin inspection hint */ fun Dataset.lastOrNull(predicate: (T) -> Boolean): T? = filter(predicate).lastOrNull() @@ -334,6 +351,71 @@ fun Dataset.singleOrNull(): T? { } +fun Dataset<*>.getUniqueNewColumnName(): String { + val rowKeys = columns() + val alphabet = 'a'..'z' + var colName = alphabet.random().toString() + while (colName in rowKeys) colName += alphabet.random() + + return colName +} + +/** + * Returns a Dataset containing all elements except first [n] elements. + * + * @throws IllegalArgumentException if [n] is negative. + * + * TODO make more efficient + */ +inline fun Dataset.drop(n: Int): Dataset { + require(n >= 0) { "Requested element count $n is less than zero." } + val index = getUniqueNewColumnName() + return withColumn(index, monotonicallyIncreasingId()) + .orderBy(desc(index)) + .dropLast(n) + .orderBy(index) + .drop(index) + .`as`() +} + +/** + * Returns a Dataset containing all elements except last [n] elements. + * + * @throws IllegalArgumentException if [n] is negative. + */ +fun Dataset.dropLast(n: Int): Dataset { + require(n >= 0) { "Requested element count $n is less than zero." } + return limit( + (count() - n).toInt().coerceAtLeast(0) + ) +} + +/** + * Returns a Dataset containing all elements except last elements that satisfy the given [predicate]. + */ +inline fun Dataset.dropLastWhile(predicate: (T) -> Boolean): Dataset { + val reversedWithIndex = withColumn("index", monotonicallyIncreasingId()) + .orderBy(desc("index")) + + TODO() +} + +/** + * Returns a Dataset containing all elements except first elements that satisfy the given [predicate]. + * + * TODO Can definitely be made more efficient + * TODO Add plugin toIterable warning + */ +inline fun Dataset.dropWhile(noinline predicate: (T) -> Boolean): Dataset { + val dropUntil = map(predicate) + .withColumn(getUniqueNewColumnName(), monotonicallyIncreasingId()) + .firstOrNull { it.getBoolean(0) } + ?.getLong(1) + ?: -1L + + return drop(dropUntil.toInt() + 1) +} + inline fun Dataset.map(noinline func: (T) -> R): Dataset = map(MapFunction(func), encoder()) @@ -924,7 +1006,7 @@ inline fun Dataset.col(column: KProperty1): Type */ @Suppress("UNCHECKED_CAST") inline fun col(column: KProperty1): TypedColumn = - functions.col(column.name).`as`() as TypedColumn + col(column.name).`as`() as TypedColumn /** * Helper function to quickly get a [TypedColumn] (or [Column]) from a dataset in a refactor-safe manner. diff --git a/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt b/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt index d24c3f43..3b15069c 100644 --- a/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt +++ b/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt @@ -559,11 +559,9 @@ class ApiTest : ShouldSpec({ first.enumMap shouldBe mapOf(SomeEnum.A to SomeOtherEnum.C) } should("Have more stdlib functions for Datasets") { - val dataset = listOf(1, 2, 3).toDS() - (1 in dataset) shouldBe true - dataset.tailAsList(2) shouldBe listOf(2, 3) - - + val dataset = listOf(1, 2, 3).toDS().drop(2) + dataset.count() shouldBe 1L + (3 in dataset) shouldBe true } } From d144fd211bbccc20b96064c8ad9c8d604e0f80e7 Mon Sep 17 00:00:00 2001 From: Jolanrensen Date: Thu, 29 Jul 2021 21:03:56 +0200 Subject: [PATCH 4/9] dropwhiles, all/any --- .../org/jetbrains/kotlinx/spark/api/ApiV1.kt | 106 +++++++++++++++--- 1 file changed, 90 insertions(+), 16 deletions(-) diff --git a/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt b/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt index b4695b03..dc08ff6b 100644 --- a/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt +++ b/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt @@ -369,13 +369,20 @@ fun Dataset<*>.getUniqueNewColumnName(): String { */ inline fun Dataset.drop(n: Int): Dataset { require(n >= 0) { "Requested element count $n is less than zero." } - val index = getUniqueNewColumnName() - return withColumn(index, monotonicallyIncreasingId()) - .orderBy(desc(index)) - .dropLast(n) - .orderBy(index) - .drop(index) - .`as`() + return when { + isEmpty -> this + n >= count() -> limit(0) + else -> { + val index = getUniqueNewColumnName() + withColumn(index, monotonicallyIncreasingId()) + .orderBy(desc(index)) + .dropLast(n) + .orderBy(index) + .drop(index) + .`as`() + } + } + } /** @@ -385,19 +392,42 @@ inline fun Dataset.drop(n: Int): Dataset { */ fun Dataset.dropLast(n: Int): Dataset { require(n >= 0) { "Requested element count $n is less than zero." } - return limit( - (count() - n).toInt().coerceAtLeast(0) - ) + return when { + isEmpty -> this + n >= count() -> limit(0) + else -> limit( + (count() - n).toInt().coerceAtLeast(0) + ) + } + } /** * Returns a Dataset containing all elements except last elements that satisfy the given [predicate]. + * + * TODO Add plugin toIterable warning */ -inline fun Dataset.dropLastWhile(predicate: (T) -> Boolean): Dataset { - val reversedWithIndex = withColumn("index", monotonicallyIncreasingId()) - .orderBy(desc("index")) +inline fun Dataset.dropLastWhile(noinline predicate: (T) -> Boolean): Dataset { + if (isEmpty) return this - TODO() + val filterApplied = map(predicate) + .withColumn( + getUniqueNewColumnName(), + monotonicallyIncreasingId(), + ) + + if (filterApplied.all { it.getBoolean(0) }) + return limit(0) + + if (filterApplied.all { !it.getBoolean(0) }) + return this + + val dropFrom = filterApplied + .lastOrNull { !it.getBoolean(0) } + ?.getLong(1) + ?: -1L + + return dropLast(count().toInt() - (dropFrom.toInt() + 1)) } /** @@ -407,8 +437,21 @@ inline fun Dataset.dropLastWhile(predicate: (T) -> Boolean): Dataset { * TODO Add plugin toIterable warning */ inline fun Dataset.dropWhile(noinline predicate: (T) -> Boolean): Dataset { - val dropUntil = map(predicate) - .withColumn(getUniqueNewColumnName(), monotonicallyIncreasingId()) + if (isEmpty) return this + + val filterApplied = map(predicate) + .withColumn( + getUniqueNewColumnName(), + monotonicallyIncreasingId(), + ) + + if (filterApplied.all { it.getBoolean(0) }) + return limit(0) + + if (filterApplied.all { !it.getBoolean(0) }) + return this + + val dropUntil = filterApplied .firstOrNull { it.getBoolean(0) } ?.getLong(1) ?: -1L @@ -416,6 +459,37 @@ inline fun Dataset.dropWhile(noinline predicate: (T) -> Boolean): return drop(dropUntil.toInt() + 1) } + +/** + * Returns `true` if collection has at least one element. + */ +fun Dataset<*>.any(): Boolean = !isEmpty + +/** + * Returns `true` if all elements match the given [predicate]. + * + * TODO plugin (!any) + */ +inline fun Dataset.all(noinline predicate: (T) -> Boolean): Boolean { + if (isEmpty) return true + + return map(predicate) + .reduceK { a, b -> a && b } +} + + +/** + * Returns `true` if at least one element matches the given [predicate]. + * + * TODO plugin note to make it faster + */ +inline fun Dataset.any(noinline predicate: (T) -> Boolean): Boolean { + if (isEmpty) return false + + return map(predicate) + .reduceK { a, b -> a || b } +} + inline fun Dataset.map(noinline func: (T) -> R): Dataset = map(MapFunction(func), encoder()) From 475482085b27773523fe47aeec333c424de40351 Mon Sep 17 00:00:00 2001 From: Jolanrensen Date: Thu, 29 Jul 2021 21:50:46 +0200 Subject: [PATCH 5/9] dropwhiles, all/any --- .../org/jetbrains/kotlinx/spark/api/ApiV1.kt | 38 +++++++++++++++++-- .../jetbrains/kotlinx/spark/api/ApiTest.kt | 4 +- 2 files changed, 36 insertions(+), 6 deletions(-) diff --git a/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt b/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt index dc08ff6b..713c051d 100644 --- a/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt +++ b/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt @@ -459,6 +459,23 @@ inline fun Dataset.dropWhile(noinline predicate: (T) -> Boolean): return drop(dropUntil.toInt() + 1) } +/** + * Returns a list containing only elements matching the given [predicate]. + * @param [predicate] function that takes the index of an element and the element itself + * and returns the result of predicate evaluation on the element. + * + */ +inline fun Dataset.filterIndexed(predicate: (index: Int, T) -> Boolean): Dataset { + val index = getUniqueNewColumnName() + + val indices = withColumn(index, monotonicallyIncreasingId()) + .selectTyped(col(index).`as`()) + + TODO() +// return filterIndexedTo(ArrayList(), predicate) +} + + /** * Returns `true` if collection has at least one element. @@ -949,11 +966,17 @@ operator fun Column.get(key: Any): Column = getItem(key) fun lit(a: Any) = functions.lit(a) /** - * Provides a type hint about the expected return value of this column. This information can + * Provides a type hint about the expected return value of this column. This information can * be used by operations such as `select` on a [Dataset] to automatically convert the * results into the correct JVM types. + * + * ``` + * val df: Dataset = ... + * val typedColumn: Dataset = df.selectTyped( col("a").`as`() ) + * ``` */ -inline fun Column.`as`(): TypedColumn = `as`(encoder()) +@Suppress("UNCHECKED_CAST") +inline fun Column.`as`(): TypedColumn = `as`(encoder()) as TypedColumn /** * Alias for [Dataset.joinWith] which passes "left" argument @@ -1068,7 +1091,7 @@ operator fun Dataset.invoke(colName: String): Column = col(colName) @Suppress("UNCHECKED_CAST") inline fun Dataset.col(column: KProperty1): TypedColumn = - col(column.name).`as`() as TypedColumn + col(column.name).`as`() /** * Returns a [Column] based on the given class attribute, not connected to a dataset. @@ -1080,7 +1103,7 @@ inline fun Dataset.col(column: KProperty1): Type */ @Suppress("UNCHECKED_CAST") inline fun col(column: KProperty1): TypedColumn = - col(column.name).`as`() as TypedColumn + col(column.name).`as`() /** * Helper function to quickly get a [TypedColumn] (or [Column]) from a dataset in a refactor-safe manner. @@ -1108,6 +1131,13 @@ fun Dataset.sort(col: KProperty1, vararg cols: KProperty1): D */ fun Dataset.showDS(numRows: Int = 20, truncate: Boolean = true) = apply { show(numRows, truncate) } +/** + * Returns a new Dataset by computing the given [Column] expressions for each element. + */ +inline fun Dataset.selectTyped( + c1: TypedColumn, +): Dataset = select(c1) + /** * Returns a new Dataset by computing the given [Column] expressions for each element. */ diff --git a/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt b/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt index 3b15069c..b094ed1a 100644 --- a/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt +++ b/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt @@ -362,7 +362,7 @@ class ApiTest : ShouldSpec({ SomeClass(intArrayOf(1, 2, 4), 5), ) - val typedColumnA: TypedColumn = dataset.col("a").`as`(encoder()) + val typedColumnA: TypedColumn = dataset.col("a").`as`() val newDS2 = dataset.selectTyped( col(SomeClass::a), // NOTE: this only works on 3.0, returning a data class with an array in it @@ -454,7 +454,7 @@ class ApiTest : ShouldSpec({ ) dataset.show() - val column = col("b").`as`() + val column = col("b").`as`() val b = dataset.where(column gt 3 and col(SomeOtherClass::c)) b.show() From d9b9067ba13a648232b6dfa731a3f5a05db51e55 Mon Sep 17 00:00:00 2001 From: Jolanrensen Date: Thu, 29 Jul 2021 22:03:41 +0200 Subject: [PATCH 6/9] updated col().`as`() behavior and added single selectTyped() variant --- .../org/jetbrains/kotlinx/spark/api/ApiV1.kt | 22 +++++++++++++++---- .../jetbrains/kotlinx/spark/api/ApiTest.kt | 15 ++++++++----- .../org/jetbrains/kotlinx/spark/api/ApiV1.kt | 21 ++++++++++++++---- .../jetbrains/kotlinx/spark/api/ApiTest.kt | 15 ++++++++----- 4 files changed, 53 insertions(+), 20 deletions(-) diff --git a/kotlin-spark-api/2.4/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt b/kotlin-spark-api/2.4/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt index 34152494..de9b7fca 100644 --- a/kotlin-spark-api/2.4/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt +++ b/kotlin-spark-api/2.4/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt @@ -647,11 +647,18 @@ operator fun Column.get(key: Any): Column = getItem(key) fun lit(a: Any) = functions.lit(a) /** - * Provides a type hint about the expected return value of this column. This information can + * Provides a type hint about the expected return value of this column. This information can * be used by operations such as `select` on a [Dataset] to automatically convert the * results into the correct JVM types. + * + * ``` + * val df: Dataset = ... + * val typedColumn: Dataset = df.selectTyped( col("a").`as`() ) + * ``` */ -inline fun Column.`as`(): TypedColumn = `as`(encoder()) +@Suppress("UNCHECKED_CAST") +inline fun Column.`as`(): TypedColumn = `as`(encoder()) as TypedColumn + /** * Alias for [Dataset.joinWith] which passes "left" argument @@ -766,7 +773,7 @@ operator fun Dataset.invoke(colName: String): Column = col(colName) @Suppress("UNCHECKED_CAST") inline fun Dataset.col(column: KProperty1): TypedColumn = - col(column.name).`as`() as TypedColumn + col(column.name).`as`() /** * Returns a [Column] based on the given class attribute, not connected to a dataset. @@ -778,7 +785,7 @@ inline fun Dataset.col(column: KProperty1): Type */ @Suppress("UNCHECKED_CAST") inline fun col(column: KProperty1): TypedColumn = - functions.col(column.name).`as`() as TypedColumn + functions.col(column.name).`as`() /** * Helper function to quickly get a [TypedColumn] (or [Column]) from a dataset in a refactor-safe manner. @@ -806,6 +813,13 @@ fun Dataset.sort(col: KProperty1, vararg cols: KProperty1): D */ fun Dataset.showDS(numRows: Int = 20, truncate: Boolean = true) = apply { show(numRows, truncate) } +/** + * Returns a new Dataset by computing the given [Column] expressions for each element. + */ +inline fun Dataset.selectTyped( + c1: TypedColumn, +): Dataset = select(c1) + /** * Returns a new Dataset by computing the given [Column] expressions for each element. */ diff --git a/kotlin-spark-api/2.4/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt b/kotlin-spark-api/2.4/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt index bae27b2e..66dbb872 100644 --- a/kotlin-spark-api/2.4/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt +++ b/kotlin-spark-api/2.4/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt @@ -339,23 +339,26 @@ class ApiTest : ShouldSpec({ SomeClass(intArrayOf(1, 2, 4), 5), ) - val typedColumnA: TypedColumn = dataset.col("a").`as`(encoder()) + val newDS1WithAs: Dataset = dataset.selectTyped( + col("b").`as`(), + ) + newDS1WithAs.show() - val newDS2 = dataset.selectTyped( + val newDS2: Dataset> = dataset.selectTyped( // col(SomeClass::a), NOTE that this doesn't work on 2.4, returnting a data class with an array in it col(SomeClass::b), col(SomeClass::b), ) newDS2.show() - val newDS3 = dataset.selectTyped( + val newDS3: Dataset> = dataset.selectTyped( col(SomeClass::b), col(SomeClass::b), col(SomeClass::b), ) newDS3.show() - val newDS4 = dataset.selectTyped( + val newDS4: Dataset> = dataset.selectTyped( col(SomeClass::b), col(SomeClass::b), col(SomeClass::b), @@ -363,7 +366,7 @@ class ApiTest : ShouldSpec({ ) newDS4.show() - val newDS5 = dataset.selectTyped( + val newDS5: Dataset> = dataset.selectTyped( col(SomeClass::b), col(SomeClass::b), col(SomeClass::b), @@ -434,7 +437,7 @@ class ApiTest : ShouldSpec({ ) dataset.show() - val column = col("b").`as`() + val column = col("b").`as`() val b = dataset.where(column gt 3 and col(SomeOtherClass::c)) b.show() diff --git a/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt b/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt index 2dde48cb..604e98d8 100644 --- a/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt +++ b/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt @@ -643,11 +643,17 @@ operator fun Column.get(key: Any): Column = getItem(key) fun lit(a: Any) = functions.lit(a) /** - * Provides a type hint about the expected return value of this column. This information can + * Provides a type hint about the expected return value of this column. This information can * be used by operations such as `select` on a [Dataset] to automatically convert the * results into the correct JVM types. + * + * ``` + * val df: Dataset = ... + * val typedColumn: Dataset = df.selectTyped( col("a").`as`() ) + * ``` */ -inline fun Column.`as`(): TypedColumn = `as`(encoder()) +@Suppress("UNCHECKED_CAST") +inline fun Column.`as`(): TypedColumn = `as`(encoder()) as TypedColumn /** * Alias for [Dataset.joinWith] which passes "left" argument @@ -762,7 +768,7 @@ operator fun Dataset.invoke(colName: String): Column = col(colName) @Suppress("UNCHECKED_CAST") inline fun Dataset.col(column: KProperty1): TypedColumn = - col(column.name).`as`() as TypedColumn + col(column.name).`as`() /** * Returns a [Column] based on the given class attribute, not connected to a dataset. @@ -774,7 +780,7 @@ inline fun Dataset.col(column: KProperty1): Type */ @Suppress("UNCHECKED_CAST") inline fun col(column: KProperty1): TypedColumn = - functions.col(column.name).`as`() as TypedColumn + functions.col(column.name).`as`() /** * Helper function to quickly get a [TypedColumn] (or [Column]) from a dataset in a refactor-safe manner. @@ -802,6 +808,13 @@ fun Dataset.sort(col: KProperty1, vararg cols: KProperty1): D */ fun Dataset.showDS(numRows: Int = 20, truncate: Boolean = true) = apply { show(numRows, truncate) } +/** + * Returns a new Dataset by computing the given [Column] expressions for each element. + */ +inline fun Dataset.selectTyped( + c1: TypedColumn, +): Dataset = select(c1) + /** * Returns a new Dataset by computing the given [Column] expressions for each element. */ diff --git a/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt b/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt index bff38ac1..a0e5c25f 100644 --- a/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt +++ b/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt @@ -362,22 +362,25 @@ class ApiTest : ShouldSpec({ SomeClass(intArrayOf(1, 2, 4), 5), ) - val typedColumnA: TypedColumn = dataset.col("a").`as`(encoder()) + val newDS1WithAs: Dataset = dataset.selectTyped( + col("a").`as`(), + ) + newDS1WithAs.show() - val newDS2 = dataset.selectTyped( + val newDS2: Dataset> = dataset.selectTyped( col(SomeClass::a), // NOTE: this only works on 3.0, returning a data class with an array in it col(SomeClass::b), ) newDS2.show() - val newDS3 = dataset.selectTyped( + val newDS3: Dataset> = dataset.selectTyped( col(SomeClass::a), col(SomeClass::b), col(SomeClass::b), ) newDS3.show() - val newDS4 = dataset.selectTyped( + val newDS4: Dataset> = dataset.selectTyped( col(SomeClass::a), col(SomeClass::b), col(SomeClass::b), @@ -385,7 +388,7 @@ class ApiTest : ShouldSpec({ ) newDS4.show() - val newDS5 = dataset.selectTyped( + val newDS5: Dataset> = dataset.selectTyped( col(SomeClass::a), col(SomeClass::b), col(SomeClass::b), @@ -454,7 +457,7 @@ class ApiTest : ShouldSpec({ ) dataset.show() - val column = col("b").`as`() + val column = col("b").`as`() val b = dataset.where(column gt 3 and col(SomeOtherClass::c)) b.show() From eae9cb4a65e47de3236deb1b4683ed37abe84b06 Mon Sep 17 00:00:00 2001 From: Jolanrensen Date: Thu, 29 Jul 2021 22:36:30 +0200 Subject: [PATCH 7/9] reverted col().`as`<>() behavior for simplicity. Updated selectTyped to accept any TypedColumn --- .../org/jetbrains/kotlinx/spark/api/ApiV1.kt | 72 +++++++++++------- .../jetbrains/kotlinx/spark/api/ApiTest.kt | 4 +- .../org/jetbrains/kotlinx/spark/api/ApiV1.kt | 74 ++++++++++++------- .../jetbrains/kotlinx/spark/api/ApiTest.kt | 4 +- 4 files changed, 99 insertions(+), 55 deletions(-) diff --git a/kotlin-spark-api/2.4/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt b/kotlin-spark-api/2.4/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt index de9b7fca..122e1747 100644 --- a/kotlin-spark-api/2.4/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt +++ b/kotlin-spark-api/2.4/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt @@ -653,11 +653,11 @@ fun lit(a: Any) = functions.lit(a) * * ``` * val df: Dataset = ... - * val typedColumn: Dataset = df.selectTyped( col("a").`as`() ) + * val typedColumn: Dataset = df.selectTyped( col("a").`as`() ) * ``` */ @Suppress("UNCHECKED_CAST") -inline fun Column.`as`(): TypedColumn = `as`(encoder()) as TypedColumn +inline fun Column.`as`(): TypedColumn = `as`(encoder()) /** @@ -773,7 +773,7 @@ operator fun Dataset.invoke(colName: String): Column = col(colName) @Suppress("UNCHECKED_CAST") inline fun Dataset.col(column: KProperty1): TypedColumn = - col(column.name).`as`() + col(column.name).`as`() as TypedColumn /** * Returns a [Column] based on the given class attribute, not connected to a dataset. @@ -785,7 +785,7 @@ inline fun Dataset.col(column: KProperty1): Type */ @Suppress("UNCHECKED_CAST") inline fun col(column: KProperty1): TypedColumn = - functions.col(column.name).`as`() + functions.col(column.name).`as`() as TypedColumn /** * Helper function to quickly get a [TypedColumn] (or [Column]) from a dataset in a refactor-safe manner. @@ -816,52 +816,74 @@ fun Dataset.showDS(numRows: Int = 20, truncate: Boolean = true) = apply { /** * Returns a new Dataset by computing the given [Column] expressions for each element. */ +@Suppress("UNCHECKED_CAST") inline fun Dataset.selectTyped( - c1: TypedColumn, -): Dataset = select(c1) + c1: TypedColumn, +): Dataset = select(c1 as TypedColumn) /** * Returns a new Dataset by computing the given [Column] expressions for each element. */ +@Suppress("UNCHECKED_CAST") inline fun Dataset.selectTyped( - c1: TypedColumn, - c2: TypedColumn, + c1: TypedColumn, + c2: TypedColumn, ): Dataset> = - select(c1, c2).map { Pair(it._1(), it._2()) } + select( + c1 as TypedColumn, + c2 as TypedColumn, + ).map { Pair(it._1(), it._2()) } /** * Returns a new Dataset by computing the given [Column] expressions for each element. */ +@Suppress("UNCHECKED_CAST") inline fun Dataset.selectTyped( - c1: TypedColumn, - c2: TypedColumn, - c3: TypedColumn, + c1: TypedColumn, + c2: TypedColumn, + c3: TypedColumn, ): Dataset> = - select(c1, c2, c3).map { Triple(it._1(), it._2(), it._3()) } + select( + c1 as TypedColumn, + c2 as TypedColumn, + c3 as TypedColumn, + ).map { Triple(it._1(), it._2(), it._3()) } /** * Returns a new Dataset by computing the given [Column] expressions for each element. */ +@Suppress("UNCHECKED_CAST") inline fun Dataset.selectTyped( - c1: TypedColumn, - c2: TypedColumn, - c3: TypedColumn, - c4: TypedColumn, + c1: TypedColumn, + c2: TypedColumn, + c3: TypedColumn, + c4: TypedColumn, ): Dataset> = - select(c1, c2, c3, c4).map { Arity4(it._1(), it._2(), it._3(), it._4()) } + select( + c1 as TypedColumn, + c2 as TypedColumn, + c3 as TypedColumn, + c4 as TypedColumn, + ).map { Arity4(it._1(), it._2(), it._3(), it._4()) } /** * Returns a new Dataset by computing the given [Column] expressions for each element. */ +@Suppress("UNCHECKED_CAST") inline fun Dataset.selectTyped( - c1: TypedColumn, - c2: TypedColumn, - c3: TypedColumn, - c4: TypedColumn, - c5: TypedColumn, + c1: TypedColumn, + c2: TypedColumn, + c3: TypedColumn, + c4: TypedColumn, + c5: TypedColumn, ): Dataset> = - select(c1, c2, c3, c4, c5).map { Arity5(it._1(), it._2(), it._3(), it._4(), it._5()) } - + select( + c1 as TypedColumn, + c2 as TypedColumn, + c3 as TypedColumn, + c4 as TypedColumn, + c5 as TypedColumn, + ).map { Arity5(it._1(), it._2(), it._3(), it._4(), it._5()) } @OptIn(ExperimentalStdlibApi::class) inline fun schema(map: Map = mapOf()) = schema(typeOf(), map) diff --git a/kotlin-spark-api/2.4/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt b/kotlin-spark-api/2.4/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt index 66dbb872..ec8a6e14 100644 --- a/kotlin-spark-api/2.4/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt +++ b/kotlin-spark-api/2.4/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt @@ -340,7 +340,7 @@ class ApiTest : ShouldSpec({ ) val newDS1WithAs: Dataset = dataset.selectTyped( - col("b").`as`(), + col("b").`as`(), ) newDS1WithAs.show() @@ -437,7 +437,7 @@ class ApiTest : ShouldSpec({ ) dataset.show() - val column = col("b").`as`() + val column = col("b").`as`() val b = dataset.where(column gt 3 and col(SomeOtherClass::c)) b.show() diff --git a/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt b/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt index 604e98d8..6b59c272 100644 --- a/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt +++ b/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt @@ -649,11 +649,11 @@ fun lit(a: Any) = functions.lit(a) * * ``` * val df: Dataset = ... - * val typedColumn: Dataset = df.selectTyped( col("a").`as`() ) + * val typedColumn: Dataset = df.selectTyped( col("a").`as`() ) * ``` */ @Suppress("UNCHECKED_CAST") -inline fun Column.`as`(): TypedColumn = `as`(encoder()) as TypedColumn +inline fun Column.`as`(): TypedColumn = `as`(encoder()) /** * Alias for [Dataset.joinWith] which passes "left" argument @@ -768,19 +768,18 @@ operator fun Dataset.invoke(colName: String): Column = col(colName) @Suppress("UNCHECKED_CAST") inline fun Dataset.col(column: KProperty1): TypedColumn = - col(column.name).`as`() + col(column.name).`as`() as TypedColumn /** * Returns a [Column] based on the given class attribute, not connected to a dataset. * ```kotlin * val dataset: Dataset = ... - * val new: Dataset> = dataset.select( col(YourClass::a), col(YourClass::b) ) + * val new: Dataset> = dataset.select( col(YourClass::a), col(YourClass::b) ) * ``` - * TODO: change example to [Pair]s when merged */ @Suppress("UNCHECKED_CAST") inline fun col(column: KProperty1): TypedColumn = - functions.col(column.name).`as`() + functions.col(column.name).`as`() as TypedColumn /** * Helper function to quickly get a [TypedColumn] (or [Column]) from a dataset in a refactor-safe manner. @@ -811,51 +810,74 @@ fun Dataset.showDS(numRows: Int = 20, truncate: Boolean = true) = apply { /** * Returns a new Dataset by computing the given [Column] expressions for each element. */ +@Suppress("UNCHECKED_CAST") inline fun Dataset.selectTyped( - c1: TypedColumn, -): Dataset = select(c1) + c1: TypedColumn, +): Dataset = select(c1 as TypedColumn) /** * Returns a new Dataset by computing the given [Column] expressions for each element. */ +@Suppress("UNCHECKED_CAST") inline fun Dataset.selectTyped( - c1: TypedColumn, - c2: TypedColumn, + c1: TypedColumn, + c2: TypedColumn, ): Dataset> = - select(c1, c2).map { Pair(it._1(), it._2()) } + select( + c1 as TypedColumn, + c2 as TypedColumn, + ).map { Pair(it._1(), it._2()) } /** * Returns a new Dataset by computing the given [Column] expressions for each element. */ +@Suppress("UNCHECKED_CAST") inline fun Dataset.selectTyped( - c1: TypedColumn, - c2: TypedColumn, - c3: TypedColumn, + c1: TypedColumn, + c2: TypedColumn, + c3: TypedColumn, ): Dataset> = - select(c1, c2, c3).map { Triple(it._1(), it._2(), it._3()) } + select( + c1 as TypedColumn, + c2 as TypedColumn, + c3 as TypedColumn, + ).map { Triple(it._1(), it._2(), it._3()) } /** * Returns a new Dataset by computing the given [Column] expressions for each element. */ +@Suppress("UNCHECKED_CAST") inline fun Dataset.selectTyped( - c1: TypedColumn, - c2: TypedColumn, - c3: TypedColumn, - c4: TypedColumn, + c1: TypedColumn, + c2: TypedColumn, + c3: TypedColumn, + c4: TypedColumn, ): Dataset> = - select(c1, c2, c3, c4).map { Arity4(it._1(), it._2(), it._3(), it._4()) } + select( + c1 as TypedColumn, + c2 as TypedColumn, + c3 as TypedColumn, + c4 as TypedColumn, + ).map { Arity4(it._1(), it._2(), it._3(), it._4()) } /** * Returns a new Dataset by computing the given [Column] expressions for each element. */ +@Suppress("UNCHECKED_CAST") inline fun Dataset.selectTyped( - c1: TypedColumn, - c2: TypedColumn, - c3: TypedColumn, - c4: TypedColumn, - c5: TypedColumn, + c1: TypedColumn, + c2: TypedColumn, + c3: TypedColumn, + c4: TypedColumn, + c5: TypedColumn, ): Dataset> = - select(c1, c2, c3, c4, c5).map { Arity5(it._1(), it._2(), it._3(), it._4(), it._5()) } + select( + c1 as TypedColumn, + c2 as TypedColumn, + c3 as TypedColumn, + c4 as TypedColumn, + c5 as TypedColumn, + ).map { Arity5(it._1(), it._2(), it._3(), it._4(), it._5()) } @OptIn(ExperimentalStdlibApi::class) diff --git a/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt b/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt index a0e5c25f..0b9c01a9 100644 --- a/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt +++ b/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt @@ -363,7 +363,7 @@ class ApiTest : ShouldSpec({ ) val newDS1WithAs: Dataset = dataset.selectTyped( - col("a").`as`(), + col("a").`as`(), ) newDS1WithAs.show() @@ -457,7 +457,7 @@ class ApiTest : ShouldSpec({ ) dataset.show() - val column = col("b").`as`() + val column = col("b").`as`() val b = dataset.where(column gt 3 and col(SomeOtherClass::c)) b.show() From 65ecd39f3d0dd5043ed4072b6af0cc97ea5750d7 Mon Sep 17 00:00:00 2001 From: Jolanrensen Date: Thu, 29 Jul 2021 22:57:25 +0200 Subject: [PATCH 8/9] updated readme --- README.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/README.md b/README.md index c6b11f71..3ab69c55 100644 --- a/README.md +++ b/README.md @@ -192,6 +192,9 @@ to create `TypedColumn`s and with those a new Dataset from pieces of another usi ```kotlin val dataset: Dataset = ... val newDataset: Dataset> = dataset.selectTyped(col(YourClass::colA), col(YourClass::colB)) + +// Alternatively, for instance when working with a Dataset +val typedDataset: Dataset> = otherDataset.selectTyped(col("a").`as`(), col("b").`as`()) ``` ### Overload resolution ambiguity From 24c48ba3f50365a75cde6aca4bc0248586957753 Mon Sep 17 00:00:00 2001 From: Jolanrensen Date: Fri, 30 Jul 2021 14:30:19 +0200 Subject: [PATCH 9/9] working on filterIndexed --- .../org/jetbrains/kotlinx/spark/api/ApiV1.kt | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt b/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt index c2bbe6a8..695d9c32 100644 --- a/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt +++ b/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt @@ -465,14 +465,15 @@ inline fun Dataset.dropWhile(noinline predicate: (T) -> Boolean): * and returns the result of predicate evaluation on the element. * */ -inline fun Dataset.filterIndexed(predicate: (index: Int, T) -> Boolean): Dataset { - val index = getUniqueNewColumnName() - - val indices = withColumn(index, monotonicallyIncreasingId()) - .selectTyped(col(index).`as`()) - +inline fun Dataset.filterIndexed(crossinline predicate: (index: Long, T) -> Boolean): Dataset { TODO() -// return filterIndexedTo(ArrayList(), predicate) + val indices = selectTyped(monotonicallyIncreasingId().`as`()) + // TODO this needs to zip, not join + val joined = indices.leftJoin(this, col(indices.columns().first()) neq -1L) + val filterResults = joined.map { (index, value) -> predicate(index, value!!) } + val filtered = selectTyped(col(filterResults.columns().first()).`as`()) + + return filtered }