Pages

Wednesday, December 6, 2017

Concurrent Programming In Scala

Welcome » NERWous C » Examples
  1. Scala Language
  2. Actors Model
  3. Parallel Collections
  4. Futures and Promises


Scala Language

Publicly released in 2004, the Scala programming language was originally designed to be more concise than the Java language and with functional programming features missing in Java at that time. Since then the Scala language has been expanded from running solely on Java Virtual Machine to run on other platforms, such as Javascript. Current information about the language can be found on its official web site, www.scala-lang.org.

Scala supports parallel and concurrent programming via the following features:
  1. Actors Model
  2. Parallel Collections
  3. Futures and Promises
For each feature, let's study an example written in Scala, and see how it can be rewritten similarly in NERWous C.


Actors Model

Scala Actors are concurrent processes that communicate by exchanging messages.

Scala Actors - Ping-Pong Example

This ping-pong example uses the deprecated Scala Actors library. Scala Actors has now migrated to Akka. However since the ping-pong example is described in better details with the Scala Actors article than with the Akka terse documentation, it is used here.
case object Ping
case object Pong
case object Stop
import scala.actors.Actor
import scala.actors.Actor._
class Ping(count: int, pong: Actor) extends Actor {
  def act() {
    var pingsLeft = count - 1
    pong ! Ping
    while (true) {
      receive {
        case Pong =>
          if (pingsLeft % 1000 == 0)
            Console.println("Ping: pong")
          if (pingsLeft > 0) {
            pong ! Ping
            pingsLeft -= 1
          } else {
            Console.println("Ping: stop")
            pong ! Stop
            exit()
          }
      }
    }
  }
}
class Pong extends Actor {
  def act() {
    var pongCount = 0
    while (true) {
      receive {
        case Ping =>
          if (pongCount % 1000 == 0)
            Console.println("Pong: ping "+pongCount)
          sender ! Pong
          pongCount = pongCount + 1
        case Stop =>
          Console.println("Pong: stop")
          exit()
      }
    }
  }
}
object pingpong extends Application {
  val pong = new Pong
  val ping = new Ping(100000, pong)
  ping.start
  pong.start
}
The ping-pong example has a Ping actor sending to the Pong actor a "Ping" message. Upon receiving it, the Pong actor replies with a "Pong" message. After 1000 interactions, the Ping actor is done playing, and sends a "Stop" message. When Pong receives the "Stop", it also stops playing.

NERWOUS C Version 1

The first version rewrite in NERWous C hews to the Scala example, by using a receive input mel argument to represent the message sending between Scala actors.
main () {
   <pel>pong = <! name="Pong">Pong (null);
   <pel>ping = <! name="Ping">Ping (100000, null);
}
void Ping (int count, <mel> string receive) {
   <pel>pong;
   <? pel name="Pong" started>pong;

   int pingsLeft = count - 1;
   <?>pong.receive = "Ping";
   while ( 1 ) {
      switch ( <?>receive ) {   /* wait to receive */
         case "Pong";
            if (pingsLeft % 1000 == 0)
               printf ("Ping: pong");
            if (pingsLeft > 0) {
               <?>pong.receive = "Ping";
               --pingsLeft;
            } else {
               printf ("Ping: stop");
               <?>pong.receive = "Stop";
               <return>;
            }
            break;
         }
      }
   }
}
void Pong (<mel> string receive) {
   <pel>ping;
   <? pel name="Ping" started>ping;

   int pongCount = 0;
   while ( 1 ) {
      switch ( <?>receive ) {
         case "Ping":
            if (pongCount % 1000 == 0)
               printf ("Pong: ping " + pongCount);
             <?>ping.receive = "Pong";
             ++pongCount;
             break;
        case "Stop":
           printf ("Pong: stop");
           <return>;
      }
   }
}
The tricky thing about the Ping-Pong example is how Pong knows about Ping since it is created before Ping ever exists. The Scala solution is to use the sender actor which represents the actor that sends the message. When Pong receives the "Ping" message, the sender is the de facto Ping actor. The NERWous C version uses the wait-for-pel statement, which allows the task Pong to wait for a task named "Ping" to have a certain state (here, we pick the started state), and initializes the local pel variable ping with information about the task named "Ping":
<? pel name="Ping" started>ping;
Scala uses the receive method for an actor to send messages to another actor. The NERWous C version above uses the mel input argument. It is named receive here but can be any valid name. When Ping first runs, it sends a "Ping" message to Pong's receive mel input argument:
<?>pong.receive = "Ping";
In the mean time, the Pong task waits on its receive mel input argument to be valued with a message, either "Ping" or "Stop". With the former, it sends back a "Pong" message. With the latter, it just quits by running the <return> statement.
A computer linguist will notice that the C language on which NERWous C is based, does not support the native type string. It is used here for code simplicity since the focus is on the concurrency features, and not on the base language.

