Skip to content

Commit

Permalink
SNOW-1333980 Support Read Structured Type Values (#104)
Browse files Browse the repository at this point in the history
* fix type name

* array v2

* map type

* object

* update JDBC

* support structed array

* map type

* support map type

* struct type

* structure type

* tmp

* tmp

* support date

* time

* complete show

* decimal

* reorg

* add test

* add test

* show string

* result test

* handle object

* show string

* tmp

* fix test

* fix getObject

* fix test

* row get array

* support map

* add scala doc

* java array

* java map

* java api

* fix checker

* remove useless code

* fix java test

* fix time zone

* fix time zone
  • Loading branch information
sfc-gh-bli authored May 13, 2024
1 parent fcc29b2 commit c0f9785
Show file tree
Hide file tree
Showing 9 changed files with 633 additions and 48 deletions.
75 changes: 62 additions & 13 deletions src/main/java/com/snowflake/snowpark_java/Row.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,26 +132,42 @@ public int hashCode() {
* @return The value of the column at the given index
*/
public Object get(int index) {
Object result = scalaRow.get(index);
if (result instanceof com.snowflake.snowpark.types.Variant) {
return InternalUtils.createVariant((com.snowflake.snowpark.types.Variant) result);
} else if (result instanceof com.snowflake.snowpark.types.Geography) {
return Geography.fromGeoJSON(((com.snowflake.snowpark.types.Geography) result).asGeoJSON());
} else if (result instanceof com.snowflake.snowpark.types.Geometry) {
return Geometry.fromGeoJSON(result.toString());
} else if (result instanceof com.snowflake.snowpark.types.Variant[]) {
return toJavaValue(scalaRow.get(index));
}

private static Object toJavaValue(Object value) {
if (value instanceof com.snowflake.snowpark.types.Variant) {
return InternalUtils.createVariant((com.snowflake.snowpark.types.Variant) value);
} else if (value instanceof com.snowflake.snowpark.types.Geography) {
return Geography.fromGeoJSON(((com.snowflake.snowpark.types.Geography) value).asGeoJSON());
} else if (value instanceof com.snowflake.snowpark.types.Geometry) {
return Geometry.fromGeoJSON(value.toString());
} else if (value instanceof com.snowflake.snowpark.types.Variant[]) {
com.snowflake.snowpark.types.Variant[] scalaVariantArray =
(com.snowflake.snowpark.types.Variant[]) result;
(com.snowflake.snowpark.types.Variant[]) value;
Variant[] resultArray = new Variant[scalaVariantArray.length];
for (int idx = 0; idx < scalaVariantArray.length; idx++) {
resultArray[idx] = InternalUtils.createVariant(scalaVariantArray[idx]);
}
return resultArray;
} else if (result instanceof scala.collection.immutable.Map<?, ?>) {
return JavaUtils.scalaMapToJavaWithVariantConversion(
(scala.collection.immutable.Map<Object, Object>) result);
} else if (value instanceof scala.collection.immutable.Map<?, ?>) {
scala.collection.immutable.Map<?, ?> input = (scala.collection.immutable.Map<?, ?>) value;
Map<Object, Object> result = new HashMap<>();
// key is either Long or String, no need to convert values
input.foreach(x -> result.put(x._1, toJavaValue(x._2)));
return result;
} else if (value instanceof Object[]) {
Object[] arr = (Object[]) value;
List<Object> result = new ArrayList<>(arr.length);
for (Object x : arr) {
result.add(toJavaValue(x));
}
return result;
} else if (value instanceof com.snowflake.snowpark.Row) {
return new Row((com.snowflake.snowpark.Row) value);
} else {
return value;
}
return result;
}

/**
Expand Down Expand Up @@ -376,6 +392,39 @@ public Map<String, Variant> getMapOfVariant(int index) {
return result;
}

/**
* Retrieves the value of the column at the given index as a list of Object.
*
* @param index The index of target column
* @return A list of Object
* @since 1.13.0
*/
public List<?> getList(int index) {
return (List<?>) get(index);
}

/**
* Retrieves the value of the column at the given index as a Java Map
*
* @param index The index of target column
* @return A Java Map
* @since 1.13.0
*/
public Map<?, ?> getMap(int index) {
return (Map<?, ?>) get(index);
}

/**
* Retrieves the value of the column at the given index as a Row
*
* @param index The index of target column
* @return A Row
* @since 1.13.0
*/
public Row getObject(int index) {
return (Row) get(index);
}

/**
* Generates a string value to represent the content of this row.
*
Expand Down
26 changes: 21 additions & 5 deletions src/main/scala/com/snowflake/snowpark/DataFrame.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2369,17 +2369,33 @@ class DataFrame private[snowpark] (
lines
}

def convertValueToString(value: Any): String =
value match {
case map: Map[_, _] =>
map
.map {
case (key, value) => s"${convertValueToString(key)}:${convertValueToString(value)}"
}
.mkString("{", ",", "}")
case ba: Array[Byte] => s"'${DatatypeConverter.printHexBinary(ba)}'"
case bytes: Array[java.lang.Byte] =>
s"'${DatatypeConverter.printHexBinary(bytes.map(_.toByte))}'"
case arr: Array[String] =>
arr.mkString("[", ",", "]")
case arr: Array[_] =>
arr.map(convertValueToString).mkString("[", ",", "]")
case arr: java.sql.Array =>
arr.getArray().asInstanceOf[Array[_]].map(convertValueToString).mkString("[", ",", "]")
case _ => value.toString
}

val body: Seq[Seq[String]] = result.flatMap(row => {
// Value may contain multiple lines
val lines: Seq[Seq[String]] = row.toSeq.zipWithIndex.map {
case (value, index) =>
val texts: Seq[String] = if (value != null) {
val str = value match {
case ba: Array[Byte] => s"'${DatatypeConverter.printHexBinary(ba)}'"
case _ => value.toString
}
// if the result contains multiple lines, split result string
splitLines(str)
splitLines(convertValueToString(value))
} else {
Seq("NULL")
}
Expand Down
73 changes: 67 additions & 6 deletions src/main/scala/com/snowflake/snowpark/Row.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import java.sql.{Date, Time, Timestamp}
import com.snowflake.snowpark.internal.ErrorMessage
import com.snowflake.snowpark.types.{Geography, Geometry, Variant}

import scala.reflect.ClassTag
import scala.util.hashing.MurmurHash3

/**
Expand All @@ -28,6 +29,15 @@ object Row {
* @since 0.2.0
*/
def fromArray(values: Array[Any]): Row = new Row(values)

private[snowpark] def fromMap(map: Map[String, Any]): Row =
new SnowflakeObject(map)
}

private[snowpark] class SnowflakeObject private[snowpark] (
private[snowpark] val map: Map[String, Any])
extends Row(map.values.toArray) {
override def toString: String = convertValueToString(this)
}

/**
Expand All @@ -37,7 +47,7 @@ object Row {
* @groupname utl Utility Functions
* @since 0.1.0
*/
class Row private (values: Array[Any]) extends Serializable {
class Row protected (values: Array[Any]) extends Serializable {

/**
* Converts this [[Row]] to a Seq
Expand Down Expand Up @@ -325,18 +335,69 @@ class Row private (values: Array[Any]) extends Serializable {
def getMapOfVariant(index: Int): Map[String, Variant] =
new Variant(getString(index)).asMap()

/**
* Returns the Snowflake Object value at the given index as a Row value.
*
* @since 1.13.0
* @group getter
*/
def getObject(index: Int): Row =
getAs[Row](index)

/**
* Returns the value of the column at the given index as a Seq value.
*
* @since 1.13.0
* @group getter
*/
def getSeq[T](index: Int): Seq[T] = {
val result = getAs[Array[_]](index)
result.map {
case x: T => x
}
}

/**
* Returns the value of the column at the given index as a Map value.
*
* @since 1.13.0
* @group getter
*/
def getMap[T, U](index: Int): Map[T, U] = {
getAs[Map[T, U]](index)
}

protected def convertValueToString(value: Any): String =
value match {
case null => "null"
case map: Map[_, _] =>
map
.map {
case (key, value) => s"${convertValueToString(key)}:${convertValueToString(value)}"
}
.mkString("Map(", ",", ")")
case binary: Array[Byte] => s"Binary(${binary.mkString(",")})"
case strValue: String => s""""$strValue""""
case arr: Array[_] =>
arr.map(convertValueToString).mkString("Array(", ",", ")")
case obj: SnowflakeObject =>
obj.map
.map {
case (key, value) =>
s"$key:${convertValueToString(value)}"
}
.mkString("Object(", ",", ")")
case other => other.toString
}

/**
* Returns a string value to represent the content of this row
* @since 0.1.0
* @group utl
*/
override def toString: String =
values
.map {
case null => "null"
case binary: Array[Byte] => s"Binary(${binary.mkString(",")})"
case other => other.toString
}
.map(convertValueToString)
.mkString("Row[", ",", "]")

private def getAs[T](index: Int): T = get(index).asInstanceOf[T]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ object JavaDataTypeUtils {
case TimestampType => JDataTypes.TimestampType
case TimeType => JDataTypes.TimeType
case VariantType => JDataTypes.VariantType
case st: StructType =>
com.snowflake.snowpark_java.types.InternalUtils.createStructType(st)
}

def javaTypeToScalaType(jDataType: JDataType): DataType =
Expand Down
Loading

0 comments on commit c0f9785

Please sign in to comment.