Mike Slinn

Getting Results from Futures & Signaling with Promises

— Draft —

Published 2014-10-01. Last modified 2019-05-01.
Time to read: 8 minutes.

Scala offers multiple options for multi-threading. This lecture continues the discussion of Scala Futures.

  • How to obtain results from Futures
  • How to use Promises to signal across threads
  • Canceling Futures

The sample code for this lecture can be found in courseNotes/src/main/scala/multi/futures/FutureAwaitSignal.scala.

Mechanics

As I mentioned in the Futures & Promises lecture, Futures have two states: completed and not completed, or pending. The isCompleted method lets you know the state. Once the Future has completed, you can retrieve its value several ways.

A code fragment that suspends execution until a Future completes is said to block execution, block on a Future, or simply block. Code that blocks on every Future is synchronous, and would provide no benefit because the purpose of Futures is to provide the ability to write asynchronous code.

Given an instance of Future, the value method will return the value of the Future at any instant. The problem is that you don't know when the Future will be fulfilled, and it is quite possible that the Future might not be fulfilled by the time the main thread is ready to exit.

First I'm going to talk about keeping the main thread alive until no longer needed, then I'll show three ways of doing that. Along the way we'll discuss how you can write code that blocks for a result, and we'll explore what results look like.

We'll also discuss how to clean up code written in a callback style. We'll postpone the topic of using Future combinators to obtain results in a non-blocking, asynchronous fashion until the Future Combinators lecture.

Keeping the Main Thread Alive

The MultiThreading lecture discussed how the main thread executes a program from beginning to end, and only stops when all regular threads spawned by the main thread also stop.

When the main thread terminates it also terminates any daemon threads that it may have spawned. Recall that the default threadpool provided by scala.concurrent.ExecutionContext.Implicits.global uses daemon threads. It is often the case that the main thread will terminate before the daemon threads that it spawned are finished. This can happen when working with Futures because unlike Scala parallel collections, Futures execute asynchronously.

This means you need a way to put the main thread to sleep just prior to termination, and possibly wake it up once all daemon threads are finished, thereby ending the program in a graceful manner. In this lecture I'll show you three ways of accomplishing this.

  1. Blocking for completion of each thread, or a collection of threads – normally not suggested for production code, but useful for experimenting and teaching purposes.
  2. Sleeping the main thread and calling System.exit when complete – works fine but is not flexible, and may not be appropriate in most circumstances.
  3. Sleeping the main thread and using cross-context signaling – recommended.

The remainder of this lecture explores each of the above in order.

Utility Methods

To prepare for the exploration we'll need a few imports and a couple of utility methods. These utility methods are defined in multi/package.scala because they are using in several lectures.

Here are the imports:

Scala REPL
scala> import scala.concurrent.duration._
import scala.concurrent.duration._ 

The scala.concurrent.duration.Duration object defines a convenient DSL for time periods, allowing you to write time periods in terms of nanoseconds (which you can also write as nanos or nanosecond), microseconds (which you can also write as micro, micros, and microsecond), milliseconds (which you can also write as milli, millis or millisecond), second or seconds, minute or minutes, hour or hours, day or days, and infinity (Inf). For example, you could specify Durations in an English-language syntax as 20 seconds, 1 minute, 3 hours or Duration.Inf.

Scala REPL
scala> import scala.language.postfixOps
import scala.language.postfixOps 

Scala's postfixOps language feature is handy when defining a domain-specific language, or DSL. This language feature is what enables us to write Durations in an English-like syntax.

Scala REPL
scala> import scala.concurrent.Await
import concurrent.Await 

Await can be used to block for execution and possibly obtain a result from a Future. As we shall see, indiscriminate use of Await can defeat the purpose of Futures.