NERWOUS C Version 2

Let's now rewrite Version 1 to use NERWous C "streaming" feature. Instead of having Ping sending a "Ping" message directly to Pong, we will have Ping stream its "Ping" messages via the release operation to its mel output argument, and Pong access Ping's output messages:
main () {
   <pel>pong = <! name="Pong">Pong ();
   <pel>ping = <! name="Ping">Ping (100000);
}
<mel> string Ping (int count) {
   <pel>pong;
   <? pel name="Pong" started>pong;

   int pingsLeft = count - 1;
   <release> "Ping";   /* stream first "Ping" */

   <?>pong;   /* wait for Pong to stream */
   if (pingsLeft % 1000 == 0)
      printf ("Ping: pong");
   if (pingsLeft > 0) {
      <release> "Ping";   /* stream "Ping" again */
      --pingsLeft;
      <resume>;    /* resume the wait for Pong to stream */
   } else {
      printf ("Ping: stop");
   }
}
<mel> string Pong () {
   <pel>ping;
   <? pel name="Ping" started>ping;

   int pongCount = 0;
   try {
      <?>ping;   /* wait for Ping to stream */
      if (pongCount % 1000 == 0)
         printf ("Pong: ping " + pongCount);
      <release> "Pong";   /* stream "Pong" */
      ++pongCount;
      <resume>;    /* resume the wait for Ping to stream */
   }
   catch ( ping<ENDED> ) { }
}
Two changes are being made in Version 2. The first one is that the while loop has been replaced by the resume operation that repeats the mel wait for the streaming messages. The second one is that the "Stop" message has been removed. The Pong task knows that Ping has ended via the ENDED exception.


Parallel Collections

The parallel collections feature in Scala is discussed in this article. The examples to illustrate this feature are:
  1. Map
  2. Fold
  3. Filter
Map

This example uses a parallel map to transform a collection of String to all-uppercase:
val lastNames = List("Smith","Jones","Frankenstein","Bach","Jackson","Rodin").par
lastNames.map(_.toUpperCase)
The result of the run is:
SMITH, JONES, FRANKENSTEIN, BACH, JACKSON, RODIN

The NERWous C version is more verbose since there is no built-in map function. Again, for simplicity, we we will use the fictitious string type which does not exist in the C language:
/* VERSION 1 */
#define NUM 6
string lastNames[NUM] = {"Smith","Jones","Frankenstein","Bach","Jackson","Rodin"};
for (int i=0; i<NUM; ++i) {
   lastNames[i] = <!> toUpperCase(lastNames[i]);
}
The NERWous C version has a serial loop that pels a new inline task in each iteration. The inline task takes in lastNames[i] as a local variable, and ends with its toUpperCase value. The returned value is then re-assigned to the local variable lastNames[i].

Due to the assignment back to lastNames[i] which is under the main task context, each inline task has to finish running before the next inline task can run. So although the inline tasks can run in parallel with one another, they are practically run serially, albeit on a possibly different cel element.

Let's now transform lastNames into a mel array so that it can be accessed in parallel by the toUpperCase inline tasks:
/* VERSION 2 */
#define NUM 6
<mel>string lastNames[NUM] = {"Smith","Jones","Frankenstein","Bach","Jackson","Rodin"};
for (int i=0; i<NUM; ++i) {
   <!> { <? replace>lastNames[i] = toUpperCase(?); }
}
Each inline task that is pelled on each iteration of the for loop now truly runs in parallel, each accessing its portion of the shared mel array lastNames. Each inline task uses the reader zone shortcut to replace in place the value of the mel element lastNames[i].

Fold

