KMR
KMR

Copyright (C) 2012-2016 RIKEN AICS

(KMR Version: v1.8.1(20160425))

KMR comes with ABSOLUTELY NO WARRANTY.

Contents (in this page):

Overview

KMR (/ke-moo-r/) is a set of high-performance map-reduce operations in the MPI (Message Passing Interface) environment. Its targets include large-scale supercomputers with thousands nodes, especially ones such as the K computer and Fujitsu FX10. But, KMR works on ordinary clusters as well.

KMR is designed to work on memory exploiting the large amount of memory available on supercomputers, whereas most other map-reduce implementations are designed to work with external disk-based operations. So, data exchanges in KMR occur as message passing instead of remote file operations. In this respect, KMR provides higher-level wrappers to the message passing routines. Applications using KMR work in bulk-synchronous and single-threaded, but its implementation inside of mappers and reducers is multi-threaded for performance.

Target Applications

KMR's primary goal is to help computational scientists write programs for post-processing of simulation data. Post-processing typically involves large amount of data and needs non-trivial effort of parallel programming. It is tedious to write a program of large-scale data processing with the raw message passing form, but the map-reduce model will make it less tedious, which is being accepted widely for programming data-intensive applications during the past decade.

KMR makes communications implicit, which frees programmers from the details of MPI ranks, tags, message lengths, race conditions, etc. KMR also eases programming with platform specifics, which sometimes make simple tasks more complicated. For example, the access practice on the file-system on K has affinity on the z-axis of the Tofu network, intended to lessen the disturbance to the communication of the other users. With this practice, a segment of a file were thought to exist in some virtual partition, and it is necessary keep an association from a segment of a file to a specific rank/node. KMR includes routines for this association.

Information and Bug Reporting, etc.

See "http://mt.aics.riken.jp/kmr" for up-to-date information and issue tracking.

Installation

Do configure, make, and make install. Make install copies commands/scripts, "kmr.h", "libkmr.a", and document files in "html" to the destination (specified by --prefix). Destinations are "bin", "include", "lib", "etc", "man", and "doc/html" in the "prefix" directory. Only a few configure options affect KMR behavior (the first line shows the default):

./configure --prefix=/usr/local --disable-debug --enable-assert --enable-openmp --enable-fortran
    [--enable-debug/--disable-debug]
    [--enable-assert/--disable-assert]
    [--enable-openmp/--disable-openmp]
    [--enable-fortran/--disable-fortran]

Normally, the following suffices:

./configure --prefix=$HOME/lib/kmr

See "config.make" for the configuring result.

KMR also works on ordinary cluster computers. To configure properly, the MPI (Message Passing Interface) is needed. Prepare the MPI library first. The configurer uses the modified versions of "ax_mpi.m4" and "ax_openmp.m4", to find the compiler driver scripts for MPI and the compiler flags for OpenMP. They are modified to be able to find compiler flags for Fujitsu compliers on the K computer (which are with unusual names).

Application Compilation

Applications can be compiled as usual. Compiler/linker options necessary (or recommended) to Fujitsu compilers on the K computer are:

mpifccpx -g -Kfast -Kopenmp

Compiled applications can be run as usual, too, via "mpirun" or "mpiexec".

Setting an environment variable "XOS_MMM_L_ARENA_FREE=2" at run time may help improve performance on the K computer, because KMR frequently calls malloc/free. It prohibits returning memory pages to the kernel pool at free (the effect was not evaluated).

Simplest Usage

Specific features of the KMR API are, first of all, that it has types in keys and values; i.e., integer, floating-point, and byte data (opaque). Second, KMR has a separate call to shuffling, and shuffling must be explicitly called in advance to reduction. Since a reducer itself reduces data local to rank/node, a local "combiner" can be implemented by running a reducer without shuffling. Third, each steps of mapping, shuffling, and reducing are separate calls with specific API, and run in step-by-step "bulk-synchronously". It is arguable that synchronous behavior looks not the best for the performance, but it allows to utilize efficient collective communication of MPI, which become more important on very large-scale computers. Fourth, sorting after reduction is also optional, and KMR provides a separate call to sorting.

The following is the simplest usage. It is "wordcount.c" in the example directory. See the section Example Code.

View the source code listing: wordcount.c.

/* Word Count */