The first utility method that I wrote for you fetches a URL and returns up to the first 500 characters of the web page, along with ellipses to let us know that the page was truncated to 500 characters. io.Source.fromURL causes the Scala runtime to read the contents of the web page pointed to by the value of url, then I call mkString on the Iterator that is returned by fromURL, thereby creating a String from the results of the iteration. The String.trim method removes any preceding or training whitespace in the string. I then limit the length of the String so that it is less than or equal to maxChars in length. Ellipses are appended to the String if it was chopped.

Scala REPL
scala> def readUrl(url: String, maxChars: Int=500): String = {
     |   val contents = io.Source.fromURL(url).mkString.trim
     |   contents.substring(0, math.min(contents.length, maxChars)) +
     |     (if (contents.length>maxChars) "..." else "")
     | }
readUrl: (urlStr: String)String 

For convenience, let's also declare a method that returns a Future of the truncated web page contents. Notice how the readUrl method invocation is wrapped into a Future.

Scala REPL
scala> def readUrlFuture(urlStr: String, maxChars: Int=500): Future[String] =
  Future(readUrl(urlStr, maxChars))
readUrlFuture: (urlStr: String)scala.concurrent.Future[String] 

Getting Results from Futures

(1) Blocking for Completion

You can block for a result, or block until completion with the Await methods. It is often reasonable to use these methods at one or two points in a program, to provide synchronization, but this should not be your normal way of working with a Future's value. If you block for every Future, you should either use Scala Parallel Collections, or accept the fact that your solution is best expressed as a single thread of execution.

(a) Awaiting Completion

You can block until a future completes by invoking Await.ready, passing in the Future you are waiting for, and the maximum length of time to wait. This causes the code to be executed in a linear fashion instead of asynchronously.

Notice that the maximum duration to block is specified in an English-language syntax as 20 seconds.

Scala REPL
scala> Await.ready(readUrlFuture("https://www.scalacourses.com"), 20 seconds)
res2: f.type = scala.concurrent.impl.Promise$DefaultPromise@2f0ee71d 

We could also block for up to one hour:

Scala REPL
scala> Await.ready(readUrlFuture("https://www.micronauticsresearch.com"), 1 hour)
res6: f.type = scala.concurrent.impl.Promise$DefaultPromise@2f0ee71d 

Or you can potentially block forever for a Future to complete (this is generally not advisable):

Scala REPL
scala> Await.ready(readUrlFuture("https://www.scalacourses.com"), Duration.Inf)
res5: f.type = scala.concurrent.impl.Promise$DefaultPromise@2f0ee71d 

Finally, you can block for a zero duration; this is generally only done if you want the timeout to always fail for testing purposes.

Any Future that does not complete within the specified duration will cause a java.util.concurrent.TimeoutException to be thrown.

Scala REPL
scala> Await.ready(readUrlFuture("https://www.scalacourses.com"), Duration.Zero)
java.util.concurrent.TimeoutException: Futures timed out after [0 days]
  at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
  at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:153)
  at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:95)
  at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:95)
  at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
  at scala.concurrent.Await$.ready(package.scala:95)
  ... 

You can run this code by typing:

Shell
$ sbt "runMain multi.futures.FutureAwait"

Awaiting the Result

Await.result is invoked just the same as Await.ready, but in addition to blocking until the Future completes, Await.result also returns the value extracted from the Future. This code looks very similar to the preceding example, and it reuses the readUrlFuture method from that example.

Scala code
import multi.futures.FutureAwait._
import scala.language.postfixOps
val result = Await.result(readUrlFuture("https://www.scalacourses.com"), 2 hours) println(s"The future has completed within 1 hour with a result of ${result.length} characters.")
val result2 = Await.result(readUrlFuture("https://www.scalacourses.com"), 1 hour) println(s"The future has completed within 1 hour with a result of ${result2.length} characters.")

You can run this code by typing:

Shell
$ sbt "runMain multi.futures.FutureResult"
The future completed within 2 hours with a result of 503 characters.
The future has completed within 1 hour with a result of 503 characters. 

