Welcome
» NERWous C
» Examples
Current Literature
MapReduce is a programming model that uses parallel algorithms to process big data sets using a large number of distributed computer nodes, The data flows through a MapReduce framework through five processing steps:
NERWous C Sample
The example is taken from the MapReduce Wikipedia page. The goal is to count the appearance of every word in a set of documents.
In the NERWous C code below, mel variables are used to pass data between the steps. Parallelism occurs inside the steps, but from step to step, it is sequential programming.
Previous Next Top
Current Literature
MapReduce is a programming model that uses parallel algorithms to process big data sets using a large number of distributed computer nodes, The data flows through a MapReduce framework through five processing steps:
- Input step: the data is read into the framework as (key 1, value 1) input data pairs.
- Map step:the input data pairs are grouped into (key 2, value 2) domain data pairs.
- Partition step: the domain data pairs are associated with the processing computer node.
- Reduce step: the computer node processes all associated domain data pairs.
- Output step: the results of the reduce step are collected to produce the final outcome.
NERWous C Sample
The example is taken from the MapReduce Wikipedia page. The goal is to count the appearance of every word in a set of documents.
In the NERWous C code below, mel variables are used to pass data between the steps. Parallelism occurs inside the steps, but from step to step, it is sequential programming.
#include "nerw.h"
typedef char word[50] WORD;
/* Input data pair */.
typedef struct { char filename[64]; char line[256]; } INPUT_PAIR;
<mel>INPUT_PAIR InputPairs[];
/* Domain data */
typedef struct { WORD word; int count; } MAP_PAIR;
<mel>MAP_PAIR MapPairs[];
/* Partition data */
typedef struct { WORD word; char celid; } PARTITION_PAIR;
<mel>PARTITION_PAIR PartitionPairs[];
/* Output data pair */
typedef struct { WORD word; int count; } OUTPUT_PAIR;
<mel>OUTPUT_PAIR OutputPairs[];
/* Run the MapReduce steps */
void main (int argc, char **argv) {
/* Read in the directory containing the files to be processed */
char directory[MAX_PATH];
snprintf (directory, MAX_PATH, "%s", argv[1]);
/* Invoke the steps sequentially */
input_step (directory);
map_step ();
partition_step();
reduce_step();
output_step();
}
/* INPUT STEP:
** Read a directory full of text files and return each line as a record.
*/
void input_step (char *directory) {
struct dirent *file;
DIR *dir = opendir (directory);
/* Run the INPUT step.
** The while loop sequentially goes through the directory.
** It skips subfolders, and processes files only.
** Each file is processed separately in its own sub-task.
** The INPUT step waits for all the sub-tasks to end before returning.
*/
<collect> while ( (file = readdir(dir)) ) {
if (file->d_type == DT_DIR) continue;
<!> input_task (file->d_name);
}
} <? ENDED>;
}
void input_task (char *filename) {
FILE *fp = fopen(filename, "r");
if (fp == NULL) return;
char * line = NULL;
size_t len = 0;
ssize_t read;
/* Read a file and collect all the lines in a [INPUT_PAIR] data pairs.
** Run the collection in parallel just to showcase the use of parallelism.
** Sequential while loop may be faster in this case, due to the task setup cost.
**
** Wait for all the collection tasks to finish with the collect-ENDED block.
*/
<collect> while ((read = getline(&line, &len, fp)) != -1) {
<!> {
INPUT_PAIR inputpair;
strcpy (inputpair.filename, filename);
strcpy (inputpair.line, line);
<insert item=inputpair> InputPairs;
}
} <? ENDED>;
}
/* MAP STEP:
** Break the line into separate words.
** Each broken word has its own MAP_PAIR with count of 1.
** Similar words are not aggregated.
*/
void map_step () {
int numitems = InputPairs<count>;
for (int i=0; i<numitems; ++i)
<!> {
map_task (<?readonly>InputPairs[i]);
}
}
void map_task (INPUT_PAIR inputpair) {
WORD *wordarr[] = split(inputpair.line, " ");
int numitems = sizeof(wordarr)/sizeof(WORD);
<collect> for (int i=0; i<numitems; ++i)
<!> {
MAP_PAIR mappair;
strcpy (mappair.word, wordarr[i]);
mappair.count = 1;
<insert item=mappair> MapPairs;
} <? ENDED>;
}
/* PARTITION STEP:
** Allocate each word to a cel node for the Reduce step.
** The method chosen is to allocate based on the first character of the word.
** This method is not very efficient since some nodes will be loaded while others are not.
*/
<cel> nodes[26] = {
{"A"}, {"B"}, {"C"}, {"D"}, {"E"}, {"F"}, {"G"}, {"H"},
{"I"}, {"J"}, {"K"}, {"L"}, {"M"}, {"N"}, {"O"}, {"P"},
{"Q"}, {"R"}, {"S"}, {"T"}, {"U"}, {"V"}, {"W"}, {"X"},
{"Y"}, {"Z"}};
void partition_step () {
MAP_PAIR mappair;
int numitems = MapPairs<count>;
<collect> for (int i=0; i<numitems; ++i)
<!> {
partition_task (<?readonly>MapPairs[i].word);
} <? ENDED>;
}
void partition_task (WORD aword) {
PARTITION_PAIR partipair;
partipair.word = aword;
partipair.celid = uppercase(aword[0]);
<insert item=partipair> PartitionPairs;
}
/* REDUCE STEP
** Count each word using the assigned cel.
*/
void reduce_step ()
PARTITION_PAIR partipair;
/* Run the [reduce_task] using the cel node identified in the Partition Pair.
** Wait for all tasks to finish their run before continuing.
*/
int numitems = PartitionPairs<count>;
<collect> for (int i=0; i<numitems; ++i) {
partipair = <?readonly> ParititionPairs[i];
<! at=partipair.celid> reduce_task (partipair.word);
} <? ENDED>;
/* Get exclusive access to the mel [OutputPairs], sort in place, and unlock it. */
<?> OutputPairs {
qsort(OutputPairs, OutputPairs<count>, sizeof(OUTPUT_PAIR), cmpfunc);
}
}
int cmpfunc (OUTPUT_PAIR *a, OUTPUT_PAIR *b) {
return a-<word[0] - b-<word[0];
}
void reduce_task (WORD aword) {
OUTPUT_PAIR outputpair;
/* For a given word, see if an Output Pair already exists or not.
** If it is, increment the count. If it is not, create a new Output Pair.
** The [found] mel variable is set if an existing Output Pair is found.
** If [found] still remains 0 after checking exiting Output Pairs, create a new one.
*/
<mel> int found = 0;
int numitems = OutputPairs<count>;
<collect> for (int i=0; i<numitems; ++i)
<!> {
<? as=outputpair> OutputPairs[i] {
if (!strcmp(outputpair.word, aword)) {
++outputpair.count;
<?readonly> found = 1; /* replace the value of found to 1 */
<checkout replace>;
}
}
} <? ENDED>;
if ( <?> found == 0 ) {
strcpy (outputpair.word, aword);
outputpair.count = 1;
<insert item=outputpair> OutputPairs;
}
}
/* OUTPUT STEP:
** Show all the word counts.
*/
void output_step ()
int numitems = OutputPairs<count>;
/* Use a sequential loop to guarantee that the output are printed in order */
for (int i=0; i<numitems; ++i) {
output_task (<?readonly>OutputPairs[i]);
}
}
void output_task (OUTPUT_PAIR outputpair) {
printf ("%s = %d", outputpair.word, outputpair.count);
}
Previous Next Top
No comments:
Post a Comment