/* It ranks the words by their occurrence count in the "LICENSE" file.
   Copy the file in the current directory and run it. */

#include <mpi.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include "kmr.h"

#define ISALPHA(X) (('a' <= X && X <= 'z') || ('A' <= X && X <= 'Z'))

static int
read_words_from_a_file(const struct kmr_kv_box kv0,
     const KMR_KVS *kvi, KMR_KVS *kvo, void *p, const long i)
{
    char b[25];
    assert(kvi == 0 && kv0.klen == 0 && kv0.vlen == 0 && kvo != 0);
    FILE *f = fopen("LICENSE", "r");
    if (f == 0) {
	perror("Cannot open a file \"LICENSE\"; fopen(LICENSE)");
	MPI_Abort(MPI_COMM_WORLD, 1);
	return 0;
    }
    int j = 0;
    for (;;) {
	assert((size_t)j <= (sizeof(b) - 1));
	int cc = getc(f);
	if ((cc == EOF || !ISALPHA(cc) || (j == (sizeof(b) - 1))) && j != 0) {
	    b[j] = 0;
	    struct kmr_kv_box kv = {
		.klen = (j + 1), .k.p = b,
		.vlen = sizeof(long), .v.i = 1};
	    kmr_add_kv(kvo, kv);
	    j = 0;
	}
	if (cc == EOF) {
	    break;
	}
	if (ISALPHA(cc)) {
	    b[j] = cc;
	    j++;
	}
    }
    fclose(f);
    return MPI_SUCCESS;
}

static int
print_top_five(const struct kmr_kv_box kv0,
      const KMR_KVS *kvi, KMR_KVS *kvo, void *p, const long i)
{
    int rank = kvi->c.mr->rank;
    if (rank == 0 && i < 5) {
	printf("#%s=%d\n", kv0.v.p, (int)(0 - kv0.k.i));
	fflush(0);
    }
    return MPI_SUCCESS;
}

static int
sum_counts_for_a_word(const struct kmr_kv_box kv[], const long n,
    const KMR_KVS *kvs, KMR_KVS *kvo, void *p)
{
    long c = 0;
    for (long i = 0; i < n; i++) {
	c -= kv[i].v.i;
    }
    struct kmr_kv_box nkv = {
	.klen = kv[0].klen,
	.k.p = kv[0].k.p,
	.vlen = sizeof(long),
	.v.i = c};
    kmr_add_kv(kvo, nkv);
    return MPI_SUCCESS;
}

int
main(int argc, char **argv)
{
    int nprocs, rank, thlv;
    MPI_Init_thread(&argc, &argv, MPI_THREAD_SERIALIZED, &thlv);
    MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    kmr_init();
    KMR *mr = kmr_create_context(MPI_COMM_WORLD, MPI_INFO_NULL, 0);

    MPI_Barrier(MPI_COMM_WORLD);
    if (rank == 0) {printf("Ranking words...\n");}

    /* Insert words with count 1 to a KVS: */
    KMR_KVS *kvs0 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_INTEGER);
    kmr_map_once(kvs0, 0, kmr_noopt, 0, read_words_from_a_file);

    /* Gather the same words for summing their counts: */
    KMR_KVS *kvs1 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_INTEGER);
    kmr_shuffle(kvs0, kvs1, kmr_noopt);

    /* Sum counts for words: */
    KMR_KVS *kvs2 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_INTEGER);
    kmr_reduce(kvs1, kvs2, 0, kmr_noopt, sum_counts_for_a_word);

    /* Swap words and counts, and make counts as keys: */
    KMR_KVS *kvs3 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
    kmr_reverse(kvs2, kvs3, kmr_noopt);

    /* Rank by count: */
    KMR_KVS *kvs4 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
    kmr_sort(kvs3, kvs4, kmr_noopt);

    /* Print the first few entries: */
    kmr_map(kvs4, 0, 0, kmr_noopt, print_top_five);

    kmr_free_context(mr);
    kmr_fin();
    MPI_Finalize();
    return 0;
}

The following list shows often used functions with short descriptions.

