Welcome
»
NERWous C
»
Examples
- Current Literature
- NERWous C Sample
Current Literature
MapReduce is a programming model that uses parallel algorithms to process big data sets using a large number of distributed computer nodes, The data flows through a MapReduce framework through five processing steps:
- Input step: the data is read into the framework as (key 1, value 1) input data pairs.
- Map step:the input data pairs are grouped into (key 2, value 2) domain data pairs.
- Partition step: the domain data pairs are associated with the processing computer node.
- Reduce step: the computer node processes all associated domain data pairs.
- Output step: the results of the reduce step are collected to produce the final outcome.
Within each step, parallel processing occurs as much as possible. The steps themselves, although logically forming a processing sequence, can also be run in parallel, interleaving with one another as long as the same final outcome is produced
NERWous C Sample
The example is taken from the
MapReduce Wikipedia page. The goal is to count the appearance of every word in a set of documents.
In the NERWous C code below,
mel variables are used to pass data between the steps. Parallelism occurs inside the steps, but from step to step, it is sequential programming.
#include "nerw.h"
typedef char word[50] WORD;
/* Input data pair */.
typedef struct { char filename[64]; char line[256]; } INPUT_PAIR;
<mel>INPUT_PAIR InputPairs[];
/* Domain data */
typedef struct { WORD word; int count; } MAP_PAIR;
<mel>MAP_PAIR MapPairs[];
/* Partition data */
typedef struct { WORD word; char celid; } PARTITION_PAIR;
<mel>PARTITION_PAIR PartitionPairs[];
/* Output data pair */
typedef struct { WORD word; int count; } OUTPUT_PAIR;
<mel>OUTPUT_PAIR OutputPairs[];
/* Run the MapReduce steps */
void main (int argc, char **argv) {
/* Read in the directory containing the files to be processed */
char directory[MAX_PATH];
snprintf (directory, MAX_PATH, "%s", argv[1]);
/* Invoke the steps sequentially */
input_step (directory);
map_step ();
partition_step();
reduce_step();
output_step();
}
/* INPUT STEP:
** Read a directory full of text files and return each line as a record.
*/
void input_step (char *directory) {
struct dirent *file;
DIR *dir = opendir (directory);
/* Run the INPUT step.
** The while loop sequentially goes through the directory.
** It skips subfolders, and processes files only.
** Each file is processed separately in its own sub-task.
** The INPUT step waits for all the sub-tasks to end before returning.
*/
<collect> while ( (file = readdir(dir)) ) {
if (file->d_type == DT_DIR) continue;
<!> input_task (file->d_name);
}
} <? ENDED>;
}
void input_task (char *filename) {
FILE *fp = fopen(filename, "r");
if (fp == NULL) return;
char * line = NULL;
size_t len = 0;
ssize_t read;
/* Read a file and collect all the lines in a [INPUT_PAIR] data pairs.
** Run the collection in parallel just to showcase the use of parallelism.
** Sequential while loop may be faster in this case, due to the task setup cost.
**
** Wait for all the collection tasks to finish with the collect-ENDED block.
*/
<collect> while ((read = getline(&line, &len, fp)) != -1) {
<!> {
INPUT_PAIR inputpair;
strcpy (inputpair.filename, filename);
strcpy (inputpair.line, line);
<insert item=inputpair> InputPairs;
}
} <? ENDED>;
}
/* MAP STEP:
** Break the line into separate words.
** Each broken word has its own MAP_PAIR with count of 1.
** Similar words are not aggregated.
*/
void map_step () {
int numitems = InputPairs<count>;
for (int i=0; i<numitems; ++i)
<!> {
map_task (<?readonly>InputPairs[i]);
}
}
void map_task (INPUT_PAIR inputpair) {
WORD *wordarr[] = split(inputpair.line, " ");
int numitems = sizeof(wordarr)/sizeof(WORD);
<collect> for (int i=0; i<numitems; ++i)
<!> {
MAP_PAIR mappair;
strcpy (mappair.word, wordarr[i]);
mappair.count = 1;
<insert item=mappair> MapPairs;
} <? ENDED>;
}
/* PARTITION STEP:
** Allocate each word to a cel node for the Reduce step.
** The method chosen is to allocate based on the first character of the word.
** This method is not very efficient since some nodes will be loaded while others are not.
*/
<cel> nodes[26] = {
{"A"}, {"B"}, {"C"}, {"D"}, {"E"}, {"F"}, {"G"}, {"H"},
{"I"}, {"J"}, {"K"}, {"L"}, {"M"}, {"N"}, {"O"}, {"P"},
{"Q"}, {"R"}, {"S"}, {"T"}, {"U"}, {"V"}, {"W"}, {"X"},
{"Y"}, {"Z"}};
void partition_step () {
MAP_PAIR mappair;
int numitems = MapPairs<count>;
<collect> for (int i=0; i<numitems; ++i)
<!> {
partition_task (<?readonly>MapPairs[i].word);
} <? ENDED>;
}
void partition_task (WORD aword) {
PARTITION_PAIR partipair;
partipair.word = aword;
partipair.celid = uppercase(aword[0]);
<insert item=partipair> PartitionPairs;
}
/* REDUCE STEP
** Count each word using the assigned cel.
*/
void reduce_step ()
PARTITION_PAIR partipair;
/* Run the [reduce_task] using the cel node identified in the Partition Pair.
** Wait for all tasks to finish their run before continuing.
*/
int numitems = PartitionPairs<count>;
<collect> for (int i=0; i<numitems; ++i) {
partipair = <?readonly> ParititionPairs[i];
<! at=partipair.celid> reduce_task (partipair.word);
} <? ENDED>;
/* Get exclusive access to the mel [OutputPairs], sort in place, and unlock it. */
<?> OutputPairs {
qsort(OutputPairs, OutputPairs<count>, sizeof(OUTPUT_PAIR), cmpfunc);
}
}
int cmpfunc (OUTPUT_PAIR *a, OUTPUT_PAIR *b) {
return a-<word[0] - b-<word[0];
}
void reduce_task (WORD aword) {
OUTPUT_PAIR outputpair;
/* For a given word, see if an Output Pair already exists or not.
** If it is, increment the count. If it is not, create a new Output Pair.
** The [found] mel variable is set if an existing Output Pair is found.
** If [found] still remains 0 after checking exiting Output Pairs, create a new one.
*/
<mel> int found = 0;
int numitems = OutputPairs<count>;
<collect> for (int i=0; i<numitems; ++i)
<!> {
<? as=outputpair> OutputPairs[i] {
if (!strcmp(outputpair.word, aword)) {
++outputpair.count;
<?readonly> found = 1; /* replace the value of found to 1 */
<checkout replace>;
}
}
} <? ENDED>;
if ( <?> found == 0 ) {
strcpy (outputpair.word, aword);
outputpair.count = 1;
<insert item=outputpair> OutputPairs;
}
}
/* OUTPUT STEP:
** Show all the word counts.
*/
void output_step ()
int numitems = OutputPairs<count>;
/* Use a sequential loop to guarantee that the output are printed in order */
for (int i=0; i<numitems; ++i) {
output_task (<?readonly>OutputPairs[i]);
}
}
void output_task (OUTPUT_PAIR outputpair) {
printf ("%s = %d", outputpair.word, outputpair.count);
}
Previous
Next
Top