Published 2014-10-01.
Last modified 2019-06-30.
Time to read: 9 minutes.
This lecture focuses on working with collections of Scala Future
s.
-
Processing
Future
s as they complete. - Mixing monads in a for-comprehension.
- Coping with failure
- Failure Propagation
Setup for the Lecture
If you are writing a web crawler your program will need to fetch many URLs at once.
We can create a collection of List[String]
to hold the URLs to fetch by invoking the urls
method defined in the
multi
package
object
.
The urls
method looks like this (you've seen this before, so I won't explain it).
lazy val goodUrlStr1 = "https://www.scalacourses.com" lazy val goodUrlStr2 = "https://www.micronauticsresearch.com" lazy val badHostUrlStr = "https://www.not-really-here.com" lazy val badPageUrlStr = "https://scalacourses.com/noPageHere" lazy val badProtocolUrlStr = "blah://scalacourses.com" def urls(includeBad: Boolean=false): List[String] = List(goodUrlStr1, goodUrlStr2) ::: (if (includeBad) List(badHostUrlStr, badPageUrlStr, badProtocolUrlStr) else Nil)
The sample code needs a few more definitions, provided in the FutureArtifacts
object.
The readUrl
function is a Function1[String, String]
which uses the Function
placeholder syntax that we learned in the
Functions are First Class lecture of the
Introduction to Scala course.
The Lambda Review & Drill lecture of the same course gives you practice with the syntax.
readUrl
accepts a string containing the URL of the web page to fetch, and it returns the contents of the web page.
We'll also need to transform the list of URLs into a list of Future
s of the corresponding web page contents
throughout this lecture; in other words we will need to create a List[Future[String]]
.
The futureContents
function is a Function1[List[String], List[Future[String]]]
and it walks through the collection of URLs,
and returns a collection of Future
s of the web page contents.
scala> val readUrl: String => String = io.Source.fromURL(_: String).mkString readUrl: String => String = <function1>
scala> val futureContents: List[String] => List[Future[String]] = (_: List[String]).map { url => Future(readUrl(url)) } futureContents: List[String] => List[scala.concurrent.Future[String]] = <function1>
build.sbt
contains the following,
so the sbt console
command already imports everything we need to work with the REPL:
initialCommands := """ import java.io.File import java.net.URL import scala.sys.process._ import concurrent._ import concurrent.ExecutionContext.Implicits.global import scala.util.control.NoStackTrace import scala.util.{Try,Success,Failure} import concurrent.{Await, Future} import concurrent.duration._ import multi._ import multi.futures._ import multi.futures.FutureArtifacts._ """
Let's try out futureContents
by just passing in urls that work:
$ sbt console ... scala> futureContents(urls()) res0: List[scala.concurrent.Future[String]] = List(scala.concurrent.impl.Promise$DefaultPromise@33ab97ee, scala.concurrent.impl.Promise$DefaultPromise@6edba8a5)
Let's get a List
of Future
of the contents of all web pages, including those referenced by bad URLs:
scala> futureContents(urls(includeBad=true)) res1: List[scala.concurrent.Future[String]] = List(scala.concurrent.impl.Promise$DefaultPromise@7b295e3, scala.concurrent.impl.Promise$DefaultPromise@6e5afac3, scala.concurrent.impl.Promise$DefaultPromise@f384c50, scala.concurrent.impl.Promise$DefaultPromise@40be9cf9, scala.concurrent.impl.Promise$DefaultPromise@53300480)