MPI_Init_thread(&argc, &argv, MPI_THREAD_SERIALIZED, &thlv)
initializes MPI. MPI must be initialized with threads enabled, because KMR uses threads.
kmr_init()
initializes the KMR environment.
kmr_create_context(MPI_COMM_WORLD, MPI_INFO_NULL, 0)
initializes a KMR context with a given communicator. The context records common information needed by all key-values.
kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE)
makes a new key-value stream used mappers and reducers.
kmr_map(kvs0, kvs1, 0, kmr_noopt, mapfn)
maps data by calling "mapfn" on "kvs0" as an input and "kvs1" as an output.
int mapfn(const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *p, const long index)
is a map-function.
kmr_map_on_rank_zero(kvs0, 0, kmr_noopt, mapfn)
is a variant of mappers, which runs a map-function on rank zero only.
kmr_map_once(kvs0, 0, kmr_noopt, 0, mapfn)
is a variant of mappers, which runs a map-function once with a dummy input.
kmr_shuffle(kvs1, kvs2, kmr_noopt)
shuffles data from kvs1 to kvs2.
kmr_reduce(kvs2, kvs3, 0, kmr_noopt, reducefn)
reduces data by calling "reducefn" on "kvs2" as an input and "kvs3" as an output.
int reducefn(const struct kmr_kv_box kv[], const long n, const KMR_KVS *kvi, KMR_KVS *kvo, void *p)
is a reduce-function.
kmr_free_kvs(kvs3)
frees a key-value stream. Normally, key-value streams are consumed by mappers and reducers and explicit calls of it are not necessary.
kmr_sort(kvs3, kvs4, kmr_noopt)
sorts key-values.
kmr_reverse(kvs2, kvs3, kmr_noopt)
swaps the key part and the value part in each pair.
kmr_dump_kvs(kvs1, 0)
dumps key-value pairs in "kvs1".
kmr_free_context(mr)
releases the common context.
kmr_fin()
terminates the use of KMR.
MPI_Finalize()
is needed as usual.

See descriptions in kmr.h.

For the next step, browse documents starting with simple0 in test0.c.

Python API

KMR also provides API for Python as 'kmr4py' Python module. It is installed along with KMR C library. The supported version of Python is bigger than 2.6.x. Currently it does not support Python3. A Python MPI implementation is required to use kmr4py module. Any Python MPI implementations may work with kmr4py, only mpi4py is tested.

To run a program that uses kmr4py, the enviroment variables, LD_LIBRARY_PATH and PYTHONPATH, must be set so that kmr shared library (libkmr.so) and python module (kmr4py.py) can be found from Python interpreter. By default, these two files are located under $KMRHOME/lib directory where KMR is isntalled.

export LD_LIBRARY_PATH=$KMRHOME/lib:$LD_LIBRARY_PATH
export PYTHONPATH=$KMRHOME/lib:PYTHONPATH
mpiexec -n 4 python YOUR_PROGRAM.py

There are some example applications included in KMR package under 'ex/' directory.

Also see the Python API document.

Sorting Key-Values

KMR does not sort key-value pairs after reduction. There is a separate call for sorting, kmr_sort().

The sorter sorts the keys by the content types, that is, signed integers, floating-point numbers, or byte arrays. Byte arrays are sorted by the lexicographical ordering on bytes. Floating-point numbers are sorted as integers after some conversions to handle negative values.

Rank-Aware Communication

It is normally not necessary to use ranks explicitly in the model of map-reduce. However, the rank information is sometimes unavoidable, especially in initializations and operations related to file I/O. KMR supports communications with explicit rank information by regarding the keys as ranks.

Specifying the option ".key_as_rank" to kmr_shuffle makes the key field work as a rank, which allows to mimic rank-based communications of MPI.

/* Rank-based Communications */
KMR_KVS *kvs0 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
KMR_KVS *kvs1 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
struct kmr_option opt = kmr_noopt;
opt.key_as_rank = 1;
kmr_shuffle(kvs0, kvs1, opt);

See descriptions in kmr.h.

Another communication pattern of MPI "allgather" is performed with kmr_replicate. It is also often used to "bcast" data from the node rank=0 to all ranks. Specifying the option ".rank_zero" to kmr_replicate makes data "gather" to the node rank=0.

/* Allgather/Bcast */
KMR_KVS *kvs0 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
KMR_KVS *kvs1 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
kmr_replicate(kvs0, kvs1, kmr_noopt);

/* Gather */
KMR_KVS *kvs0 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
KMR_KVS *kvs1 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
struct kmr_option opt = kmr_noopt;
opt.rank_zero = 1;
kmr_replicate(kvs0, kvs1, opt);

