KMR
kmrwatch0.c
Go to the documentation of this file.
1 /* kmrwatch0.c (2014-02-04) */
2 /* Copyright (C) 2012-2016 RIKEN AICS */
3 
4 /** \file kmrwatch0.c KMR Spawned Program Wrapper. It wraps a command
5  of a spawned process which runs independently and makes no
6  communication to the parent (the spawner), so that it lets the
7  parent notified about the end of the process. It has two modes
8  corresponding to kmr_map_processes() and
9  kmr_map_serial_processes(). (MODE 0): It fork-execs an
10  independent MPI program for kmr_map_processes(), and makes a
11  socket to the parent to notify the end of the process via socket
12  closure. It uses a socket because MPI communication to the parent
13  is unusable. USAGE: kmrwatch0 "mpi" ipaddr magic "--" command...
14  (MODE 1): It fork-execs a non-MPI program for
15  kmr_map_serial_processes(), by surrounding it with calls to
16  MPI_Init() and MPI_Finalize(), and sends a reply to the parent.
17  See manual pages for more information. USAGE: kmrwatch0 "seq" any
18  magic "--" command...\n To enable tracing, set 'T' as the last
19  character in the magic argument. */
20 
21 /* For "kmrwatch mpi" the following script suffices. NOTE: A file
22  descriptor number 20 is used in it because the use of 9 failed with
23  "MPICH2" (it only matters when the wrapper would use a fixed
24  number).
25  "#!/bin/bash"
26  "# KMRWATCH, notifies process end via socket closure."
27  "# Usage: kmrwatch.sh mpi hostport magic -- command..."
28  "if [ \"$#\" -lt 5 ]; then"
29  "\techo \"KMRWATCH.SH: bad number of arguments.\"; exit 1"
30  "fi"
31  "seqmpi=$1; hostport=$2; magic=$3; separator=$4"
32  "shift; shift; shift; shift"
33  "if [ \"$separator\" != \"--\" ]; then"
34  "\techo \"KMRWATCH.SH: bad argument separator.\"; exit 1"
35  "fi"
36  "exec 20<> \"/dev/tcp/$hostport\""
37  "echo -e \"$magic\" >&20"
38  "$*" */
39 
40 #include <mpi.h>
41 #include <stdio.h>
42 #include <stdlib.h>
43 #include <string.h>
44 #include <strings.h>
45 #include <sys/types.h>
46 #include <sys/wait.h>
47 #include <unistd.h>
48 #include <fcntl.h>
49 #include <netdb.h>
50 #include <sys/socket.h>
51 #include <netinet/in.h>
52 #include <arpa/inet.h>
53 #include <errno.h>
54 
55 #include "kmr.h"
56 
57 union SOCKADDR {
58  struct sockaddr sa;
59  struct sockaddr_in in4;
60  struct sockaddr_in6 in6;
61  struct sockaddr_storage ss;
62 };
63 
64 int
65 main(int argc, char **argv)
66 {
67  int cc;
68 
69  if (argc < 5) {
70  fprintf(stderr, (";;KMR kmrwatch0 error:"
71  " too few arguments: argc=%d\n"), argc);
72  exit(255);
73  }
74 
75  char *mode = argv[1];
76  char *hostport = argv[2];
77  char *magic = argv[3];
78  char *separator = argv[4];
79 
80  int version = -1;
81  _Bool tracing = 0;
82  _Bool quitwithoutfinalize = 0;
83  int lm = (int)strlen(magic);
84  for (int i = 0; i < lm; i++) {
85  if (magic[i] == 'V' && i < (lm - 1)) {
86  version = magic[i + 1] - '0';
87  }
88  if (magic[i] == 'T') {
89  tracing = 1;
90  }
91  if (magic[i] == 'X') {
92  quitwithoutfinalize = 1;
93  }
94  }
95 
96  if (version != 0) {
97  fprintf(stderr, (";;KMR kmrwatch0 error:"
98  " version mismatch: %s\n"), magic);
99  exit(255);
100  }
101 
102  if (!(strcmp(mode, "seq") == 0 || (strcmp(mode, "mpi") == 0))) {
103  fprintf(stderr, (";;KMR kmrwatch0 error:"
104  " bad mode (be seq or mpi): %s\n"), mode);
105  exit(255);
106  }
107  if (strcmp(separator, "--") != 0) {
108  fprintf(stderr, (";;KMR kmrwatch0 error:"
109  " bad separator, -- needed: %s\n"), separator);
110  exit(255);
111  }
112 
113  if (tracing) {
114  int pid = getpid();
115  fprintf(stderr, ";;KMR kmrwatch0: pid=%d\n", pid);
116  fflush(0);
117  }
118 
119  /* Non-MPI means the wrapper has to behave as an MPI process. */
120 
121  _Bool nonmpi = (mode[0] == 's');
122 
123  int fd = -1;
124 
125  int nprocs = -1;
126  int rank = -1;
127  MPI_Comm parent = MPI_COMM_NULL;
128 
129  if (nonmpi) {
130  if (tracing) {
131  fprintf(stderr, (";;KMR kmrwatch0: initializing MPI\n"));
132  fflush(0);
133  }
134 
135 #if 0
136  cc = MPI_Init(&argc, &argv);
137  assert(cc == MPI_SUCCESS);
138 #else
139  int lev;
140  cc = MPI_Init_thread(&argc, &argv, MPI_THREAD_SERIALIZED, &lev);
141  assert(cc == MPI_SUCCESS);
142 #endif
143  cc = MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
144  assert(cc == MPI_SUCCESS);
145  cc = MPI_Comm_rank(MPI_COMM_WORLD, &rank);
146  assert(cc == MPI_SUCCESS);
147  cc = MPI_Comm_get_parent(&parent);
148  assert(cc == MPI_SUCCESS);
149  if (parent == MPI_COMM_NULL) {
150  fprintf(stderr, ("KMR kmrwatch0 error:"
151  " MPI_Comm_get_parent() failed\n"));
152  exit(255);
153  }
154  }
155 
156 #if 0
157  if (tracing) {
158  fprintf(stderr, (";;KMR kmrwatch0:"
159  " starting a process\n"));
160  fflush(0);
161  }
162 #endif
163 
164  int pid = fork();
165  if (pid == -1) {
166  char *m = strerror(errno);
167  fprintf(stderr, ("KMR kmrwatch0 error:"
168  " fork() failed: %s\n"), m);
169  } else {
170  if (pid == 0) {
171  if (fd != -1) {
172  close(fd);
173  fd = -1;
174  }
175 
176  argv[argc] = 0;
177  cc = execvp(argv[5], &argv[5]);
178  if (cc == -1) {
179  char *m = strerror(errno);
180  fprintf(stderr, ("KMR kmrwatch0 error:"
181  " execvp(%s) failed: %s\n"),
182  argv[5], m);
183  } else {
184  fprintf(stderr, ("KMR kmrwatch0 error:"
185  " execvp() returned: cc=%d\n"),
186  cc);
187  }
188  exit(255);
189  }
190  }
191 
192  if (pid != -1 && !nonmpi) {
193  /* Connect to the spawner */
194  assert(pid != 0);
195  char host[NI_MAXHOST + 6];
196  int preferip = 0;
197 
198  if (tracing) {
199  fprintf(stderr, (";;KMR kmrwatch0:"
200  " connecting a socket to the spawner\n"));
201  fflush(0);
202  }
203 
204  int len = ((int)strlen(hostport) + 1);
205  if (len > (int)sizeof(host)) {
206  fprintf(stderr, ("KMR kmrwatch0 error:"
207  " bad host/port pair (too long): %s\n"),
208  hostport);
209  exit(255);
210  }
211  memcpy(host, hostport, (size_t)len);
212  char *port = 0;
213  int portno = 0;
214  {
215  char gomi[4];
216  char *s = rindex(host, '/');
217  if (s == 0) {
218  fprintf(stderr,
219  ("KMR kmrwatch0 error:"
220  " bad host/port pair (no slash): %s\n"), hostport);
221  exit(255);
222  }
223  *s = 0;
224  port = (s + 1);
225  cc = sscanf(port, "%d%c", &portno, (char *)&gomi);
226  if (cc != 1) {
227  fprintf(stderr,
228  ("KMR kmrwatch0 error:"
229  " bad host/port pair (bad port): %s\n"), hostport);
230  }
231  }
232 
233  struct addrinfo hints;
234  memset(&hints, 0, sizeof(hints));
235  hints.ai_flags = (AI_ADDRCONFIG);
236  hints.ai_socktype = SOCK_STREAM;
237  hints.ai_protocol = IPPROTO_TCP;
238  if (preferip == 4) {
239  hints.ai_family = AF_INET;
240  } else if (preferip == 6) {
241  hints.ai_family = AF_INET6;
242  } else {
243  hints.ai_family = AF_UNSPEC;
244  /*AI_V4MAPPED*/
245  }
246  struct addrinfo *ai;
247  cc = getaddrinfo(host, port, &hints, &ai);
248  if (cc != 0) {
249  char const *m = gai_strerror(cc);
250  fprintf(stderr, ("KMR kmrwatch0 error:"
251  "getaddrinfo(%s,%s) failed: %s.\n"),
252  host, port, m);
253  }
254 
255  struct {char *s; struct addrinfo *a; int e;} errs[10];
256  int addresstries = 0;
257  for (struct addrinfo *p = ai; p != 0; p = p->ai_next) {
258  union SOCKADDR *sa = (void *)p->ai_addr;
259  char *fm = 0;
260  if (p->ai_family == AF_INET) {
261  assert(ntohs(sa->in4.sin_port) == portno);
262  fm = "AF_INET";
263  } else if (p->ai_family == AF_INET6) {
264  assert(ntohs(sa->in6.sin6_port) == portno);
265  fm = "AF_INET6";
266  } else {
267  continue;
268  }
269  addresstries++;
270  if (addresstries >= (int)(sizeof(errs) / sizeof(*errs))) {
271  break;
272  }
273  errs[addresstries - 1].s = 0;
274 
275  fd = socket(p->ai_family, p->ai_socktype, p->ai_protocol);
276  if (fd < 0) {
277  errs[addresstries - 1].s = "socket";
278  errs[addresstries - 1].a = p;
279  errs[addresstries - 1].e = errno;
280  char *m = strerror(errno);
281  if (tracing) {
282  fprintf(stderr, ("KMR kmrwatch0 error:"
283  "socket(%s) failed: %s\n"),
284  fm, m);
285  fflush(0);
286  }
287  fd = -1;
288  continue;
289  }
290  int one = 1;
291  cc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR,
292  &one, sizeof(one));
293  if (cc != 0) {
294  char *m = strerror(errno);
295  fprintf(stderr, ("KMR kmrwatch0 error:"
296  "setsockopt(SO_REUSEADDR) failed"
297  " (ignored): %s\n"), m);
298  }
299  cc = connect(fd, p->ai_addr, p->ai_addrlen);
300  if (cc < 0) {
301  errs[addresstries - 1].s = "connect";
302  errs[addresstries - 1].a = p;
303  errs[addresstries - 1].e = errno;
304  /*(errno == ETIMEDOUT || errno == ECONNREFUSED)*/
305  if (tracing) {
306  char *m = strerror(errno);
307  fprintf(stderr,
308  (";;KMR kmrwatch0:"
309  " connect(%s): %s\n"), hostport, m);
310  fflush(0);
311  }
312  close(fd);
313  fd = -1;
314  continue;
315  }
316  break;
317  }
318  if (fd == -1) {
319  if (addresstries == 0) {
320  fprintf(stderr, ("KMR kmrwatch0 error:"
321  "No address found for %s %s\n"),
322  host, port);
323  } else {
324  for (int i = 0; i < addresstries; i++) {
325  assert(errs[i].s != 0);
326  struct addrinfo *p = errs[i].a;
327  union SOCKADDR *sa = (void *)p->ai_addr;
328  void *addr;
329  char *fm = 0;
330  if (p->ai_family == AF_INET) {
331  addr = &(sa->in4.sin_addr);
332  fm = "AF_INET";
333  } else if (p->ai_family == AF_INET6) {
334  addr = &sa->in6.sin6_addr;
335  fm = "AF_INET6";
336  } else {
337  assert(0);
338  }
339  char peer[INET6_ADDRSTRLEN];
340  inet_ntop(p->ai_family, addr, peer, sizeof(peer));
341  char *m = strerror(errs[i].e);
342  if (strcmp(errs[i].s, "socket") == 0) {
343  fprintf(stderr, ("KMR kmrwatch0 error:"
344  " socket(%s) failed: %s\n"),
345  fm, m);
346  } else if (strcmp(errs[i].s, "connect") == 0) {
347  fprintf(stderr,
348  ("KMR kmrwatch0 error:"
349  " connect(%s/%s) failed: %s\n"),
350  peer, port, m);
351  } else {
352  assert(0);
353  }
354  }
355  }
356  exit(255);
357  }
358  freeaddrinfo(ai);
359 
360  /* Handshake between server and client */
361  int val;
362  ssize_t rsize = read(fd, &val, sizeof(int));
363  if (rsize < 0) {
364  char *m = strerror(errno);
365  fprintf(stderr, ("KMR kmrwatch0 error:"
366  "read failed: %s\n"), m);
367  close(fd);
368  exit(255);
369  }
370  assert(rsize == sizeof(int));
371  ssize_t wsize = write(fd, &val, sizeof(int));
372  if (wsize < 0) {
373  char *m = strerror(errno);
374  fprintf(stderr, ("KMR kmrwatch0 error:"
375  "write failed: %s\n"), m);
376  close(fd);
377  exit(255);
378  }
379  assert(wsize == sizeof(int));
380  if (val == 0) {
381  close(fd);
382  fd = -1;
383  }
384  }
385 
386  if (tracing) {
387  fprintf(stderr, (";;KMR kmrwatch0:"
388  " waiting for a process"
389  " to finish (pid=%d)\n"),
390  (int)pid);
391  fflush(0);
392  }
393 #if 0
394  struct sigaction sa;
395  memset(&sa, 0, sizeof(sa));
396  sigemptyset(&sa.sa_mask);
397  sa.sa_flags = (SA_RESETHAND);
398  sa.sa_handler = 0;
399  cc = sigaction(SIGALRM, &sa, 0);
400  if (cc == -1) {
401  char *m = strerror(errno);
402  fprintf(stderr, ("KMR kmrwatch0 error:"
403  " sigaction(%d): %s\n", SIGALRM, m));
404  }
405 #endif
406  /*alarm(30);*/
407  int st;
408  cc = waitpid(pid, &st, 0);
409  if (cc == -1) {
410  if (errno == EINTR) {
411  fprintf(stderr, ("KMR kmrwatch0 error:"
412  " waitpid() interrupted\n"));
413  } else {
414  char *m = strerror(errno);
415  fprintf(stderr, ("KMR kmrwatch0 error:"
416  " waitpid() failed: %s\n"), m);
417  }
418  }
419  /*alarm(0);*/
420 
421  if (0) {
422  fprintf(stderr, (";;KMR kmrwatch0: detected a process done\n"));
423  fflush(0);
424  }
425 
426  if (nonmpi) {
427  if (tracing) {
428  fprintf(stderr, (";;KMR kmrwatch0:"
429  " sending a reply for done-notification\n"));
430  fflush(0);
431  }
432 
433  int peer = 0;
434  cc = MPI_Send(0, 0, MPI_BYTE, peer,
435  KMR_TAG_SPAWN_REPLY, parent);
436  assert(cc == MPI_SUCCESS);
437 
438  if (quitwithoutfinalize) {
439  if (tracing) {
440  fprintf(stderr, (";;KMR kmrwatch0:"
441  " force quit without finalizing MPI\n"));
442  fflush(0);
443  }
444  _exit(0);
445  }
446 
447  if (tracing) {
448  fprintf(stderr, (";;KMR kmrwatch0: finalizing MPI\n"));
449  fflush(0);
450  }
451 
452  cc = MPI_Finalize();
453  assert(cc == MPI_SUCCESS);
454  } else {
455  if (tracing) {
456  fprintf(stderr, (";;KMR kmrwatch0:"
457  " closing a socket for done-notification\n"));
458  fflush(0);
459  }
460 
461  if (fd != -1) {
462  close(fd);
463  fd = -1;
464  }
465  }
466 
467  if (tracing) {
468  fprintf(stderr, (";;KMR kmrwatch0: done\n"));
469  fflush(0);
470  }
471 
472  return 0;
473 }
474 
475 /*
476 Copyright (C) 2012-2016 RIKEN AICS
477 This library is distributed WITHOUT ANY WARRANTY. This library can be
478 redistributed and/or modified under the terms of the BSD 2-Clause License.
479 */
KMR Interface.