KMR
kmrshell_mpi.c
Go to the documentation of this file.
1 /* kmrshell_mpi.c (2013-12-22) */
2 /* Copyright (C) 2012-2016 RIKEN AICS */
3 
4 /** \file kmrshell_mpi.c
5  kmrshell_mpi is command line version of KMR and it runs a MapReduce
6  program whose mapper and reducers are user specified MPI programs.
7 
8  When kmrshell_mpi is used to run a MapReduce program, user should specify
9  a simple program that generates key-value pairs from the output of mapper.
10  The key-value generator program can be specified by '-k' option and
11  can be implemented by reading outputs of mapper and then writing
12  key-value pairs to the standard output.
13  After shuffling the key-value paris, key-value pairs are written to files
14  on each rank with 'key'-named text files whose line represents a key-value
15  separated by a space.
16  The file is passed to the reducer as the last parameter.
17 
18  kmrshell_mpi can run Map-only MapReduce where no reducer is run.
19  This is very useful if you want to run multiple tasks as a single job.
20 
21  Options
22  - \c -m mapper [Mandatory]\c
23  - Specify a mapper program (MPI)
24  - \c -k key_value_generator [Optional] \c
25  - Specify a key-value pair generator program (serial)
26  - \c -r reducer [Optional]\c
27  - Specify a reducer program (MPI)
28  - \c -n m_num[:r_num] [Optional] \c
29  - Specify number of MPI processes to run mapper and reducers.
30  When \c r_num \c is specified, each mapper runs with \c m_num \c
31  processes and each reducer runs with \c r_num \c processes.
32  When \c r_num \c is not specified each mapper and reducer runs
33  with \c m_num \c processes.
34  The default is 2.
35 
36  Usage
37  \code
38  $ mpiexec -machinefile machines -n 4 \
39  ./kmrshell_mpi -n m_num[:r_num] -m mapper [-k kvgenerator] [-r reducer] \
40  inputfile
41  \endcode
42 
43  Examples
44  \code
45  e.g.1) Run MPI mapper and MPI reducer with 2 MPI processes each.
46  $ mpiexec -np 2 ./kmrshell_mpi -n 2 -m "./mpi_pi.mapper" -k "./mpi_pi.kvgen.sh" -r "./mpi_pi.reducer" ./work
47 
48  e.g.2) Run MPI mapper with 2 MPI processes and MPI reducer with 4 processes.
49  $ mpiexec -np 2 ./kmrshell_mpi -n 2:4 -m "./mpi_pi.mapper" -k "./mpi_pi.kvgen.sh" -r "./mpi_pi.reducer" ./work
50 
51  e.g.3) Only run MPI mapper with 2 MPI processes
52  $ mpiexec -np 2 ./kmrshell2 -n 2 -m "./mpi_pi.mapper" ./work
53  \endcode
54 */
55 
56 #include <stdio.h>
57 #include <stdlib.h>
58 #include <string.h>
59 #include <stdarg.h>
60 #include <sys/types.h>
61 #include <sys/stat.h>
62 #include <sys/param.h>
63 #include <dirent.h>
64 #include <unistd.h>
65 #include <mpi.h>
66 #include "kmr.h"
67 
68 /* Maximum number of arguments to mapper and reducer programs. */
69 #define ARGSIZ 8
70 
71 /* Buffer string size of arguments to mapper and reducer programs. */
72 #define ARGSTRLEN (8 * 1024)
73 
74 /* Buffer string size of a command name */
75 #define COMMANDLEN 1024
76 
77 /* Default number of MPI processes used in spawned program */
78 #define DEFAULT_MPI 2
79 
80 /* Maximum length of line that represents a key-value */
81 #define LINELEN 32767
82 
83 static void kmrsh_abort(int, const char *, ...);
84 static int add_command_kv(KMR_KVS *, int, char **, char *, int);
85 static int generate_mapcmd_kvs(const struct kmr_kv_box,
86  const KMR_KVS *, KMR_KVS *, void *, long);
87 static int run_kv_generator(const struct kmr_kv_box,
88  const KMR_KVS *, KMR_KVS *, void *, long);
89 static int write_kvs(const struct kmr_kv_box[], const long,
90  const KMR_KVS *, KMR_KVS *, void *);
91 static int generate_redcmd_kvs(const struct kmr_kv_box,
92  const KMR_KVS *, KMR_KVS *, void *, long);
93 static int sleep_wait(const struct kmr_kv_box, const KMR_KVS *, KMR_KVS *,
94  void *, long);
95 static int delete_file(const struct kmr_kv_box, const KMR_KVS *, KMR_KVS *,
96  void *, long);
97 static void parse_args(char *, char *[]);
98 
99 
100 /* A structure that stores command line information.
101  */
102 struct cmdinfo {
103  char **cmd_args;
104  char *infile;
105  int num_procs;
106 };
107 
108 
109 /* Abort function */
110 static void
111 kmrsh_abort(int rank, const char *format, ...)
112 {
113  va_list arg;
114  if (rank == 0) {
115  va_start(arg, format);
116  vfprintf(stderr, format, arg);
117  va_end(arg);
118  }
119  MPI_Abort(MPI_COMM_WORLD, 1);
120  exit(1);
121 }
122 
123 /* This function create a key-value whose key is the specified id and
124  value is command line, and then add it to the KVS.
125 */
126 static int
127 add_command_kv(KMR_KVS *kvo, int id, char **cmd, char *infile, int n_procs)
128 {
129  int i, cmdlen, vlen;
130  char *cp, *np, *value;
131  char maxprocs[32];
132 
133  /* set maxprocs parameter */
134  snprintf(maxprocs, 31, "maxprocs=%d", n_procs);
135 
136  /* construct command line */
137  for (cmdlen = 0, i = 0; i < ARGSIZ; i++) {
138  if (cmd[i] == NULL) {
139  break;
140  }
141  cmdlen += (int)strlen(cmd[i]) + 1;
142  }
143  vlen = (int)strlen(maxprocs) + 1 + cmdlen + (int)strlen(infile) + 1;
144  value = (char *)malloc((size_t)vlen * sizeof(char));
145  memcpy(value, maxprocs, strlen(maxprocs));
146  cp = value + strlen(maxprocs);
147  *cp++ = ' ';
148  for (i = 0; i < ARGSIZ; i++) {
149  if (cmd[i] == NULL) {
150  break;
151  }
152  size_t len = strlen(cmd[i]);
153  memcpy(cp, cmd[i], len);
154  cp += len;
155  *cp++ = ' ';
156  }
157 
158  /* set input file */
159  memcpy(cp, infile, strlen(infile));
160  *(cp + strlen(infile)) = '\0';
161 
162  /* replace all ' ' by '\0' */
163  cp = value;
164  while (1) {
165  if ((np = strchr((const char*)cp, ' ')) != NULL) {
166  *np++ = '\0';
167  cp = np;
168  } else {
169  break;
170  }
171  }
172 
173  struct kmr_kv_box nkv = { .klen = sizeof(long),
174  .vlen = (size_t)vlen * sizeof(char),
175  .k.i = id,
176  .v.p = (void *)value };
177  int ret = kmr_add_kv(kvo, nkv);
178  free(value);
179  return ret;
180 }
181 
182 /* KMR map function
183  It generates a KVS whose keys are numbers and values are command lines
184  for mapper.
185 */
186 static int
187 generate_mapcmd_kvs(const struct kmr_kv_box kv,
188  const KMR_KVS *kvi, KMR_KVS *kvo, void *p, long i_)
189 {
190  int ret;
191  struct cmdinfo *info = (struct cmdinfo *)p;
192  char *path = info->infile;
193  struct stat status;
194 
195  if (stat(path, &status) < 0) {
196  fprintf(stderr, "File[%s] error\n", path);
197  return -1;
198  }
199  if (!S_ISDIR(status.st_mode) && !S_ISREG(status.st_mode)) {
200  fprintf(stderr, "File[%s] is not regular file or directory\n", path);
201  return -1;
202  }
203 
204  if (S_ISDIR(status.st_mode)) { /* directory */
205  size_t direntsz;
206  long nmax = pathconf(path, _PC_NAME_MAX);
207  if (nmax == -1) {
208  direntsz = (64 * 1024);
209  } else {
210  direntsz = (offsetof(struct dirent, d_name) + (size_t)nmax + 1);
211  }
212  DIR *d;
213  struct dirent *dentp;
214  char b[direntsz];
215  int id = 0;
216 
217  d = opendir(path);
218  if (d == NULL) {
219  perror("opendir");
220  return -1;
221  }
222  while (readdir_r(d, (void *)b, &dentp) >= 0) {
223  struct stat substat;
224  char fullpath[MAXPATHLEN];
225  if (dentp == NULL) {
226  break;
227  }
228 
229  ret = snprintf(fullpath, sizeof(fullpath), "%s/%s",
230  path, dentp->d_name);
231  if (ret <= 0) {
232  perror("snprintf");
233  continue;
234  }
235 
236  if (stat(fullpath, &substat) < 0) {
237  continue;
238  }
239  if (S_ISREG(substat.st_mode)) {
240  ret = add_command_kv(kvo, id, info->cmd_args, fullpath,
241  info->num_procs);
242  if (ret != MPI_SUCCESS) {
243  return ret;
244  }
245  id += 1;
246  }
247  }
248  closedir(d);
249  ret = MPI_SUCCESS;
250  } else { /* file */
251  ret = add_command_kv(kvo, 0, info->cmd_args, path, info->num_procs);
252  }
253 
254  return ret;
255 }
256 
257 /* KMR map function
258  It generates key-values for shuffling after mapper programs has been
259  executed.
260 */
261 static int
262 run_kv_generator(const struct kmr_kv_box kv,
263  const KMR_KVS *kvi, KMR_KVS *kvo, void *p, long i_)
264 {
265  struct cmdinfo *info = (struct cmdinfo *)p;
266 
267  if (info->cmd_args[0] != NULL) {
268  int ret, pipefd[2];
269 
270  ret = pipe(pipefd);
271  if (ret < 0) {
272  perror("pipe for kv generator");
273  return ret;
274  }
275 
276  pid_t pid = fork();
277  if (pid < 0) {
278  perror("fork kv generator");
279  return -1;
280  } else if (pid == 0) {
281  // child process
282  ret = close(pipefd[0]);
283  if (ret < 0) {
284  perror("pipe close kv generator");
285  return ret;
286  }
287  ret = dup2(pipefd[1], STDOUT_FILENO);
288  if (ret < 0) {
289  perror("dup2 pipe kv generator");
290  return ret;
291  }
292  ret = close(pipefd[1]);
293  if (ret < 0) {
294  perror("pipe close kv generator");
295  return ret;
296  }
297 
298  // get the input filename from key-value
299  char *cp, *infile;
300  for (cp = (char *)kv.v.p; cp < kv.v.p + kv.vlen - 1; cp++) {
301  if (*cp == '\0') {
302  infile = cp + 1;
303  }
304  }
305 
306  char *cmd_args[ARGSIZ+1] = { NULL };
307  int i;
308  for (i = 0; i <= ARGSIZ; i++) {
309  if (info->cmd_args[i] != NULL) {
310  cmd_args[i] = info->cmd_args[i];
311  } else {
312  cmd_args[i] = infile;
313  break;
314  }
315  }
316 
317  ret = execv(cmd_args[0], cmd_args);
318  if (ret < 0) {
319  perror("execv kv generator");
320  return ret;
321  }
322  } else {
323  // parent process
324  ret = close(pipefd[1]);
325  if (ret < 0) {
326  perror("pipe close kv generator");
327  return ret;
328  }
329 
330  char line[LINELEN];
331  long missingnl = 0;
332  long badlines = 0;
333  FILE* chld_out = fdopen(pipefd[0], "r");
334  while (fgets(line, sizeof(line), chld_out) != NULL) {
335  char *cp = strchr(line, '\n');
336  if (cp != NULL) {
337  assert(*cp == '\n');
338  *cp = '\0';
339  } else {
340  missingnl++;
341  }
342  cp = strchr(line, ' ');
343  if (cp == NULL) {
344  /* No value field. */
345  badlines++;
346  continue;
347  }
348  *cp = '\0';
349  char *key = line;
350  char *value = (cp + 1);
351  struct kmr_kv_box nkv;
352  nkv.klen = (int)strlen(key) + 1;
353  nkv.vlen = (int)strlen(value) + 1;
354  nkv.k.p = key;
355  nkv.v.p = value;
356  ret = kmr_add_kv(kvo, nkv);
357  if (ret != MPI_SUCCESS) {
358  return ret;
359  }
360 
361  if (missingnl) {
362  fprintf(stderr, ("warning: Line too long or "
363  "missing last newline.\n"));
364  }
365  if (badlines) {
366  fprintf(stderr, ("warning: Some lines have "
367  "no key-value pairs (ignored).\n"));
368  }
369  }
370 
371  ret = close(pipefd[0]);
372  if (ret < 0) {
373  perror("pipe close kv generator");
374  return ret;
375  }
376  }
377  }
378 
379  return MPI_SUCCESS;
380 }
381 
382 /* KMR reduce function
383  Write key-value pairs in KVS to key-named files.
384 */
385 static int
386 write_kvs(const struct kmr_kv_box kv[], const long n,
387  const KMR_KVS *kvs, KMR_KVS *kvo, void *p)
388 {
389  FILE *fp;
390  int ret;
391 
392  if ((fp = fopen(kv[0].k.p, "w")) == NULL) {
393  perror("open file with write mode");
394  return -1;
395  }
396  for (long i = 0; i < n; i++) {
397  fprintf(fp, "%s %s\n", kv[i].k.p, kv[i].v.p);
398  }
399  fclose(fp);
400 
401  // key is key, value is file path
402  struct kmr_kv_box nkv;
403  nkv.klen = kv[0].klen;
404  nkv.k.p = kv[0].k.p;
405  nkv.vlen = kv[0].klen;
406  nkv.v.p = kv[0].k.p;
407  ret = kmr_add_kv(kvo, nkv);
408  if (ret != MPI_SUCCESS) {
409  return ret;
410  }
411  return MPI_SUCCESS;
412 }
413 
414 /* KMR map function
415  It generates a KVS whose keys are numbers and values are command lines
416  for reducer.
417 */
418 static int
419 generate_redcmd_kvs(const struct kmr_kv_box kv,
420  const KMR_KVS *kvi, KMR_KVS *kvo, void *p, long i_)
421 {
422  int ret;
423  struct cmdinfo *info = (struct cmdinfo *)p;
424 
425  ret = add_command_kv(kvo, (int)i_, info->cmd_args, (char *)kv.k.p,
426  info->num_procs);
427  return ret;
428 }
429 
430 static int
431 sleep_wait(const struct kmr_kv_box kv,
432  const KMR_KVS *kvi, KMR_KVS *kvo, void *p, long i_)
433 {
434  sleep(1);
435  return MPI_SUCCESS;
436 }
437 
438 static int
439 delete_file(const struct kmr_kv_box kv,
440  const KMR_KVS *kvi, KMR_KVS *kvo, void *p, long i_)
441 {
442  char *file_name = (char*)kv.k.p;
443  int ret = access(file_name, F_OK);
444  if (ret == 0) {
445  unlink(file_name);
446  }
447  return MPI_SUCCESS;
448 }
449 
450 
451 /* Parses command parameters given for mapper and reducer arguments.
452  It scans an argument string like "mapper arg0 arg1" for the -m and
453  -r options, and generates an argv array {"mapper", "arg0", "arg1",
454  NULL}. The separator is a whitespace.
455  \param argstr string given for -m or -r options.
456  \param argary array to be filled by argument strings. */
457 static void
458 parse_args(char *argstr, char *argary[])
459 {
460  char *cp, *np;
461  char **ap;
462 
463  ap = argary;
464  cp = argstr;
465  while (1) {
466  *ap = cp;
467  if ((np = strchr((const char*)cp, ' ')) != NULL) {
468  *np++ = '\0';
469  }
470  if (++ap >= &argary[ARGSIZ-1]) {
471  *ap = NULL;
472  break;
473  } else {
474  if (np == NULL) {
475  *ap = NULL;
476  break;
477  }
478  }
479  while (*np == ' ') {
480  np++;
481  }
482  cp = np;
483  }
484 
485  if (argary[0][0] != '.' || argary[0][0] != '/') {
486  /* insert './' in front of command name */
487  int len = (int)strlen(argary[0]) + 1;
488  if (len + 2 > COMMANDLEN) {
489  fprintf(stderr, "command name is too long.\n");
490  MPI_Abort(MPI_COMM_WORLD, 1);
491  }
492  for (int i = len + 1; i >= 0; i--) {
493  argary[0][i+2] = argary[0][i];
494  }
495  argary[0][0] = '.';
496  argary[0][1] = '/';
497  }
498 }
499 
500 
501 int
502 main(int argc, char *argv[])
503 {
504  int rank, ret, opt;
505  char *mapper = NULL, *reducer = NULL, *infile = NULL;
506  char *margv[ARGSIZ] = { NULL }, margbuf[ARGSTRLEN];
507  char *rargv[ARGSIZ] = { NULL }, rargbuf[ARGSTRLEN];
508  char *kargv[ARGSIZ] = { NULL }, kargbuf[ARGSTRLEN];
509  int map_procs = DEFAULT_MPI, red_procs = DEFAULT_MPI;
510 
511  char *usage_msg =
512  "Usage %s -n m_num[:r_num] -m mapper [-k kvgenerator] [-r reducer] "
513  "inputfile\n";
514 
515  MPI_Init(&argc, &argv);
516  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
517 
518  while ((opt = getopt(argc, argv, "m:r:n:k:")) != -1) {
519  switch (opt) {
520  size_t asz;
521  case 'm':
522  asz = (strlen(optarg) + 1);
523  if (asz >= ARGSTRLEN) {
524  kmrsh_abort(rank, "Argument too long for mapper (%s)\n",
525  optarg);
526  }
527  memcpy(margbuf, optarg, asz);
528  parse_args(margbuf, &margv[0]);
529  mapper = margv[0];
530  break;
531  case 'r':
532  asz = (strlen(optarg) + 1);
533  if (asz >= ARGSTRLEN) {
534  kmrsh_abort(rank, "Argument too long for reducer (%s)\n",
535  optarg);
536  }
537  memcpy(rargbuf, optarg, asz);
538  parse_args(rargbuf, &rargv[0]);
539  reducer = rargv[0];
540  break;
541  case 'n':
542  asz = (strlen(optarg) + 1);
543  char para_arg[ARGSTRLEN], *cp;
544  memcpy(para_arg, optarg, asz);
545  cp = strchr(para_arg, ':');
546  if (cp == NULL) {
547  /* use the same # of processes in map & reduce */
548  map_procs = (int)strtol(para_arg, NULL, 10);
549  red_procs = map_procs;
550  } else {
551  /* use the different # of processes */
552  *cp = '\0';
553  char *np = cp + 1;
554  map_procs = (int)strtol(para_arg, NULL, 10);
555  red_procs = (int)strtol(np, NULL, 10);
556  }
557  break;
558  case 'k':
559  asz = (strlen(optarg) + 1);
560  if (asz >= ARGSTRLEN) {
561  kmrsh_abort(rank, "Argument too long for key-value "
562  "generator (%s)\n", optarg);
563  }
564  memcpy(kargbuf, optarg, asz);
565  parse_args(kargbuf, &kargv[0]);
566  break;
567  default:
568  kmrsh_abort(rank, usage_msg, argv[0]);
569  }
570  }
571 
572  if ((argc - optind) != 1) {
573  kmrsh_abort(rank, usage_msg, argv[0]);
574  } else {
575  infile = argv[optind];
576  optind++;
577  }
578 
579  if (mapper == NULL) {
580  kmrsh_abort(rank, usage_msg, argv[0]);
581  }
582 
583  kmr_init();
584  KMR *mr = kmr_create_context(MPI_COMM_WORLD, MPI_INFO_NULL, 0);
585  //mr->trace_map_spawn = 1;
586 
587  /* Assign mapper command lines to static processes */
588  struct cmdinfo mapinfo = { margv, infile, map_procs };
589  KMR_KVS *kvs_commands = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
590  ret = kmr_map_once(kvs_commands, &mapinfo, kmr_noopt, 1,
591  generate_mapcmd_kvs);
592  if (ret != MPI_SUCCESS) {
593  kmrsh_abort(rank, "kmr_map_once failed.\n");
594  }
595 
596  /* Run mapper */
597  KMR_KVS *kvs_map = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
598  struct cmdinfo gkvinfo = { kargv, NULL, 0 };
599  ret = kmr_map_processes(0, kvs_commands, kvs_map, &gkvinfo, MPI_INFO_NULL,
600  kmr_snoopt, run_kv_generator);
601  if (ret != MPI_SUCCESS) {
602  kmrsh_abort(rank, "executing mapper failed.\n");
603  }
604 
605  if (reducer != NULL) {
606  /* Shuffle key-value */
607  KMR_KVS *kvs_red = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
608  ret = kmr_shuffle(kvs_map, kvs_red, kmr_noopt);
609  if (ret != MPI_SUCCESS) {
610  kmrsh_abort(rank, "shuffling failed.\n");
611  }
612 
613  /* Write key-values to files whose name is key */
614  KMR_KVS *kvs_file = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
615  ret = kmr_reduce(kvs_red, kvs_file, NULL, kmr_noopt, write_kvs);
616  if (ret != MPI_SUCCESS) {
617  kmrsh_abort(rank, "writing key-values to files failed.\n");
618  }
619 
620  /* Generate commands for reducer */
621  struct cmdinfo redinfo = { rargv, NULL, red_procs };
622  kvs_commands = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
623  struct kmr_option kmr_inspect = { .inspect = 1 };
624  ret = kmr_map(kvs_file, kvs_commands, &redinfo, kmr_inspect,
625  generate_redcmd_kvs);
626  if (ret != MPI_SUCCESS) {
627  kmrsh_abort(rank, "kmr_map failed.\n");
628  }
629 
630  /* Run reducer */
631  ret = kmr_map_processes(0, kvs_commands, NULL, NULL, MPI_INFO_NULL,
632  kmr_snoopt, sleep_wait);
633  if (ret != MPI_SUCCESS) {
634  kmrsh_abort(rank, "executing reducer failed.\n");
635  }
636 
637  /* Delete key files */
638  ret = kmr_map(kvs_file, NULL, NULL, kmr_noopt, delete_file);
639  if (ret != MPI_SUCCESS) {
640  kmrsh_abort(rank, "kmr_map failed.\n");
641  }
642  } else {
643  kmr_free_kvs(kvs_map);
644  }
645 
646  kmr_free_context(mr);
647  kmr_fin();
648  MPI_Finalize();
649  return 0;
650 }
651 
652 /*
653 Copyright (C) 2012-2016 RIKEN AICS
654 This library is distributed WITHOUT ANY WARRANTY. This library can be
655 redistributed and/or modified under the terms of the BSD 2-Clause License.
656 */
Key-Value Stream (abstract).
Definition: kmr.h:587
#define kmr_reduce(KVI, KVO, ARG, OPT, R)
Reduces key-value pairs.
Definition: kmr.h:88
Options to Mapping, Shuffling, and Reduction.
Definition: kmr.h:613
#define ARGSTRLEN
Buffer string size of arguments to mapper and reducer programs.
Definition: kmrshell.c:31
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
Definition: kmrbase.c:751
#define ARGSIZ
Maximum number of arguments to mapper and reducer programs.
Definition: kmrshell.c:28
int kmr_map_once(KMR_KVS *kvo, void *arg, struct kmr_option opt, _Bool rank_zero_only, kmr_mapfn_t m)
Maps once.
Definition: kmrbase.c:1402
#define kmr_create_kvs(MR, KF, VF)
Makes a new key-value stream (of type KMR_KVS) with the specified field datatypes.
Definition: kmr.h:71
int kmr_shuffle(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Shuffles key-value pairs to the appropriate destination ranks.
Definition: kmrbase.c:2036
KMR Context.
Definition: kmr.h:222
int kmr_free_kvs(KMR_KVS *kvs)
Releases a key-value stream (type KMR_KVS).
Definition: kmrbase.c:621
#define kmr_map(KVI, KVO, ARG, OPT, M)
Maps simply.
Definition: kmr.h:82
Handy Copy of a Key-Value Field.
Definition: kmr.h:358
int kmr_fin(void)
Clears the environment.
Definition: kmrbase.c:124
#define kmr_init()
Sets up the environment.
Definition: kmr.h:747
int kmr_free_context(KMR *mr)
Releases a context created with kmr_create_context().
Definition: kmrbase.c:326
KMR Interface.
static void parse_args(char *, char *[])
Parses command parameters given for mapper and reducer arguments.
Definition: kmrshell.c:257
#define LINELEN
Maximum length of a line of data.
Definition: kmrshell.c:25
int kmr_map_processes(_Bool nonmpi, KMR_KVS *kvi, KMR_KVS *kvo, void *arg, MPI_Info info, struct kmr_spawn_option opt, kmr_mapfn_t mapfn)
Maps on processes started by MPI_Comm_spawn() to run independent processes.
Definition: kmrmapms.c:1965
KMR * kmr_create_context(const MPI_Comm comm, const MPI_Info conf, const char *name)
Makes a new KMR context (a context has type KMR).
Definition: kmrbase.c:147