Pages

Monday, June 19, 2017

A MapReduce Example

Welcome » NERWous C » Examples
  1. Current Literature
  2. NERWous C Sample


Current Literature

MapReduce is a programming model that uses parallel algorithms to process big data sets using a large number of distributed computer nodes, The data flows through a MapReduce framework through five processing steps:
  • Input step: the data is read into the framework as (key 1, value 1) input data pairs.
  • Map step:the input data pairs are grouped into (key 2, value 2) domain data pairs.
  • Partition step: the domain data pairs are associated with the processing computer node.
  • Reduce step: the computer node processes all associated domain data pairs.
  • Output step: the results of the reduce step are collected to produce the final outcome.
Within each step, parallel processing occurs as much as possible. The steps themselves, although logically forming a processing sequence, can also be run in parallel, interleaving with one another as long as the same final outcome is produced


NERWous C Sample

The example is taken from the MapReduce Wikipedia page. The goal is to count the appearance of every word in a set of documents.

In the NERWous C code below, mel variables are used to pass data between the steps. Parallelism occurs inside the steps, but from step to step, it is sequential programming.
#include "nerw.h"

typedef char word[50] WORD;

/* Input data pair */.
typedef struct { char filename[64]; char line[256]; } INPUT_PAIR;
<mel>INPUT_PAIR InputPairs[];

/* Domain data */
typedef struct { WORD word; int count; } MAP_PAIR;
<mel>MAP_PAIR MapPairs[];

/* Partition data */
typedef struct { WORD word; char celid; } PARTITION_PAIR;
<mel>PARTITION_PAIR PartitionPairs[];

/* Output data pair */
typedef struct { WORD word; int count; } OUTPUT_PAIR;
<mel>OUTPUT_PAIR OutputPairs[];

/* Run the MapReduce steps */

void main (int argc, char **argv) {

   /* Read in the directory containing the files to be processed */
   char directory[MAX_PATH];    
   snprintf (directory, MAX_PATH, "%s", argv[1]);

   /* Invoke the steps sequentially */
   input_step (directory);
   map_step ();
   partition_step();
   reduce_step();
   output_step();
}

/* INPUT STEP:
** Read a directory full of text files and return each line as a record.
*/

void input_step (char *directory) {
   struct dirent *file;
   DIR *dir = opendir (directory);
    
   /* Run the INPUT step.
   ** The while loop sequentially goes through the directory.
   ** It skips subfolders, and processes files only.
   ** Each file is processed separately in its own sub-task.
   ** The INPUT step waits for all the sub-tasks to end before returning.
   */
   <collect> while ( (file = readdir(dir)) ) {
      if (file->d_type == DT_DIR) continue;
      <!> input_task (file->d_name);
      }      
   } <? ENDED>;
}

void input_task (char *filename) {
   FILE *fp = fopen(filename, "r");
   if (fp == NULL) return;

   char * line = NULL;
   size_t len = 0;
   ssize_t read;
    
   /* Read a file and collect all the lines in a [INPUT_PAIR] data pairs.
   ** Run the collection in parallel just to showcase the use of parallelism. 
   ** Sequential while loop may be faster in this case, due to the task setup cost.
   ** 
   ** Wait for all the collection tasks to finish with the collect-ENDED block.
   */
    <collect> while ((read = getline(&line, &len, fp)) != -1) {
      <!> {
         INPUT_PAIR inputpair;
         strcpy (inputpair.filename, filename);
         strcpy (inputpair.line, line);
         <insert item=inputpair> InputPairs;
      }
   } <? ENDED>;
}

/* MAP STEP:
** Break the line into separate words. 
** Each broken word has its own MAP_PAIR with count of 1.
** Similar words are not aggregated.
*/

void map_step () {
   int numitems = InputPairs<count>;
   for (int i=0; i<numitems; ++i)
   <!> {
      map_task (<?readonly>InputPairs[i]);
   }
}

