Pages

Thursday, June 29, 2017

Pel Setup

Welcome » NERWous C » Pel
  1. Pel Attributes
  2. Import Attributes
  3. Timeout Attribute
  4. At Attribute
  5. Mode Attribute
  6. Name Attribute
  7. Priority Attribute
  8. Pel Variable


Pel Attributes

As introduced in the Pel Basic chapter, a pel creation statement accepts attributes:
OperationAttributesSynopsis
<!>code at
=cel
Request that code runs on the specific cel
import
=list-of-vars
Transfer the values of specific global variables to the created task
import-file
=list-of-files
Transfer the values of specific global variables in the listed files to the created task
mode
=setting
Request that the task be created with this mode: suspend (create then suspend execution), quickstart (create faster than normal), maintenance (create in maintenance mode for diagnostic messages), running (create and wait for the task to run)
name
=usn
Assign a unique user-specified name so the task can be referred to without knowing the system-generated task ID.
priority
=n
Request that code runs at the n-th priority level
timeout
=t
(Synchronous mode only) Request that the task be created within t msec.
<start>p Start a suspended task.
<suspend>p Suspend a running task.
<update>p name=usn Update the local properties of task p. If the name attribute is specified, update p from the task named with the specified user-specified name.
<terminate>p Request that the task p be terminated.
<kill>p Force the task p to end immediately.


Import Attributes

The import and import-file attributes are discussed in earlier chapter and chapter.


Previous Next Top

Monday, June 19, 2017

A MapReduce Example

Welcome » NERWous C » Examples
  1. Current Literature
  2. NERWous C Sample


Current Literature

MapReduce is a programming model that uses parallel algorithms to process big data sets using a large number of distributed computer nodes, The data flows through a MapReduce framework through five processing steps:
  • Input step: the data is read into the framework as (key 1, value 1) input data pairs.
  • Map step:the input data pairs are grouped into (key 2, value 2) domain data pairs.
  • Partition step: the domain data pairs are associated with the processing computer node.
  • Reduce step: the computer node processes all associated domain data pairs.
  • Output step: the results of the reduce step are collected to produce the final outcome.
Within each step, parallel processing occurs as much as possible. The steps themselves, although logically forming a processing sequence, can also be run in parallel, interleaving with one another as long as the same final outcome is produced


NERWous C Sample

The example is taken from the MapReduce Wikipedia page. The goal is to count the appearance of every word in a set of documents.

In the NERWous C code below, mel variables are used to pass data between the steps. Parallelism occurs inside the steps, but from step to step, it is sequential programming.
#include "nerw.h"

typedef char word[50] WORD;

/* Input data pair */.
typedef struct { char filename[64]; char line[256]; } INPUT_PAIR;
<mel>INPUT_PAIR InputPairs[];

/* Domain data */
typedef struct { WORD word; int count; } MAP_PAIR;
<mel>MAP_PAIR MapPairs[];

/* Partition data */
typedef struct { WORD word; char celid; } PARTITION_PAIR;
<mel>PARTITION_PAIR PartitionPairs[];

/* Output data pair */
typedef struct { WORD word; int count; } OUTPUT_PAIR;
<mel>OUTPUT_PAIR OutputPairs[];

/* Run the MapReduce steps */

void main (int argc, char **argv) {

   /* Read in the directory containing the files to be processed */
   char directory[MAX_PATH];    
   snprintf (directory, MAX_PATH, "%s", argv[1]);

   /* Invoke the steps sequentially */
   input_step (directory);
   map_step ();
   partition_step();
   reduce_step();
   output_step();
}

/* INPUT STEP:
** Read a directory full of text files and return each line as a record.
*/

void input_step (char *directory) {
   struct dirent *file;
   DIR *dir = opendir (directory);
    
   /* Run the INPUT step.
   ** The while loop sequentially goes through the directory.
   ** It skips subfolders, and processes files only.
   ** Each file is processed separately in its own sub-task.
   ** The INPUT step waits for all the sub-tasks to end before returning.
   */
   <collect> while ( (file = readdir(dir)) ) {
      if (file->d_type == DT_DIR) continue;
      <!> input_task (file->d_name);
      }      
   } <? ENDED>;
}

void input_task (char *filename) {
   FILE *fp = fopen(filename, "r");
   if (fp == NULL) return;

   char * line = NULL;
   size_t len = 0;
   ssize_t read;
    
   /* Read a file and collect all the lines in a [INPUT_PAIR] data pairs.
   ** Run the collection in parallel just to showcase the use of parallelism. 
   ** Sequential while loop may be faster in this case, due to the task setup cost.
   ** 
   ** Wait for all the collection tasks to finish with the collect-ENDED block.
   */
    <collect> while ((read = getline(&line, &len, fp)) != -1) {
      <!> {
         INPUT_PAIR inputpair;
         strcpy (inputpair.filename, filename);
         strcpy (inputpair.line, line);
         <insert item=inputpair> InputPairs;
      }
   } <? ENDED>;
}

