Mike Slinn

Future Bad Habits and Exercise

— Draft —

Published 2014-09-22. Last modified 2015-01-29.
Time to read: 3 minutes.

This lecture contains a section on bad habits, followed an exercise to help you apply what you have learned regarding Scala Futures and Promises.

The sample code for this lecture can be found in courseNotes/src/main/scala/futures/FutureRace.scala.

Accessing Command-Line Parameters

The command line arguments that the user typed when launching a console application are available in the predefined args Array[String]. Here I obtain the token that the user typed, and attempt to convert it to an Int. If there is a problem with the conversion, or the user did not specify a value, threadCount is assigned the value 1.

Scala code
val threadCount: Int = try {
  if (args.length==1) math.max(1, args(0).toInt) else 1
} catch {
  case e: Exception =>
    println("You can specify the number of threads in the threadpool on the command line; default value is 1")
    System.exit(-1)
    1
}

Bad Habit: Accessing Shared Mutable State From A Future

Futures normally run on their own thread, and are intended for parallelism, not concurrency. This means that they should not share mutable state. The following code sets up a race condition, wherein you are not sure what the value of the shared variable should be when it is evaluated by the future. Just to make things interesting, let's set up an ExecutionContext that has only the number of threads that are specified on the command line.

Scala code
val pool: ExecutorService = Executors.newFixedThreadPool(threadCount)
implicit val ec = ExecutionContext.fromExecutor(pool)

In this example, the arithmetic computations 2 + 3 + offset and 2 + 3 + accessor run in their own execution contexts. As you can see, the value of offset was changed to 42 after the Future was created, when it had the value 6.