See descriptions in kmr.h.

There are a few mapping calls to handle key-value pairs. For using keys as ranks, it is sometimes wanted to save key information in the value field. kmr_pairing() makes each value field be replaced with key-value pair, keeping the same key field. kmr_unpairing() recovers key-value pairs, discarding old key field. kmr_reverse() swaps the key and the value fields.

Master-Slave Mapper

KMR supports a mapper with master-slave job scheduling. The special mapper kmr_map_ms distributes key-value pairs on the master (rank0) to slaves (ranks other than rank0), and collects the resulting key-value pairs again on the master. Key-value data is only stored on the master in this case.

KMR_KVS *tasks = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
/* Put tasks as key-values here... */
KMR_KVS *results = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
do {
    struct kmr_option opt = kmr_noopt;
    opt.nothreading = 1;
    cc = kmr_map_ms(tasks, results, m, 0, opt);
} while (cc == MPI_ERR_ROOT);

See descriptions in kmr.h.

There is another master-slave job scheduling, which spawns and runs new MPI processes.

KMR_KVS *tasks = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
/* Put tasks as key-values here... */
KMR_KVS *results = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
struct kmr_option opt = kmr_noopt;
kmr_map_via_spawn(tasks, resuts, 0, info, opt, 0);

See descriptions in kmr.h.

File Access Support

KMR is intended to ease programming with platform specifics. As decribed in the overview section, the access practice on the file-system on K has affinity on the z-axis of the Tofu network. It is actually not for pure performance, but it is the demands by the administration to lessen the disturbance to the other users. Besides affinity, the job-scheuduler uses file-staging which copies data files and program files to the storage of working area. File-staging is designed to respect the affinity on the z-axis. Users are often confronted by a problem to read a large data file onto many ranks, because it is simply a bottleneck.

One way to read large data is that the user first splits the large data file into segments, does stage-in, then reassembles them to the original. It is easy to distribute the load of file accessing to multiple z-axes by the description of file-staging. KMR provides an routine to support reassembling of the segmented files:

KMR *mr = ...;
int color = 0;
void *buffer;
off_t size;
kmr_read_files_reassemble(mr, "filepart", color, 0, -1, &buffer, &size);

See descriptions in kmr.h.

Another way to read large data is that the user does stage-in the large data file as a single file as it is, then reads the segments of the single file from multiple ranks. This makes a large file on the file system whose affinity spans to multiple z-axes. So, it is necessary to take into account the striping information of the Lustre file-system to keep file accesses confined to one z-axis. Each rank only accesses the parts of a stripe, which are known to be on the same z-axis. KMR provides an routine to support segmented access to the large files:

KMR *mr = ...;
int color = 0;
void *buffer;
off_t size;
kmr_read_file_by_segments(mr, "largefile", color, &buffer, &size);

See descriptions in kmr.h.

There is some utility mappers to ease to enumerate files and directories. One such mapper maps on file names, which are specified or exists in the specified directories.

KMR *mr = ...;
KMR_KVS *results = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
char *names[] = {"f0", "f1", "f2"};
int n = 3;
kmr_map_file_names(mr, names, n, kmr_fnoopt, results, 0, kmr_noopt, m);

See descriptions in kmr.h.

Checkpoint/Restart

KMR provides a simple checkpoint/restart mechanism to achieve highly reliable MapReduce execution. As the checkpoint/restart is implemented in KMR library, any programs that use kmr functions and MapReduce programs invoked by kmrrun can use the feature.

Using Checkpoint/Restart in kmrrun

To enable checkpoint/restart in kmrrun, users only have to pass "--ckpt" option to the command. When "--ckpt" option is given, state of MapReduce progress is saved as checkpoint files at every MapReduce operation, such as map, shuffle and reduce. When "--ckpt" option is given and if previous checkpoint files exist, the MapReduce execution is restarted from the saved state.

Using Checkpoint/Restart in a KMR program

To enable checkpoint/restart in a user program that use KMR, users need to specify an environment variable named KMROPTION. KMROPTION points to a kmr resource file that defines behavior of kmr. Specifying "ckpt_enable=1" in that file enables checkpoint/restart.

echo "ckpt_enable=1" > kmrrc
KMROPTION=kmrrc mpiexec -np 12 ./a.out