Associating Inputs With Their Futures
A common scenario when working with Future
s is the requirement to be able to associate completed
Future
s with the original data or requests that initiated them.
So that the URLs can be associated with the contents of their web pages, the urlSearch
method will
internally need a collection of Tuple2[String, Future[String]]
.
The first element of the Tuple2
will be the URL to fetch, and the second will be a
Future[String]
which will hold the web page contents corresponding to that URL.
Here is how we can make the List[Tuple2]
.
Very similar code is provided in the FutureWord
object defined in FutureWord.scala
.
scala> val u = urls(includeBad=true) u: List[String] = List(https://www.scalacourses.com, https://www.micronauticsresearch.com, https://www.not-really-here.com, https://scalacourses.com/noPageHere, blah://scalacourses.com)
scala> u zip futureContents(u) res4: List[(String, scala.concurrent.Future[String])] = List((https://www.scalacourses.com,scala.concurrent.impl.Promise$DefaultPromise@32d02adc), (https://www.micronauticsresearch.com,scala.concurrent.impl.Promise$DefaultPromise@7eb66cc5), (https://www.not-really-here.com,scala.concurrent.impl.Promise$DefaultPromise@31c13f78), (https://scalacourses.com/noPageHere,scala.concurrent.impl.Promise$DefaultPromise@332c87f2), (blah://scalacourses.com,scala.concurrent.impl.Promise$DefaultPromise@19cd81))
Remember that each newly created Future
executes as soon as a free execution context is available.
The urlSearch
method uses the Future.zip
method in exactly this way.
urlSearch
identifies web pages that have certain word in them.
It also returns a List[Future[String]]
so blocks of the sample code can periodically synchronize –
execution will be blocked until all the Future
s complete.
You can see what I mean by examining the FutureWord
console application in FutureWord.scala
.
import multi.futures.FutureArtifacts._ def urlSearch(word: String, urls: List[String]): List[Future[String]] = { val futures = futureContents(urls) for { (url, future) <- urls zip futures contents <- future if contents.toLowerCase.contains(word) } println(s"$url contains '$word'") futures }
The urlSearch
method begins by creating a List[Future[String]]
called futures
so that all the web pages are fetched in parallel.
If this were done in the for-loop instead, the Future
s would have been fetched in series,
and there would have been no benefit to using Future
s.
zip
is used to build a list of tuples that associate URLs to the appropriate Future
of web page contents.
The next generator evaluates each Future
as it completes and stores it into the temporary
immutable variable contents
.
If that future
is not ready, the for-loop blocks until it completes.
If contents
contains the word that we are looking for, the associated URL is printed.
If the Future
fails the loop ends and the next iteration is begun, if there is one.
Notice that there is a for-loop within urlSearch
; not a for-comprehension because there is no
yield
statement.
If you attempt to insert the yield
keyword there, the program will not compile because the monadic
types do not match: the first monad is a Tuple2[String, Future[String]]
and the second monad is a Future[String]
.
I will show code that yield
s a result later in this lecture.
For now, we'll just print the value of each completed Future
in the order of the contents of the
urls
parameter, but we won't return any results.
If each Future
requires significant processing in the body of the for-loop,
and the slowest Future
happens to complete first, then this code will spend more time blocking than it should.
We will learn how to process Future
s as soon as as they complete later in this lecture.
Let's see urlSearch
at work:
scala> FutureWord.urlSearch("scala", urls(includeBad=true)) List[scala.concurrent.Future[String]] = List(scala.concurrent.impl.Promise$DefaultPromise@1e87f3c8, scala.concurrent.impl.Promise$DefaultPromise@6be478e6, scala.concurrent.impl.Promise$DefaultPromise@271b40bc, scala.concurrent.impl.Promise$DefaultPromise@77684e89, scala.concurrent.impl.Promise$DefaultPromise@56d0fa1f)
urlSearch: https://www.micronauticsresearch.com contains 'scala' urlSearch: https://www.scalacourses.com contains 'scala'
You can run this program by typing:
$ sbt "runMain multi.futures.FutureWord"

Future for-Loop in Detail
This is a great program to see how Future
s behave in a for-loop.
The urlSearch
method walks through a list of URL strings and reports the web pages that contain the desired word.
You can see that I've slipped in
AtomicInteger
,
which uses a set of relatively modern CPU instructions to perform thread-safe basic arithmetic on integer quantities.
This means that count
can be safely manipulated from various execution contexts without fear of race conditions.
The count
variable is used to determine when all the Future
s have completed, and therefore the program can be shut down.
The Atomic
classes are part of the Java runtime library, so they are all available to Scala,
and they are: AtomicBoolean
, AtomicInteger
, AtomicIntegerArray
, AtomicIntegerFieldUpdater
, AtomicLong
, AtomicLongArray
, AtomicLongFieldUpdater
, AtomicMarkableReference
, AtomicReference
, AtomicReferenceArray
, AtomicReferenceFieldUpdater
, and
AtomicStampedReference
.
urlSearch
creates a tuple of a url
and and index i
using zipWithIndex
,
and both parameters are passed to the report
method.
Because the andThen
combinator is chained to the report
method, any failed Future
s are reported.
Failed Future of a Collection For the Unwary
The Future Combinators lecture
showed how to handle a single Future
's failure appropriately,
and we'll see more ideas throughout this lecture.
Now I want show an example of how Future.sequence
and its cousins
(foldLeft
, reduceLeft
and traverse
)
can get you into trouble when failures are encountered.
First, let's define a method that displays the portion of the web page content that contains the word that was searched for. There is nothing new to learn about this method, it's just handy for what we'll do next.
/** @return String of the form [...]blah blah word blah blah [...] */ def snippet(word: String, string: String): String = { val m = string.trim.toLowerCase val i = math.max(0, m.indexOf(word) - 50) val j = math.min(m.length, i + 100) val snippet = (if (i == 0) "" else "...") + m.substring(i, j).trim + (if (j == m.length) "" else "...") snippet }
Now let's consider the difference between a Future
of a collection, and a collection of Future
s.
We already know that each Future
in a collection continues executing even when one or more of the other Future
s fail.
Recall that Future.sequence
converts a List[Future[T]]
to a Future[List[T]]
.
When one Future
in the original List[Future[T]]
fails, the entire
Future[List[T]]
fails, even though they all run to completion.
This means we will get no results from code like the following if any Future
fails.
val brokenUrlSearch: (String, List[String]) => Future[List[String]] = (word: String, urls: List[String]) => Future.sequence(futureContents(urls)).collect { case list: List[String] => val resultList = list.filter(_.toLowerCase.contains(word)) resultList.foreach { contents => println(s"brokenUrlSearch: ${snippet(word, contents)}") } resultList } andThen { case Success(list) => println("brokenUrlSearch succeeded!") case Failure(ex) => println("brokenUrlSearch failed on URL " + ex.getMessage) }
BTW, in order to make brokenUrlSearch
operate as part of a console App
I had to wrap it in a class due to the now-familiar initialization issues of console App
s,
which we first encountered in the Parallel Collections lecture.
object FutureFailed extends App { import FutureArtifacts._ class FailureTest { val brokenUrlSearch: (String, List[String]) => Future[List[String]] = (word: String, urls: List[String]) => Future.sequence(futureContents(urls)).collect { case list: List[String] => val resultList = list.filter(_.toLowerCase.contains(word)) resultList.foreach { contents => println(s"brokenUrlSearch: ${snippet(word, contents)}")} resultList } andThen { case Success(list) => println("brokenUrlSearch succeeded!") case Failure(ex) => println("brokenUrl.Search failed on URL " + ex.getMessage) } } Await.ready(new FailureTest().brokenUrlSearch("scala", urls(includeBad=true)), Duration.Inf) }
Now let's try calling brokenUrlSearch
from the REPL.
$ sbt console ... scala> new FutureFailed.FailureTest().brokenUrlSearch("scala", urls(includeBad=true)) res2: scala.concurrent.Future[List[String]] = scala.concurrent.impl.Promise$DefaultPromise@4e65049b
brokenUrlSearch failed on URL www.not-really-here.com
We did not get the desired output because one Future
in the collection failed,
which meant that the new Future
returned by Future.sequence
also failed,
and so the partial function passed to collect
was never invoked.
Instead, the Failure
case of the andThen
expression was evaluated,
which caused the name of the failing URL to be printed.
Not only is this code wrong, it consumes computing resources needlessly because all threads run to completion,
even though no output is generated.
You can run this program by typing:
$ sbt "runMain multi.futures.FutureFailed"
Output is as shown above.

Flowing Data as a River
The following code returns the same results as the original urlSearch
,
but it provides more flexibility in how Exception
s are handled.
I've inserted some extra println
s and made the program more complex so you can see data flowing as a river,
which is an exciting phenomenon to be able to observe.
This code uses recoverWith
to return a Future[Option[String]]
containing None
whenever an Exception
is encountered.
Using recoverWith
means that every URL will have a Future
containing a Success
value and will never contain any failed Future
s.
We use Option[String]
to know when each Future
actually contains a String
result to process or not.
object FutureRecovering extends App { def urlSearch(word: String, urls: List[String]): Unit = { val count = new java.util.concurrent.atomic.AtomicInteger(urls.size-1) val futures: List[Future[(String, Option[String])]] = urls.map { url => Future((url, Some(readUrl(url)))).recoverWith { case e: Exception => Future.successful((url, None)) }.andThen { case _ => println(s"Completed $url") } }
for { future <- futures (url, maybeContents) <- future } { println(s"Failed: ${maybeContents.isEmpty} count: ${count.get}") if (maybeContents.isEmpty && count.getAndDecrement==1) System.exit(0) for { contents <- maybeContents if contents.toLowerCase.contains(word) } { println(s"Succeeded: '$word' was found in $url") if (count.getAndDecrement==1) System.exit(0) println(s"Count: ${count.get}") } } }
urlSearch("scala", urls(includeBad=true)) synchronized { wait() } }
Notice there are two main loops and the second loop is compound.
The first loop is controlled by the map
combinator, and it creates the futures
variable.
The first loop always creates successful Future
s because of the recoverWith
combinator,
which translates Exception
s into tuples of the failed URL and None
.
The andThen
clause in the first loop always prints out Completed
whenever a Future
completes.
The second loop is activated before all of the Future
s created by the first loop complete,
however the List[Future[_]]
is processed in the order that the URLs were listed in.
The next section in this lecture will remove that restriction.
As each Future
completes, its value, which is a tuple, is unpacked into url
and maybeContents
.
The a Succeeded
or a Failed
message is printed out, and a check is made to see if processing is complete.
If not, the inner loop runs, which examines the contents to see if they contain the desired word.
Let's try it in the REPL.
Notice that Succeeded
and Failed
messages from the second loop are mixed with
Completed
messages from the first loop - the data flows freely.
$ sbt console ... scala> FutureRecovering.urlSearch("scala", urls(includeBad=true)) Completed blah://scalacourses.com Failed: true count: 4 Completed https://www.not-really-here.com Failed: true count: 3 Completed https://www.scalacourses.com Failed: false count: 2 Succeeded: 'scala' was found in https://www.scalacourses.com Count: 1 Completed https://www.micronauticsresearch.com Failed: false count: 1 Completed https://scalacourses.com/noPageHere Failed: true count: 1
You can run this program by typing:
$ sbt "runMain multi.futures.FutureRecovering"
Processing Futures As They Complete

Here is a method, modeled after
Twitter's Future.select
method,
that I think should be incorporated into the Scala runtime library.
It solves the problem we encountered throughout the preceding portion of this lecture by immediately making each Future
available for
further processing as they complete, without blocking.
/** @return the first future to complete, with the remainder of the Futures as a sequence. * @param fs a scala.collection.Seq * @author Victor Klang (https://gist.github.com/4488970) * @author Mike Slinn Updated to Scala 2.13 */ def select[A](futures: Seq[Future[A]]) (implicit ec: ExecutionContext): Future[(Try[A], Seq[Future[A]])] = { import scala.annotation.tailrec import scala.concurrent.Promise
@tailrec def stripe(promise: Promise[(Try[A], Seq[Future[A]])], head: Seq[Future[A]], thisFuture: Future[A], tail: Seq[Future[A]]): Future[(Try[A], Seq[Future[A]])] = { thisFuture onComplete { result => if (!promise.isCompleted) promise.trySuccess((result, head ++ tail)) } if (tail.isEmpty) promise.future else stripe(promise, head :+ thisFuture, tail.head, tail.tail) }
if (futures.isEmpty) Future.failed(new IllegalArgumentException("List of futures is empty")) else stripe(Promise(), futures, futures.head, futures.tail) }
Here is a convenient driver method for invoking select
:
/** Apply a function over a sequence of futures as soon as each future completes. * @param futures sequence of futures to operate on * @param operation function to apply on each Future value as soon as they complete * @param whenDone block of code to execute when all futures have been processed */ def asapFutures[T](futures: Seq[Future[T]]) (operation: Try[T]=>Unit) (whenDone: =>Unit={}) (implicit ec: ExecutionContext): Unit = {
def jiffyFutures(futures: Seq[Future[T]]) (operation: Try[T]=>Unit) (whenDone: =>Unit): Unit = { if (futures.nonEmpty) { select(futures) andThen { case Success((tryResult, remainingFutures)) => operation(tryResult) jiffyFutures(remainingFutures)(operation)(whenDone)
case Failure(throwable) => println("Unexpected exception: " + throwable.getMessage) } andThen { case _ => if (futures.size==1) whenDone } } }
if (futures.isEmpty) whenDone else jiffyFutures(futures)(operation)(whenDone) }
The first argument to asapFutures
is of course the Future
s to operate on as they complete.
The second argument is a Function1[Try[T],Unit]
,
which is the transformation that we want to apply to the Future
s once they complete.
The whenDone
parameter is lazily evaluated, as we learned in the
More Fun With Functions lecture of the
Introduction to Scala course,
and has a default value {}
, which is effectively a no-op.
The third parameter list once again consists of an implicit ExecutionContext
.
I have provided select
and asapFutures
in FuturesUtil.scala
so you can incorporate them into your projects.
I will show an example of how to invoke asapFutures
next.
First, let's define a Function
that transforms a collection of URLs into a collection of tuples
containing a URL and its associated Future
of the web page content.
val futureTuples: List[String] => List[Future[(String, String)]] = (_: List[String]).map { url => Future((url, readUrl(url))) }
Now we can define a new version of urlSearch
that looks for a word in a collection of web page URLs,
and it invokes asapFutures
.
The first argument to asapFutures
is the collection of Tuple2[String, Future]
to process.
Each tuple contains a URL and a Future
of the corresponding web page content.
I've provided a partial function as the second argument to asapFutures
instead of a
Function1
because Success
and Failure
cases can be specified with a partial function.
We discussed why it is legal to supply a partial function whenever a Function1
is expected in the
Partial Functions lecture.
The partial function receives the value of each future as they complete, and it handles three cases.
- Successfully fetched the web page and found the word.
Notice that the
tuple.unapply
method is implicitly invoked as discussed in the Unapply lecture of the Introduction to Scala course, so that temporary immutable variablesurl
andcontents
are defined. - Successfully fetched the web page but the word was not found.
Again,
tuple.unapply
is implicitly invoked sourl
andcontents
are defined. - The web page was not found.
The third argument to asapFutures
, whenDone
,
is lazily evaluated and is merely passed along to asapFutures
.
def urlSearch(word: String, urls: List[String])(whenDone: =>Unit={}): Unit = { asapFutures(futureTuples(urls)) { case Success((url, contents)) if contents.toLowerCase.contains(word) => println(s"Found '$word' in $url:\n${snippet(word, contents)}\n") case Success((url, contents)) => println(s"Sorry, $url does not contain '$word'\n") case Failure(err) => println(s"Error: ${err.getMessage}\n") }(whenDone) }
Let's employ urlSearch
twice, once with a non-empty list of URLs, and once with an empty list so there is nothing to process.
signal1
and signal2
are used in a closure according to the signaling pattern introduced in the
Getting Results from Futures & Signaling with Promises lecture;
when signal1
is completed the first Await.ready
allows execution to continue,
then when signal2
completes the program prints "All done
" and exits.
These closures are bound to the whenDone
parameters of urlSearch
and asapFutures
.
See the Closures lecture of the
Introduction to Scala course to refresh your memory of what a closure is.
val signal1 = Promise[String]() urlSearch("free", urls) { signal1.success("done") } Await.ready(signal1.future, duration.Duration.Inf)
val signal2 = Promise[String]() urlSearch("free", Nil) { signal2.success("done") } Await.ready(signal2.future, duration.Duration.Inf)
println("All done")
You can run this program by typing:
$ sbt "runMain multi.futures.FutureSelect" Error: Could not read from not_really_here.com
Found 'free' in https://scalacourses.com: ...<span style="float: right"> 2 / 31 free lectures </span> </div> <...
Sorry, https://micronauticsresearch.com does not contain 'free'
All done
Mixing Monads in a for-Comprehension

So far so good – but we have not yet attempted to return a value from urlSearch
.
Instead, that method only has side effects because it returns Unit
.
Let's see what happens if modify urlSearch
so it returns a value.
scala> def urlSearch(word: String, urls: List[String]) = { val futures: List[Future[String]] = urls.map(u => Future(readUrl(u))) for { (url, future) <- urls zip futures contents <- future if contents.toLowerCase.contains(word) } yield url } Error:(176, 16) type mismatch; found : scala.concurrent.Future[String] required: scala.collection.IterableOnce[?] contents <- future if contents.toLowerCase.contains(word)
As we learned in the For-Loops and For-Comprehensions lecture,
this doesn't compile because List
and Future
are different types of monads.
Remember that when you have multiple generators in a for-comprehension, you are flattening the resulting
type because of the flatMap
/ map
implementation provided by compiler desugaring.
Although it is easy for the Scala compiler to flatten a Future[Future[T]]
to a Future[T]
,
the Scala compiler does not know how to flatten nested monads of different types.
Given List[Future[T]],
would you want the flattened type to be List[T]
or Future[T]
?
Similarly, when flattening a Future[List[T]]
,
would you want the resulting type to be Future[T]
or List[T]
.
Let's come at this a bit differently for working with Future
s.
Instead of using zip
to associate URLs with Future
s of web page contents,
let's make a Function1
that accepts a list of URLs and returns a List[(String, Future[String])]
containing all the URLs and their future contents, or Future
exceptions if any web page fails to fetch.
This method is just like futureTuples
, except it uses collect
with a partial function instead of map
and a closure.
scala> val futureContents: List[String] => List[(String, Future[String])] = (_: List[String]).collect { case url => (url, Future(readUrl(url))) } futureContents: List[String] => List[(String, scala.concurrent.Future[String])] = <function1>
Now we can use futureContents
to obtain a list of tuples of URLs and their corresponding Future
s of the web page contents.
scala> futureContents(urls) res15: List[(String, scala.concurrent.Future[String])] = List((https://not_really_here.com,scala.concurrent.impl.Promise$DefaultPromise@49d77e6c), (https://scalacourses.com,scala.concurrent.impl.Promise$DefaultPromise@62ad0a53), (https://micronauticsresearch.com,scala.concurrent.impl.Promise$DefaultPromise@4e8101c1))
To allow us to retrieve the value from the Future
s, let's define a Function1
that accepts a Future[String]
and returns the contents, or the empty string if any Exception
occurs.
val futureString: Future[String] => String = (future: Future[String]) => try { Await.result(future, duration.Duration.Inf) } catch { case e: Exception => "" }
Now we can define a Function1
that uses List.collect
and a partial function that is only defined for Future[String]
instances
that contain the desired word when completed.
scala> val listOfTuples: String => List[(String, String)] = (word: String) => futureContents(urls).collect { case (url, future) if futureString(future).contains(word) => (snippet(word, futureString(future)), url) } listOfTuples: String => List[(String, String)] = <function1>
scala> listOfTuples("free") res16: List(String, String)] = List((...<span style="float: right"> 0 / 34 free lectures </span> </div> <...,https://scalacourses.com))
You can run this program by typing:
scala> sbt "runMain multi.futures.FutureMixed"
Output is as shown above.
© Copyright 1994-2024 Michael Slinn. All rights reserved.
If you would like to request to use this copyright-protected work in any manner,
please send an email.
This website was made using Jekyll and Mike Slinn’s Jekyll Plugins.