Mike Slinn

Collections of Futures

— Draft —

Published 2014-10-01. Last modified 2019-06-30.
Time to read: 9 minutes.

This lecture focuses on working with collections of Scala Futures.

  • Processing Futures as they complete.
  • Mixing monads in a for-comprehension.
  • Coping with failure
  • Failure Propagation

Setup for the Lecture

If you are writing a web crawler your program will need to fetch many URLs at once. We can create a collection of List[String] to hold the URLs to fetch by invoking the urls method defined in the multi package object. The urls method looks like this (you've seen this before, so I won't explain it).

Scala code
lazy val goodUrlStr1       = "https://www.scalacourses.com"
lazy val goodUrlStr2       = "https://www.micronauticsresearch.com"
lazy val badHostUrlStr     = "https://www.not-really-here.com"
lazy val badPageUrlStr     = "https://scalacourses.com/noPageHere"
lazy val badProtocolUrlStr = "blah://scalacourses.com"

def urls(includeBad: Boolean=false): List[String] =
  List(goodUrlStr1, goodUrlStr2) :::
    (if (includeBad) List(badHostUrlStr, badPageUrlStr, badProtocolUrlStr) else Nil)

The sample code needs a few more definitions, provided in the FutureArtifacts object. The readUrl function is a Function1[String, String] which uses the Function placeholder syntax that we learned in the Functions are First Class lecture of the Introduction to Scala course. The Lambda Review & Drill lecture of the same course gives you practice with the syntax. readUrl accepts a string containing the URL of the web page to fetch, and it returns the contents of the web page. We'll also need to transform the list of URLs into a list of Futures of the corresponding web page contents throughout this lecture; in other words we will need to create a List[Future[String]]. The futureContents function is a Function1[List[String], List[Future[String]]] and it walks through the collection of URLs, and returns a collection of Futures of the web page contents.

Scala REPL
scala> val readUrl: String => String =
  io.Source.fromURL(_: String).mkString
readUrl: String => String = <function1>
scala>
val futureContents: List[String] => List[Future[String]] = (_: List[String]).map { url => Future(readUrl(url)) } futureContents: List[String] => List[scala.concurrent.Future[String]] = <function1>

build.sbt contains the following, so the sbt console command already imports everything we need to work with the REPL:

Scala code
initialCommands := """
import java.io.File
import java.net.URL
import scala.sys.process._
import concurrent._
import concurrent.ExecutionContext.Implicits.global
import scala.util.control.NoStackTrace
import scala.util.{Try,Success,Failure}
import concurrent.{Await, Future}
import concurrent.duration._
import multi._
import multi.futures._
import multi.futures.FutureArtifacts._
"""

Let's try out futureContents by just passing in urls that work:

Shell
$ sbt console
...
scala> futureContents(urls())
res0: List[scala.concurrent.Future[String]] = List(scala.concurrent.impl.Promise$DefaultPromise@33ab97ee, scala.concurrent.impl.Promise$DefaultPromise@6edba8a5) 

Let's get a List of Future of the contents of all web pages, including those referenced by bad URLs:

Scala REPL
scala> futureContents(urls(includeBad=true))
res1: List[scala.concurrent.Future[String]] = List(scala.concurrent.impl.Promise$DefaultPromise@7b295e3, scala.concurrent.impl.Promise$DefaultPromise@6e5afac3, scala.concurrent.impl.Promise$DefaultPromise@f384c50, scala.concurrent.impl.Promise$DefaultPromise@40be9cf9, scala.concurrent.impl.Promise$DefaultPromise@53300480) 

Associating Inputs With Their Futures

A common scenario when working with Futures is the requirement to be able to associate completed Futures with the original data or requests that initiated them. So that the URLs can be associated with the contents of their web pages, the urlSearch method will internally need a collection of Tuple2[String, Future[String]]. The first element of the Tuple2 will be the URL to fetch, and the second will be a Future[String] which will hold the web page contents corresponding to that URL.

Here is how we can make the List[Tuple2]. Very similar code is provided in the FutureWord object defined in FutureWord.scala.

Scala REPL
scala> val u = urls(includeBad=true)
u: List[String] = List(https://www.scalacourses.com, https://www.micronauticsresearch.com, https://www.not-really-here.com, https://scalacourses.com/noPageHere, blah://scalacourses.com)
scala> u zip futureContents(u) res4: List[(String, scala.concurrent.Future[String])] = List((https://www.scalacourses.com,scala.concurrent.impl.Promise$DefaultPromise@32d02adc), (https://www.micronauticsresearch.com,scala.concurrent.impl.Promise$DefaultPromise@7eb66cc5), (https://www.not-really-here.com,scala.concurrent.impl.Promise$DefaultPromise@31c13f78), (https://scalacourses.com/noPageHere,scala.concurrent.impl.Promise$DefaultPromise@332c87f2), (blah://scalacourses.com,scala.concurrent.impl.Promise$DefaultPromise@19cd81))

Remember that each newly created Future executes as soon as a free execution context is available.

The urlSearch method uses the Future.zip method in exactly this way. urlSearch identifies web pages that have certain word in them. It also returns a List[Future[String]] so blocks of the sample code can periodically synchronize – execution will be blocked until all the Futures complete. You can see what I mean by examining the FutureWord console application in FutureWord.scala.

This for-loop mixes monads
Scala code
import multi.futures.FutureArtifacts._

def urlSearch(word: String, urls: List[String]): List[Future[String]] = {
  val futures = futureContents(urls)
  for {
    (url, future) <- urls zip futures
    contents      <- future if contents.toLowerCase.contains(word)
  } println(s"$url contains '$word'")
  futures
}

The urlSearch method begins by creating a List[Future[String]] called futures so that all the web pages are fetched in parallel. If this were done in the for-loop instead, the Futures would have been fetched in series, and there would have been no benefit to using Futures. zip is used to build a list of tuples that associate URLs to the appropriate Future of web page contents. The next generator evaluates each Future as it completes and stores it into the temporary immutable variable contents. If that future is not ready, the for-loop blocks until it completes. If contents contains the word that we are looking for, the associated URL is printed. If the Future fails the loop ends and the next iteration is begun, if there is one.

Notice that there is a for-loop within urlSearch; not a for-comprehension because there is no yield statement. If you attempt to insert the yield keyword there, the program will not compile because the monadic types do not match: the first monad is a Tuple2[String, Future[String]] and the second monad is a Future[String]. I will show code that yields a result later in this lecture. For now, we'll just print the value of each completed Future in the order of the contents of the urls parameter, but we won't return any results.

If each Future requires significant processing in the body of the for-loop, and the slowest Future happens to complete first, then this code will spend more time blocking than it should. We will learn how to process Futures as soon as as they complete later in this lecture.

Let's see urlSearch at work:

Scala REPL
scala> FutureWord.urlSearch("scala", urls(includeBad=true))
List[scala.concurrent.Future[String]] = List(scala.concurrent.impl.Promise$DefaultPromise@1e87f3c8, scala.concurrent.impl.Promise$DefaultPromise@6be478e6, scala.concurrent.impl.Promise$DefaultPromise@271b40bc, scala.concurrent.impl.Promise$DefaultPromise@77684e89, scala.concurrent.impl.Promise$DefaultPromise@56d0fa1f)
urlSearch: https://www.micronauticsresearch.com contains 'scala' urlSearch: https://www.scalacourses.com contains 'scala'

You can run this program by typing:

Shell
$ sbt "runMain multi.futures.FutureWord"

Future for-Loop in Detail

This is a great program to see how Futures behave in a for-loop. The urlSearch method walks through a list of URL strings and reports the web pages that contain the desired word. You can see that I've slipped in AtomicInteger, which uses a set of relatively modern CPU instructions to perform thread-safe basic arithmetic on integer quantities. This means that count can be safely manipulated from various execution contexts without fear of race conditions. The count variable is used to determine when all the Futures have completed, and therefore the program can be shut down.

The Atomic classes are part of the Java runtime library, so they are all available to Scala, and they are: AtomicBoolean, AtomicInteger, AtomicIntegerArray, AtomicIntegerFieldUpdater, AtomicLong, AtomicLongArray, AtomicLongFieldUpdater, AtomicMarkableReference, AtomicReference, AtomicReferenceArray, AtomicReferenceFieldUpdater, and AtomicStampedReference.

urlSearch creates a tuple of a url and and index i using zipWithIndex, and both parameters are passed to the report method. Because the andThen combinator is chained to the report method, any failed Futures are reported.

Failed Future of a Collection For the Unwary

The Future Combinators lecture showed how to handle a single Future's failure appropriately, and we'll see more ideas throughout this lecture. Now I want show an example of how Future.sequence and its cousins (foldLeft, reduceLeft and traverse) can get you into trouble when failures are encountered.

First, let's define a method that displays the portion of the web page content that contains the word that was searched for. There is nothing new to learn about this method, it's just handy for what we'll do next.

Scala code
/** @return String of the form [...]blah blah word blah blah [...] */
def snippet(word: String, string: String): String = {
  val m = string.trim.toLowerCase
  val i = math.max(0, m.indexOf(word) - 50)
  val j = math.min(m.length, i + 100)
  val snippet = (if (i == 0) "" else "...") + m.substring(i, j).trim + (if (j == m.length) "" else "...")
  snippet
}

Now let's consider the difference between a Future of a collection, and a collection of Futures. We already know that each Future in a collection continues executing even when one or more of the other Futures fail. Recall that Future.sequence converts a List[Future[T]] to a Future[List[T]]. When one Future in the original List[Future[T]] fails, the entire Future[List[T]] fails, even though they all run to completion. This means we will get no results from code like the following if any Future fails.

Scala code
val brokenUrlSearch: (String, List[String]) => Future[List[String]] =
  (word: String, urls: List[String]) =>
    Future.sequence(futureContents(urls)).collect {
      case list: List[String] =>
        val resultList = list.filter(_.toLowerCase.contains(word))
        resultList.foreach { contents => println(s"brokenUrlSearch: ${snippet(word, contents)}") }
        resultList
    } andThen {
      case Success(list) =>
        println("brokenUrlSearch succeeded!")

      case Failure(ex) =>
        println("brokenUrlSearch failed on URL " + ex.getMessage)
    }

BTW, in order to make brokenUrlSearch operate as part of a console App I had to wrap it in a class due to the now-familiar initialization issues of console Apps, which we first encountered in the Parallel Collections lecture.

Scala code
object FutureFailed extends App {
  import FutureArtifacts._
  class FailureTest {
    val brokenUrlSearch: (String, List[String]) => Future[List[String]] =
      (word: String, urls: List[String]) =>
        Future.sequence(futureContents(urls)).collect {
          case list: List[String] =>
            val resultList = list.filter(_.toLowerCase.contains(word))
            resultList.foreach { contents => println(s"brokenUrlSearch: ${snippet(word, contents)}")}
            resultList
        } andThen {
          case Success(list) =>
            println("brokenUrlSearch succeeded!")

          case Failure(ex) =>
            println("brokenUrl.Search failed on URL " + ex.getMessage)
        }
  }

  Await.ready(new FailureTest().brokenUrlSearch("scala", urls(includeBad=true)), Duration.Inf)
}

Now let's try calling brokenUrlSearch from the REPL.

Shell
$ sbt console
...
scala> new FutureFailed.FailureTest().brokenUrlSearch("scala", urls(includeBad=true))
res2: scala.concurrent.Future[List[String]] = scala.concurrent.impl.Promise$DefaultPromise@4e65049b
brokenUrlSearch failed on URL www.not-really-here.com

We did not get the desired output because one Future in the collection failed, which meant that the new Future returned by Future.sequence also failed, and so the partial function passed to collect was never invoked. Instead, the Failure case of the andThen expression was evaluated, which caused the name of the failing URL to be printed. Not only is this code wrong, it consumes computing resources needlessly because all threads run to completion, even though no output is generated.

You can run this program by typing:

Shell
$ sbt "runMain multi.futures.FutureFailed"

Output is as shown above.

Flowing Data as a River

The following code returns the same results as the original urlSearch, but it provides more flexibility in how Exceptions are handled. I've inserted some extra printlns and made the program more complex so you can see data flowing as a river, which is an exciting phenomenon to be able to observe. This code uses recoverWith to return a Future[Option[String]] containing None whenever an Exception is encountered. Using recoverWith means that every URL will have a Future containing a Success value and will never contain any failed Futures. We use Option[String] to know when each Future actually contains a String result to process or not.

Scala code
object FutureRecovering extends App {
  def urlSearch(word: String, urls: List[String]): Unit = {
    val count = new java.util.concurrent.atomic.AtomicInteger(urls.size-1)

    val futures: List[Future[(String, Option[String])]] = urls.map { url =>
      Future((url, Some(readUrl(url)))).recoverWith {
        case e: Exception => Future.successful((url, None))
      }.andThen { case _ => println(s"Completed $url") }
    }
for { future <- futures (url, maybeContents) <- future } { println(s"Failed: ${maybeContents.isEmpty} count: ${count.get}") if (maybeContents.isEmpty && count.getAndDecrement==1) System.exit(0) for { contents <- maybeContents if contents.toLowerCase.contains(word) } { println(s"Succeeded: '$word' was found in $url") if (count.getAndDecrement==1) System.exit(0) println(s"Count: ${count.get}") } } }
urlSearch("scala", urls(includeBad=true)) synchronized { wait() } }
List[Future[_]] is always processed in order

Notice there are two main loops and the second loop is compound. The first loop is controlled by the map combinator, and it creates the futures variable. The first loop always creates successful Futures because of the recoverWith combinator, which translates Exceptions into tuples of the failed URL and None. The andThen clause in the first loop always prints out Completed whenever a Future completes. The second loop is activated before all of the Futures created by the first loop complete, however the List[Future[_]] is processed in the order that the URLs were listed in. The next section in this lecture will remove that restriction. As each Future completes, its value, which is a tuple, is unpacked into url and maybeContents. The a Succeeded or a Failed message is printed out, and a check is made to see if processing is complete. If not, the inner loop runs, which examines the contents to see if they contain the desired word.

Let's try it in the REPL. Notice that Succeeded and Failed messages from the second loop are mixed with Completed messages from the first loop - the data flows freely.

Shell
$ sbt console
... 
scala> FutureRecovering.urlSearch("scala", urls(includeBad=true))
Completed blah://scalacourses.com
Failed: true count: 4
Completed https://www.not-really-here.com
Failed: true count: 3
Completed https://www.scalacourses.com
Failed: false count: 2
Succeeded: 'scala' was found in https://www.scalacourses.com
Count: 1
Completed https://www.micronauticsresearch.com
Failed: false count: 1
Completed https://scalacourses.com/noPageHere
Failed: true count: 1 

You can run this program by typing:

Shell
$ sbt "runMain multi.futures.FutureRecovering"

Processing Futures As They Complete

Here is a method, modeled after Twitter's Future.select method, that I think should be incorporated into the Scala runtime library. It solves the problem we encountered throughout the preceding portion of this lecture by immediately making each Future available for further processing as they complete, without blocking.

Scala code
/** @return the first future to complete, with the remainder of the Futures as a sequence.
 * @param fs a scala.collection.Seq
 * @author Victor Klang (https://gist.github.com/4488970)
 * @author Mike Slinn Updated to Scala 2.13 */
  def select[A](futures: Seq[Future[A]])
               (implicit ec: ExecutionContext): Future[(Try[A], Seq[Future[A]])] = {
    import scala.annotation.tailrec
    import scala.concurrent.Promise
@tailrec def stripe(promise: Promise[(Try[A], Seq[Future[A]])], head: Seq[Future[A]], thisFuture: Future[A], tail: Seq[Future[A]]): Future[(Try[A], Seq[Future[A]])] = { thisFuture onComplete { result => if (!promise.isCompleted) promise.trySuccess((result, head ++ tail)) } if (tail.isEmpty) promise.future else stripe(promise, head :+ thisFuture, tail.head, tail.tail) }
if (futures.isEmpty) Future.failed(new IllegalArgumentException("List of futures is empty")) else stripe(Promise(), futures, futures.head, futures.tail) }

Here is a convenient driver method for invoking select:

Scala code
/** Apply a function over a sequence of futures as soon as each future completes.
 * @param futures sequence of futures to operate on
 * @param operation function to apply on each Future value as soon as they complete
 * @param whenDone block of code to execute when all futures have been processed */
  def asapFutures[T](futures: Seq[Future[T]])
                    (operation: Try[T]=>Unit)
                    (whenDone: =>Unit={})
                    (implicit ec: ExecutionContext): Unit = {
def jiffyFutures(futures: Seq[Future[T]]) (operation: Try[T]=>Unit) (whenDone: =>Unit): Unit = { if (futures.nonEmpty) { select(futures) andThen { case Success((tryResult, remainingFutures)) => operation(tryResult) jiffyFutures(remainingFutures)(operation)(whenDone)
case Failure(throwable) => println("Unexpected exception: " + throwable.getMessage) } andThen { case _ => if (futures.size==1) whenDone } } }
if (futures.isEmpty) whenDone else jiffyFutures(futures)(operation)(whenDone) }

The first argument to asapFutures is of course the Futures to operate on as they complete. The second argument is a Function1[Try[T],Unit], which is the transformation that we want to apply to the Futures once they complete. The whenDone parameter is lazily evaluated, as we learned in the More Fun With Functions lecture of the Introduction to Scala course, and has a default value {}, which is effectively a no-op. The third parameter list once again consists of an implicit ExecutionContext.

I have provided select and asapFutures in FuturesUtil.scala so you can incorporate them into your projects.

I will show an example of how to invoke asapFutures next. First, let's define a Function that transforms a collection of URLs into a collection of tuples containing a URL and its associated Future of the web page content.

Scala code
val futureTuples: List[String] => List[Future[(String, String)]] =
    (_: List[String]).map { url => Future((url, readUrl(url))) }

Now we can define a new version of urlSearch that looks for a word in a collection of web page URLs, and it invokes asapFutures. The first argument to asapFutures is the collection of Tuple2[String, Future] to process. Each tuple contains a URL and a Future of the corresponding web page content. I've provided a partial function as the second argument to asapFutures instead of a Function1 because Success and Failure cases can be specified with a partial function. We discussed why it is legal to supply a partial function whenever a Function1 is expected in the Partial Functions lecture. The partial function receives the value of each future as they complete, and it handles three cases.

  1. Successfully fetched the web page and found the word. Notice that the tuple.unapply method is implicitly invoked as discussed in the Unapply lecture of the Introduction to Scala course, so that temporary immutable variables url and contents are defined.
  2. Successfully fetched the web page but the word was not found. Again, tuple.unapply is implicitly invoked so url and contents are defined.
  3. The web page was not found.

The third argument to asapFutures, whenDone, is lazily evaluated and is merely passed along to asapFutures.

Scala code
def urlSearch(word: String, urls: List[String])(whenDone: =>Unit={}): Unit = {
  asapFutures(futureTuples(urls)) {
    case Success((url, contents)) if contents.toLowerCase.contains(word) =>
      println(s"Found '$word' in $url:\n${snippet(word, contents)}\n")

    case Success((url, contents)) =>
      println(s"Sorry, $url does not contain '$word'\n")

    case Failure(err) =>
      println(s"Error: ${err.getMessage}\n")
  }(whenDone)
}

Let's employ urlSearch twice, once with a non-empty list of URLs, and once with an empty list so there is nothing to process. signal1 and signal2 are used in a closure according to the signaling pattern introduced in the Getting Results from Futures & Signaling with Promises lecture; when signal1 is completed the first Await.ready allows execution to continue, then when signal2 completes the program prints "All done" and exits. These closures are bound to the whenDone parameters of urlSearch and asapFutures. See the Closures lecture of the Introduction to Scala course to refresh your memory of what a closure is.

Scala code
val signal1 = Promise[String]()
urlSearch("free", urls) { signal1.success("done") }
Await.ready(signal1.future, duration.Duration.Inf)
val signal2 = Promise[String]() urlSearch("free", Nil) { signal2.success("done") } Await.ready(signal2.future, duration.Duration.Inf)
println("All done")

You can run this program by typing:

Shell
$ sbt "runMain multi.futures.FutureSelect"
Error: Could not read from not_really_here.com
Found 'free' in https://scalacourses.com: ...<span style="float: right"> 2 / 31 free lectures </span> </div> <...
Sorry, https://micronauticsresearch.com does not contain 'free'
All done

Mixing Monads in a for-Comprehension

So far so good – but we have not yet attempted to return a value from urlSearch. Instead, that method only has side effects because it returns Unit.

Let's see what happens if modify urlSearch so it returns a value.

Scala REPL
scala> def urlSearch(word: String, urls: List[String]) = {
         val futures: List[Future[String]] = urls.map(u => Future(readUrl(u)))
         for {
           (url, future) <- urls zip futures
           contents      <- future if contents.toLowerCase.contains(word)
         } yield url
       }
Error:(176, 16) type mismatch;
   found   : scala.concurrent.Future[String]
   required: scala.collection.IterableOnce[?]
        contents <- future if contents.toLowerCase.contains(word) 

As we learned in the For-Loops and For-Comprehensions lecture, this doesn't compile because List and Future are different types of monads. Remember that when you have multiple generators in a for-comprehension, you are flattening the resulting type because of the flatMap / map implementation provided by compiler desugaring. Although it is easy for the Scala compiler to flatten a Future[Future[T]] to a Future[T], the Scala compiler does not know how to flatten nested monads of different types. Given List[Future[T]], would you want the flattened type to be List[T] or Future[T]? Similarly, when flattening a Future[List[T]], would you want the resulting type to be Future[T] or List[T].

Let's come at this a bit differently for working with Futures. Instead of using zip to associate URLs with Futures of web page contents, let's make a Function1 that accepts a list of URLs and returns a List[(String, Future[String])] containing all the URLs and their future contents, or Future exceptions if any web page fails to fetch. This method is just like futureTuples, except it uses collect with a partial function instead of map and a closure.

Scala REPL
scala> val futureContents: List[String] => List[(String, Future[String])] =
  (_: List[String]).collect { case url => (url, Future(readUrl(url))) }
futureContents: List[String] => List[(String, scala.concurrent.Future[String])] = <function1> 

Now we can use futureContents to obtain a list of tuples of URLs and their corresponding Futures of the web page contents.

Scala REPL
scala> futureContents(urls)
res15: List[(String, scala.concurrent.Future[String])] = List((https://not_really_here.com,scala.concurrent.impl.Promise$DefaultPromise@49d77e6c), (https://scalacourses.com,scala.concurrent.impl.Promise$DefaultPromise@62ad0a53), (https://micronauticsresearch.com,scala.concurrent.impl.Promise$DefaultPromise@4e8101c1)) 

To allow us to retrieve the value from the Futures, let's define a Function1 that accepts a Future[String] and returns the contents, or the empty string if any Exception occurs.

Scala code
val futureString: Future[String] => String = (future: Future[String]) =>
  try {
    Await.result(future, duration.Duration.Inf)
  } catch {
    case e: Exception => ""
  }

Now we can define a Function1 that uses List.collect and a partial function that is only defined for Future[String] instances that contain the desired word when completed.

Scala REPL
scala> val listOfTuples: String => List[(String, String)] = (word: String) =>
  futureContents(urls).collect {
    case (url, future) if futureString(future).contains(word) =>
      (snippet(word, futureString(future)), url)
  }
listOfTuples: String => List[(String, String)] = <function1>
scala>
listOfTuples("free") res16: List(String, String)] = List((...<span style="float: right"> 0 / 34 free lectures </span> </div> <...,https://scalacourses.com))

You can run this program by typing:

Shell
scala> sbt "runMain multi.futures.FutureMixed"

Output is as shown above.


* indicates a required field.

Please select the following to receive Mike Slinn’s newsletter:

You can unsubscribe at any time by clicking the link in the footer of emails.

Mike Slinn uses Mailchimp as his marketing platform. By clicking below to subscribe, you acknowledge that your information will be transferred to Mailchimp for processing. Learn more about Mailchimp’s privacy practices.