Checkpointing without fsync

If users can assume that the MTBF of the system is much larger than the expected program execution time, they can disable 'fsync' on writing checkpoint files by specifying 'ckpt_no_fsync=1' option as follows. It surely reduces the reliability but also reduces IO overhead, and as a result, increases performance.

echo "ckpt_enable=1" > kmrrc
echo "ckpt_no_fsync=1" >> kmrrc
KMROPTION=kmrrc mpiexec -np 12 ./a.out

It may be helpful when users run a job in a short maximum elapsed time queue, such as 'micro' queue in the K computer. Though the job would be killed by the job scheduler when the elapsed time is exceeded, checkpoint files are guaranteed to be saved and they can restart the same program as a new job from the saved states in the checkpoint files.

Selective Checkpointing

From version 1.6, KMR experimentally supports 'selective mode' in saving checkpoint files to further reduce IO overhead. By enabling selective mode, KMR saves checkpoint files of only user-specified key-value streams.

To enable selective mode, specify the following environment variables.

echo "ckpt_enable=1" > kmrrc
echo "ckpt_selective=1" >> kmrrc
KMROPTION=kmrrc mpiexec -np 12 ./a.out

Just enabling this option do not take checkpoints. To save a checkpoint file of an output key-value stream of a kmr function, users need to pass 'take_ckpt' option to the function. In the following example, a checkpoint file of key-value stream named 'kvs1' is saved.

struct kmr_option kmr_ckpt = { .take_ckpt = 1 };
kmr_map(kvs0, kvs1, 0, kmr_ckpt, mapfn);
This feature is currently highly experimental, be careful to use.

Supported functions

As checkpoint/restart is only supported in the following kmr functions, programs that use other kmr functions and that stores data outside of KMR context (key-value stream) can not benefit from the feature.

Map functions
kmr_map, kmr_map_once, kmr_map_on_rank_zero, kmr_map_rank_by_rank, kmr_map_via_spawn, kmr_map_processes, kmr_map_parallel_processes, kmr_map_serial_processes, kmr_map_getline, kmr_map_getline_in_memory_, kmr_map_for_some
Reduce functions
kmr_reduce, kmr_reduce_as_one, kmr_reduce_for_some
Communication functions
kmr_shuffle, kmr_replicate
Other functions
kmr_move_kvs, kmr_reverse, kmr_pairing, kmr_unpairing, kmr_assign_file

We plan to support checkpoint/restart in other kmr functions in the near future releases.

Command Line Utility (KMRRUN)

KMR provides a simple command line tool, named kmrrun, that can run arbitrary programs as mapper and reducer. By using kmrrun, users can easily execute a MapReduce task and also embarrassingly parallel tasks. kmrrun requires OpenMPI or Fujitsu MPI (for K and FX10). It does not work with MPICH2 or Intel MPI.

To run a MapReduce task, users have to prepare the following three programs.

Mapper
is a Mapper program. It should accept a file name as the last command line parameter. It can be a serial, OpenMP or MPI program.
Key-Value pair generator
is a program that generates key-value pairs by reading output files of Mapper. It should accept a file name whose name is the input file to the Mapper as the last command line parameter. The generated key-value pairs should be written to the standard output and key and value should be separated by a space. It should be a serial program.
Reducer
is a Reducer program. It should accept a file name as the last command line parameter. The name of the file is key of a key-value pair and its contents are text where each line is a key-value pair saparated by a space. It can be a serial, OpenMP or MPI program.

The command line looks like the following.

mpiexec -np 12 kmrrun -n 8 -m "mapper arg0 arg1" -k "kvgen.sh" -r "reducer arg0 arg1" file

For command line arguments, see the man-page of kmrrun.1. Also see descriptions of kmrrun.

If the file argument is a regular file, it processes it. Or, if the argument is a directory, it enumerates regular files under the directory and processes all the contents of the files. When a mapper or a reducer has arguments, they are passed as a quoted string separated by a whitespace.

There are some differences between kmrshell and kmrrun.

KMR provides a tool to use kmrrun on the job-schduler of K: "kmrrungenscript". "kmrgenscript" generates a job-script for the job-scheduler to use kmrrun. See the related man-pages.

There are a frew examples (pi calculation) for kmrrun that are implemented in serial C program and MPI/C program.

