67 #include <sys/types.h> 69 #include <sys/param.h> 82 #define ARGSTRLEN (8 * 1024) 88 #define DEFAULT_PROCS 1 91 #define TMPDIR_PREFIX "./KMRRUN_TMP" 96 static void kmrrun_abort(
int,
const char *, ...);
97 static int add_command_kv(
KMR_KVS *,
int,
char **,
char *,
int);
98 static int generate_mapcmd_kvs(
const struct kmr_kv_box,
100 static int run_kv_generator(
const struct kmr_kv_box,
102 static int write_kvs(
const struct kmr_kv_box[],
const long,
104 static int generate_redcmd_kvs(
const struct kmr_kv_box,
108 static void create_tmpdir(
KMR *,
char *,
size_t);
109 static void delete_tmpdir(
KMR *,
char *);
124 kmrrun_abort(
int rank,
const char *format, ...)
128 va_start(arg, format);
129 vfprintf(stderr, format, arg);
132 MPI_Abort(MPI_COMM_WORLD, 1);
140 add_command_kv(
KMR_KVS *kvo,
int id,
char **cmd,
char *infile,
int n_procs)
143 char *cp, *np, *value;
147 snprintf(maxprocs, 31,
"maxprocs=%d", n_procs);
150 for (cmdlen = 0, i = 0; i <
ARGSIZ; i++) {
151 if (cmd[i] == NULL) {
154 cmdlen += (int)strlen(cmd[i]) + 1;
156 vlen = (int)strlen(maxprocs) + 1 + cmdlen + (int)strlen(infile) + 1;
157 value = (
char *)malloc((
size_t)vlen *
sizeof(char));
158 memcpy(value, maxprocs, strlen(maxprocs));
159 cp = value + strlen(maxprocs);
161 for (i = 0; i <
ARGSIZ; i++) {
162 if (cmd[i] == NULL) {
165 int len = (int)strlen(cmd[i]);
166 memcpy(cp, cmd[i], (
size_t)len);
172 memcpy(cp, infile, strlen(infile));
173 *(cp + strlen(infile)) =
'\0';
178 if ((np = strchr((
const char*)cp,
' ')) != NULL) {
186 struct kmr_kv_box nkv = { .klen =
sizeof(long),
187 .vlen = vlen * (
int)
sizeof(char),
189 .v.p = (
void *)value };
200 generate_mapcmd_kvs(
const struct kmr_kv_box kv,
205 char *path = info->infile;
208 if (stat(path, &status) < 0) {
209 fprintf(stderr,
"File[%s] error\n", path);
212 if (!S_ISDIR(status.st_mode) && !S_ISREG(status.st_mode)) {
213 fprintf(stderr,
"File[%s] is not regular file or directory\n", path);
217 if (S_ISDIR(status.st_mode)) {
219 long nmax = pathconf(path, _PC_NAME_MAX);
221 direntsz = (64 * 1024);
223 direntsz = (offsetof(
struct dirent, d_name) + (size_t)nmax + 1);
226 struct dirent *dentp;
235 while (readdir_r(d, (
void *)b, &dentp) >= 0) {
237 char fullpath[MAXPATHLEN];
242 ret = snprintf(fullpath,
sizeof(fullpath),
"%s/%s",
243 path, dentp->d_name);
249 if (stat(fullpath, &substat) < 0) {
252 if (S_ISREG(substat.st_mode)) {
253 ret = add_command_kv(kvo,
id, info->cmd_args, fullpath,
255 if (ret != MPI_SUCCESS) {
264 ret = add_command_kv(kvo, 0, info->cmd_args, path, info->num_procs);
280 if (info->cmd_args[0] != NULL) {
285 perror(
"pipe for kv generator");
291 perror(
"fork kv generator");
293 }
else if (pid == 0) {
295 ret = close(pipefd[0]);
297 perror(
"pipe close kv generator");
300 ret = dup2(pipefd[1], STDOUT_FILENO);
302 perror(
"dup2 pipe kv generator");
305 ret = close(pipefd[1]);
307 perror(
"pipe close kv generator");
312 char *cp, *infile = NULL;
313 for (cp = (
char *)kv.v.p; cp < kv.v.p + kv.vlen - 1; cp++) {
319 char *cmd_args[ARGSIZ+1] = { NULL };
321 for (i = 0; i <=
ARGSIZ; i++) {
322 if (info->cmd_args[i] != NULL) {
323 cmd_args[i] = info->cmd_args[i];
325 cmd_args[i] = infile;
330 ret = execv(cmd_args[0], cmd_args);
332 perror(
"execv kv generator");
337 ret = close(pipefd[1]);
339 perror(
"pipe close kv generator");
346 FILE* chld_out = fdopen(pipefd[0],
"r");
347 while (fgets(line,
sizeof(line), chld_out) != NULL) {
348 char *cp = strchr(line,
'\n');
355 cp = strchr(line,
' ');
363 char *value = (cp + 1);
365 nkv.klen = (int)strlen(key) + 1;
366 nkv.vlen = (int)strlen(value) + 1;
370 if (ret != MPI_SUCCESS) {
375 fprintf(stderr, (
"warning: Line too long or " 376 "missing last newline.\n"));
379 fprintf(stderr, (
"warning: Some lines have " 380 "no key-value pairs (ignored).\n"));
384 ret = close(pipefd[0]);
386 perror(
"pipe close kv generator");
399 write_kvs(
const struct kmr_kv_box kv[],
const long n,
404 char filename[PATHLEN];
406 snprintf(filename, PATHLEN,
"%s/%d/%s",
407 (
char *)p, kvs->c.mr->rank, kv[0].k.p);
408 if ((fp = fopen(filename,
"w")) == NULL) {
409 perror(
"open file with write mode");
412 for (
long i = 0; i < n; i++) {
413 fprintf(fp,
"%s %s\n", kv[i].k.p, kv[i].v.p);
419 nkv.klen = kv[0].klen;
421 nkv.vlen = (int)strlen(filename) + 1;
424 if (ret != MPI_SUCCESS) {
435 generate_redcmd_kvs(
const struct kmr_kv_box kv,
441 ret = add_command_kv(kvo, (
int)i_, info->cmd_args, (
char *)kv.v.p,
453 char *file_name = (
char*)kv.v.p;
454 int ret = access(file_name, F_OK);
471 if (!(argstr[0] ==
'.' || argstr[0] ==
'/')) {
473 int len = (int)strlen(argstr) + 1;
475 fprintf(stderr,
"command line is too long.\n");
476 MPI_Abort(MPI_COMM_WORLD, 1);
478 fprintf(stderr,
"The command is assumed to be located in " 479 "the current directory.\n");
480 for (
int i = len; i >= 0; i--) {
481 argstr[i+2] = argstr[i];
493 if ((np = strchr((
const char*)cp,
' ')) != NULL) {
496 if (++ap >= &argary[ARGSIZ-1]) {
514 create_tmpdir(
KMR *mr,
char *tmpdir,
size_t tmpdir_len)
518 int cur_time = (int)time(NULL);
519 snprintf(tmpdir, tmpdir_len, TMPDIR_PREFIX
"%d", cur_time);
520 int ret = mkdir(tmpdir, 0777);
522 char *errmsg = strerror(errno);
523 kmrrun_abort(0,
"Error on creating the temporal directory: %s\n",
527 int ret = MPI_Bcast(tmpdir, (
int)tmpdir_len, MPI_CHAR, 0, mr->comm);
528 if (ret != MPI_SUCCESS) {
529 kmrrun_abort(mr->rank,
"MPI_Bcast failed.\n");
532 char rankdir[PATHLEN];
533 snprintf(rankdir, PATHLEN,
"%s/%d", tmpdir, mr->rank);
534 ret = mkdir(rankdir, 0777);
536 char *errmsg = strerror(errno);
537 kmrrun_abort(mr->rank,
538 "Error on creating the rank local temporal directory: " 545 delete_tmpdir(
KMR *mr,
char *tmpdir)
548 char rankdir[PATHLEN];
549 snprintf(rankdir, PATHLEN,
"%s/%d", tmpdir, mr->rank);
550 int ret = rmdir(rankdir);
552 char *errmsg = strerror(errno);
554 "Rank[%05d] Failed to delete rank local temporal directory: " 555 "%s\n", mr->rank, errmsg);
557 MPI_Barrier(mr->comm);
562 char *errmsg = strerror(errno);
564 "Failed to delete the temporal directory: %s\n", errmsg);
571 main(
int argc,
char *argv[])
574 char *mapper = NULL, *reducer = NULL, *infile = NULL;
578 int map_procs = DEFAULT_PROCS, red_procs = DEFAULT_PROCS;
582 "Usage %s [-n m_num[:r_num]] -m mapper [-k kvgenerator]\n" 583 " [-r reducer] [--ckpt]\n" 586 MPI_Init(&argc, &argv);
587 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
590 int option_index = 0;
592 static struct option long_options[] = {
593 {
"ckpt", no_argument, 0, 0},
597 opt = getopt_long(argc, argv,
"m:r:k:n:",
598 long_options, &option_index);
606 if (strcmp(
"ckpt", long_options[option_index].name) == 0) {
611 asz = (strlen(optarg) + 1);
613 kmrrun_abort(rank,
"Argument too long for mapper (%s)\n",
616 memcpy(margbuf, optarg, asz);
621 asz = (strlen(optarg) + 1);
623 kmrrun_abort(rank,
"Argument too long for reducer (%s)\n",
626 memcpy(rargbuf, optarg, asz);
631 asz = (strlen(optarg) + 1);
633 memcpy(para_arg, optarg, asz);
634 cp = strchr(para_arg,
':');
637 map_procs = (int)strtol(para_arg, NULL, 10);
638 red_procs = map_procs;
643 map_procs = (int)strtol(para_arg, NULL, 10);
644 red_procs = (int)strtol(np, NULL, 10);
648 asz = (strlen(optarg) + 1);
650 kmrrun_abort(rank,
"Argument too long for key-value " 651 "generator (%s)\n", optarg);
653 memcpy(kargbuf, optarg, asz);
657 kmrrun_abort(rank, usage_msg, argv[0]);
661 if ((argc - optind) != 1) {
662 kmrrun_abort(rank, usage_msg, argv[0]);
664 infile = argv[optind];
668 if (mapper == NULL) {
669 kmrrun_abort(rank, usage_msg, argv[0]);
674 MPI_Info_create(&info);
675 if (ckpt_enable == 1) {
676 ret = MPI_Info_set(info,
"ckpt_enable",
"1");
679 MPI_Info_free(&info);
680 mr->spawn_gap_msec[0] = 500;
681 mr->spawn_gap_msec[1] = 1000;
685 _Bool nonmpi = (map_procs == 1) ? 1 : 0;
686 struct cmdinfo mapinfo = { margv, infile, map_procs };
688 ret =
kmr_map_once(kvs_commands, &mapinfo, kmr_noopt, 1,
689 generate_mapcmd_kvs);
690 if (ret != MPI_SUCCESS) {
691 kmrrun_abort(rank,
"kmr_map_once failed.\n");
694 ret =
kmr_shuffle(kvs_commands, kvs_commands2, kmr_noopt);
695 if (ret != MPI_SUCCESS) {
696 kmrrun_abort(rank,
"kmr_shuffle failed.\n");
701 struct cmdinfo gkvinfo = { kargv, NULL, 1 };
703 MPI_INFO_NULL, kmr_snoopt, run_kv_generator);
704 if (ret != MPI_SUCCESS) {
705 kmrrun_abort(rank,
"executing mapper failed.\n");
708 if (reducer != NULL) {
712 if (ret != MPI_SUCCESS) {
713 kmrrun_abort(rank,
"shuffling failed.\n");
717 char tmpdir[PATHLEN];
718 create_tmpdir(mr, tmpdir, PATHLEN);
722 ret =
kmr_reduce(kvs_red, kvs_file, tmpdir, kmr_noopt, write_kvs);
723 if (ret != MPI_SUCCESS) {
724 kmrrun_abort(rank,
"writing key-values to files failed.\n");
728 nonmpi = (red_procs == 1) ? 1 : 0;
729 struct cmdinfo redinfo = { rargv, NULL, red_procs };
731 struct kmr_option kmr_inspect = { .inspect = 1 };
732 ret =
kmr_map(kvs_file, kvs_commands, &redinfo, kmr_inspect,
733 generate_redcmd_kvs);
734 if (ret != MPI_SUCCESS) {
735 kmrrun_abort(rank,
"kmr_map failed.\n");
741 if (ret != MPI_SUCCESS) {
742 kmrrun_abort(rank,
"executing reducer failed.\n");
746 ret =
kmr_map(kvs_file, NULL, NULL, kmr_noopt, delete_file);
747 if (ret != MPI_SUCCESS) {
748 kmrrun_abort(rank,
"deleting file failed.\n");
752 delete_tmpdir(mr, tmpdir);
Key-Value Stream (abstract).
#define kmr_reduce(KVI, KVO, ARG, OPT, R)
Reduces key-value pairs.
Options to Mapping, Shuffling, and Reduction.
#define ARGSTRLEN
Buffer string size of arguments to mapper and reducer programs.
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
#define ARGSIZ
Maximum number of arguments to mapper and reducer programs.
int kmr_map_once(KMR_KVS *kvo, void *arg, struct kmr_option opt, _Bool rank_zero_only, kmr_mapfn_t m)
Maps once.
#define kmr_create_kvs(MR, KF, VF)
Makes a new key-value stream (of type KMR_KVS) with the specified field datatypes.
int kmr_shuffle(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Shuffles key-value pairs to the appropriate destination ranks.
int kmr_free_kvs(KMR_KVS *kvs)
Releases a key-value stream (type KMR_KVS).
#define kmr_map(KVI, KVO, ARG, OPT, M)
Maps simply.
Handy Copy of a Key-Value Field.
int kmr_fin(void)
Clears the environment.
#define kmr_init()
Sets up the environment.
int kmr_free_context(KMR *mr)
Releases a context created with kmr_create_context().
static void parse_args(char *, char *[])
Parses command parameters given for mapper and reducer arguments.
#define LINELEN
Maximum length of a line of data.
int kmr_map_processes(_Bool nonmpi, KMR_KVS *kvi, KMR_KVS *kvo, void *arg, MPI_Info info, struct kmr_spawn_option opt, kmr_mapfn_t mapfn)
Maps on processes started by MPI_Comm_spawn() to run independent processes.
KMR * kmr_create_context(const MPI_Comm comm, const MPI_Info conf, const char *name)
Makes a new KMR context (a context has type KMR).