This example adds up the values from 1 to 10000.
val parArray = (1 to 10000).toArray.par
parArray.fold(0)(_ + _)
The result of 1 + 2 + 3 + ... + 10000 is 50005000. Again, the NERWous C version is much more verbose since there is no built-in fold capability:
#define NUMCOUNT 10000
int parArray[NUMCOUNT];
for (i = 0; i<NUMCOUNT; ++i)
   parArray[i] = i+1;    /* initialize array */

<mel> int sum;
<collect> for (i = 0; i<NUMCOUNT; i += 2 ) {
    <! import="parArray[i] ..<flex ubound=NUMCOUNT uvalue=0>.. parArray[i+1]"> {
        <?> sum += (parArray[i] + parArray[i+1]);
    }
} <? ENDED>;
printf ("Summation value %d", <?>sum);
The first for loop initializes the parArray array with values 1, 2, etc. to 10000. The second for loop takes every two elements of the array and do the summation in separate tasks. Each task gets their local parArray elements imported, does the addition, and adds up the result into the share sum mel variable.

The second for loop is run inside a collect-ENDED block to make sure that all tasks that it pels have ended before the program continues with the printf statement. This ensures that the mel variable sum contains all the summations.

Each iteration in the second for loop creates a separate inline task to do the summation. This task requires two local values passed via the import operation. The import statement could be written simply as:
<! import="parArray[i],parArray[i+1]">
However if the number of items in parArray is odd, the last iteration of the for loop will generate an out-of-bound exception on parArray[i+1]. The use of the flex consecontive construct addendum allows the run-time environment to assign the uvalue value (0) to any parArray item with an index equal to or greater than the ubound value (NUMCOUNT).

Filter

This example uses a parallel filter to select the last names that come alphabetically after the letter “J”.
val lastNames = List("Smith","Jones","Frankenstein","Bach","Jackson","Rodin").par
lastNames.filter(_.head >= 'J')
The result of the run is Smith, Jones, Jackson, Rodin. This is the NERWous C version:
#define NUM 6
string lastNames[NUM] = { "Smith","Jones","Frankenstein","Bach","Jackson","Rodin" };
for (int i=0; i<NUM; ++i) {
   <!> { if head(lastNames[i]) >= 'J') printf("%s", lastNames[i]); }
}
Each iteration of the for loop pels a new task that runs independently from each other. Each task gets its local array element lastNames[i] imported for processing. Since each task runs independently and concurrently, the order of the names printed out is not deterministic.


Futures and Promises

Under Scala, a future is a read-only reference to a yet-to-be-completed value, while a promise is used to complete a future either sucessfully or with an exception and failure.

Futures - Coffee Making Example

The coffee making example is described in this article. The complete code is taken from GitHub:
import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.util.Random
import scala.util.Try

object kitchen extends App {

  /////////////////////////////
  // Some type aliases, just for getting more meaningful method signatures:
  type CoffeeBeans = String
  type GroundCoffee = String
  case class Water(temperature: Int)
  type Milk = String
  type FrothedMilk = String
  type Espresso = String
  type Cappuccino = String

  // some exceptions for things that might go wrong in the individual steps
  // (we'll need some of them later, use the others when experimenting
  // with the code):
  case class GrindingException(msg: String) extends Exception(msg)
  case class FrothingException(msg: String) extends Exception(msg)
  case class WaterBoilingException(msg: String) extends Exception(msg)
  case class BrewingException(msg: String) extends Exception(msg)
  /////////////////////////////
  def combine(espresso: Espresso, frothedMilk: FrothedMilk): Cappuccino = "cappuccino"

  def grind(beans: CoffeeBeans): Future[GroundCoffee] = Future {
    println("start grinding...")
    Thread.sleep(Random.nextInt(2000))
    if (beans == "baked beans") throw GrindingException("are you joking?")
    println("finished grinding...")
    s"ground coffee of $beans"
  }

  def heatWater(water: Water): Future[Water] = Future {
    println("heating the water now")
    Thread.sleep(Random.nextInt(2000))
    println("hot, it's hot!")
    water.copy(temperature = 85)
  }

  def frothMilk(milk: Milk): Future[FrothedMilk] = Future {
    println("milk frothing system engaged!")
    Thread.sleep(Random.nextInt(2000))
    println("shutting down milk frothing system")
    s"frothed $milk"
  }

