60 #include <sys/types.h> 62 #include <sys/param.h> 72 #define ARGSTRLEN (8 * 1024) 75 #define COMMANDLEN 1024 83 static void kmrsh_abort(
int,
const char *, ...);
84 static int add_command_kv(
KMR_KVS *,
int,
char **,
char *,
int);
85 static int generate_mapcmd_kvs(
const struct kmr_kv_box,
87 static int run_kv_generator(
const struct kmr_kv_box,
89 static int write_kvs(
const struct kmr_kv_box[],
const long,
91 static int generate_redcmd_kvs(
const struct kmr_kv_box,
111 kmrsh_abort(
int rank,
const char *format, ...)
115 va_start(arg, format);
116 vfprintf(stderr, format, arg);
119 MPI_Abort(MPI_COMM_WORLD, 1);
127 add_command_kv(
KMR_KVS *kvo,
int id,
char **cmd,
char *infile,
int n_procs)
130 char *cp, *np, *value;
134 snprintf(maxprocs, 31,
"maxprocs=%d", n_procs);
137 for (cmdlen = 0, i = 0; i <
ARGSIZ; i++) {
138 if (cmd[i] == NULL) {
141 cmdlen += (int)strlen(cmd[i]) + 1;
143 vlen = (int)strlen(maxprocs) + 1 + cmdlen + (int)strlen(infile) + 1;
144 value = (
char *)malloc((
size_t)vlen *
sizeof(char));
145 memcpy(value, maxprocs, strlen(maxprocs));
146 cp = value + strlen(maxprocs);
148 for (i = 0; i <
ARGSIZ; i++) {
149 if (cmd[i] == NULL) {
152 size_t len = strlen(cmd[i]);
153 memcpy(cp, cmd[i], len);
159 memcpy(cp, infile, strlen(infile));
160 *(cp + strlen(infile)) =
'\0';
165 if ((np = strchr((
const char*)cp,
' ')) != NULL) {
173 struct kmr_kv_box nkv = { .klen =
sizeof(long),
174 .vlen = (
size_t)vlen *
sizeof(char),
176 .v.p = (
void *)value };
187 generate_mapcmd_kvs(
const struct kmr_kv_box kv,
192 char *path = info->infile;
195 if (stat(path, &status) < 0) {
196 fprintf(stderr,
"File[%s] error\n", path);
199 if (!S_ISDIR(status.st_mode) && !S_ISREG(status.st_mode)) {
200 fprintf(stderr,
"File[%s] is not regular file or directory\n", path);
204 if (S_ISDIR(status.st_mode)) {
206 long nmax = pathconf(path, _PC_NAME_MAX);
208 direntsz = (64 * 1024);
210 direntsz = (offsetof(
struct dirent, d_name) + (size_t)nmax + 1);
213 struct dirent *dentp;
222 while (readdir_r(d, (
void *)b, &dentp) >= 0) {
224 char fullpath[MAXPATHLEN];
229 ret = snprintf(fullpath,
sizeof(fullpath),
"%s/%s",
230 path, dentp->d_name);
236 if (stat(fullpath, &substat) < 0) {
239 if (S_ISREG(substat.st_mode)) {
240 ret = add_command_kv(kvo,
id, info->cmd_args, fullpath,
242 if (ret != MPI_SUCCESS) {
251 ret = add_command_kv(kvo, 0, info->cmd_args, path, info->num_procs);
267 if (info->cmd_args[0] != NULL) {
272 perror(
"pipe for kv generator");
278 perror(
"fork kv generator");
280 }
else if (pid == 0) {
282 ret = close(pipefd[0]);
284 perror(
"pipe close kv generator");
287 ret = dup2(pipefd[1], STDOUT_FILENO);
289 perror(
"dup2 pipe kv generator");
292 ret = close(pipefd[1]);
294 perror(
"pipe close kv generator");
300 for (cp = (
char *)kv.v.p; cp < kv.v.p + kv.vlen - 1; cp++) {
306 char *cmd_args[
ARGSIZ+1] = { NULL };
308 for (i = 0; i <=
ARGSIZ; i++) {
309 if (info->cmd_args[i] != NULL) {
310 cmd_args[i] = info->cmd_args[i];
312 cmd_args[i] = infile;
317 ret = execv(cmd_args[0], cmd_args);
319 perror(
"execv kv generator");
324 ret = close(pipefd[1]);
326 perror(
"pipe close kv generator");
333 FILE* chld_out = fdopen(pipefd[0],
"r");
334 while (fgets(line,
sizeof(line), chld_out) != NULL) {
335 char *cp = strchr(line,
'\n');
342 cp = strchr(line,
' ');
350 char *value = (cp + 1);
352 nkv.klen = (int)strlen(key) + 1;
353 nkv.vlen = (int)strlen(value) + 1;
357 if (ret != MPI_SUCCESS) {
362 fprintf(stderr, (
"warning: Line too long or " 363 "missing last newline.\n"));
366 fprintf(stderr, (
"warning: Some lines have " 367 "no key-value pairs (ignored).\n"));
371 ret = close(pipefd[0]);
373 perror(
"pipe close kv generator");
386 write_kvs(
const struct kmr_kv_box kv[],
const long n,
392 if ((fp = fopen(kv[0].k.p,
"w")) == NULL) {
393 perror(
"open file with write mode");
396 for (
long i = 0; i < n; i++) {
397 fprintf(fp,
"%s %s\n", kv[i].k.p, kv[i].v.p);
403 nkv.klen = kv[0].klen;
405 nkv.vlen = kv[0].klen;
408 if (ret != MPI_SUCCESS) {
419 generate_redcmd_kvs(
const struct kmr_kv_box kv,
425 ret = add_command_kv(kvo, (
int)i_, info->cmd_args, (
char *)kv.k.p,
442 char *file_name = (
char*)kv.k.p;
443 int ret = access(file_name, F_OK);
467 if ((np = strchr((
const char*)cp,
' ')) != NULL) {
470 if (++ap >= &argary[
ARGSIZ-1]) {
485 if (argary[0][0] !=
'.' || argary[0][0] !=
'/') {
487 int len = (int)strlen(argary[0]) + 1;
488 if (len + 2 > COMMANDLEN) {
489 fprintf(stderr,
"command name is too long.\n");
490 MPI_Abort(MPI_COMM_WORLD, 1);
492 for (
int i = len + 1; i >= 0; i--) {
493 argary[0][i+2] = argary[0][i];
502 main(
int argc,
char *argv[])
505 char *mapper = NULL, *reducer = NULL, *infile = NULL;
509 int map_procs = DEFAULT_MPI, red_procs = DEFAULT_MPI;
512 "Usage %s -n m_num[:r_num] -m mapper [-k kvgenerator] [-r reducer] " 515 MPI_Init(&argc, &argv);
516 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
518 while ((opt = getopt(argc, argv,
"m:r:n:k:")) != -1) {
522 asz = (strlen(optarg) + 1);
524 kmrsh_abort(rank,
"Argument too long for mapper (%s)\n",
527 memcpy(margbuf, optarg, asz);
532 asz = (strlen(optarg) + 1);
534 kmrsh_abort(rank,
"Argument too long for reducer (%s)\n",
537 memcpy(rargbuf, optarg, asz);
542 asz = (strlen(optarg) + 1);
544 memcpy(para_arg, optarg, asz);
545 cp = strchr(para_arg,
':');
548 map_procs = (int)strtol(para_arg, NULL, 10);
549 red_procs = map_procs;
554 map_procs = (int)strtol(para_arg, NULL, 10);
555 red_procs = (int)strtol(np, NULL, 10);
559 asz = (strlen(optarg) + 1);
561 kmrsh_abort(rank,
"Argument too long for key-value " 562 "generator (%s)\n", optarg);
564 memcpy(kargbuf, optarg, asz);
568 kmrsh_abort(rank, usage_msg, argv[0]);
572 if ((argc - optind) != 1) {
573 kmrsh_abort(rank, usage_msg, argv[0]);
575 infile = argv[optind];
579 if (mapper == NULL) {
580 kmrsh_abort(rank, usage_msg, argv[0]);
588 struct cmdinfo mapinfo = { margv, infile, map_procs };
590 ret =
kmr_map_once(kvs_commands, &mapinfo, kmr_noopt, 1,
591 generate_mapcmd_kvs);
592 if (ret != MPI_SUCCESS) {
593 kmrsh_abort(rank,
"kmr_map_once failed.\n");
598 struct cmdinfo gkvinfo = { kargv, NULL, 0 };
600 kmr_snoopt, run_kv_generator);
601 if (ret != MPI_SUCCESS) {
602 kmrsh_abort(rank,
"executing mapper failed.\n");
605 if (reducer != NULL) {
609 if (ret != MPI_SUCCESS) {
610 kmrsh_abort(rank,
"shuffling failed.\n");
615 ret =
kmr_reduce(kvs_red, kvs_file, NULL, kmr_noopt, write_kvs);
616 if (ret != MPI_SUCCESS) {
617 kmrsh_abort(rank,
"writing key-values to files failed.\n");
621 struct cmdinfo redinfo = { rargv, NULL, red_procs };
623 struct kmr_option kmr_inspect = { .inspect = 1 };
624 ret =
kmr_map(kvs_file, kvs_commands, &redinfo, kmr_inspect,
625 generate_redcmd_kvs);
626 if (ret != MPI_SUCCESS) {
627 kmrsh_abort(rank,
"kmr_map failed.\n");
632 kmr_snoopt, sleep_wait);
633 if (ret != MPI_SUCCESS) {
634 kmrsh_abort(rank,
"executing reducer failed.\n");
638 ret =
kmr_map(kvs_file, NULL, NULL, kmr_noopt, delete_file);
639 if (ret != MPI_SUCCESS) {
640 kmrsh_abort(rank,
"kmr_map failed.\n");
Key-Value Stream (abstract).
#define kmr_reduce(KVI, KVO, ARG, OPT, R)
Reduces key-value pairs.
Options to Mapping, Shuffling, and Reduction.
#define ARGSTRLEN
Buffer string size of arguments to mapper and reducer programs.
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
#define ARGSIZ
Maximum number of arguments to mapper and reducer programs.
int kmr_map_once(KMR_KVS *kvo, void *arg, struct kmr_option opt, _Bool rank_zero_only, kmr_mapfn_t m)
Maps once.
#define kmr_create_kvs(MR, KF, VF)
Makes a new key-value stream (of type KMR_KVS) with the specified field datatypes.
int kmr_shuffle(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Shuffles key-value pairs to the appropriate destination ranks.
int kmr_free_kvs(KMR_KVS *kvs)
Releases a key-value stream (type KMR_KVS).
#define kmr_map(KVI, KVO, ARG, OPT, M)
Maps simply.
Handy Copy of a Key-Value Field.
int kmr_fin(void)
Clears the environment.
#define kmr_init()
Sets up the environment.
int kmr_free_context(KMR *mr)
Releases a context created with kmr_create_context().
static void parse_args(char *, char *[])
Parses command parameters given for mapper and reducer arguments.
#define LINELEN
Maximum length of a line of data.
int kmr_map_processes(_Bool nonmpi, KMR_KVS *kvi, KMR_KVS *kvo, void *arg, MPI_Info info, struct kmr_spawn_option opt, kmr_mapfn_t mapfn)
Maps on processes started by MPI_Comm_spawn() to run independent processes.
KMR * kmr_create_context(const MPI_Comm comm, const MPI_Info conf, const char *name)
Makes a new KMR context (a context has type KMR).