Shell Command Pipeline (Streaming)

It is often convenient to run shell commands as mappers and reducers. The function is called "streaming" in Hadoop. KMR includes a tool called "kmrshell", which invokes processes of a mapper, a shuffler, and a reducer, and connects them via Unix pipes. After invoking processes, "kmrshell" becomes a file-reader, which concatenates contents of the input files (if a directory is specified as an input) and passes the contents to a mapper.

                     pipe        pipe               pipe
input ==> "kmrshell" ---> mapper ---> "kmrshuffler" ---> reducer ==> results
files                 (shell command)                (shell command)

In a shell command pipeline, key-value data is a line of string "key value\n", where the fields are separated by a whitespace. A mapper should output lines of key-value pair strings, and a reducer should accept input lines of key-value pair strings. A reducer will see consecutive lines having the same key, which are to be reduced. A shuffler command is named "kmrshuffler", which is installed in the "lib" directory.

The command line looks like the following.

kmrshell -m "mapper arg0 arg1" -r "reducer arg0 arg1" file

For command line arguments, see the man-page of kmrshell.1. Also see descriptions of kmrshell.

If the file argument is a regular file, it processes it. Or, if the argument is a directory, it enumerates regular files under the directory and processes all the contents of the files. When a mapper or a reducer has arguments, they are passed as a quoted string separated by a whitespace.

To ease further for using the job-scheduler of K, KMR includes a few more tools: "kmrfsplit.1", "kmrgenscript.1", and "kmrwrapper.1". "kmrfsplit" splits a file into almost equally-sized parts, by looking for a record separator. "kmrgenscript" generates a job-script for the job-scheduler to use a shell command pipeline. "kmrwrapper" combines above two, to generate a job-script which will handle large files. "kmrshuffler" is a shuffler program of key-values for "kmrshell". See the man-pages of them:

There are a few examples (word count) of a mapper and a reducer written in C and Python in the "shell" directory.

Auxiliary Functions

KMR provides some variants of mappers and several small functions for handling KVS.

kmr_map_on_rank_zero(kvo, arg, opt, fn)
is a variant of mappers, which runs a map-function on rank zero only.
kmr_map_once(kvo, arg, opt, rank_zero_only, fn)
is a variant of mappers, which runs a map-function once with a dummy input.
kmr_move_kvs(kvi, kvo, opt)
moves the contents of the input to the output.
kmr_concatenate_kvs(kvs[] nkvs, kvo, opt)
concatenates the contents of the given KVSes to the output.
kmr_distribute(kvi, kvo, cyclic, opt)
redistributes key-value pairs approximately evenly among ranks for leveling the load of map processing.
kmr_ranking(kvi, kvo, count, opt)
assigns a ranking to key-value pairs. The key part is the ranks. The value part is an old key-value pair, which can be unpaired by kmr_unpairing().
kmr_choose_first_part(kvi, kvo, n, opt)
chooses the first N entries using kmr_ranking().
kmr_reverse(kvi, kvo, opt)
swaps the key part and the value part in each pair.
kmr_pairing(kvs, kvo, opt)
makes a new pair with a key-value pairing in the value part.
kmr_unpairing(kvs, kvo, opt)
performs an inverse of kmr_pairing(), discarding the old key part.
kmr_match(kvi0, kvi1, kvo, opt)
produces product pairs from the two inputs. The key-value pairs are matched by the same key, and pairs are produced by taking the key from the first KVS and the value from the second KVS.
kmr_histogram_count_by_ranks(kvs, frq, var, rankzeroonly)
Fills the array with the count of the elements of each rank.

Spawning Mappers

KMR provides mappers which will start MPI processes via MPI_Comm_spawn() for map processing, because, it is sometimes necessary to run application programs to process data. There are two map functions. See kmr_map_via_spawn() and kmr_map_parallel_processes

DO NOT USE THEM; Consider using the command kmrrun. The use of spawning mappers is fairly complicated, because they start separate MPI applications. It does not make sense to call map functions as usual, because what is invoked is a process but not a function, and a key-value need to be passed as an argument in the command list. Spawning mappers are intended to ease the use of MPI_Comm_Spawn(), but it is still complicated.