Again, if this is how you wrote all your code that had Futures, there would be limited benefit to using Futures.

Awaiting a Pre-Completed Future

Future.succesful() creates a special kind of Future called a KeptPromise. As I mentioned earlier, creating a Future in this way does not require a new thread, because the computation is performed immediately. Calling Await.result on a KeptPromise does not require a context switch or other overhead.

Warning: Bad Habits!

This is an example of how NOT to obtain the value of a Future. The problem with this code is that it will fail if the future has not completed, and you'll be tempted to write a lot of code testing for completion, then blocking for a result, which defeats the purpose of a Future.

Scala REPL
scala> val future: Future[String] = readUrlFuture("https://www.scalacourses.com")

The next statement will only succeed if you wait for the preceding future to complete.

Scala REPL
scala> val fValueTry: Try[String] = future.value.get
fValueTry: scala.util.Try[scala.io.BufferedSource] = Success(non-empty iterator)
scala>
val futureValue: String = future.value.get.get res13: scala.io.BufferedSource = non-empty iterator
scala>
val futureValue: String = future.value.get.get.mkString res14: String = "<!DOCTYPE html> <html lang="en"> <head> <title>Welcome to ScalaCourses.com</title> <meta http-equiv="Content-Type" content="text/html; charset=UTF-8" /> <script src='/webjars/jquery/1.9.1/jquery.min.js' type='text/javascript'></script> <script src='/webjars/jquery-ui/1.10.2/ui/minified/jquery-ui.min.js' type='text/javascript'></script> <script src='/webjars/bootstrap/2.3.2/js/bootstr"

You can run this code by typing:

Shell
$ sbt "runMain multi.futures.FutureBadHabits"
Exception in thread "main" java.util.NoSuchElementException: None.get 

This program will throw an exception because the first two expressions will execute right away, and the program will attempt to retrieve the value of the Future before it has completed.

Obviously this is not how you should retrieve the value from a Future, but it does show you some useful information.

  • future.value returns None until the Future has completed.
  • future.value.get returns a Try object which can hold one of two types of results: a Success or a Failure.
  • future.value.get.get unpacks the Success and returns an iterator (actually, a BufferedSource).
  • future.value.get.get returns a String containing the contents of the URL.

There are better ways of retrieving the value from a Future:

  1. We could block for a result as we discussed above, but this limits parallelism.
  2. We could use callbacks as described in the Future Callbacks lecture, but this is a messy way to write multithreaded code.
  3. We could use combinators as described in the Future Combinators lecture. This is often the best approach for writing asynchronous code.

(2) Sleeping the Main Thread and Calling System.exit

You can put a thread to sleep by writing:

Scala code
synchronized { wait() }

If called from the main thread, then the main thread will go to sleep. It then becomes the responsibility of the daemon threads to terminate the program gracefully. Here is an example:

Scala code
readUrlFuture("https://www.scalacourses.com") onComplete {
  case Success(value) =>
    println(s"\nFirst 500 characters of https://scalacourses.com:\n$value")
    System.exit(0)
case Failure(throwable) => println("\n" + throwable.getMessage) System.exit(-1) } println("End of mainline: suspending main thread in case any futures still need to complete.") synchronized { wait() }

As we will see in the Future Callbacks lecture, Future.onComplete accepts a partial function, into which it passes a Try. We discussed partial functions in Partial Functions lecture. You should recall from the Try and try/catch/finally lecture of the Introduction to Scala course that Try has two subclasses: Success and Failure.

Each of the cases that make up the partial function passed to onComplete extracts the value of the appropriate subclass of Future. We discussed how unapply extracts values in the Unapply lecture of the Introduction to Scala course.

For the Success case, the first 500 characters of the web page is printed out; for the Failure case, the message contained in the future is printed out. Both cases terminate all threads and also the program by calling System.exit.

You can run this code by typing:

Shell
$ sbt "runMain multi.futures.WaitExitDemo"
End of mainline: suspending main thread in case any futures still need to complete.
First 500 characters of https://scalacourses.com:
<!DOCTYPE html>
<html lang="en">
<head>
  <title>Online Scala Training</title>
  <meta http-equiv="Content-Type" content="text/html; charset=UTF-8" />
  <script src='https://cdn.jsdelivr.net/webjars/jquery/1.11.1/jquery.min.js'></script>
  <script src='https://cdn.jsdelivr.net/webjars/jquery-cookie/1.4.1/jquery.cookie.js'></script>
  <script src='https://cdn.jsdelivr.net/webjars/jquery-ui/1.11.1/jquery-ui.min.js'></script>
  <script src='https://cdn.jsdelivr.net/webjars/bootstrap/2.3.2/js/bootstrap. 

(3) Sleeping the Main Thread and Using Signaling

Let's modify the previous example to use cross-context signalling with a promise.

Scala code
import scala.concurrent._
import scala.io.Source
import scala.util.{Failure, Success}
val signal = Promise[String]() readUrlFuture("https://www.scalacourses.com") onComplete { case Success(value) => println("\nFirst 500 characters of https://scalacourses.com:\n$value" signal.complete(Success("All done"))
case Failure(throwable) => println("\n" + throwable.getMessage) signal.complete(Failure(throwable)) } Await.ready(signal.future, duration.Duration.Inf)

You can run this program by typing:

Shell
$ sbt "runMain multi.futures.SignalDemo"

Output is the same as before.

Cleaning up Callback Hell

Java code can have a lot of callbacks; Java was invented in the 1990s, when that sort of coding style was popular. Here is a method that uses the Java Sound API to play a sound.

I designed the method to be synchronous; it won't return until the sound has finished playing. The three lines highlighted in yellow follow the same pattern as the previous example in order to turn this asynchronous code into a synchronous method.

Scala code
import concurrent.{Await, Promise}
def playSound(fileName: String): Unit = { try { import javax.sound.sampled._ val audioIn: AudioInputStream = { if (fileName.startsWith("https:")) { AudioSystem.getAudioInputStream(new java.net.URL(fileName) ) } else { val stream = getClass.getClassLoader.getResourceAsStream(fileName) AudioSystem.getAudioInputStream(new java.io.BufferedInputStream(stream)) } } val clip = AudioSystem.getClip clip.open(audioIn) clip.start() val promise = Promise[String]() clip.addLineListener(new LineListener { // wait until sound has finished playing def update(event: LineEvent): Unit = { if (event.getType == LineEvent.Type.STOP) { event.getLine.close() promise.success("done") () } } }) Await.result(promise.future, duration.Duration.Inf) () } catch { case e: Exception => println(e) } }

Initially, execution flows from top to bottom. A Promise called promise is created, then a callback to Clip.addLineListener is set up, and then Await.result pauses execution until the Promise's Future is complete. Inside the update callback the Promise is fulfilled by calling success. When the update method executes it fulfills the Promise and Await.result stops waiting.

This code is provided in the onCreate project as part of onCreate/src/main/scala/OnCreate.scala, which is stuck at Scala 2.12.x. The Swatcher dependency has not been maintained and is probably abandoned. Swatcher is not available for Scala 2.13, so we'll probably have to come up with another code example eventually.

Canceling Futures

Scala Futures normally run to completion, which can be wasteful of computing resources. Here is a method that creates an interruptible Future.

Scala code
/** @param block is lazily evaluated and is used to create the Future.
  * @param ex is the usual ExecutionContext for the Future to run
  * @return Tuple containing the Future and a Function1[String,CancellationException].
  *         The Function1 returns None if Future has not been canceled, otherwise it returns Some(CancellationException))
  *         that contains the String passed to the function when the future was canceled.
  * @author Eric Pederson (@sourcedelica) https://stackoverflow.com/questions/16020964/cancellation-with-future-and-promise-in-scala
  * @author Mike Slinn updated to Scala 2.13 */
def interruptibleFuture[T](block: =>T)
                          (implicit ex: ExecutionContext): (Future[T], String => Option[CancellationException]) = {
  val p = Promise[T]()
  val future = p.future
  val atomicReference = new java.util.concurrent.atomic.AtomicReference[Thread](null)
  p completeWith Future {
    val thread = Thread.currentThread
    atomicReference.synchronized { atomicReference.set(thread) }
    try block finally {
      atomicReference.synchronized { atomicReference getAndSet null } ne thread
      ()
    }
  }
/* This method can be called multiple times without any problem */ val cancel = (msg: String) => { if (p.isCompleted) { None } else { atomicReference.synchronized { Option(atomicReference getAndSet null) foreach { _.interrupt() } } val ex = new CancellationException(msg) p.tryFailure(ex) Some(ex) } } (future, cancel) }

As you can see, Futures that were canceled before they completed contain a java.util.concurrent.CancellationException.

Here is an example of how to use interruptibleFuture. I am using a list of the https://List of slow web sites: slowest web sites found on internetsupervision.com. You might need to update the list of urls if those sites improve.

val (future, cancel) = interruptibleFuture(readUrl("https://scalacourses.com"))
val wasCanceled = cancel("Die!")
cancel("Die again!")
println(s"Fetch of ScalaCourses.com wasCanceled: ${wasCancelled.isDefined}\n")

You can run this code example by typing:

Shell
$ sbt "runMain multi.futures.FutureCancel1"
Fetch of ScalaCourses.com wasCancelled: true 

Here is a more complex example, which accepts the results of the first future to complete a task, then cancels all remaining futures. We can best understand how this program works by examining the type of the iFutures variable. It consists of a collection of some scary-looking tuples, one tuple for each url to process. The first element of each tuple is a now-familiar Future of an inner tuple of url and web page contents. The second element of the outer tuple is a Function1 from String to Option[CancellationException]; let's call this the cancelation object. The Future returned in the first element of the outer tuple can be canceled by invoking the Function in the second element of the outer tuple. If the Future was already complete by the time the cancelation object is invoked, nothing happens and the Function1 returns None. Otherwise, the Future is canceled and Some(CancellationException) is returned and the Exception contains the string that was specified when the cancelation object was invoked.

val urls = List("https://magarihub.com", "https://vitarak.com", "https://www.firstpersonmedical.com")
val iFutures: List[(Future[(String, String)], (String) => Option[CancellationException])] =
  urls.map(url => interruptibleFuture((url, readUrl(url))))

Future.firstCompletedOf(iFutures.map(_._1)).andThen { // cancel all the remaining futures
  case _ =>
    iFutures.foreach {
      case iFuture if !iFuture._1.isCompleted =>
        val i = iFutures.indexOf(iFuture)
        iFuture._2(s"Canceled fetch of ${urls(i)}") // this invokes the cancelation object
        println(s"${iFuture._1.value.get.failed.get.getMessage}")

      case iFuture =>
        val i = iFutures.indexOf(iFuture)
        println(s"Fetched ${urls(i)} successfully")
    }
}

You can run this code example by typing.

Shell
$$ sbt "runMain multi.futures.FutureCancel2"

Typical output is:

Fetched https://magarihub.com successfully
java.util.concurrent.CancellationException: Canceled fetch of https://vitarak.com
java.util.concurrent.CancellationException: Canceled fetch of https://www.firstpersonmedical.com

The version of this program in the courseNotes project uses signaling with promises to control when the program exits.


* indicates a required field.

Please select the following to receive Mike Slinn’s newsletter:

You can unsubscribe at any time by clicking the link in the footer of emails.

Mike Slinn uses Mailchimp as his marketing platform. By clicking below to subscribe, you acknowledge that your information will be transferred to Mailchimp for processing. Learn more about Mailchimp’s privacy practices.