KMR
kmrrun.c
Go to the documentation of this file.
1 /* kmrrun.c (2014-05-29) */
2 /* Copyright (C) 2012-2016 RIKEN AICS */
3 
4 /** \file kmrrun.c
5  kmrrun is command line version of KMR and it runs a MapReduce
6  program whose mapper and reducers are user specified programs.
7  Both mapper and reducer can be a serial or an MPI program.
8 
9  When kmrrun is used to run a MapReduce program, user should specify
10  a simple program that generates key-value pairs from the output of mapper.
11  The key-value generator program can be specified by '-k' option and
12  can be implemented by reading outputs of mapper and then writing
13  key-value pairs to the standard output.
14  After shuffling the key-value paris, key-value pairs are written to files
15  on each rank with 'key'-named text files whose line represents a key-value
16  separated by a space.
17  The file is passed to the reducer as the last parameter.
18 
19  kmrrun can run Map-only MapReduce where no reducer is run.
20  This is very useful if you want to run multiple tasks as a single job.
21 
22  Options
23  - \c -m mapper [Mandatory]\c
24  - Specify a mapper program (serial, OpenMP or MPI)
25  - \c -k key_value_generator [Optional] \c
26  - Specify a key-value pair generator program (serial)
27  - \c -r reducer [Optional]\c
28  - Specify a reducer program (serial, OpenMP or MPI)
29  - \c -n m_num[:r_num] [Optional] \c
30  - Specify number of MPI processes to run mapper and reducers.
31  When \c r_num \c is specified, each mapper runs with \c m_num \c
32  processes and each reducer runs with \c r_num \c processes.
33  When \c r_num \c is not specified each mapper and reducer runs
34  with \c m_num \c processes.
35  The default is 1, and in this case, the mapper and reducer are
36  assumed to be serial programs.
37  - \c --ckpt [Optional] \c
38  - Enable checkpoint/restart.
39 
40  Usage
41  \code
42  $ mpiexec -machinefile machines -n 4 \
43  ./kmrrun -n m_num[:r_num] -m mapper [-k kvgenerator] [-r reducer] [--ckpt]\
44  inputfile
45  \endcode
46 
47  Examples
48  \code
49  e.g.1) Run serial mapper and reducer
50  $ mpirun -np 2 ./kmrrun -m "./pi.mapper" -k "./pi.kvgen.sh" -r "./pi.reducer" ./work
51 
52  e.g.2) Run MPI mapper and MPI reducer with 2 MPI processes each.
53  $ mpirun -np 2 ./kmrrun -n 2 -m "./mpi_pi.mapper" -k "./mpi_pi.kvgen.sh" -r "./mpi_pi.reducer" ./work
54 
55  e.g.3) Run MPI mapper with 2 MPI processes and serial reducer
56  $ mpirun -np 2 ./kmrrun -n 2:1 -m "./mpi_pi.mapper" -k "./mpi_pi.kvgen.sh" -r "./pi.reducer" ./work
57 
58  e.g.4) Only run MPI mapper with 2 MPI processes
59  $ mpirun -np 2 ./kmrrun -n 2 -m "./mpi_pi.mapper" ./work
60  \endcode
61 */
62 
63 #include <stdio.h>
64 #include <stdlib.h>
65 #include <string.h>
66 #include <stdarg.h>
67 #include <sys/types.h>
68 #include <sys/stat.h>
69 #include <sys/param.h>
70 #include <dirent.h>
71 #include <unistd.h>
72 #include <getopt.h>
73 #include <time.h>
74 #include <errno.h>
75 #include <mpi.h>
76 #include "kmr.h"
77 
78 /* Maximum number of arguments to mapper and reducer programs. */
79 #define ARGSIZ 8
80 
81 /* Buffer string size of arguments to mapper and reducer programs. */
82 #define ARGSTRLEN (8 * 1024)
83 
84 /* path name length */
85 #define PATHLEN 1024
86 
87 /* Default number of processes used in spawned program */
88 #define DEFAULT_PROCS 1
89 
90 /* Prefix of a directory where key-named files are generated */
91 #define TMPDIR_PREFIX "./KMRRUN_TMP"
92 
93 /* Maximum length of line that represents a key-value */
94 #define LINELEN 32767
95 
96 static void kmrrun_abort(int, const char *, ...);
97 static int add_command_kv(KMR_KVS *, int, char **, char *, int);
98 static int generate_mapcmd_kvs(const struct kmr_kv_box,
99  const KMR_KVS *, KMR_KVS *, void *, long);
100 static int run_kv_generator(const struct kmr_kv_box,
101  const KMR_KVS *, KMR_KVS *, void *, long);
102 static int write_kvs(const struct kmr_kv_box[], const long,
103  const KMR_KVS *, KMR_KVS *, void *);
104 static int generate_redcmd_kvs(const struct kmr_kv_box,
105  const KMR_KVS *, KMR_KVS *, void *, long);
106 static int delete_file(const struct kmr_kv_box, const KMR_KVS *, KMR_KVS *,
107  void *, long);
108 static void create_tmpdir(KMR *, char *, size_t);
109 static void delete_tmpdir(KMR *, char *);
110 static void parse_args(char *, char *[]);
111 
112 
113 /* A structure that stores command line information.
114  */
115 struct cmdinfo {
116  char **cmd_args;
117  char *infile;
118  int num_procs;
119 };
120 
121 
122 /* Abort function */
123 static void
124 kmrrun_abort(int rank, const char *format, ...)
125 {
126  va_list arg;
127  if (rank == 0) {
128  va_start(arg, format);
129  vfprintf(stderr, format, arg);
130  va_end(arg);
131  }
132  MPI_Abort(MPI_COMM_WORLD, 1);
133  exit(1);
134 }
135 
136 /* This function create a key-value whose key is the specified id and
137  value is command line, and then add it to the KVS.
138 */
139 static int
140 add_command_kv(KMR_KVS *kvo, int id, char **cmd, char *infile, int n_procs)
141 {
142  int i, cmdlen, vlen;
143  char *cp, *np, *value;
144  char maxprocs[32];
145 
146  /* set maxprocs parameter */
147  snprintf(maxprocs, 31, "maxprocs=%d", n_procs);
148 
149  /* construct command line */
150  for (cmdlen = 0, i = 0; i < ARGSIZ; i++) {
151  if (cmd[i] == NULL) {
152  break;
153  }
154  cmdlen += (int)strlen(cmd[i]) + 1;
155  }
156  vlen = (int)strlen(maxprocs) + 1 + cmdlen + (int)strlen(infile) + 1;
157  value = (char *)malloc((size_t)vlen * sizeof(char));
158  memcpy(value, maxprocs, strlen(maxprocs));
159  cp = value + strlen(maxprocs);
160  *cp++ = ' ';
161  for (i = 0; i < ARGSIZ; i++) {
162  if (cmd[i] == NULL) {
163  break;
164  }
165  int len = (int)strlen(cmd[i]);
166  memcpy(cp, cmd[i], (size_t)len);
167  cp += len;
168  *cp++ = ' ';
169  }
170 
171  /* set input file */
172  memcpy(cp, infile, strlen(infile));
173  *(cp + strlen(infile)) = '\0';
174 
175  /* replace all ' ' by '\0' */
176  cp = value;
177  while (1) {
178  if ((np = strchr((const char*)cp, ' ')) != NULL) {
179  *np++ = '\0';
180  cp = np;
181  } else {
182  break;
183  }
184  }
185 
186  struct kmr_kv_box nkv = { .klen = sizeof(long),
187  .vlen = vlen * (int)sizeof(char),
188  .k.i = id,
189  .v.p = (void *)value };
190  int ret = kmr_add_kv(kvo, nkv);
191  free(value);
192  return ret;
193 }
194 
195 /* KMR map function
196  It generates a KVS whose keys are numbers and values are command lines
197  for mapper.
198 */
199 static int
200 generate_mapcmd_kvs(const struct kmr_kv_box kv,
201  const KMR_KVS *kvi, KMR_KVS *kvo, void *p, long i_)
202 {
203  int ret;
204  struct cmdinfo *info = (struct cmdinfo *)p;
205  char *path = info->infile;
206  struct stat status;
207 
208  if (stat(path, &status) < 0) {
209  fprintf(stderr, "File[%s] error\n", path);
210  return -1;
211  }
212  if (!S_ISDIR(status.st_mode) && !S_ISREG(status.st_mode)) {
213  fprintf(stderr, "File[%s] is not regular file or directory\n", path);
214  return -1;
215  }
216 
217  if (S_ISDIR(status.st_mode)) { /* directory */
218  size_t direntsz;
219  long nmax = pathconf(path, _PC_NAME_MAX);
220  if (nmax == -1) {
221  direntsz = (64 * 1024);
222  } else {
223  direntsz = (offsetof(struct dirent, d_name) + (size_t)nmax + 1);
224  }
225  DIR *d;
226  struct dirent *dentp;
227  char b[direntsz];
228  int id = 0;
229 
230  d = opendir(path);
231  if (d == NULL) {
232  perror("opendir");
233  return -1;
234  }
235  while (readdir_r(d, (void *)b, &dentp) >= 0) {
236  struct stat substat;
237  char fullpath[MAXPATHLEN];
238  if (dentp == NULL) {
239  break;
240  }
241 
242  ret = snprintf(fullpath, sizeof(fullpath), "%s/%s",
243  path, dentp->d_name);
244  if (ret <= 0) {
245  perror("snprintf");
246  continue;
247  }
248 
249  if (stat(fullpath, &substat) < 0) {
250  continue;
251  }
252  if (S_ISREG(substat.st_mode)) {
253  ret = add_command_kv(kvo, id, info->cmd_args, fullpath,
254  info->num_procs);
255  if (ret != MPI_SUCCESS) {
256  return ret;
257  }
258  id += 1;
259  }
260  }
261  closedir(d);
262  ret = MPI_SUCCESS;
263  } else { /* file */
264  ret = add_command_kv(kvo, 0, info->cmd_args, path, info->num_procs);
265  }
266 
267  return ret;
268 }
269 
270 /* KMR map function
271  It generates key-values for shuffling after mapper programs has been
272  executed.
273 */
274 static int
275 run_kv_generator(const struct kmr_kv_box kv,
276  const KMR_KVS *kvi, KMR_KVS *kvo, void *p, long i_)
277 {
278  struct cmdinfo *info = (struct cmdinfo *)p;
279 
280  if (info->cmd_args[0] != NULL) {
281  int ret, pipefd[2];
282 
283  ret = pipe(pipefd);
284  if (ret < 0) {
285  perror("pipe for kv generator");
286  return ret;
287  }
288 
289  pid_t pid = fork();
290  if (pid < 0) {
291  perror("fork kv generator");
292  return -1;
293  } else if (pid == 0) {
294  // child process
295  ret = close(pipefd[0]);
296  if (ret < 0) {
297  perror("pipe close kv generator");
298  return ret;
299  }
300  ret = dup2(pipefd[1], STDOUT_FILENO);
301  if (ret < 0) {
302  perror("dup2 pipe kv generator");
303  return ret;
304  }
305  ret = close(pipefd[1]);
306  if (ret < 0) {
307  perror("pipe close kv generator");
308  return ret;
309  }
310 
311  // get the input filename from key-value
312  char *cp, *infile = NULL;
313  for (cp = (char *)kv.v.p; cp < kv.v.p + kv.vlen - 1; cp++) {
314  if (*cp == '\0') {
315  infile = cp + 1;
316  }
317  }
318 
319  char *cmd_args[ARGSIZ+1] = { NULL };
320  int i;
321  for (i = 0; i <= ARGSIZ; i++) {
322  if (info->cmd_args[i] != NULL) {
323  cmd_args[i] = info->cmd_args[i];
324  } else {
325  cmd_args[i] = infile;
326  break;
327  }
328  }
329 
330  ret = execv(cmd_args[0], cmd_args);
331  if (ret < 0) {
332  perror("execv kv generator");
333  return ret;
334  }
335  } else {
336  // parent process
337  ret = close(pipefd[1]);
338  if (ret < 0) {
339  perror("pipe close kv generator");
340  return ret;
341  }
342 
343  char line[LINELEN];
344  long missingnl = 0;
345  long badlines = 0;
346  FILE* chld_out = fdopen(pipefd[0], "r");
347  while (fgets(line, sizeof(line), chld_out) != NULL) {
348  char *cp = strchr(line, '\n');
349  if (cp != NULL) {
350  assert(*cp == '\n');
351  *cp = '\0';
352  } else {
353  missingnl++;
354  }
355  cp = strchr(line, ' ');
356  if (cp == NULL) {
357  /* No value field. */
358  badlines++;
359  continue;
360  }
361  *cp = '\0';
362  char *key = line;
363  char *value = (cp + 1);
364  struct kmr_kv_box nkv;
365  nkv.klen = (int)strlen(key) + 1;
366  nkv.vlen = (int)strlen(value) + 1;
367  nkv.k.p = key;
368  nkv.v.p = value;
369  ret = kmr_add_kv(kvo, nkv);
370  if (ret != MPI_SUCCESS) {
371  return ret;
372  }
373 
374  if (missingnl) {
375  fprintf(stderr, ("warning: Line too long or "
376  "missing last newline.\n"));
377  }
378  if (badlines) {
379  fprintf(stderr, ("warning: Some lines have "
380  "no key-value pairs (ignored).\n"));
381  }
382  }
383 
384  ret = close(pipefd[0]);
385  if (ret < 0) {
386  perror("pipe close kv generator");
387  return ret;
388  }
389  }
390  }
391 
392  return MPI_SUCCESS;
393 }
394 
395 /* KMR reduce function
396  Write key-value pairs in KVS to key-named files.
397 */
398 static int
399 write_kvs(const struct kmr_kv_box kv[], const long n,
400  const KMR_KVS *kvs, KMR_KVS *kvo, void *p)
401 {
402  FILE *fp;
403  int ret;
404  char filename[PATHLEN];
405 
406  snprintf(filename, PATHLEN, "%s/%d/%s",
407  (char *)p, kvs->c.mr->rank, kv[0].k.p);
408  if ((fp = fopen(filename, "w")) == NULL) {
409  perror("open file with write mode");
410  return -1;
411  }
412  for (long i = 0; i < n; i++) {
413  fprintf(fp, "%s %s\n", kv[i].k.p, kv[i].v.p);
414  }
415  fclose(fp);
416 
417  // key is key, value is file path
418  struct kmr_kv_box nkv;
419  nkv.klen = kv[0].klen;
420  nkv.k.p = kv[0].k.p;
421  nkv.vlen = (int)strlen(filename) + 1;
422  nkv.v.p = filename;
423  ret = kmr_add_kv(kvo, nkv);
424  if (ret != MPI_SUCCESS) {
425  return ret;
426  }
427  return MPI_SUCCESS;
428 }
429 
430 /* KMR map function
431  It generates a KVS whose keys are numbers and values are command lines
432  for reducer.
433 */
434 static int
435 generate_redcmd_kvs(const struct kmr_kv_box kv,
436  const KMR_KVS *kvi, KMR_KVS *kvo, void *p, long i_)
437 {
438  int ret;
439  struct cmdinfo *info = (struct cmdinfo *)p;
440 
441  ret = add_command_kv(kvo, (int)i_, info->cmd_args, (char *)kv.v.p,
442  info->num_procs);
443  return ret;
444 }
445 
446 /* KMR map function
447  It deletes a key-named file.
448 */
449 static int
450 delete_file(const struct kmr_kv_box kv,
451  const KMR_KVS *kvi, KMR_KVS *kvo, void *p, long i_)
452 {
453  char *file_name = (char*)kv.v.p;
454  int ret = access(file_name, F_OK);
455  if (ret == 0) {
456  unlink(file_name);
457  }
458  return MPI_SUCCESS;
459 }
460 
461 
462 /* Parses command parameters given for mapper and reducer arguments.
463  It scans an argument string like "mapper arg0 arg1" for the -m and
464  -r options, and generates an argv array {"mapper", "arg0", "arg1",
465  NULL}. The separator is a whitespace.
466  \param argstr string given for -m or -r options.
467  \param argary array to be filled by argument strings. */
468 static void
469 parse_args(char *argstr, char *argary[])
470 {
471  if (!(argstr[0] == '.' || argstr[0] == '/')) {
472  /* insert './' in front of command name */
473  int len = (int)strlen(argstr) + 1;
474  if (len + 2 > ARGSTRLEN) {
475  fprintf(stderr, "command line is too long.\n");
476  MPI_Abort(MPI_COMM_WORLD, 1);
477  }
478  fprintf(stderr, "The command is assumed to be located in "
479  "the current directory.\n");
480  for (int i = len; i >= 0; i--) {
481  argstr[i+2] = argstr[i];
482  }
483  argstr[0] = '.';
484  argstr[1] = '/';
485  }
486 
487  char *cp, *np;
488  char **ap;
489  ap = argary;
490  cp = argstr;
491  while (1) {
492  *ap = cp;
493  if ((np = strchr((const char*)cp, ' ')) != NULL) {
494  *np++ = '\0';
495  }
496  if (++ap >= &argary[ARGSIZ-1]) {
497  *ap = NULL;
498  break;
499  } else {
500  if (np == NULL) {
501  *ap = NULL;
502  break;
503  }
504  }
505  while (*np == ' ') {
506  np++;
507  }
508  cp = np;
509  }
510 }
511 
512 /* Create temporal directories for storing key files for reducers. */
513 static void
514 create_tmpdir(KMR *mr, char *tmpdir, size_t tmpdir_len)
515 {
516  /* create root directory on rank 0 */
517  if (mr->rank == 0) {
518  int cur_time = (int)time(NULL);
519  snprintf(tmpdir, tmpdir_len, TMPDIR_PREFIX"%d", cur_time);
520  int ret = mkdir(tmpdir, 0777);
521  if (ret != 0) {
522  char *errmsg = strerror(errno);
523  kmrrun_abort(0, "Error on creating the temporal directory: %s\n",
524  errmsg);
525  }
526  }
527  int ret = MPI_Bcast(tmpdir, (int)tmpdir_len, MPI_CHAR, 0, mr->comm);
528  if (ret != MPI_SUCCESS) {
529  kmrrun_abort(mr->rank, "MPI_Bcast failed.\n");
530  }
531  /* create rank local directory */
532  char rankdir[PATHLEN];
533  snprintf(rankdir, PATHLEN, "%s/%d", tmpdir, mr->rank);
534  ret = mkdir(rankdir, 0777);
535  if (ret != 0) {
536  char *errmsg = strerror(errno);
537  kmrrun_abort(mr->rank,
538  "Error on creating the rank local temporal directory: "
539  "%s\n", errmsg);
540  }
541 }
542 
543 /* Delete temporal directories for storing key files for reducers. */
544 static void
545 delete_tmpdir(KMR *mr, char *tmpdir)
546 {
547  /* delete rank local directory */
548  char rankdir[PATHLEN];
549  snprintf(rankdir, PATHLEN, "%s/%d", tmpdir, mr->rank);
550  int ret = rmdir(rankdir);
551  if (ret != 0) {
552  char *errmsg = strerror(errno);
553  fprintf(stderr,
554  "Rank[%05d] Failed to delete rank local temporal directory: "
555  "%s\n", mr->rank, errmsg);
556  }
557  MPI_Barrier(mr->comm);
558  /* delete root directory on rank 0 */
559  if (mr->rank == 0) {
560  ret = rmdir(tmpdir);
561  if (ret != 0) {
562  char *errmsg = strerror(errno);
563  fprintf(stderr,
564  "Failed to delete the temporal directory: %s\n", errmsg);
565  }
566  }
567 }
568 
569 
570 int
571 main(int argc, char *argv[])
572 {
573  int rank, ret, opt;
574  char *mapper = NULL, *reducer = NULL, *infile = NULL;
575  char *margv[ARGSIZ] = { NULL }, margbuf[ARGSTRLEN];
576  char *rargv[ARGSIZ] = { NULL }, rargbuf[ARGSTRLEN];
577  char *kargv[ARGSIZ] = { NULL }, kargbuf[ARGSTRLEN];
578  int map_procs = DEFAULT_PROCS, red_procs = DEFAULT_PROCS;
579  int ckpt_enable = 0;
580 
581  char *usage_msg =
582  "Usage %s [-n m_num[:r_num]] -m mapper [-k kvgenerator]\n"
583  " [-r reducer] [--ckpt]\n"
584  " inputfile\n";
585 
586  MPI_Init(&argc, &argv);
587  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
588 
589  while (1) {
590  int option_index = 0;
591 
592  static struct option long_options[] = {
593  {"ckpt", no_argument, 0, 0},
594  {0, 0, 0, 0}
595  };
596 
597  opt = getopt_long(argc, argv, "m:r:k:n:",
598  long_options, &option_index);
599  if (opt == -1) {
600  break;
601  }
602 
603  switch (opt) {
604  size_t asz;
605  case 0:
606  if (strcmp("ckpt", long_options[option_index].name) == 0) {
607  ckpt_enable = 1;
608  }
609  break;
610  case 'm':
611  asz = (strlen(optarg) + 1);
612  if (asz >= ARGSTRLEN) {
613  kmrrun_abort(rank, "Argument too long for mapper (%s)\n",
614  optarg);
615  }
616  memcpy(margbuf, optarg, asz);
617  parse_args(margbuf, &margv[0]);
618  mapper = margv[0];
619  break;
620  case 'r':
621  asz = (strlen(optarg) + 1);
622  if (asz >= ARGSTRLEN) {
623  kmrrun_abort(rank, "Argument too long for reducer (%s)\n",
624  optarg);
625  }
626  memcpy(rargbuf, optarg, asz);
627  parse_args(rargbuf, &rargv[0]);
628  reducer = rargv[0];
629  break;
630  case 'n':
631  asz = (strlen(optarg) + 1);
632  char para_arg[ARGSTRLEN], *cp;
633  memcpy(para_arg, optarg, asz);
634  cp = strchr(para_arg, ':');
635  if (cp == NULL) {
636  /* use the same # of processes in map & reduce */
637  map_procs = (int)strtol(para_arg, NULL, 10);
638  red_procs = map_procs;
639  } else {
640  /* use the different # of processes */
641  *cp = '\0';
642  char *np = cp + 1;
643  map_procs = (int)strtol(para_arg, NULL, 10);
644  red_procs = (int)strtol(np, NULL, 10);
645  }
646  break;
647  case 'k':
648  asz = (strlen(optarg) + 1);
649  if (asz >= ARGSTRLEN) {
650  kmrrun_abort(rank, "Argument too long for key-value "
651  "generator (%s)\n", optarg);
652  }
653  memcpy(kargbuf, optarg, asz);
654  parse_args(kargbuf, &kargv[0]);
655  break;
656  default:
657  kmrrun_abort(rank, usage_msg, argv[0]);
658  }
659  }
660 
661  if ((argc - optind) != 1) {
662  kmrrun_abort(rank, usage_msg, argv[0]);
663  } else {
664  infile = argv[optind];
665  optind++;
666  }
667 
668  if (mapper == NULL) {
669  kmrrun_abort(rank, usage_msg, argv[0]);
670  }
671 
672  kmr_init();
673  MPI_Info info;
674  MPI_Info_create(&info);
675  if (ckpt_enable == 1) {
676  ret = MPI_Info_set(info, "ckpt_enable", "1");
677  }
678  KMR *mr = kmr_create_context(MPI_COMM_WORLD, info, 0);
679  MPI_Info_free(&info);
680  mr->spawn_gap_msec[0] = 500;
681  mr->spawn_gap_msec[1] = 1000;
682  //mr->trace_map_spawn = 1;
683 
684  /* Assign mapper command lines to static processes */
685  _Bool nonmpi = (map_procs == 1) ? 1 : 0;
686  struct cmdinfo mapinfo = { margv, infile, map_procs };
687  KMR_KVS *kvs_commands = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
688  ret = kmr_map_once(kvs_commands, &mapinfo, kmr_noopt, 1,
689  generate_mapcmd_kvs);
690  if (ret != MPI_SUCCESS) {
691  kmrrun_abort(rank, "kmr_map_once failed.\n");
692  }
693  KMR_KVS *kvs_commands2 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
694  ret = kmr_shuffle(kvs_commands, kvs_commands2, kmr_noopt);
695  if (ret != MPI_SUCCESS) {
696  kmrrun_abort(rank, "kmr_shuffle failed.\n");
697  }
698 
699  /* Run mapper */
700  KMR_KVS *kvs_map = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
701  struct cmdinfo gkvinfo = { kargv, NULL, 1 };
702  ret = kmr_map_processes(nonmpi, kvs_commands2, kvs_map, &gkvinfo,
703  MPI_INFO_NULL, kmr_snoopt, run_kv_generator);
704  if (ret != MPI_SUCCESS) {
705  kmrrun_abort(rank, "executing mapper failed.\n");
706  }
707 
708  if (reducer != NULL) {
709  /* Shuffle key-value */
710  KMR_KVS *kvs_red = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
711  ret = kmr_shuffle(kvs_map, kvs_red, kmr_noopt);
712  if (ret != MPI_SUCCESS) {
713  kmrrun_abort(rank, "shuffling failed.\n");
714  }
715 
716  /* Create a temporal directory for storing key files */
717  char tmpdir[PATHLEN];
718  create_tmpdir(mr, tmpdir, PATHLEN);
719 
720  /* Write key-values to files whose name is key */
721  KMR_KVS *kvs_file = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
722  ret = kmr_reduce(kvs_red, kvs_file, tmpdir, kmr_noopt, write_kvs);
723  if (ret != MPI_SUCCESS) {
724  kmrrun_abort(rank, "writing key-values to files failed.\n");
725  }
726 
727  /* Generate commands for reducer */
728  nonmpi = (red_procs == 1) ? 1 : 0;
729  struct cmdinfo redinfo = { rargv, NULL, red_procs };
730  kvs_commands = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
731  struct kmr_option kmr_inspect = { .inspect = 1 };
732  ret = kmr_map(kvs_file, kvs_commands, &redinfo, kmr_inspect,
733  generate_redcmd_kvs);
734  if (ret != MPI_SUCCESS) {
735  kmrrun_abort(rank, "kmr_map failed.\n");
736  }
737 
738  /* Run reducer */
739  ret = kmr_map_processes(nonmpi, kvs_commands, NULL, NULL, MPI_INFO_NULL,
740  kmr_snoopt, NULL);
741  if (ret != MPI_SUCCESS) {
742  kmrrun_abort(rank, "executing reducer failed.\n");
743  }
744 
745  /* Delete key files */
746  ret = kmr_map(kvs_file, NULL, NULL, kmr_noopt, delete_file);
747  if (ret != MPI_SUCCESS) {
748  kmrrun_abort(rank, "deleting file failed.\n");
749  }
750 
751  /* Delete the temporal directory */
752  delete_tmpdir(mr, tmpdir);
753  } else {
754  kmr_free_kvs(kvs_map);
755  }
756 
757  kmr_free_context(mr);
758  kmr_fin();
759  MPI_Finalize();
760  return 0;
761 }
762 
763 /*
764 Copyright (C) 2012-2016 RIKEN AICS
765 This library is distributed WITHOUT ANY WARRANTY. This library can be
766 redistributed and/or modified under the terms of the BSD 2-Clause License.
767 */
Key-Value Stream (abstract).
Definition: kmr.h:587
#define kmr_reduce(KVI, KVO, ARG, OPT, R)
Reduces key-value pairs.
Definition: kmr.h:88
Options to Mapping, Shuffling, and Reduction.
Definition: kmr.h:613
#define ARGSTRLEN
Buffer string size of arguments to mapper and reducer programs.
Definition: kmrshell.c:31
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
Definition: kmrbase.c:751
#define ARGSIZ
Maximum number of arguments to mapper and reducer programs.
Definition: kmrshell.c:28
int kmr_map_once(KMR_KVS *kvo, void *arg, struct kmr_option opt, _Bool rank_zero_only, kmr_mapfn_t m)
Maps once.
Definition: kmrbase.c:1402
#define kmr_create_kvs(MR, KF, VF)
Makes a new key-value stream (of type KMR_KVS) with the specified field datatypes.
Definition: kmr.h:71
int kmr_shuffle(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Shuffles key-value pairs to the appropriate destination ranks.
Definition: kmrbase.c:2036
KMR Context.
Definition: kmr.h:222
int kmr_free_kvs(KMR_KVS *kvs)
Releases a key-value stream (type KMR_KVS).
Definition: kmrbase.c:621
#define kmr_map(KVI, KVO, ARG, OPT, M)
Maps simply.
Definition: kmr.h:82
Handy Copy of a Key-Value Field.
Definition: kmr.h:358
int kmr_fin(void)
Clears the environment.
Definition: kmrbase.c:124
#define kmr_init()
Sets up the environment.
Definition: kmr.h:747
int kmr_free_context(KMR *mr)
Releases a context created with kmr_create_context().
Definition: kmrbase.c:326
KMR Interface.
static void parse_args(char *, char *[])
Parses command parameters given for mapper and reducer arguments.
Definition: kmrshell.c:257
#define LINELEN
Maximum length of a line of data.
Definition: kmrshell.c:25
int kmr_map_processes(_Bool nonmpi, KMR_KVS *kvi, KMR_KVS *kvo, void *arg, MPI_Info info, struct kmr_spawn_option opt, kmr_mapfn_t mapfn)
Maps on processes started by MPI_Comm_spawn() to run independent processes.
Definition: kmrmapms.c:1965
KMR * kmr_create_context(const MPI_Comm comm, const MPI_Info conf, const char *name)
Makes a new KMR context (a context has type KMR).
Definition: kmrbase.c:147