/* MAP STEP:
** Break the line into separate words. 
** Each broken word has its own MAP_PAIR with count of 1.
** Similar words are not aggregated.
*/

void map_step () {
   int numitems = InputPairs<count>;
   for (int i=0; i<numitems; ++i)
   <!> {
      map_task (<?readonly>InputPairs[i]);
   }
}

void map_task (INPUT_PAIR inputpair) {
   WORD *wordarr[] = split(inputpair.line, " ");
   int numitems = sizeof(wordarr)/sizeof(WORD);
   <collect> for (int i=0; i<numitems; ++i)
   <!> {
       MAP_PAIR mappair;
       strcpy (mappair.word, wordarr[i]);
       mappair.count = 1;
       <insert item=mappair> MapPairs;
   } <? ENDED>;
}

/* PARTITION STEP:
** Allocate each word to a cel node for the Reduce step.
** The method chosen is to allocate based on the first character of the word.
** This method is not very efficient since some nodes will be loaded while others are not.
*/

<cel> nodes[26] = { 
{"A"}, {"B"}, {"C"}, {"D"}, {"E"}, {"F"}, {"G"}, {"H"},
{"I"}, {"J"}, {"K"}, {"L"}, {"M"}, {"N"}, {"O"}, {"P"},
{"Q"}, {"R"}, {"S"}, {"T"}, {"U"}, {"V"}, {"W"}, {"X"},
{"Y"}, {"Z"}};

void partition_step () {
   MAP_PAIR mappair;
   int numitems = MapPairs<count>;
   <collect> for (int i=0; i<numitems; ++i)
   <!> {
       partition_task (<?readonly>MapPairs[i].word);
   } <? ENDED>;
}

void partition_task (WORD aword) {
   PARTITION_PAIR partipair;
   partipair.word = aword;
   partipair.celid = uppercase(aword[0]);
   <insert item=partipair> PartitionPairs;
}

/* REDUCE STEP
** Count each word using the assigned cel.
*/

void reduce_step ()
   PARTITION_PAIR partipair;
    
   /* Run the [reduce_task] using the cel node identified in the Partition Pair.
   ** Wait for all tasks to finish their run before continuing.
   */
   int numitems = PartitionPairs<count>;
   <collect> for (int i=0; i<numitems; ++i) {
      partipair = <?readonly> ParititionPairs[i];
      <! at=partipair.celid> reduce_task (partipair.word);
   } <? ENDED>;
    
   /* Get exclusive access to the mel [OutputPairs], sort in place, and unlock it. */
   <?> OutputPairs {
      qsort(OutputPairs, OutputPairs<count>, sizeof(OUTPUT_PAIR), cmpfunc);
   }
}

int cmpfunc (OUTPUT_PAIR *a, OUTPUT_PAIR *b) {
   return a-<word[0] - b-<word[0];
}

void reduce_task (WORD aword) {
   OUTPUT_PAIR outputpair;
    
   /* For a given word, see if an Output Pair already exists or not.
   ** If it is, increment the count. If it is not, create a new Output Pair.
   ** The [found] mel variable is set if an existing Output Pair is found.
   ** If [found] still remains 0 after checking exiting Output Pairs, create a new one.
   */
   <mel> int found = 0;
   int numitems = OutputPairs<count>;
   <collect> for (int i=0; i<numitems; ++i)
   <!> {
       <? as=outputpair> OutputPairs[i] {
           if (!strcmp(outputpair.word, aword)) {
               ++outputpair.count;
               <?readonly> found = 1;    /* replace the value of found to 1 */
               <checkout replace>;
           }
       }
   } <? ENDED>;
    
   if ( <?> found == 0 ) {
      strcpy (outputpair.word, aword);
      outputpair.count = 1;
      <insert item=outputpair> OutputPairs;
   } 
}

/* OUTPUT STEP:
** Show all the word counts.
*/

void output_step ()
   int numitems = OutputPairs<count>;

   /* Use a sequential loop to guarantee that the output are printed in order */
   for (int i=0; i<numitems; ++i) {
      output_task (<?readonly>OutputPairs[i]);
   }
}

void output_task (OUTPUT_PAIR outputpair) {
   printf ("%s = %d", outputpair.word, outputpair.count);
}


Previous Next Top