From 7e8e957e50bd53ec7e7b24f1dcbe107f08998625 Mon Sep 17 00:00:00 2001 From: bravo-zhang Date: Thu, 28 Jun 2018 19:35:11 -0700 Subject: [PATCH] Spark: correctly log records received. --- spark/src/main/scala/com/metamx/tranquility/spark/BeamRDD.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/spark/src/main/scala/com/metamx/tranquility/spark/BeamRDD.scala b/spark/src/main/scala/com/metamx/tranquility/spark/BeamRDD.scala index 976f018..25e7075 100644 --- a/spark/src/main/scala/com/metamx/tranquility/spark/BeamRDD.scala +++ b/spark/src/main/scala/com/metamx/tranquility/spark/BeamRDD.scala @@ -48,6 +48,7 @@ class BeamRDD[T: ClassTag](rdd: RDD[T]) extends Logging with Serializable val exception = new AtomicReference[Throwable] for (record <- partitionOfRecords) { + received.incrementAndGet() sender.send(record) respond { case Return(_) => sent.incrementAndGet() case Throw(e: MessageDroppedException) => // Suppress