Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

No Encoder found for java.time.Instant #88

Open
Lihe-Ma opened this issue Aug 16, 2019 · 5 comments
Open

No Encoder found for java.time.Instant #88

Lihe-Ma opened this issue Aug 16, 2019 · 5 comments
Assignees

Comments

@Lihe-Ma
Copy link

Lihe-Ma commented Aug 16, 2019

I have some troubles when upgrade my code to the latest sdk version 0.4.2 , but it was fine when using 0.2.1 before, here is my code:

    val events = spark.read.textFile(mySource)
      .map(line => Event.parse(line))
      .filter(_.isValid)
      .flatMap(_.toOption)    
      .map(event => event.toJson(true).noSpaces)
    val dataframe = spark.read.json(events)
    dataframe.show(100, false)

then I got error below :

Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for java.time.Instant
option value class: "java.time.Instant"
field (class: "scala.Option", name: "etl_tstamp")
root class: "com.snowplowanalytics.snowplow.analytics.scalasdk.Event"
	at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:650)
	at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:452)
	at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
	at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:906)
	at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:46)
	at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:452)
	at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:494)
	at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:452)
	at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
	at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:906)
	at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:46)
	at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:452)
	at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1$$anonfun$8.apply(ScalaReflection.scala:644)
	at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1$$anonfun$8.apply(ScalaReflection.scala:632)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
	at scala.collection.immutable.List.flatMap(List.scala:355)
	at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:632)
	at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:452)
	at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
	at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:906)
	at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:46)
	at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:452)
	at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:441)
	at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:71)
	at org.apache.spark.sql.Encoders$.product(Encoders.scala:275)
	at org.apache.spark.sql.LowPrioritySQLImplicits$class.newProductEncoder(SQLImplicits.scala:248)
	at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:34)
	at org.mlh.hello.myTest$.main(myTest.scala:16)
	at org.mlh.hello.myTest.main(myTest.scala)

line 16 is .flatMap(_.toOption) , it seems that the Instant type in Event class cannnot be dealt correctly.
Is there any ideas ?

@chuwy
Copy link
Contributor

chuwy commented Aug 16, 2019

Hello @take-it-out,

What version of Scala/Spark are you using? The exception is quite strange because it tells us Spark cannot find its own encoder for Instant whereas Instant is encoded into string by Analytics SDK and circe (the JSON library) themselves, so no Spark serialization should be involved.

(Just in case - you don't need to use .isValid because .toOption will throw away invalid values anyway)

@Lihe-Ma
Copy link
Author

Lihe-Ma commented Aug 18, 2019

thanks for you reply, I am using scala 2.12.8 and spark 2.4.0 .

@chuwy
Copy link
Contributor

chuwy commented Aug 19, 2019

Thanks, we'll try out these versions (don't think we did). Meanwhile, could you please try this code:

val events = spark.read.textFile(mySource)
  .flatMap(line => Event.parse(line).toOption.map(_.toJson(true).noSpaces))

Because I was wrong saying that "Spark cannot find its own encoder for Instant whereas Instant is encoded into string by Analytics SDK" - in your code Instant exists between RDD.map invocations and hence has to be serialized.

@Lihe-Ma
Copy link
Author

Lihe-Ma commented Aug 19, 2019

@chuwy thx, it works.

@Lihe-Ma Lihe-Ma closed this as completed Aug 22, 2019
@chuwy
Copy link
Contributor

chuwy commented Aug 23, 2019

Hey @take-it-out, hope you don't mind if it stays open for a bit. My concern is that our production systems use Analytics SDK in a way you shown with Spark 2.3, but I'm wondering why it doesn't work on 2.4.

@chuwy chuwy reopened this Aug 23, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants