17 #include <sys/types.h> 20 #include <sys/param.h> 31 #define ARGSTRLEN (8 * 1024) 39 static char *shuffler =
"kmrshuffler";
42 static int execute(
char *,
char *
const [], pid_t *);
44 static void putfilecontents(
char *);
55 waitpid(-1, &status, 0);
65 if ((ret = close(pipefd[1])) < 0) {
69 if ((ret = dup2(pipefd[0], STDIN_FILENO)) < 0) {
73 if ((ret = close(pipefd[0])) < 0) {
80 if ((ret = close(pipefd[0])) < 0) {
84 if ((ret = dup2(pipefd[1], STDOUT_FILENO)) < 0) {
88 if ((ret = close(pipefd[1])) < 0) {
102 execute(
char *prog,
char *
const args[], pid_t *cldpid)
106 struct sigaction schld, spipe;
108 memset(&schld, 0,
sizeof(
struct sigaction));
110 schld.sa_flags |= SA_RESTART;
111 ret = sigaction(SIGCHLD, &schld, NULL);
114 memset(&spipe, 0,
sizeof(
struct sigaction));
115 spipe.sa_handler = SIG_IGN;
116 ret = sigaction(SIGPIPE, &spipe, NULL);
119 if ((ret = pipe(pipefd)) < 0) {
136 if (cldpid != NULL) {
146 if ((ret = execv(prog, args)) < 0) {
159 putfilecontents(
char *path)
164 fin = fopen(path,
"r");
170 while ((fgets(line,
sizeof(line), fin)) != NULL) {
171 if (fputs(line, stdout) == EOF) {
191 if (stat(path, &status) < 0) {
192 fprintf(stderr,
"file %s error\n", path);
196 if (!S_ISDIR(status.st_mode) && !S_ISREG(status.st_mode)) {
197 fprintf(stderr,
"file %s is not regular or directory\n", path);
201 if (S_ISDIR(status.st_mode)) {
205 long nmax = pathconf(path, _PC_NAME_MAX);
207 direntsz = (64 * 1024);
209 direntsz = (offsetof(
struct dirent, d_name) + (size_t)nmax + 1);
212 struct dirent *dentp;
220 while (readdir_r(d, (
void *)b, &dentp) >= 0) {
222 char fullpath[MAXPATHLEN];
228 ret = snprintf(fullpath,
sizeof(fullpath),
"%s/%s", path, dentp->d_name);
234 if (stat(fullpath, &substat) < 0) {
238 if (S_ISREG(substat.st_mode)) {
239 putfilecontents(fullpath);
244 putfilecontents(path);
266 if ((np = strchr((
const char*)cp,
' ')) != NULL) {
269 if (++ap >= &argary[
ARGSIZ-1]) {
296 char *mapper = NULL, *reducer = NULL, *infile = NULL;
298 pid_t mapper_pid, shuffler_pid, reducer_pid;
302 while ((opt = getopt(argc, argv,
"m:r:")) != -1) {
306 asz = (strlen(optarg) + 1);
308 fprintf(stderr,
"Argument too long for mapper (%s)\n",
312 memcpy(margbuf, optarg, asz);
317 asz = (strlen(optarg) + 1);
319 fprintf(stderr,
"Argument too long for reducer (%s)\n",
323 memcpy(rargbuf, optarg, asz);
328 fprintf(stderr,
"Usage %s -m mapper [-r reducer] inputfile\n", argv[0]);
333 if ((argc - optind) != 1) {
334 fprintf(stderr,
"Usage %s -m mapper [-r reducer] inputfile\n", argv[0]);
337 infile = argv[optind];
341 if (mapper == NULL) {
342 fprintf(stderr,
"Usage %s -m mapper [-r reducer] inputfile\n", argv[0]);
346 if (reducer != NULL) {
348 ret =
execute(reducer, rargv, &reducer_pid);
350 fprintf(stderr,
"execute %s failed\n", reducer);
357 ret =
execute(shuffler, sargv, &shuffler_pid);
359 fprintf(stderr,
"execute %s failed\n", shuffler);
365 ret =
execute(mapper, margv, &mapper_pid);
367 fprintf(stderr,
"execute %s failed\n", mapper);
374 fprintf(stderr,
"filereader failed\n");
378 if (reducer != NULL) {
380 waitpid(reducer_pid, NULL, 0);
383 waitpid(mapper_pid, NULL, 0);
static int execute(char *, char *const [], pid_t *)
Forks and execs a process.
#define ARGSTRLEN
Buffer string size of arguments to mapper and reducer programs.
static int pipeops(int *, int)
Sets up pipe states.
#define ARGSIZ
Maximum number of arguments to mapper and reducer programs.
static void reapchild(int)
Handles SIGCHLD signal.
static int filereader(char *)
Reads files (file-reader).
int main(int argc, char *argv[])
Starts map-reduce shell processes (for "streaming").
static void parse_args(char *, char *[])
Parses command parameters given for mapper and reducer arguments.
#define LINELEN
Maximum length of a line of data.