They will be used when you want to run MPI processes as mappers. However, it is normally difficult, because the job schedulers used in computer centers prohibit to start MPI processes from MPI processes. Sometimes, the use of MPI_Comm_spawn() is the only way, and kmr_map_via_spawn() and kmr_map_parallel_processes() give an easier way to use MPI_Comm_spawn() from KMR applications.

The value part of key-values are used as a command line of spawned processes. The value part should be a list of null-terminated strings, which are used as "argv" command strings. The key part is ignored. The number of processes to spawned is specified by the command line prefix which is the form of string "maxprocs=n". Or, it is specified by the MPI_Info entry "maxprocs", when the number of processes is the same for all spawn invocations. It invokes MPI_Comm_spawn() for each key-value. The value part should look like:

maxprocs=4\0./a.out\0arg1\0arg2

The difference between kmr_map_via_spawn() and kmr_map_parallel_processes() is how they wait. kmr_map_via_spawn() waits for the spawned processes to return a reply. It is used when the MPI application can be modified. The reply is sent by kmr_reply_to_spawner(). The lack of its call fails to finish the call. On the other hand, kmr_map_parallel_processes() waits for the exit of the spawned processes. It is used when the MPI application cannot be modified.

Both kmr_map_via_spawn() and kmr_map_parallel_processes() accept a map-function. It is called after the spawned processes are finished.

KMR provides the pair of functions kmr_receive_kvs_from_spawned_fn() and kmr_send_kvs_to_spawner() to gather KVS generated by spawned processes. kmr_receive_kvs_from_spawned_fn() is a map-function and it can be passed to kmr_map_via_spawn() as an argument. kmr_send_kvs_to_spawner() is a utility function which can be called in the spawned processes. It sends KVS from spawned processes to the spawner.

Example Code

There are a few examples in the "ex" directory in addition to the simplest tests in the "src" directory. Some are rewrites of test/example codes of "Phoenix-MapReduce" and "MapReduce-MPI".

wordcount.c:
is a word count.
test0.c:
is a simplest test code. It is not an example, but a test.
kmeans-kmr.c:
is a calculation of K-Means.
kmeans-mrmpi.cpp (for reference):
is a calculation of K-Means in MapReduce-MPI (not in KMR).
wordcountf.f90:
is a word count in Fortran (F2003).
testf.f90:
is a simplest test code in Fortran (F2003).
wordcountpy.py:
is a word count in Python.
kmeanspy.py:
is a calculation of K-Means in Python.
testcxx.cpp:
is a code in C++11 with a closure.
graysort.c:
is a GraySort. See "http://sortbenchmark.org" for TeraSort benchmarks.
tpch.c:
is TPC-H benchmarks. It runs SQL queries by map-reduce. See "http://www.tpc.org/tpch/" for TPC-H benchmarks.
mrmpi-wordfreq.c:
is a word-count. It is taken from MapReduce-MPI.
phoenix-kmeans.c:
is a calculation of K-Means. It works by replicating problem points data and intermediate k-means results. It is taken from Phoenix-MapReduce, but rewritten much.
phoenix-matrix-multiply.c:
is a simple matrix-multiply. It works simply by distributing rows. It is of course slow sequentially, too. It is taken from Phoenix-MapReduce, but rewritten much.

Appendix: KMR Basics

A key-value stream (KVS) in KMR is a vector of pairs of a key and a value, where keys and values are byte arrays with their lengths. A key-value stream is actually ordered, especially so after sorting, but normally the ordering is irrelevant.

Appendix: KMRViz Visualisation Tool

KMRViz is a visualisation tool for KMR. KMRViz shows timestamps, number of key-values in KVS, and etc. of major KMR functions graphically. KMRViz can be downloaded from KMRViz github page.

To use KMRViz, you need to run your KMR program with tracing option ON to generate trace files for KMRViz. The name of the option is "kmrviz_trace". By default, the option is set OFF. To enable the option through an environment variable without code modification, add "kmrviz_trace=1" line to your configuration file.

echo "kmrviz_trace=1" >> kmrrc
KMROPTION=kmrrc mpiexec -np 12 ./a.out
To enable by modifying your code, just adding "mr->kmrviz_trace = 1;" line to your main function is enough.
KMR *mr = kmr_create_context(MPI_COMM_WORLD, MPI_INFO_NULL, 0);
mr->kmrviz_trace = 1;