Scala code
var offset = 6 // @volatile does not affect race conditions
def accessor = offset
val f1 = Future { 2 + 3 + offset // will be executed asynchronously } andThen { case Success(result) => println("Result 1: " + result) case Failure(exception) => println("Exception 1: " + exception.getMessage) }
val f2 = Future { 2 + 3 + accessor // will be executed asynchronously } andThen { case Success(result) => println("Result 2: " + result) case Failure(exception) => println("Exception 2: " + exception.getMessage) } offset = 42 println("End of mainline, offset = " + offset)

The last two lines of the program are worthy of note.

Scala code
Await.ready(Future.sequence(List(f1, f2)), Duration.Inf)
pool.shutdown()

Once the two Futures have completed, pool.shutdown() is invoked, which terminates the threadpool and allows the program to exit.

What do you think that the printed value should be for Result 1 and Result 2? 11 and 47, or 11 and 11? The result depends upon whether there is more than one thread available, the type of Executor used, and possibly the workload of other threads managed by the Executor.

Let's run the program with one thread in pool. We do that by invoking Executors.newFixedThreadPool and passing in 1, which is obtained from the command line by parsing args(0). The following defines pool to be an ExecutorService with only one thread, so the result of running the program is deterministic and clearly not what you would want.

Shell
$ sbt "runMain multi.futures.FutureRace 1"
...output ...
Result 1: 11
End of mainline, offset = 42
Result 2: 47 

The program generates different results when run with 2 threads.

Shell
$ sbt "runMain multi.futures.FutureRace 2"
...output ...
Result 1: 11
Result 2: 11
End of mainline, offset = 42 

This problem arises because Futures normally execute on a different thread than the invoking thread, and the execution context with the Future started with the offset set to 6. Similar problems will ensue even if you call a function from the scope that defines the Future. For the full program, see FutureRace.scala in the courseNotes project.

The Future is computed in a closure that runs as an anonymous class on another execution context. See the Closures lecture of the Introduction to Scala course to remind yourself of what a closure is. The value of offset when the Future is first computed is passed to the Future, but when the Future evaluates in another thread's stack frame, the value of offset might not have been not be updated to the current value on the original thread's stack frame.

Memory consistency issues can arise when unsafely passing objects between execution contexts; these code examples use integers, so there cannot be any partial writes that would lead to internal inconsistency. If a code example were to use an object containing non-final fields, and the calling execution contexts were to attempt to update a field on the Future’s execution context, then the code running in the Future’s context could see the object in an inconsistent state – that is, partially updated. These problems can be hard to detect, and examining variable values on the heap or the main program’s stack can be misleading. As I discussed in the Working With Collections of Futures lecture, the Java Atomic objects can be useful for this scenario.

Interaction with shared mutable state or other external resources outside the Future is allowed if the code is threadsafe and uses mechanisms such as transactors or j.u.c. coding practices such as synchronized, volatile, latches and locks for obeying ordinary concurrency rules. The simple rule is to only access immutable data when defining a Future. This rule is why Scala Futures can support parallelism and composition. You can overcome this restriction somewhat by using transactors and normal j.u.c. programming practices for multi-threaded programs.

j.u.c. has some handy and efficient mechanisms for producing immutable and concurrent versions of mutable collections, such as CopyOnWriteArrayList, ConcurrentMap, and other immutable and concurrent members of the Java collections framework.

j.u.c. mechanisms only work within a single Java virtual machine; they does not work across multiple JVMs. For explicitly created Futures such as found in this course, this restriction is not a problem. Distributed Akka Actors can create Futures on remote JVMs, however, and transactors are necessary to handle that scenario. Exploring transactors is beyond the scope of this course.

Exercise – Typing Monkeys Using Futures

This exercise should take at least an hour. Modify the monkey simulator program you wrote for the Parallel Collections lecture so it uses Futures.

Solution

This solution can be found in solutions/MonkeyFutures.scala.

Scala code
package solutions
import scala.concurrent.{Await, Future} import scala.concurrent.duration._ import scala.concurrent.ExecutionContext.Implicits.global import scala.collection.parallel._
trait Timeable[T] { def time(block: => T): T = { val t0 = System.nanoTime val result: T = block val elapsedMs = (System.nanoTime - t0) / 1000000 println("Elapsed time: " + elapsedMs + "ms") result } }
/** Solution to https://www.scalacourses.com/showLecture/17/90 * This program uses more memory as the number of iterations increases. * For 13000000L iterations, specify -Xmx2g. * For that many iterations, you should only expect 3 characters to match in about a minute on a fast machine. * This naive implementation's computational effort increases geometrically with the number of iterations. */ object FutureMonkey extends App with Timeable[Future[String]] { val iterations: Long = 13000000L val singleFutureTimeout = Duration(20, "seconds")
val stringToMatch = "I thought I saw a lolcat! I did, I did see a lolcat!" val random = new scala.util.Random val allowableChars = """ !.,;'""" + (('a' to 'z').toList ::: ('A' to 'Z').toList ::: (0 to 9).toList).mkString
try { val result = Await.result(time(parFun), singleFutureTimeout) println(s"Best match is: '$result'") } catch { case exception: Throwable => println(exception.getMessage) }
/** Generate a random string of length n from the given alphabet */ def randomString(alphabet: String, n: Int): String = Stream.continually(random.nextInt(alphabet.size)).map(alphabet).take(n).mkString
def matchSubstring(str1: String, str2: String): String = str1.view.zip(str2).takeWhile(Function.tupled(_ == _)).map(_._1).mkString
def checkIt: Future[String] = Future { val item = randomString(allowableChars, stringToMatch.length) matchSubstring(item, stringToMatch) }
def parFun: Future[String] = { print(s"Mapping $iterations iterations") val strs: ParSeq[Future[String]] = for { i <- (1L to iterations).toList.par } yield { if (i % (iterations/100L)==0) print(".") // print one dot for each % complete checkIt } println("\nReducing in parallel...") strs.reduce((acc, item) => if (acc.value.mkString.length < item.value.mkString.length) item else acc) } }

* indicates a required field.

Please select the following to receive Mike Slinn’s newsletter:

You can unsubscribe at any time by clicking the link in the footer of emails.

Mike Slinn uses Mailchimp as his marketing platform. By clicking below to subscribe, you acknowledge that your information will be transferred to Mailchimp for processing. Learn more about Mailchimp’s privacy practices.