  def brew(coffee: GroundCoffee, heatedWater: Water): Future[Espresso] = Future {
    println("happy brewing :)")
    Thread.sleep(Random.nextInt(2000))
    println("it's brewed!")
    "espresso"
  }

  ///////////// Business logic
  println("Kitched starting")
  def prepareCappuccino(): Future[Cappuccino] = {
    val groundCoffee = grind("arabica beans")
    val heatedWater = heatWater(Water(20))
    val frothedMilk = frothMilk("milk")
    for {
      ground <- groundCoffee
      water <- heatedWater
      foam <- frothedMilk
      espresso <- brew(ground, water)
    } yield combine(espresso, foam)
  }
  val capo = prepareCappuccino()
  Await.ready(capo, 1 minutes)
  println("Kitched ending")
}
The NERWous C version has the main task pels the tasks to do the grinding (grind), heating water (heatWater), frothing the milk (frothMilk) and brewing the espresso (brew) -- to be run in parallel. The brew task waits for the grind task to have the coffee beans ground, and the heatWater task to have the water hot to the requested temperature. The main task, after pelling all the activities, waits for the brew and frothMilk to be done before declaring "Kitchen ending". If the brewing is not ready within a minute, the main task ends with "Kitchen ending without espresso".
main () {
   printf ("Kitchen starting");
   <pel> ground = <!> grind("arabica beans");
   <pel> water = <!> heatWater(Water(20));
   <pel> foam = <!> frothMilk("milk");
   <pel> espresso = <!> brew(ground, water);

   try {
      string frothMilk_ret, brew_ret;
      (frothMilk_ret, brew_ret) = <? timeout=6000> (foam, espresso);
      if ( brew_ret == "" ) printf ("Kitchen ending without espresso");
      else printf ("Kitchen ending");
   }
   catch ((foam && espresso)<TIMEOUT>) {
      printf ("Espresso and milk not ready within 1 min");
   }
}

string grind( string beans ) {
    printf("start grinding...");
    sleep(rand()%2000);
    if ( beans == "baked beans") return "are you joking?";
    printf("finished grinding...");
    return("ground coffee of " + beans);
 }

 int heatWater(int temp) {
    printf("heating the water now")
    sleep(rand()%2000);
    printf("hot, it's hot!")
    return 85;
 }

 string frothMilk( string milk ) {
    printf("milk frothing system engaged!")
    sleep(rand()%2000);
    printf("shutting down milk frothing system");
    return "frothed " + milk;
 }

 string brew( pel ground, pel water ) {
    printf("happy brewing :)");
    sleep(rand()%2000);

    string ret = <?> ground;
    if ( ret == "are you joking?" ) return "";

    <?> water;
    printf("it's brewed!");
    return "espresso";
 }
The tasks communicate their ending via the return statement with a string value. The tasks that depend on those ending tasks, start to do their own stuff, then wait for those returned values before continuing with the rest of their own stuff.

Promises - Producer/Consumer Example

This producer/consumer example is taken from the Scala SIP-14 document.
import scala.concurrent.{ Future, Promise }
val p = Promise[T]()
val f = p.future

val producer = Future {
  val r = someComputation
  if (isInvalid(r))
    p failure (new IllegalStateException)
  else {
    val q = doSomeMoreComputation(r)
    p success q
  }
}

val consumer = Future {
  startDoingSomething()
  f onSuccess {
    case r => doSomethingWithResult(r)
  }
}
The NERWous C version has the main task pelling the producer and consumer tasks to run in parallel.
main () {
   <pel> prod = <!> producer();
   <!> consumer(prod);
}
int producer () {
   int r = someComputation();
   if (isInvalid(r) ) <end FAILED>;
   else {
      int q = doSomeMoreComputation(r);
      <return> q;
   }
}
void consumer (<pel>prod) {
   startDoingSomething();
   try {
      doSomethingWithResult (<?>prod);
   }
   catch (prod<FAILED>) {}
   catch (prod<...>) {}
}
The producer task either ends with a FAILED exception on an invalid someComputation result, or ends normally with a valid result to be used as the return value of the task.

The consumer task is given a representative of the producer task via the pel input argument prod. It does startDoingSomething then waits for producer return value via the mel wait statement <?>prod. If the wait is successful, it will do doSomethingWithResult. If the wait fails due to a FAILED or other exception, the consumer just ends without doing anything more.


Previous Next Top

No comments:

Post a Comment