With this option ON, two types of traces, text and binary, will be generated for each process after running a KMR program. The binary files are used for viewing in KMRViz.

Appendix: Configure Changes

--- ax_mpi.m4.org	Tue Oct  2 15:42:40 2012
+++ ax_mpi.m4	Tue Oct  2 15:44:27 2012
@@ -72,7 +72,7 @@
 AC_LANG_CASE([C], [
 	AC_REQUIRE([AC_PROG_CC])
 	AC_ARG_VAR(MPICC,[MPI C compiler command])
-	AC_CHECK_PROGS(MPICC, mpicc hcc mpxlc_r mpxlc mpcc cmpicc, $CC)
+	AC_CHECK_PROGS(MPICC, mpifccpx mpicc hcc mpxlc_r mpxlc mpcc cmpicc, $CC)
 	ax_mpi_save_CC="$CC"
 	CC="$MPICC"
 	AC_SUBST(MPICC)
@@ -80,7 +80,7 @@
 [C++], [
 	AC_REQUIRE([AC_PROG_CXX])
 	AC_ARG_VAR(MPICXX,[MPI C++ compiler command])
-	AC_CHECK_PROGS(MPICXX, mpic++ mpicxx mpiCC hcp mpxlC_r mpxlC mpCC cmpic++, $CXX)
+	AC_CHECK_PROGS(MPICXX, mpiFCCpx mpic++ mpicxx mpiCC hcp mpxlC_r mpxlC mpCC cmpic++, $CXX)
 	ax_mpi_save_CXX="$CXX"
 	CXX="$MPICXX"
 	AC_SUBST(MPICXX)
@@ -88,7 +88,7 @@
 [Fortran 77], [
 	AC_REQUIRE([AC_PROG_F77])
 	AC_ARG_VAR(MPIF77,[MPI Fortran 77 compiler command])
-	AC_CHECK_PROGS(MPIF77, mpif77 hf77 mpxlf_r mpxlf mpf77 cmpifc, $F77)
+	AC_CHECK_PROGS(MPIF77, mpifrtpx mpif77 hf77 mpxlf_r mpxlf mpf77 cmpifc, $F77)
 	ax_mpi_save_F77="$F77"
 	F77="$MPIF77"
 	AC_SUBST(MPIF77)
@@ -96,7 +96,7 @@
 [Fortran], [
 	AC_REQUIRE([AC_PROG_FC])
 	AC_ARG_VAR(MPIFC,[MPI Fortran compiler command])
-	AC_CHECK_PROGS(MPIFC, mpif90 mpxlf95_r mpxlf90_r mpxlf95 mpxlf90 mpf90 cmpif90c, $FC)
+	AC_CHECK_PROGS(MPIFC, mpifrtpx mpif90 mpxlf95_r mpxlf90_r mpxlf95 mpxlf90 mpf90 cmpif90c, $FC)
 	ax_mpi_save_FC="$FC"
 	FC="$MPIFC"
 	AC_SUBST(MPIFC)
--- ax_openmp.m4.org	Tue Oct  2 15:55:46 2012
+++ ax_openmp.m4	Tue Oct  2 16:05:27 2012
@@ -73,8 +73,9 @@
 AC_CACHE_CHECK([for OpenMP flag of _AC_LANG compiler], ax_cv_[]_AC_LANG_ABBREV[]_openmp, [save[]_AC_LANG_PREFIX[]FLAGS=$[]_AC_LANG_PREFIX[]FLAGS
 ax_cv_[]_AC_LANG_ABBREV[]_openmp=unknown
 # Flags to try:  -fopenmp (gcc), -openmp (icc), -mp (SGI & PGI),
-#                -xopenmp (Sun), -omp (Tru64), -qsmp=omp (AIX), none
-ax_openmp_flags="-fopenmp -openmp -mp -xopenmp -omp -qsmp=omp none"
+#                -xopenmp (Sun), -omp (Tru64), -qsmp=omp (AIX),
+#                -Kopenmp (Fujitsu), none
+ax_openmp_flags="-Kopenmp -fopenmp -openmp -mp -xopenmp -omp -qsmp=omp none"
 if test "x$OPENMP_[]_AC_LANG_PREFIX[]FLAGS" != x; then
   ax_openmp_flags="$OPENMP_[]_AC_LANG_PREFIX[]FLAGS $ax_openmp_flags"
 fi