diff --git a/spark/src/main/scala/com/metamx/tranquility/spark/BeamRDD.scala b/spark/src/main/scala/com/metamx/tranquility/spark/BeamRDD.scala index 976f018..25e7075 100644 --- a/spark/src/main/scala/com/metamx/tranquility/spark/BeamRDD.scala +++ b/spark/src/main/scala/com/metamx/tranquility/spark/BeamRDD.scala @@ -48,6 +48,7 @@ class BeamRDD[T: ClassTag](rdd: RDD[T]) extends Logging with Serializable val exception = new AtomicReference[Throwable] for (record <- partitionOfRecords) { + received.incrementAndGet() sender.send(record) respond { case Return(_) => sent.incrementAndGet() case Throw(e: MessageDroppedException) => // Suppress