Skip to content

Commit

Permalink
feat: Add cancellable future utility (#34)
Browse files Browse the repository at this point in the history
  • Loading branch information
mdedetrich authored Apr 4, 2022
1 parent 01e93ee commit 5aa58ab
Show file tree
Hide file tree
Showing 6 changed files with 279 additions and 0 deletions.
22 changes: 22 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -315,3 +315,25 @@ val result: Future[Int] = retryWithBackOff(5, 5.seconds) {
callThatService(): Future[Int]
}
```

### CancelableFuture

If you need to create a ```Future``` that you want to cancel at a later point in time then
you can use a `CancelableFuture`.

```scala
import markatta.futiles.CancellableFuture

val cancellableFuture = CancellableFuture {
someLongOperation()
}

cancellableFuture.cancel()

```

Note that the `.cancel` method on `CancelableFuture` is a best effort implementation and
it also does not handle cleaning up of resources (such as file handles) since further
computations deriving from `.map`/`.flatMap`/`onComplete` may not execute. If a
`CancelableFuture` was cancelled this way it will fail with a `CancellationException`
exception.
55 changes: 55 additions & 0 deletions src/main/scala-2.11/markatta/futiles/CancellableFutureImpl.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package markatta.futiles

import java.util.concurrent.{Callable, FutureTask}
import scala.concurrent._
import scala.concurrent.duration.Duration
import scala.util.Try

private[futiles] final class CancellableFutureImpl[T](executionContext: ExecutionContext, block: => T)
extends CancellableFuture[T] {
private val promise = Promise[T]()

def delegate: Future[T] = promise.future

private val jf: FutureTask[T] = new FutureTask[T](
new Callable[T] {
override def call(): T = block
}
) {
override def done(): Unit = promise.complete(
Try(
try
get()
catch {
case e: ExecutionException if e.getCause != null =>
// This is here to mirror the same behaviour that Scala's Future has, i.e. if you throw
// an exception in a Scala Future then then Future.failed has that same exception. Java's
// FutureTask however wraps this in an ExecutionException.
throw e.getCause
}
)
)
}

override def onComplete[U](f: Try[T] => U)(implicit executor: ExecutionContext): Unit =
delegate.onComplete(f)

override def isCompleted: Boolean = delegate.isCompleted

override def value: Option[Try[T]] = delegate.value

override def transform[S](s: T => S, f: Throwable => Throwable)(implicit
executor: ExecutionContext
): Future[S] = delegate.transform(s, f)

override def ready(atMost: Duration)(implicit permit: CanAwait): CancellableFutureImpl.this.type = {
delegate.ready(atMost)
this
}

override def result(atMost: Duration)(implicit permit: CanAwait): T = delegate.result(atMost)

override def cancel(): Unit = jf.cancel(true)

executionContext.execute(jf)
}
56 changes: 56 additions & 0 deletions src/main/scala-2.12/markatta/futiles/CancellableFutureImpl.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package markatta.futiles

import java.util.concurrent.{Callable, FutureTask}
import scala.concurrent._
import scala.concurrent.duration.Duration
import scala.util.Try

private[futiles] final class CancellableFutureImpl[T](executionContext: ExecutionContext, block: => T)
extends CancellableFuture[T] {
private val promise = Promise[T]()

def delegate: Future[T] = promise.future

private val jf: FutureTask[T] = new FutureTask[T](
new Callable[T] {
override def call(): T = block
}
) {
override def done(): Unit = promise.complete(
Try(
try
get()
catch {
case e: ExecutionException if e.getCause != null =>
// This is here to mirror the same behaviour that Scala's Future has, i.e. if you throw
// an exception in a Scala Future then then Future.failed has that same exception. Java's
// FutureTask however wraps this in an ExecutionException.
throw e.getCause
}
)
)
}

override def onComplete[U](f: Try[T] => U)(implicit executor: ExecutionContext): Unit =
delegate.onComplete(f)

override def isCompleted: Boolean = delegate.isCompleted

override def value: Option[Try[T]] = delegate.value

override def transform[S](f: Try[T] => Try[S])(implicit executor: ExecutionContext): Future[S] = delegate.transform(f)

override def transformWith[S](f: Try[T] => Future[S])(implicit executor: ExecutionContext): Future[S] =
delegate.transformWith(f)

override def ready(atMost: Duration)(implicit permit: CanAwait): CancellableFutureImpl.this.type = {
delegate.ready(atMost)
this
}

override def result(atMost: Duration)(implicit permit: CanAwait): T = delegate.result(atMost)

override def cancel(): Unit = jf.cancel(true)

executionContext.execute(jf)
}
56 changes: 56 additions & 0 deletions src/main/scala-2.13/markatta/futiles/CancellableFutureImpl.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package markatta.futiles

import java.util.concurrent.{Callable, FutureTask}
import scala.concurrent._
import scala.concurrent.duration.Duration
import scala.util.Try

private[futiles] final class CancellableFutureImpl[T](executionContext: ExecutionContext, block: => T)
extends CancellableFuture[T] {
private val promise = Promise[T]()

def delegate: Future[T] = promise.future

private val jf: FutureTask[T] = new FutureTask[T](
new Callable[T] {
override def call(): T = block
}
) {
override def done(): Unit = promise.complete(
Try(
try
get()
catch {
case e: ExecutionException if e.getCause != null =>
// This is here to mirror the same behaviour that Scala's Future has, i.e. if you throw
// an exception in a Scala Future then then Future.failed has that same exception. Java's
// FutureTask however wraps this in an ExecutionException.
throw e.getCause
}
)
)
}

override def onComplete[U](f: Try[T] => U)(implicit executor: ExecutionContext): Unit =
delegate.onComplete(f)

override def isCompleted: Boolean = delegate.isCompleted

override def value: Option[Try[T]] = delegate.value

override def transform[S](f: Try[T] => Try[S])(implicit executor: ExecutionContext): Future[S] = delegate.transform(f)

override def transformWith[S](f: Try[T] => Future[S])(implicit executor: ExecutionContext): Future[S] =
delegate.transformWith(f)

override def ready(atMost: Duration)(implicit permit: CanAwait): CancellableFutureImpl.this.type = {
delegate.ready(atMost)
this
}

override def result(atMost: Duration)(implicit permit: CanAwait): T = delegate.result(atMost)

override def cancel(): Unit = jf.cancel(true)

executionContext.execute(jf)
}
32 changes: 32 additions & 0 deletions src/main/scala/markatta/futiles/CancellableFuture.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package markatta.futiles

import scala.concurrent._
import scala.util.Try
import java.util.concurrent.{Callable, FutureTask}
import scala.concurrent.duration.Duration

trait CancellableFuture[T] extends Future[T] {

/** Attempts to cancel the underlying [[scala.concurrent.Future]]. Note that this is a best effort attempt
*/
@throws[CancellationException]
def cancel(): Unit
}

object CancellableFuture {

/** Allows you to run a computation inside of a [[scala.concurrent.Future]] which can later be cancelled
*
* @param body
* The computation to run inside of the [[scala.concurrent.Future]]
* @param executionContext
* The [[scala.concurrent.ExecutionContext]] to run the [[scala.concurrent.Future]] on
* @return
* A [[markatta.futiles.CancellableFuture]] providing a `cancel` method allowing you to terminate the
* [[markatta.futiles.CancellableFuture]] at any time
* @see
* Adapted from https://stackoverflow.com/a/39986418/1519631
*/
def apply[T](body: => T)(implicit executionContext: ExecutionContext): CancellableFuture[T] =
new CancellableFutureImpl[T](executionContext, body)
}
58 changes: 58 additions & 0 deletions src/test/scala/markatta/futiles/CancellableFutureSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package markatta.futiles

import java.util.concurrent.atomic.AtomicBoolean
import scala.concurrent.CancellationException

class CancellableFutureSpec extends Spec {
describe("The cancellable utility") {

describe("without cancellation") {

it("works as a normal Future") {
val cancellable = CancellableFuture {
()
}

cancellable.futureValue shouldEqual ()
}

it("throws an Exception correctly") {
val cancellable = CancellableFuture {
throw new IllegalArgumentException
}

val exception = cancellable.failed.futureValue
exception shouldBe an[IllegalArgumentException]
}

}

describe("with cancellation") {

it("prevents Future from completing") {
val atomicBoolean = new AtomicBoolean(true)

val cancellable = CancellableFuture {
Thread.sleep(100)
atomicBoolean.set(false)
}

Thread.sleep(50)
cancellable.cancel()
Thread.sleep(100)
atomicBoolean.get() shouldEqual true
}

it("throws a CancellationException exception") {
val cancellable = CancellableFuture {
Thread.sleep(100)
}
cancellable.cancel()
cancellable.failed.futureValue shouldBe an[CancellationException]
}

}

}

}

0 comments on commit 5aa58ab

Please sign in to comment.