KMR
kmrshell.c
Go to the documentation of this file.
1 /* kmrshell.c */
2 /* Copyright (C) 2012-2016 RIKEN AICS */
3 
4 /** \file kmrshell.c KMR-Shell for Streaming. It forks processes of a
5  mapper, a shuffler, and a reducer, then, it reads a number of
6  specified input files and passes their data to a mapper via a
7  pipe. A mapper, a shuffler, and a reducer are shell
8  executables. */
9 
10 #include <stdio.h>
11 #include <stdlib.h>
12 #include <stddef.h>
13 #include <string.h>
14 #include <unistd.h>
15 #include <fcntl.h>
16 #include <signal.h>
17 #include <sys/types.h>
18 #include <sys/stat.h>
19 #include <sys/wait.h>
20 #include <sys/param.h>
21 #include <dirent.h>
22 #include <assert.h>
23 
24 /** Maximum length of a line of data. */
25 #define LINELEN 32767
26 
27 /** Maximum number of arguments to mapper and reducer programs. */
28 #define ARGSIZ 8
29 
30 /** Buffer string size of arguments to mapper and reducer programs. */
31 #define ARGSTRLEN (8 * 1024)
32 
33 /** Parameter for pipeops(). */
34 enum {
35  PIPE_IN = 1, /**< Attach stdin to pipe. */
36  PIPE_OUT = 2, /**< Attach stdout to pipe. */
37 };
38 
39 static char *shuffler = "kmrshuffler";
40 
41 static int pipeops(int *, int);
42 static int execute(char *, char *const [], pid_t *);
43 static void reapchild(int);
44 static void putfilecontents(char *);
45 static int filereader(char *);
46 static void parse_args(char *, char *[]);
47 
48 /** Handles SIGCHLD signal. */
49 
50 static void
51 reapchild(int exitstat)
52 {
53  int status;
54  //fprintf(stderr, "reapchild\n");
55  waitpid(-1, &status, 0);
56 }
57 
58 /** Sets up pipe states. It attaches pipes to stdin or stdout. */
59 
60 static int
61 pipeops(int *pipefd, int direc)
62 {
63  int ret;
64  if (direc == PIPE_IN) {
65  if ((ret = close(pipefd[1])) < 0) {
66  perror("close pipe");
67  return ret;
68  }
69  if ((ret = dup2(pipefd[0], STDIN_FILENO)) < 0) {
70  perror("dup2 pipe");
71  return ret;
72  }
73  if ((ret = close(pipefd[0])) < 0) {
74  perror("close pipe");
75  return ret;
76  }
77  }
78 
79  if (direc == PIPE_OUT) {
80  if ((ret = close(pipefd[0])) < 0) {
81  perror("close pipe");
82  return ret;
83  }
84  if ((ret = dup2(pipefd[1], STDOUT_FILENO)) < 0) {
85  perror("dup2 pipe");
86  return ret;
87  }
88  if ((ret = close(pipefd[1])) < 0) {
89  perror("close pipe");
90  return ret;
91  }
92  }
93  return 0;
94 }
95 
96 /** Forks and execs a process.
97  \param prog program path.
98  \param args argments to program.
99  \param cldpid pid of child (out). */
100 
101 static int
102 execute(char *prog, char *const args[], pid_t *cldpid)
103 {
104  pid_t pid;
105  int ret, pipefd[2];
106  struct sigaction schld, spipe;
107 
108  memset(&schld, 0, sizeof(struct sigaction));
109  schld.sa_handler = reapchild;
110  schld.sa_flags |= SA_RESTART;
111  ret = sigaction(SIGCHLD, &schld, NULL);
112  assert(ret >= 0);
113 
114  memset(&spipe, 0, sizeof(struct sigaction));
115  spipe.sa_handler = SIG_IGN;
116  ret = sigaction(SIGPIPE, &spipe, NULL);
117  assert(ret >= 0);
118 
119  if ((ret = pipe(pipefd)) < 0) {
120  perror("pipe");
121  return ret;
122  }
123 
124  pid = fork();
125 
126  if (pid < 0) {
127  perror("fork");
128  return -1;
129  }
130 
131  if (pid > 0) {
132  /* parent */
133  if ((ret = pipeops(pipefd, PIPE_OUT)) < 0) {
134  return -1;
135  }
136  if (cldpid != NULL) {
137  *cldpid = pid;
138  }
139  }
140 
141  if (pid == 0) {
142  /* child */
143  if ((ret = pipeops(pipefd, PIPE_IN)) < 0) {
144  return ret;
145  }
146  if ((ret = execv(prog, args)) < 0) {
147  return ret;
148  }
149 
150  }
151  return 0;
152 }
153 
154 /* Puts contents of a file to stdout. It reads a line from a file and
155  put it.
156  \param path to file. */
157 
158 static void
159 putfilecontents(char *path)
160 {
161  FILE *fin;
162  char line[LINELEN];
163 
164  fin = fopen(path, "r");
165  if (fin == NULL) {
166  perror("open");
167  return;
168  }
169 
170  while ((fgets(line, sizeof(line), fin)) != NULL) {
171  if (fputs(line, stdout) == EOF) {
172  break;
173  }
174  }
175 
176  fclose(fin);
177  return;
178 }
179 
180 /** Reads files (file-reader). It reads possibly multiple files and
181  writes their contents to stdout. If "path" is a directory, it
182  enumerates the files under the directory and reads each file.
183  \param path path to file (or directory). */
184 
185 static int
186 filereader(char *path)
187 {
188  struct stat status;
189 
190  /* file check */
191  if (stat(path, &status) < 0) {
192  fprintf(stderr, "file %s error\n", path);
193  return -1;
194  }
195 
196  if (!S_ISDIR(status.st_mode) && !S_ISREG(status.st_mode)) {
197  fprintf(stderr, "file %s is not regular or directory\n", path);
198  return -1;
199  }
200 
201  if (S_ISDIR(status.st_mode)) {
202  /* directory */
203  /*char b[MAXGETDENTS_SIZE];*/
204  size_t direntsz;
205  long nmax = pathconf(path, _PC_NAME_MAX);
206  if (nmax == -1) {
207  direntsz = (64 * 1024);
208  } else {
209  direntsz = (offsetof(struct dirent, d_name) + (size_t)nmax + 1);
210  }
211  DIR *d;
212  struct dirent *dentp;
213  char b[direntsz];
214 
215  d = opendir(path);
216  if (d == NULL) {
217  perror("opendir");
218  return -1;
219  }
220  while (readdir_r(d, (void *)b, &dentp) >= 0) {
221  struct stat substat;
222  char fullpath[MAXPATHLEN];
223  int ret;
224  if (dentp == NULL) {
225  break;
226  }
227 
228  ret = snprintf(fullpath, sizeof(fullpath), "%s/%s", path, dentp->d_name);
229  if (ret <= 0) {
230  perror("snprintf");
231  continue;
232  }
233 
234  if (stat(fullpath, &substat) < 0) {
235  continue;
236  }
237 
238  if (S_ISREG(substat.st_mode)) {
239  putfilecontents(fullpath);
240  }
241  }
242  closedir(d);
243  } else {
244  putfilecontents(path);
245  }
246  return 0;
247 }
248 
249 /** Parses command parameters given for mapper and reducer arguments.
250  It scans an argument string like "mapper arg0 arg1" for the -m and
251  -r options, and generates an argv array {"mapper", "arg0", "arg1",
252  0}. The separator is a whitespace.
253  \param argstr string given for -m or -r options.
254  \param argary array to be filled by argument strings. */
255 
256 static void
257 parse_args(char *argstr, char *argary[])
258 {
259  char *cp, *np;
260  char **ap;
261 
262  ap = argary;
263  cp = argstr;
264  while (1) {
265  *ap = cp;
266  if ((np = strchr((const char*)cp, ' ')) != NULL) {
267  *np++ = '\0';
268  }
269  if (++ap >= &argary[ARGSIZ-1]) {
270  **ap = '\0';
271  break;
272  } else {
273  if (np == NULL) {
274  **ap = '\0';
275  break;
276  }
277  }
278  while (*np == ' ') {
279  np++;
280  }
281  cp = np;
282  }
283 }
284 
285 /** Starts map-reduce shell processes (for "streaming"). It
286  forks and execs a mapper, a shuffler, and a reducer, which are
287  connected via pipes.
288  \arg \c -m \c mapper program.
289  \arg \c -r \c reducer program.
290  \arg \c input file or directory. */
291 
292 int
293 main(int argc, char *argv[])
294 {
295  int ret, opt;
296  char *mapper = NULL, *reducer = NULL, *infile = NULL;
297  char *margv[ARGSIZ], *rargv[ARGSIZ], *sargv[2];
298  pid_t mapper_pid, shuffler_pid, reducer_pid;
299  char margbuf[ARGSTRLEN];
300  char rargbuf[ARGSTRLEN];
301 
302  while ((opt = getopt(argc, argv, "m:r:")) != -1) {
303  switch (opt) {
304  size_t asz;
305  case 'm':
306  asz = (strlen(optarg) + 1);
307  if (asz >= ARGSTRLEN) {
308  fprintf(stderr, "Argument too long for mapper (%s)\n",
309  optarg);
310  exit(-1);
311  }
312  memcpy(margbuf, optarg, asz);
313  parse_args(margbuf, &margv[0]);
314  mapper = margv[0];
315  break;
316  case 'r':
317  asz = (strlen(optarg) + 1);
318  if (asz >= ARGSTRLEN) {
319  fprintf(stderr, "Argument too long for reducer (%s)\n",
320  optarg);
321  exit(-1);
322  }
323  memcpy(rargbuf, optarg, asz);
324  parse_args(rargbuf, &rargv[0]);
325  reducer = rargv[0];
326  break;
327  default:
328  fprintf(stderr, "Usage %s -m mapper [-r reducer] inputfile\n", argv[0]);
329  exit(-1);
330  }
331  }
332 
333  if ((argc - optind) != 1) {
334  fprintf(stderr, "Usage %s -m mapper [-r reducer] inputfile\n", argv[0]);
335  exit(EXIT_FAILURE);
336  } else {
337  infile = argv[optind];
338  optind++;
339  }
340 
341  if (mapper == NULL) {
342  fprintf(stderr, "Usage %s -m mapper [-r reducer] inputfile\n", argv[0]);
343  exit(EXIT_FAILURE);
344  }
345 
346  if (reducer != NULL) {
347  /* Start reducer process. */
348  ret = execute(reducer, rargv, &reducer_pid);
349  if (ret < 0) {
350  fprintf(stderr, "execute %s failed\n", reducer);
351  exit(EXIT_FAILURE);
352  }
353 
354  /* Start shuffler process. */
355  sargv[0] = shuffler;
356  sargv[1] = NULL;
357  ret = execute(shuffler, sargv, &shuffler_pid);
358  if (ret < 0) {
359  fprintf(stderr, "execute %s failed\n", shuffler);
360  exit(EXIT_FAILURE);
361  }
362  }
363 
364  /* Start mapper process. */
365  ret = execute(mapper, margv, &mapper_pid);
366  if (ret < 0) {
367  fprintf(stderr, "execute %s failed\n", mapper);
368  exit(EXIT_FAILURE);
369  }
370 
371  /* Call filereader(). */
372  ret = filereader(infile);
373  if (ret < 0) {
374  fprintf(stderr, "filereader failed\n");
375  }
376  fclose(stdout);
377 
378  if (reducer != NULL) {
379  /* Wait for termination of shuffler. */
380  waitpid(reducer_pid, NULL, 0);
381  } else {
382  /* Wait for termination of mapper. */
383  waitpid(mapper_pid, NULL, 0);
384  }
385 
386  exit(EXIT_SUCCESS);
387 }
388 
389 /*
390 Copyright (C) 2012-2016 RIKEN AICS
391 This library is distributed WITHOUT ANY WARRANTY. This library can be
392 redistributed and/or modified under the terms of the BSD 2-Clause License.
393 */
static int execute(char *, char *const [], pid_t *)
Forks and execs a process.
Definition: kmrshell.c:102
#define ARGSTRLEN
Buffer string size of arguments to mapper and reducer programs.
Definition: kmrshell.c:31
static int pipeops(int *, int)
Sets up pipe states.
Definition: kmrshell.c:61
#define ARGSIZ
Maximum number of arguments to mapper and reducer programs.
Definition: kmrshell.c:28
static void reapchild(int)
Handles SIGCHLD signal.
Definition: kmrshell.c:51
static int filereader(char *)
Reads files (file-reader).
Definition: kmrshell.c:186
Attach stdin to pipe.
Definition: kmrshell.c:35
int main(int argc, char *argv[])
Starts map-reduce shell processes (for "streaming").
Definition: kmrshell.c:293
Attach stdout to pipe.
Definition: kmrshell.c:36
static void parse_args(char *, char *[])
Parses command parameters given for mapper and reducer arguments.
Definition: kmrshell.c:257
#define LINELEN
Maximum length of a line of data.
Definition: kmrshell.c:25