void map_task (INPUT_PAIR inputpair) {
   WORD *wordarr[] = split(inputpair.line, " ");
   int numitems = sizeof(wordarr)/sizeof(WORD);
   <collect> for (int i=0; i<numitems; ++i)
   <!> {
       MAP_PAIR mappair;
       strcpy (mappair.word, wordarr[i]);
       mappair.count = 1;
       <insert item=mappair> MapPairs;
   } <? ENDED>;
}

/* PARTITION STEP:
** Allocate each word to a cel node for the Reduce step.
** The method chosen is to allocate based on the first character of the word.
** This method is not very efficient since some nodes will be loaded while others are not.
*/

<cel> nodes[26] = { 
{"A"}, {"B"}, {"C"}, {"D"}, {"E"}, {"F"}, {"G"}, {"H"},
{"I"}, {"J"}, {"K"}, {"L"}, {"M"}, {"N"}, {"O"}, {"P"},
{"Q"}, {"R"}, {"S"}, {"T"}, {"U"}, {"V"}, {"W"}, {"X"},
{"Y"}, {"Z"}};

void partition_step () {
   MAP_PAIR mappair;
   int numitems = MapPairs<count>;
   <collect> for (int i=0; i<numitems; ++i)
   <!> {
       partition_task (<?readonly>MapPairs[i].word);
   } <? ENDED>;
}

void partition_task (WORD aword) {
   PARTITION_PAIR partipair;
   partipair.word = aword;
   partipair.celid = uppercase(aword[0]);
   <insert item=partipair> PartitionPairs;
}

/* REDUCE STEP
** Count each word using the assigned cel.
*/

void reduce_step ()
   PARTITION_PAIR partipair;
    
   /* Run the [reduce_task] using the cel node identified in the Partition Pair.
   ** Wait for all tasks to finish their run before continuing.
   */
   int numitems = PartitionPairs<count>;
   <collect> for (int i=0; i<numitems; ++i) {
      partipair = <?readonly> ParititionPairs[i];
      <! at=partipair.celid> reduce_task (partipair.word);
   } <? ENDED>;
    
   /* Get exclusive access to the mel [OutputPairs], sort in place, and unlock it. */
   <?> OutputPairs {
      qsort(OutputPairs, OutputPairs<count>, sizeof(OUTPUT_PAIR), cmpfunc);
   }
}

int cmpfunc (OUTPUT_PAIR *a, OUTPUT_PAIR *b) {
   return a-<word[0] - b-<word[0];
}

void reduce_task (WORD aword) {
   OUTPUT_PAIR outputpair;
    
   /* For a given word, see if an Output Pair already exists or not.
   ** If it is, increment the count. If it is not, create a new Output Pair.
   ** The [found] mel variable is set if an existing Output Pair is found.
   ** If [found] still remains 0 after checking exiting Output Pairs, create a new one.
   */
   <mel> int found = 0;
   int numitems = OutputPairs<count>;
   <collect> for (int i=0; i<numitems; ++i)
   <!> {
       <? as=outputpair> OutputPairs[i] {
           if (!strcmp(outputpair.word, aword)) {
               ++outputpair.count;
               <?readonly> found = 1;    /* replace the value of found to 1 */
               <checkout replace>;
           }
       }
   } <? ENDED>;
    
   if ( <?> found == 0 ) {
      strcpy (outputpair.word, aword);
      outputpair.count = 1;
      <insert item=outputpair> OutputPairs;
   } 
}

/* OUTPUT STEP:
** Show all the word counts.
*/

void output_step ()
   int numitems = OutputPairs<count>;

   /* Use a sequential loop to guarantee that the output are printed in order */
   for (int i=0; i<numitems; ++i) {
      output_task (<?readonly>OutputPairs[i]);
   }
}

void output_task (OUTPUT_PAIR outputpair) {
   printf ("%s = %d", outputpair.word, outputpair.count);
}


Previous Next Top

No comments:

Post a Comment