Skip to content

Commit

Permalink
[GSProcessing] Fix support for large integer values for noop transfom…
Browse files Browse the repository at this point in the history
…ation (#707)

For now we also parse all numerical CSV values as float64. In the future
we'll allow the use the option to choose, defaulting at float32.

*Issue #, if available:* Fixes #706

*Description of changes:*

* For CSV numerical feature input always parse as float64. We plan to
allow this to be user-defined in the future.
* For CSV regression label input always parse as float64. We plan to
allow this to be user-defined in the future.
* For Parquet numerical feature input for noop transformation, use the
input's data type, to avoid precision loss by converting to float32.
* For CSV numerical input for noop transformation (which is parsed as
string), output to float64.

*How this was tested*:

* Added relevant unit test
* Tested on generated graph dataset with integer features with values up
to 80M, results shown below

<img width="1094" alt="image"
src="https://github.com/awslabs/graphstorm/assets/9048995/772dc549-fd09-416c-bf73-71fbf11e634b">


By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice.
  • Loading branch information
thvasilo authored Jan 22, 2024
1 parent a34755d commit 9bb4232
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@

from pyspark.sql import DataFrame
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, FloatType, NumericType
from pyspark.sql.types import ArrayType, DoubleType, NumericType

from graphstorm_processing.constants import SPECIAL_CHARACTERS
from .base_dist_transformation import DistributedTransformation


class NoopTransformation(DistributedTransformation):
"""A no-op transformation that parses data as floats or lists of floats
and forwards the result withouth any processing.
"""A no-op transformation that parses data as numerical values
and forwards the result without any processing.
For CSV input that contains numerical array rows that use a separator character,
this transformation splits the values into a vector of floats, e.g. "1|2|3"
this transformation splits the values into a vector of doubles, e.g. "1|2|3"
becomes a vector [1.0, 2.0, 3.0].
Parameters
Expand Down Expand Up @@ -56,7 +56,7 @@ def apply(self, input_df: DataFrame) -> DataFrame:
initialization of the transformation.
"""

# If the incoming DataFrame has numerical array rows, just return it.
# If the incoming DataFrame has numerical [array] rows, just return it.
col_datatype = input_df.schema[self.cols[0]].dataType
if col_datatype.typeName() == "array":
assert isinstance(col_datatype, ArrayType)
Expand All @@ -66,14 +66,18 @@ def apply(self, input_df: DataFrame) -> DataFrame:
f"for column {self.cols[0]}"
)
return input_df
elif isinstance(col_datatype, NumericType):
return input_df

# Otherwise we'll try to convert the values from list of strings to list of Doubles

def str_list_to_float_vec(string_list: Optional[List[str]]) -> Optional[List[float]]:
if string_list:
return [float(x) for x in string_list]
return None

strvec_to_float_vec_udf = F.udf(
str_list_to_float_vec, ArrayType(FloatType(), containsNull=False)
str_list_to_float_vec, ArrayType(DoubleType(), containsNull=False)
)

if self.separator:
Expand All @@ -87,7 +91,7 @@ def str_list_to_float_vec(string_list: Optional[List[str]]) -> Optional[List[flo
return input_df
else:
return input_df.select(
[F.col(column).cast(FloatType()).alias(column) for column in self.cols]
[F.col(column).cast(DoubleType()).alias(column) for column in self.cols]
)

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,7 @@
import logging
from typing import Sequence, List, Type

from pyspark.sql.types import (
StructType,
StructField,
StringType,
DataType,
FloatType,
)
from pyspark.sql.types import StructType, StructField, StringType, DataType, DoubleType

from ..config.config_parser import EdgeConfig, NodeConfig
from ..config.label_config_base import LabelConfig
Expand Down Expand Up @@ -99,8 +93,8 @@ def determine_spark_feature_type(feature_type: str) -> Type[DataType]:
"multi-categorical",
] or feature_type.startswith("text"):
return StringType
if feature_type in ["numerical", "bucket-numerical", "none"]:
return FloatType
if feature_type in ["numerical", "bucket-numerical"]:
return DoubleType
else:
raise NotImplementedError(f"Unknown feature type: {feature_type}")

Expand All @@ -115,7 +109,7 @@ def _parse_edge_labels_schema(edge_labels_objects: Sequence[LabelConfig]) -> Seq
if target_task_type == "classification":
field_list.append(StructField(label_col, StringType(), True))
elif target_task_type == "regression":
field_list.append(StructField(label_col, FloatType(), True))
field_list.append(StructField(label_col, DoubleType(), True))
elif target_task_type == "link_prediction" and label_col:
logging.info(
"Bypassing edge label %s, as it is only used for link prediction", label_col
Expand Down Expand Up @@ -163,6 +157,6 @@ def _parse_node_labels_schema(node_labels_objects: List[LabelConfig]) -> Sequenc
# Could be ints, would that be an issue?
field_list.append(StructField(label_col, StringType(), True))
elif target_task_type == "regression":
field_list.append(StructField(label_col, FloatType(), True))
field_list.append(StructField(label_col, DoubleType(), True))

return field_list
28 changes: 27 additions & 1 deletion graphstorm-processing/tests/test_dist_noop_transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from numpy.testing import assert_array_equal
from pyspark.sql import functions as F, DataFrame, SparkSession

from pyspark.sql.types import ArrayType, IntegerType, StructField, StructType
from pyspark.sql.types import ArrayType, IntegerType, LongType, StructField, StructType

from graphstorm_processing.data_transformations.dist_transformations import NoopTransformation

Expand Down Expand Up @@ -74,3 +74,29 @@ def test_noop_floatvector_transformation(spark: SparkSession, check_df_schema):
transformed_values = [row[col_name] for row in transformed_df.collect()]

assert_array_equal(expected_values, transformed_values)


def test_noop_largegint_transformation(spark: SparkSession, check_df_schema):
"""No-op transformation for long numerical columns"""
large_int = 4 * 10**18
data = [
([[large_int, large_int + 1]]),
([[large_int + 2, large_int + 3]]),
([[large_int + 4, large_int + 5]]),
([[large_int + 6, large_int + 7]]),
([[large_int + 8, large_int + 9]]),
]

col_name = "feat"
schema = StructType([StructField("feat", ArrayType(LongType(), True), True)])
vec_df = spark.createDataFrame(data, schema=schema)

noop_transfomer = NoopTransformation([col_name])

transformed_df = noop_transfomer.apply(vec_df)

check_df_schema(transformed_df)

transformed_values = [row[col_name] for row in transformed_df.collect()]

assert_array_equal([val[0] for val in data], transformed_values)

0 comments on commit 9bb4232

Please sign in to comment.