KMR
kmrmapms.c
Go to the documentation of this file.
1 /* kmrmapms.c (2014-02-04) */
2 /* Copyright (C) 2012-2016 RIKEN AICS */
3 
4 /** \file kmrmapms.c Master-Slave Mapping on Key-Value Stream. */
5 
6 #include <mpi.h>
7 #include <stddef.h>
8 #include <unistd.h>
9 #include <limits.h>
10 #include <poll.h>
11 #include <netdb.h>
12 #include <errno.h>
13 #include <fcntl.h>
14 #include <sys/types.h>
15 #include <sys/wait.h>
16 #include <sys/stat.h>
17 #include <sys/param.h>
18 #include <arpa/inet.h>
19 #include <assert.h>
20 #ifdef _OPENMP
21 #include <omp.h>
22 #endif
23 #include "kmr.h"
24 #include "kmrimpl.h"
25 
26 #define MAX(a,b) (((a)>(b))?(a):(b))
27 #define MIN(a,b) (((a)<(b))?(a):(b))
28 
29 static const int kmr_kv_buffer_slack_size = 1024;
30 
31 /* State of each task of kmr_map_ms(). */
32 
33 enum {
34  KMR_RPC_NONE, KMR_RPC_GOON, KMR_RPC_DONE
35 };
36 
37 static inline void
38 kmr_assert_peer_tag(int tag)
39 {
40  assert(KMR_TAG_PEER_0 <= tag && tag < KMR_TAG_PEER_END);
41 }
42 
43 /* Special values of task ID. Task IDs are non-negative. A task ID
44  is included in an RPC request, which is used both for returning a
45  result and for wanting a new task. KMR_RPC_ID_NONE marks no
46  results are returned in the first request from a slave thread.
47  KMR_RPC_ID_FIN marks a node has finished with all the slave
48  threads. */
49 
50 #define KMR_RPC_ID_NONE -1
51 #define KMR_RPC_ID_FIN -2
52 
53 /** Delivers key-value pairs as requested. It returns MPI_SUCCESS if
54  all done, or MPI_ERR_ROOT otherwise. It finishes the tasks when
55  all nodes have contacted and all slave threads are done.
56  Protocol: (1) Receive an RPC request (KMR_TAG_REQ). A request
57  consists of a triple of integers (task-ID, peer-tag, result-size)
58  ("int req[3]"). The task-ID encodes some special values. (2)
59  Receive a result if a slave has one. (3) Return a new task if
60  available. A reply consists of a tuple of integers (task-ID,
61  argument-size) ("int ack[2]"). (4) Or, return a "no-tasks"
62  indicator by ID=KMR_RPC_ID_NONE. (5) Count "done" messages by
63  ID=KMR_RPC_ID_FIN, which indicates the slave node has finished for
64  all slave threads. The task-ID in an RPC request is
65  KMR_RPC_ID_NONE for the first request (meaning that the request
66  has no result). Peer-tags are used in subsequent messages to
67  direct reply messages to a requesting thread. */
68 
69 static int
71  void *arg, struct kmr_option opt, kmr_mapfn_t m)
72 {
73  KMR *mr = kvi->c.mr;
74  if (kmr_fields_pointer_p(kvi)) {
75  kmr_error(mr, "kmr_map_ms: cannot handle pointer field types");
76  }
77  assert(kvo->c.key_data == kvi->c.key_data
78  && kvo->c.value_data == kvi->c.value_data);
79  MPI_Comm comm = mr->comm;
80  int nprocs = mr->nprocs;
81  long cnt = kvi->c.element_count;
82  assert(INT_MIN <= cnt && cnt <= INT_MAX);
83  struct kmr_map_ms_state *ms = kvi->c.ms;
84  char *msstates = &(ms->states[0]);
85  if (ms == 0) {
86  /* First time. */
87  size_t hdsz = offsetof(struct kmr_map_ms_state, states);
88  ms = kmr_malloc((hdsz + sizeof(char) * (size_t)cnt));
89  kvi->c.ms = ms;
90  ms->nodes = 0;
91  ms->kicks = 0;
92  ms->dones = 0;
93  msstates = &(ms->states[0]);
94  for (long i = 0; i < cnt; i++) {
95  msstates[i] = KMR_RPC_NONE;
96  }
97  }
98  if (ms->dones == cnt && ms->nodes == (nprocs - 1)) {
99  /* Finish the task. */
100  if (kvi->c.temporary_data != 0) {
101  kmr_free(kvi->c.temporary_data,
102  (sizeof(struct kmr_kvs_entry *) * (size_t)cnt));
103  kvi->c.temporary_data = 0;
104  }
105  return MPI_SUCCESS;
106  }
107  /* Make/remake array of key-value pointers. */
108  struct kmr_kvs_entry **ev = kvi->c.temporary_data;
109  if (ev == 0) {
110  ev = kmr_malloc(sizeof(struct kmr_kvs_entry *) * (size_t)cnt);
111  kvi->c.temporary_data = ev;
112  kvi->c.current_block = kvi->c.first_block;
113  struct kmr_kvs_entry *e = kmr_kvs_first_entry(kvi, kvi->c.first_block);
114  for (long i = 0; i < cnt; i++) {
115  assert(e != 0);
116  ev[i] = e;
117  e = kmr_kvs_next(kvi, e, 0);
118  }
119  }
120  /* Wait for one request and process it. */
121  assert(ms->dones <= ms->kicks);
122  MPI_Status st;
123  int cc;
124  int req[3];
125  cc = MPI_Recv(req, 3, MPI_INT, MPI_ANY_SOURCE, KMR_TAG_REQ, comm, &st);
126  assert(cc == MPI_SUCCESS);
127  int peer_tag = req[0];
128  int peer = st.MPI_SOURCE;
129  assert(peer != 0);
130  {
131  int id = req[1];
132  int sz = req[2];
133  if (id == KMR_RPC_ID_NONE) {
134  /* Got the first request from a peer, no task results. */
135  } else if (id == KMR_RPC_ID_FIN) {
136  /* Got the finishing request from a peer. */
137  ms->nodes++;
138  assert(ms->nodes <= (nprocs - 1));
139  } else {
140  /* Receive a task result. */
141  assert(id >= 0);
142  kmr_assert_peer_tag(peer_tag);
143  void *packed = kmr_malloc((size_t)sz);
144  cc = MPI_Recv(packed, sz, MPI_BYTE, peer, peer_tag, comm, &st);
145  assert(cc == MPI_SUCCESS);
146  KMR_KVS *kvx = kmr_create_kvs(mr, KMR_KV_BAD, KMR_KV_BAD);
147  cc = kmr_restore_kvs(kvx, packed, (size_t)sz, kmr_noopt);
148  assert(cc == MPI_SUCCESS);
149  struct kmr_option keepopen = {.keep_open = 1};
150  cc = kmr_map(kvx, kvo, 0, keepopen, kmr_add_identity_fn);
151  assert(cc == MPI_SUCCESS);
152  kmr_free(packed, (size_t)sz);
153  assert(msstates[id] == KMR_RPC_GOON);
154  msstates[id] = KMR_RPC_DONE;
155  ms->dones++;
156  }
157  }
158  if (ms->kicks < cnt) {
159  /* Send a new task (cnt is in integer range). */
160  int id;
161  for (id = 0; id < cnt; id++) {
162  if (msstates[id] == KMR_RPC_NONE) {
163  break;
164  }
165  }
166  assert(id != KMR_RPC_ID_NONE && id != cnt);
167  struct kmr_kvs_entry *e = ev[id];
168  int sz = (int)kmr_kvs_entry_netsize(e);
169  assert(sz > 0);
170  int ack[2] = {id, sz};
171  cc = MPI_Send(ack, 2, MPI_INT, peer, peer_tag, comm);
172  assert(cc == MPI_SUCCESS);
173  cc = MPI_Send(e, sz, MPI_BYTE, peer, peer_tag, comm);
174  assert(cc == MPI_SUCCESS);
175  assert(msstates[id] == KMR_RPC_NONE);
176  msstates[id] = KMR_RPC_GOON;
177  ms->kicks++;
178  } else {
179  /* Finish the slave. */
180  int ack[2] = {KMR_RPC_ID_NONE, 0};
181  cc = MPI_Send(ack, 2, MPI_INT, peer, peer_tag, comm);
182  assert(cc == MPI_SUCCESS);
183  }
184  /* Have more entries. */
185  return MPI_ERR_ROOT;
186 }
187 
188 /** Asks the master for a task, then calls a map-function. With
189  threading, each thread works independently asking the master for a
190  task. It simply protects MPI send/recv calls by OMP critical
191  sections, but their grain sizes are too large for uses of OMP
192  critical sections. */
193 
194 static int
196  void *arg, struct kmr_option opt, kmr_mapfn_t m)
197 {
198  assert(!kmr_fields_pointer_p(kvi)
199  && kvo->c.key_data == kvi->c.key_data
200  && kvo->c.value_data == kvi->c.value_data);
201  KMR * const mr = kvi->c.mr;
202  const MPI_Comm comm = mr->comm;
203  const int rank = mr->rank;
204  const enum kmr_kv_field keyf = kmr_unit_sized_or_opaque(kvo->c.key_data);
205  const enum kmr_kv_field valf = kmr_unit_sized_or_opaque(kvo->c.value_data);
206  assert(rank != 0);
207  assert(kvi->c.element_count == 0);
208 #ifdef _OPENMP
209  const _Bool threading = !(mr->single_thread || opt.nothreading);
210 #endif
211  KMR_OMP_PARALLEL_IF_(threading)
212  {
213  int cc;
214  int thr = KMR_OMP_GET_THREAD_NUM();
215  MPI_Status st;
216  struct kmr_kvs_entry *e = 0;
217  int maxsz = 0;
218  int peer_tag = KMR_TAG_PEER(thr);
219  kmr_assert_peer_tag(peer_tag);
220  {
221  /* Make the first request. */
222  int req[3] = {peer_tag, KMR_RPC_ID_NONE, 0};
223  KMR_OMP_CRITICAL_
224  cc = MPI_Send(req, 3, MPI_INT, 0, KMR_TAG_REQ, comm);
225  assert(cc == MPI_SUCCESS);
226  }
227  for (;;) {
228  int ack[2];
229  KMR_OMP_CRITICAL_
230  cc = MPI_Recv(ack, 2, MPI_INT, 0, peer_tag, comm, &st);
231  assert(cc == MPI_SUCCESS);
232  int id = ack[0];
233  int sz = ack[1];
234  if (id == KMR_RPC_ID_NONE) {
235  break;
236  }
237  assert(id >= 0 && sz > 0);
238  if (sz > maxsz) {
239  maxsz = (sz + kmr_kv_buffer_slack_size);
240  e = kmr_realloc(e, (size_t)maxsz);
241  assert(e != 0);
242  }
243  KMR_OMP_CRITICAL_
244  cc = MPI_Recv(e, sz, MPI_BYTE, 0, peer_tag, comm, &st);
245  assert(cc == MPI_SUCCESS);
246  /* Invoke mapper. */
247  KMR_KVS *kvx;
248  KMR_OMP_CRITICAL_
249  kvx = kmr_create_kvs(mr, keyf, valf);
250  struct kmr_kv_box kv = kmr_pick_kv(e, kvi);
251  cc = (*m)(kv, kvi, kvx, arg, id);
252  if (cc != MPI_SUCCESS) {
253  char ee[80];
254  snprintf(ee, sizeof(ee),
255  "Map-fn returned with error cc=%d", cc);
256  kmr_error(mr, ee);
257  }
258  kmr_add_kv_done(kvx);
259  void *packed = 0;
260  size_t packsz = 0;
261  cc = kmr_save_kvs(kvx, &packed, &packsz, kmr_noopt);
262  assert(cc == MPI_SUCCESS && packed != 0);
263  /* Send a task result. */
264  assert(packsz <= (size_t)INT_MAX);
265  sz = (int)packsz;
266  int req[3] = {peer_tag, id, sz};
267  KMR_OMP_CRITICAL_
268  cc = MPI_Send(req, 3, MPI_INT, 0, KMR_TAG_REQ, comm);
269  assert(cc == MPI_SUCCESS);
270  KMR_OMP_CRITICAL_
271  cc = MPI_Send(packed, sz, MPI_BYTE, 0, peer_tag, comm);
272  assert(cc == MPI_SUCCESS);
273  /* Cleanup. */
274  KMR_OMP_CRITICAL_
275  cc = kmr_free_kvs(kvx);
276  assert(cc == MPI_SUCCESS);
277  kmr_free(packed, packsz);
278  }
279  if (e != 0) {
280  kmr_free(e, (size_t)maxsz);
281  }
282  }
283  /* (Threads join). */
284  {
285  /* Make the finishing request. */
286  int cc;
287  int req[3] = {0, KMR_RPC_ID_FIN, 0};
288  cc = MPI_Send(req, 3, MPI_INT, 0, KMR_TAG_REQ, comm);
289  assert(cc == MPI_SUCCESS);
290  }
291  return MPI_SUCCESS;
292 }
293 
294 /** Maps in master-slave mode. The input key-value stream should be
295  empty except on rank0 where the master is running (the contents on
296  the slave ranks are ignored). It consumes the input key-value
297  stream. The master does delivery only. The master returns
298  frequently to give a chance to check-pointing, etc. The master
299  returns immaturely each time one pair is delivered, and those
300  returns are marked by MPI_ERR_ROOT indicating more tasks remain.
301  In contrast, slaves return only after all tasks done. The enough
302  state to have to keep during kmr_map_ms() for check-pointing is in
303  the key-value streams KVI and KVO on the master. Note that this
304  totally diverges from bulk-synchronous execution. It does not
305  accept key-value field types KMR_KV_POINTER_OWNED or
306  KMR_KV_POINTER_UNMANAGED. Effective-options: NOTHREADING,
307  KEEP_OPEN. See struct kmr_option. */
308 
309 int
311  void *arg, struct kmr_option opt, kmr_mapfn_t m)
312 {
313  kmr_assert_kvs_ok(kvi, kvo, 1, 0);
314  KMR *mr = kvi->c.mr;
315  struct kmr_option kmr_supported = {.nothreading = 1, .keep_open = 1};
316  kmr_check_fn_options(mr, kmr_supported, opt, __func__);
317  int kcdc = kmr_ckpt_disable_ckpt(mr);
318  int rank = mr->rank;
319  long cnt = kvi->c.element_count;
320  assert(INT_MIN <= cnt && cnt <= INT_MAX);
321  int ccr;
322  int cc;
323  if (rank == 0) {
324  ccr = kmr_map_master(kvi, kvo, arg, opt, m);
325  if (ccr == MPI_SUCCESS) {
326  cc = kmr_add_kv_done(kvo);
327  assert(cc == MPI_SUCCESS);
328  cc = kmr_free_kvs(kvi);
329  assert(cc == MPI_SUCCESS);
330  }
331  } else {
332  ccr = kmr_map_slave(kvi, kvo, arg, opt, m);
333  cc = kmr_add_kv_done(kvo);
334  assert(cc == MPI_SUCCESS);
335  cc = kmr_free_kvs(kvi);
336  assert(cc == MPI_SUCCESS);
337  }
338  kmr_ckpt_enable_ckpt(mr, kcdc);
339  return ccr;
340 }
341 
342 /* ================================================================ */
343 
344 /* Mode of Spawning. KMR_SPAWN_INTERACT indicates spawned processes
345  interact with a map-function. KMR_SPAWN_SERIAL and
346  KMR_SPAWN_PARALLEL indicates spawned processes do not interact with
347  the parent. KMR_SPAWN_SERIAL is for sequential programs, for which
348  a watch-program "kmrwatch0" replies in place of spawned processes.
349  KMR_SPAWN_PARALLEL is for independent MPI programs, which do not
350  interact with the parent. Since independent MPI programs run
351  freely, it uses a socket connection by a watch-program "kmrwatch0"
352  to detect their ends. */
353 
354 enum kmr_spawn_mode {
355  KMR_SPAWN_INTERACT, KMR_SPAWN_SERIAL, KMR_SPAWN_PARALLEL
356 };
357 
358 /** State of each Spawning. The array of this structure is stored in
359  the kmr_spawning structure. RUNNING indicates the spawned
360  processes are running. N_PROCS is the number of processes to be
361  spawned (it equals to the COUNT below). INDEX is the number of
362  processes spawned so far, and COUNT is the number of processes of
363  the current spawn. The range INDEX by COUNT enumerates spawned
364  processes, and is used to point in the array of the MPI requests.
365  ARGC and ARGV are the argument list. ABUF (byte array of size
366  with ALEN), ARGV0 (pointer array of size with ARGC0) are buffers
367  for making command line arguments. ICOMM is the
368  inter-communicator. WATCH_PORT hold a IP port number for
369  watch-programs. */
370 
372  _Bool running;
373  int n_procs;
374  int index;
375  int count;
376  int argc;
377  char **argv;
378  int argc0;
379  char **argv0;
380  size_t alen;
381  char *abuf;
382  MPI_Comm icomm;
383  int watch_port;
384  double timestamp[6];
385 };
386 
387 /** State of Spawner. REPLIES hold receive requests for spawned
388  processes. WATCHES hold sockets used to detect the end of the
389  spawned processes. N_STARTEDS is process count of started, and
390  N_RUNNINGS is process count that have not finished. */
391 
392 struct kmr_spawning {
393  char *fn;
394  enum kmr_spawn_mode mode;
395  int n_spawns;
396  int n_spawners;
397  int n_processes;
398  int usize;
399  int spawn_limit;
400 
401  int n_starteds;
402  int n_runnings;
403 
404  struct kmr_spawn_state *spawned;
405  struct kmr_kv_box *ev;
406  MPI_Request *replies;
407  int *watches;
408 
409  int watch_listener;
410  char watch_host[MAXHOSTNAMELEN + 10];
411 };
412 
413 /* Sums integers among all ranks. It is used to check the number of
414  ranks which call spawn. */
415 
416 static int
417 kmr_sum_on_all_ranks(KMR *mr, int v, int *sum)
418 {
419  assert(sum != 0);
420  int cc;
421  KMR_KVS *kvs0 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_INTEGER);
422  struct kmr_kv_box nkv = {
423  .klen = (int)sizeof(long),
424  .vlen = (int)sizeof(long),
425  .k.i = 0,
426  .v.i = v
427  };
428  cc = kmr_add_kv(kvs0, nkv);
429  assert(cc == MPI_SUCCESS);
430  kmr_add_kv_done(kvs0);
431  KMR_KVS *kvs1 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_INTEGER);
432  cc = kmr_replicate(kvs0, kvs1, kmr_noopt);
433  assert(cc == MPI_SUCCESS);
434  long count;
435  cc = kmr_reduce_as_one(kvs1, 0, &count, kmr_noopt, kmr_isum_one_fn);
436  assert(cc == MPI_SUCCESS);
437  *sum = (int)count;
438  return MPI_SUCCESS;
439 }
440 
441 static int
442 kmr_make_pretty_argument_string(char *s, size_t sz, int argc, char **argv)
443 {
444  int cc;
445  size_t cnt = 0;
446  for (int i = 0; (i < argc && argv[i] != 0); i++) {
447  cc = snprintf(&s[cnt], (sz - cnt), (i == 0 ? "%s" : ",%s"), argv[i]);
448  cnt += (size_t)cc;
449  if (cnt >= sz) {
450  return 0;
451  }
452  }
453  return 0;
454 }
455 
456 int
457 kmr_map_via_spawn_ff(KMR_KVS *kvi, KMR_KVS *kvo, void *arg,
458  int finfo, struct kmr_spawn_option opt,
459  kmr_mapfn_t mapfn)
460 {
461  MPI_Info info = MPI_Info_f2c(finfo);
462  int cc = kmr_map_via_spawn(kvi, kvo, arg, info, opt, mapfn);
463  return cc;
464 }
465 
466 static inline void
467 kmr_spawn_info_put(struct kmr_spawn_info *info,
468  struct kmr_spawn_state *s,
469  struct kmr_spawn_option opt, void *arg)
470 {
471  info->maparg = arg;
472  info->u.icomm = s->icomm;
473  info->icomm_ff = MPI_Comm_c2f(s->icomm);
474  info->reply_root = opt.reply_root;
475 }
476 
477 static inline void
478 kmr_spawn_info_get(struct kmr_spawn_info *info,
479  struct kmr_spawn_state *s)
480 {
481  if (info->u.icomm != s->icomm) {
482  s->icomm = info->u.icomm;
483  } else if (info->icomm_ff != MPI_Comm_c2f(s->icomm)) {
484  s->icomm = MPI_Comm_f2c(info->icomm_ff);
485  }
486 }
487 
488 /* Lists processes to spawn from the key-value entries. */
489 
490 static int
491 kmr_list_spawns(struct kmr_spawning *spw, KMR_KVS *kvi, MPI_Info info,
492  struct kmr_spawn_option opt)
493 {
494  assert(spw->n_spawns == (int)kvi->c.element_count);
495  KMR * const mr = kvi->c.mr;
496  _Bool tracing5 = (mr->trace_map_spawn && (5 <= mr->verbosity));
497  int cc;
498 
499  /* Scan key-value pairs and put them in EV. */
500 
501  kvi->c.current_block = kvi->c.first_block;
502  struct kmr_kvs_entry *e = kmr_kvs_first_entry(kvi, kvi->c.first_block);
503  for (int w = 0; w < spw->n_spawns; w++) {
504  spw->ev[w] = kmr_pick_kv(e, kvi);
505  e = kmr_kvs_next(kvi, e, 0);
506  }
507 
508  /* Share the universe evenly by all ranks which call spawn. */
509 
510  int nranks;
511  cc = kmr_sum_on_all_ranks(mr, ((spw->n_spawns > 0) ? 1 : 0), &nranks);
512  assert(cc == MPI_SUCCESS && nranks <= mr->nprocs);
513  spw->n_spawners = nranks;
514  int *usizep;
515  int uflag;
516  cc = MPI_Attr_get(MPI_COMM_WORLD, MPI_UNIVERSE_SIZE, &usizep, &uflag);
517  if (cc != MPI_SUCCESS || uflag == 0) {
518  char ee[80];
519  snprintf(ee, sizeof(ee), "%s: MPI lacks universe size", spw->fn);
520  kmr_error(mr, ee);
521  }
522  spw->usize = *usizep;
523  if (spw->usize <= mr->nprocs) {
524  char ee[80];
525  snprintf(ee, sizeof(ee), "%s: no dynamic processes in universe",
526  spw->fn);
527  kmr_error(mr, ee);
528  }
529  int m = spw->usize - mr->nprocs;
530  if (spw->n_spawners != 0) {
531  m /= spw->n_spawners;
532  }
533  spw->spawn_limit = ((mr->spawn_max_processes != 0)
534  ? MIN(mr->spawn_max_processes, m)
535  : m);
536  if (tracing5) {
537  if (spw->n_spawns > 0) {
538  fprintf(stderr,
539  ";;KMR [%05d] %s: universe-size=%d spawn-limit=%d\n",
540  mr->rank, spw->fn, spw->usize, spw->spawn_limit);
541  fflush(0);
542  }
543  }
544 
545  /* Take MAXPROCS from info if defined. */
546 
547  int maxprocs = -1;
548  {
549  char *infoval = kmr_malloc((size_t)(MPI_MAX_INFO_VAL + 1));
550  int iflag;
551  if (info != MPI_INFO_NULL) {
552  cc = MPI_Info_get(info, "maxprocs", MPI_MAX_INFO_VAL,
553  infoval, &iflag);
554  assert(cc == MPI_SUCCESS);
555  } else {
556  iflag = 0;
557  }
558  if (iflag != 0) {
559  int v;
560  cc = kmr_parse_int(infoval, &v);
561  if (cc == 0 || v < 0) {
562  char ee[80];
563  snprintf(ee, sizeof(ee), "%s: bad value in info maxprocs=%s",
564  spw->fn, infoval);
565  kmr_error(mr, ee);
566  maxprocs = -1;
567  } else {
568  maxprocs = v;
569  }
570  } else {
571  maxprocs = -1;
572  }
573  kmr_free(infoval, (size_t)(MPI_MAX_INFO_VAL + 1));
574  }
575 
576  /* Make the arguments to spawn. */
577 
578  spw->n_processes = 0;
579  for (int w = 0; w < spw->n_spawns; w++) {
580  struct kmr_kv_box kv = spw->ev[w];
581  struct kmr_spawn_state *s = &(spw->spawned[w]);
582  s->running = 0;
583  s->n_procs = -1;
584  s->index = 0;
585  s->count = 0;
586  s->argc = 0;
587  s->argv = 0;
588  s->argc0 = 0;
589  s->argv0 = 0;
590  s->alen = 0;
591  s->abuf = 0;
592  s->icomm = MPI_COMM_NULL;
593  s->watch_port = -1;
594 
595  s->alen = (size_t)kv.vlen;
596  s->abuf = kmr_malloc(s->alen);
597  memcpy(s->abuf, kv.v.p, (size_t)kv.vlen);
598  int maxargc;
599  cc = kmr_scan_argv_strings(mr, s->abuf, s->alen,
600  0, &maxargc, 0,
601  opt.separator_space, spw->fn);
602  assert(cc == MPI_SUCCESS);
603  s->argv0 = kmr_malloc(sizeof(char *) * (size_t)(maxargc + 1));
604  memset(s->argv0, 0, (sizeof(char *) * (size_t)(maxargc + 1)));
605  cc = kmr_scan_argv_strings(mr, s->abuf, s->alen,
606  (maxargc + 1), &s->argc0, s->argv0,
607  opt.separator_space, spw->fn);
608  assert(cc == MPI_SUCCESS);
609  assert(maxargc == s->argc0);
610 
611  /* Check if the "MAXPROCS=" string in the arguments. */
612 
613  if (s->argc0 > 0 && strncmp("maxprocs=", s->argv0[0], 9) == 0) {
614  int v;
615  cc = kmr_parse_int(&s->argv0[0][9], &v);
616  if (cc == 0 || v < 0) {
617  char ee[80];
618  snprintf(ee, sizeof(ee), "%s: bad maxprocs=%s",
619  spw->fn, s->argv0[0]);
620  kmr_error(mr, ee);
621  }
622  s->n_procs = v;
623  s->argc = (s->argc0 - 1);
624  s->argv = (s->argv0 + 1);
625  } else {
626  s->n_procs = maxprocs;
627  s->argc = s->argc0;
628  s->argv = s->argv0;
629  }
630 
631  if (s->argc <= 0) {
632  char ee[80];
633  snprintf(ee, sizeof(ee), "%s: no arguments", spw->fn);
634  kmr_error(mr, ee);
635  }
636  if (s->n_procs <= 0) {
637  char ee[80];
638  snprintf(ee, sizeof(ee), "%s: maxprocs not specified",
639  spw->fn);
640  kmr_error(mr, ee);
641  }
642  if (s->n_procs > spw->spawn_limit) {
643  char ee[80];
644  snprintf(ee, sizeof(ee),
645  "%s: maxprocs too large, (maxprocs=%d limit=%d)",
646  spw->fn, s->n_procs, spw->spawn_limit);
647  kmr_error(mr, ee);
648  }
649 
650  spw->n_processes += s->n_procs;
651  }
652 
653  return MPI_SUCCESS;
654 }
655 
656 static int
657 kmr_free_comm_with_tracing(KMR *mr, struct kmr_spawning *spw,
658  struct kmr_spawn_state *s)
659 {
660  int cc;
661  _Bool tracing5 = (mr->trace_map_spawn && (5 <= mr->verbosity));
662  if (s->icomm != MPI_COMM_NULL) {
663  if (tracing5) {
664  ptrdiff_t done = (s - spw->spawned);
665  fprintf(stderr, (";;KMR [%05d] %s [%ld]:"
666  " MPI_Comm_free (could block)...\n"),
667  mr->rank, spw->fn, done);
668  fflush(0);
669  }
670 
671  if (!mr->spawn_disconnect_but_free) {
672  cc = MPI_Comm_free(&(s->icomm));
673  } else {
674  cc = MPI_Comm_disconnect(&(s->icomm));
675  }
676  assert(cc == MPI_SUCCESS);
677 
678  if (tracing5) {
679  ptrdiff_t done = (s - spw->spawned);
680  fprintf(stderr, (";;KMR [%05d] %s [%ld]:"
681  " MPI_Comm_free done\n"),
682  mr->rank, spw->fn, done);
683  fflush(0);
684  }
685  }
686  return MPI_SUCCESS;
687 }
688 
689 /* Makes a listening socket for a watch-program. It fills
690  WATCH_LISTENER (fd) and WATCH_HOST fields in the array of SPAWNS,
691  if successful. */
692 
693 static int
694 kmr_listen_to_watch(KMR *mr, struct kmr_spawning *spw, int index)
695 {
696  assert(sizeof(spw->watch_host) >= 46);
697  int cc;
698  union {
699  struct sockaddr sa;
700  struct sockaddr_in sa4;
701  struct sockaddr_in6 sa6;
702  struct sockaddr_storage ss;
703  } sa;
704  char hostname[MAXHOSTNAMELEN];
705  char address[INET6_ADDRSTRLEN];
706 
707  /* Not use AF_UNSPEC. */
708 
709  int af = mr->spawn_watch_af;
710  assert(af == 0 || af == 4 || af == 6);
711  char *family = (af == 4 ? "AF_INET" : "AF_INET6");
712 
713  assert(spw->watch_listener == -1);
714  const int *ports = mr->spawn_watch_port_range;
715  assert(ports[0] != 0 || ports[1] == 0);
716  for (int port = ports[0]; port <= ports[1]; port++) {
717  if (af == 4) {
718  sa.sa.sa_family = AF_INET;
719  } else if (af == 0 || af == 6) {
720  sa.sa.sa_family = AF_INET6;
721  } else {
722  assert(0);
723  }
724  int fd = socket(sa.sa.sa_family, SOCK_STREAM, 0);
725  if (fd < 0) {
726  char ee[80];
727  char *m = strerror(errno);
728  snprintf(ee, sizeof(ee), "%s: socket(%s) failed: %s",
729  spw->fn, family, m);
730  kmr_error(mr, ee);
731  }
732  int one = 1;
733  cc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
734  if (cc != 0) {
735  char ee[80];
736  char *m = strerror(errno);
737  snprintf(ee, sizeof(ee), "%s: setsockopt(SO_REUSEADDR): %s",
738  spw->fn, m);
739  kmr_warning(mr, 1, ee);
740  }
741 
742  socklen_t salen;
743  if (af == 4) {
744  memset(&sa, 0, sizeof(sa));
745  sa.sa4.sin_family = AF_INET;
746  sa.sa4.sin_addr.s_addr = htonl(INADDR_ANY);
747  sa.sa4.sin_port = htons((uint16_t)port);
748  salen = sizeof(sa.sa4);
749  } else if (af == 0 || af == 6) {
750  memset(&sa, 0, sizeof(sa));
751  sa.sa6.sin6_family = AF_INET6;
752  sa.sa6.sin6_addr = in6addr_any;
753  sa.sa6.sin6_port = htons((uint16_t)port);
754  salen = sizeof(sa.sa6);
755  } else {
756  assert(0);
757  }
758 
759  /* NOTE: Linux returns EINVAL for EADDRINUSE in bind. */
760 
761  cc = bind(fd, &sa.sa, salen);
762  if (cc != 0) {
763  if (errno == EADDRINUSE || errno == EINVAL) {
764  cc = close(fd);
765  assert(cc == 0);
766  continue;
767  } else {
768  char ee[80];
769  char *m = strerror(errno);
770  snprintf(ee, sizeof(ee), "%s: bind(%s, port=%d) failed: %s",
771  spw->fn, family, port, m);
772  kmr_error(mr, ee);
773  }
774  }
775 
776  /* NOTE: Linux may return EADDRINUSE in listen, too. */
777 
778  int backlog = spw->spawn_limit;
779  cc = listen(fd, backlog);
780  if (cc != 0) {
781  if (errno == EADDRINUSE || errno == EINVAL) {
782  cc = close(fd);
783  assert(cc == 0);
784  continue;
785  } else {
786  char ee[80];
787  char *m = strerror(errno);
788  snprintf(ee, sizeof(ee), "%s: listen(%s, port=%d) failed: %s",
789  spw->fn, family, port, m);
790  kmr_error(mr, ee);
791  }
792  }
793  assert(fd != -1);
794  spw->watch_listener = fd;
795  break;
796  }
797 
798  int fd = spw->watch_listener;
799  if (fd == -1) {
800  char ee[80];
801  snprintf(ee, sizeof(ee), "%s: no ports to listen to watch-programs",
802  spw->fn);
803  kmr_error(mr, ee);
804  }
805 
806  /* Get address and port number from the socket. */
807 
808  memset(&sa, 0, sizeof(sa));
809  socklen_t salen = sizeof(sa);
810  cc = getsockname(fd, &sa.sa, &salen);
811  if (cc != 0) {
812  char ee[80];
813  char *m = strerror(errno);
814  snprintf(ee, sizeof(ee), "%s: getsockname() failed: %s",
815  spw->fn, m);
816  kmr_error(mr, ee);
817  }
818 
819  int port = 0;
820  if (sa.sa.sa_family == AF_INET) {
821  port = ntohs(sa.sa4.sin_port);
822  } else if (sa.sa.sa_family == AF_INET6) {
823  port = ntohs(sa.sa6.sin6_port);
824  } else {
825  char ee[80];
826  snprintf(ee, sizeof(ee), "%s: getsockname(): unknown ip family=%d",
827  spw->fn, sa.sa.sa_family);
828  kmr_error(mr, ee);
829  }
830  assert(port != 0);
831 
832  if (mr->spawn_watch_host_name != 0) {
833  cc = snprintf(hostname, sizeof(hostname),
834  "%s", mr->spawn_watch_host_name);
835  assert(cc < (int)sizeof(hostname));
836  } else {
837  cc = gethostname(hostname, sizeof(hostname));
838  if (cc != 0) {
839  char ee[80];
840  char *m = strerror(errno);
841  snprintf(ee, sizeof(ee), "%s: gethostname() failed: %s",
842  spw->fn, m);
843  kmr_error(mr, ee);
844  }
845  }
846 
847  struct addrinfo hints;
848  memset(&hints, 0, sizeof(hints));
849  hints.ai_flags = AI_ADDRCONFIG;
850  hints.ai_socktype = SOCK_STREAM;
851  hints.ai_protocol = IPPROTO_TCP;
852  if (af == 4) {
853  hints.ai_family = AF_INET;
854  } else if (af == 6) {
855  hints.ai_family = AF_INET6;
856  } else if (af == 0) {
857  hints.ai_family = (AF_INET6 | AI_V4MAPPED);
858  } else {
859  assert(0);
860  }
861  struct addrinfo *addrs = 0;
862  cc = getaddrinfo(hostname, 0, &hints, &addrs);
863  if (cc != 0) {
864  char ee[80];
865  const char *m = gai_strerror(cc);
866  snprintf(ee, sizeof(ee), "%s: getaddrinfo(%s) failed: %s",
867  spw->fn, hostname, m);
868  kmr_error(mr, ee);
869  }
870  struct addrinfo *p;
871  for (p = addrs; p != 0; p = p->ai_next) {
872  if (!(p->ai_family == AF_INET || p->ai_family == AF_INET6)) {
873  continue;
874  }
875  if (af == 4 && p->ai_family != AF_INET) {
876  continue;
877  }
878  if (af == 6 && p->ai_family != AF_INET6) {
879  continue;
880  }
881  break;
882  }
883  if (p == 0) {
884  char ee[80];
885  snprintf(ee, sizeof(ee), "%s: getaddrinfo(%s): no address for host",
886  spw->fn, hostname);
887  kmr_error(mr, ee);
888  }
889 
890  if (p->ai_family == AF_INET) {
891  void *inaddr = &(((struct sockaddr_in *)p->ai_addr)->sin_addr);
892  inet_ntop(p->ai_family, inaddr, address, sizeof(address));
893  } else if (p->ai_family == AF_INET6) {
894  void *inaddr = &(((struct sockaddr_in6 *)p->ai_addr)->sin6_addr);
895  inet_ntop(p->ai_family, inaddr, address, sizeof(address));
896  } else {
897  char ee[80];
898  snprintf(ee, sizeof(ee), "%s: getaddrinfo(%s): unknown ip family=%d",
899  spw->fn, hostname, p->ai_family);
900  kmr_error(mr, ee);
901  }
902  freeaddrinfo(addrs);
903 
904  assert(0 <= index && index < spw->n_spawns);
905  struct kmr_spawn_state *s = &(spw->spawned[index]);
906  s->watch_port = port;
907 
908  cc = snprintf(spw->watch_host, sizeof(spw->watch_host),
909  "%s", address);
910  assert(cc < (int)sizeof(spw->watch_host));
911 
912  return MPI_SUCCESS;
913 }
914 
915 /* Waits for connections from the watch-programs of all the spawned
916  processes. It works on spawning one by one. */
917 
918 static int
919 kmr_accept_on_watch(KMR *mr, struct kmr_spawning *spw, int index)
920 {
921  _Bool tracing5 = (mr->trace_map_spawn && (5 <= mr->verbosity));
922  assert(0 <= index && index < spw->n_spawns);
923  struct kmr_spawn_state *s = &(spw->spawned[index]);
924  assert(s->n_procs > 0);
925  union {
926  struct sockaddr sa;
927  struct sockaddr_in sa4;
928  struct sockaddr_in6 sa6;
929  struct sockaddr_storage ss;
930  } sa;
931  int cc;
932 
933  assert(spw->watch_listener != -1);
934  int fd0 = spw->watch_listener;
935  for (int count = 0; count < s->n_procs; count++) {
936  for (;;) {
937  nfds_t nfds = 1;
938  struct pollfd fds0, *fds = &fds0;
939  memset(fds, 0, (sizeof(struct pollfd) * nfds));
940  fds[0].fd = fd0;
941  fds[0].events = (POLLIN|POLLPRI);
942  fds[0].revents = 0;
943 
944  assert(mr->spawn_watch_accept_onhold_msec >= (60 * 1000));
945  int msec = mr->spawn_watch_accept_onhold_msec;
946  int nn = poll(fds, nfds, msec);
947  if (nn == 0) {
948  char ee[80];
949  snprintf(ee, sizeof(ee),
950  "%s: accepting watch-programs timed out"
951  " (msec=%d)", spw->fn, msec);
952  kmr_error(mr, ee);
953  } else if (nn < 0 && (errno == EAGAIN || errno == EINTR)) {
954  char ee[80];
955  char *m = strerror(errno);
956  snprintf(ee, sizeof(ee),
957  "%s: poll (for watch-programs) returned: %s",
958  spw->fn, m);
959  kmr_warning(mr, 1, ee);
960  continue;
961  } else if (nn < 0){
962  char ee[80];
963  char *m = strerror(errno);
964  snprintf(ee, sizeof(ee),
965  "%s: poll (for watch-programs) failed: %s",
966  spw->fn, m);
967  kmr_error(mr, ee);
968  }
969  break;
970  }
971 
972  memset(&sa, 0, sizeof(sa));
973  socklen_t salen = sizeof(sa);
974  int fd = accept(fd0, &sa.sa, &salen);
975  if (fd == -1) {
976  char ee[80];
977  char *m = strerror(errno);
978  snprintf(ee, sizeof(ee),
979  "%s: accept (for watch-programs) failed: %s",
980  spw->fn, m);
981  kmr_error(mr, ee);
982  }
983 
984  /*setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));*/
985 
986  if (tracing5) {
987  char address[INET6_ADDRSTRLEN];
988  //int port = 0;
989  if (sa.sa.sa_family == AF_INET) {
990  void *inaddr = &sa.sa4.sin_addr;
991  inet_ntop(sa.sa.sa_family, inaddr, address, sizeof(address));
992  //port = ntohs(sa.sa4.sin_port);
993  } else if (sa.sa.sa_family == AF_INET6) {
994  void *inaddr = &sa.sa6.sin6_addr;
995  inet_ntop(sa.sa.sa_family, inaddr, address, sizeof(address));
996  //port = ntohs(sa.sa6.sin6_port);
997  } else {
998  char ee[80];
999  snprintf(ee, sizeof(ee), "%s: accept(): unknown ip family=%d",
1000  spw->fn, sa.sa.sa_family);
1001  kmr_error(mr, ee);
1002  }
1003 
1004  fprintf(stderr, (";;KMR [%05d] %s [%d]:"
1005  " accepting a connection of watch-programs"
1006  " on port=%d from %s (%d/%d)\n"),
1007  mr->rank, spw->fn, index,
1008  s->watch_port, address, (count + 1), s->n_procs);
1009  fflush(0);
1010  }
1011 
1012  int val;
1013  if (count == 0 || mr->spawn_watch_all) {
1014  assert((s->index + count) <= spw->n_processes);
1015  spw->watches[s->index + count] = fd;
1016  /* send 1 when connection is accepted */
1017  val = 1;
1018  } else {
1019  val = 0;
1020  }
1021  ssize_t wsize = write(fd, &val, sizeof(int));
1022  if (wsize < 0) {
1023  char ee[80];
1024  char *m = strerror(errno);
1025  snprintf(ee, sizeof(ee),
1026  "%s: write (for watch-programs) failed: %s",
1027  spw->fn, m);
1028  kmr_error(mr, ee);
1029  }
1030  assert(wsize == sizeof(int));
1031 
1032  int rval;
1033  ssize_t rsize = read(fd, &rval, sizeof(int));
1034  if (rsize < 0) {
1035  char ee[80];
1036  char *m = strerror(errno);
1037  snprintf(ee, sizeof(ee),
1038  "%s: read (for watch-programs) failed: %s",
1039  spw->fn, m);
1040  kmr_error(mr, ee);
1041  }
1042  assert(rsize == sizeof(int));
1043  assert(val == rval);
1044 
1045  if (!(count == 0 || mr->spawn_watch_all)) {
1046  cc = close(fd);
1047  assert(cc == 0);
1048  }
1049  }
1050 
1051  cc = close(spw->watch_listener);
1052  assert(cc == 0);
1053  spw->watch_listener = -1;
1054 
1055  return MPI_SUCCESS;
1056 }
1057 
1058 static int
1059 kmr_receive_for_reply(KMR *mr, struct kmr_spawning *spw,
1060  int w, _Bool replyeach, _Bool replyroot)
1061 {
1062  assert(0 <= w && w < spw->n_spawns);
1063  int cc;
1064  enum kmr_spawn_mode mode = spw->mode;
1065  struct kmr_spawn_state *s = &(spw->spawned[w]);
1066  MPI_Request *reqs = spw->replies;
1067  if (mode == KMR_SPAWN_INTERACT) {
1068  if (replyeach) {
1069  assert(s->index + s->count <= spw->n_processes);
1070  for (int rank = 0; rank < s->count; rank++) {
1071  assert(reqs[s->index + rank] == MPI_REQUEST_NULL);
1072  cc = MPI_Irecv(0, 0, MPI_BYTE,
1073  rank, KMR_TAG_SPAWN_REPLY,
1074  s->icomm, &reqs[s->index + rank]);
1075  assert(cc == MPI_SUCCESS);
1076  assert(reqs[s->index + rank] != MPI_REQUEST_NULL);
1077  }
1078  } else if (replyroot) {
1079  assert(w <= spw->n_processes);
1080  int rank = 0;
1081  assert(reqs[w] == MPI_REQUEST_NULL);
1082  cc = MPI_Irecv(0, 0, MPI_BYTE,
1083  rank, KMR_TAG_SPAWN_REPLY,
1084  s->icomm, &reqs[w]);
1085  assert(cc == MPI_SUCCESS);
1086  assert(reqs[w] != MPI_REQUEST_NULL);
1087  } else {
1088  /*nothing*/
1089  }
1090  } else if (mode == KMR_SPAWN_SERIAL) {
1091  assert(replyeach);
1092  {
1093  assert(s->index + s->count <= spw->n_processes);
1094  for (int rank = 0; rank < s->count; rank++) {
1095  assert(reqs[s->index + rank] == MPI_REQUEST_NULL);
1096  cc = MPI_Irecv(0, 0, MPI_BYTE,
1097  rank, KMR_TAG_SPAWN_REPLY,
1098  s->icomm, &reqs[s->index + rank]);
1099  assert(cc == MPI_SUCCESS);
1100  assert(reqs[s->index + rank] != MPI_REQUEST_NULL);
1101  }
1102  }
1103  } else {
1104  assert(mode == KMR_SPAWN_INTERACT || mode == KMR_SPAWN_SERIAL);
1105  }
1106  return MPI_SUCCESS;
1107 }
1108 
1109 /* Waits for a single reply and checks a finished spawn from the
1110  request index. It returns an index of the n-th spawning or -1 if
1111  nothing has finished. Or, when no replies are expected, it
1112  immediately returns a minimum index of the spawns still running.
1113  The return value is an index of the array SPAWNED. */
1114 
1115 static int
1116 kmr_wait_for_reply(KMR *mr, struct kmr_spawning *spw,
1117  struct kmr_spawn_option opt)
1118 {
1119  _Bool tracing5 = (mr->trace_map_spawn && (5 <= mr->verbosity));
1120  int cc;
1121  MPI_Request *reqs = spw->replies;
1122  if (opt.reply_each) {
1123  MPI_Status st;
1124  int index;
1125  int nwait = spw->n_processes;
1126  cc = MPI_Waitany(nwait, reqs, &index, &st);
1127  assert(cc == MPI_SUCCESS && index != MPI_UNDEFINED);
1128  assert(index < spw->n_processes);
1129  assert(reqs[index] == MPI_REQUEST_NULL);
1130  /* Find spawned state from the request index. */
1131  int done = -1;
1132  for (int w = 0; w < spw->n_spawns; w++) {
1133  struct kmr_spawn_state *s = &spw->spawned[w];
1134  if (index < (s->index + s->count)) {
1135  assert(s->index <= index);
1136  done = w;
1137  break;
1138  }
1139  }
1140  assert(done != -1);
1141  struct kmr_spawn_state *s = &spw->spawned[done];
1142  assert(s->running);
1143  int count = (opt.reply_each ? s->count : 1);
1144  int nreplies = 0;
1145  assert((s->index + count) <= spw->n_processes);
1146  for (int j = 0; j < count; j++) {
1147  if (reqs[s->index + j] == MPI_REQUEST_NULL) {
1148  nreplies++;
1149  }
1150  }
1151 
1152  if (tracing5) {
1153  fprintf(stderr, (";;KMR [%05d] %s [%d]: got a reply (%d/%d)\n"),
1154  mr->rank, spw->fn, done, nreplies, count);
1155  fflush(0);
1156  }
1157 
1158  _Bool fin = (nreplies == count);
1159  return (fin ? done : -1);
1160  } else if (opt.reply_root) {
1161  MPI_Status st;
1162  int index;
1163  int nwait = spw->n_spawns;
1164  cc = MPI_Waitany(nwait, reqs, &index, &st);
1165  assert(cc == MPI_SUCCESS && index != MPI_UNDEFINED);
1166  assert(index <= spw->n_spawns);
1167  assert(reqs[index] == MPI_REQUEST_NULL);
1168  int done = index;
1169  assert(0 <= done && done < spw->n_spawns);
1170  struct kmr_spawn_state *s = &spw->spawned[done];
1171  assert(s->running);
1172  assert(reqs[done] == MPI_REQUEST_NULL);
1173 
1174  if (tracing5) {
1175  fprintf(stderr, (";;KMR [%05d] %s [%d]: got a root reply\n"),
1176  mr->rank, spw->fn, done);
1177  fflush(0);
1178  }
1179 
1180  return done;
1181  } else {
1182  int done = -1;
1183  for (int w = 0; w < spw->n_spawns; w++) {
1184  struct kmr_spawn_state *s = &spw->spawned[w];
1185  if (s->running) {
1186  done = w;
1187  break;
1188  }
1189  }
1190 
1191  if (tracing5) {
1192  fprintf(stderr, (";;KMR [%05d] %s [%d]: (no checks of replies)\n"),
1193  mr->rank, spw->fn, done);
1194  fflush(0);
1195  }
1196 
1197  assert(done != -1);
1198  return done;
1199  }
1200 }
1201 
1202 /* Waits for the end of some spawned process. It detects the end by
1203  closure of a socket of the watch-program. It returns an index of
1204  the n-th spawning, or -1 if nothing has finished. It waits for one
1205  to finish, but it may possibly return with nothing with -1. (It
1206  avoids using MPI_STATUS_IGNORE in MPI_Testany() for a bug in some
1207  versions of Open MPI (around 1.6.3)) */
1208 
1209 static int
1210 kmr_wait_for_watch(KMR *mr, struct kmr_spawning *spw,
1211  struct kmr_spawn_option _)
1212 {
1213  _Bool tracing5 = (mr->trace_map_spawn && (5 <= mr->verbosity));
1214  int cc;
1215  char garbage[4096];
1216 
1217  int nruns = 0;
1218  for (int w = 0; w < spw->n_spawns; w++) {
1219  struct kmr_spawn_state *s = &(spw->spawned[w]);
1220  if (s->running) {
1221  nruns += s->count;
1222  }
1223  }
1224  assert(nruns != 0 && spw->n_runnings == nruns);
1225 
1226  nfds_t nfds = 0;
1227  for (int i = 0; i < spw->n_processes; i++) {
1228  if (spw->watches[i] != -1) {
1229  nfds++;
1230  }
1231  }
1232  assert(nfds != 0);
1233 
1234  struct pollfd *fds = kmr_malloc(sizeof(struct pollfd) * (size_t)nfds);
1235 
1236  int done = -1;
1237  for (;;) {
1238  memset(fds, 0, (sizeof(struct pollfd) * nfds));
1239  nfds_t fdix = 0;
1240  for (int i = 0; i < spw->n_processes; i++) {
1241  if (spw->watches[i] != -1) {
1242  assert(fdix < nfds);
1243  fds[fdix].fd = spw->watches[i];
1244  fds[fdix].events = (POLLIN|POLLPRI);
1245  fds[fdix].revents = 0;
1246  fdix++;
1247  }
1248  }
1249  assert(fdix == nfds);
1250 
1251  if (tracing5) {
1252  fprintf(stderr, (";;KMR [%05d] %s:"
1253  " waiting for some watch-programs finish\n"),
1254  mr->rank, spw->fn);
1255  fflush(0);
1256  }
1257 
1258  for (;;) {
1259  int msec = 1;
1260  int nn = poll(fds, nfds, msec);
1261  if (nn == 0) {
1262  int index;
1263  int ok;
1264  MPI_Status st;
1265  MPI_Testany(0, 0, &index, &ok, &st);
1266  /*kmr_warning(mr, 1,
1267  "poll (for watch-programs)"
1268  " timed out badly; continuing");*/
1269  continue;
1270  } else if (nn < 0 && (errno == EAGAIN || errno == EINTR)) {
1271  char ee[80];
1272  char *m = strerror(errno);
1273  snprintf(ee, sizeof(ee),
1274  ("poll (for watch-programs) interrupted;"
1275  " continuing: %s"), m);
1276  kmr_warning(mr, 1, ee);
1277  continue;
1278  } else if (nn < 0){
1279  char ee[80];
1280  char *m = strerror(errno);
1281  snprintf(ee, sizeof(ee),
1282  "%s: poll (for watch-programs) failed: %s",
1283  spw->fn, m);
1284  kmr_error(mr, ee);
1285  } else {
1286  break;
1287  }
1288  }
1289 
1290  int fd = -1;
1291  for (nfds_t k = 0; k < nfds; k++) {
1292  if (fds[k].fd != -1 && fds[k].revents != 0) {
1293  fd = fds[k].fd;
1294  break;
1295  }
1296  }
1297  if (fd == -1) {
1298  char ee[80];
1299  snprintf(ee, sizeof(ee), "poll (for watch-programs) no FD found");
1300  kmr_warning(mr, 1, ee);
1301  continue;
1302  }
1303 
1304  int index = -1;
1305  for (int w = 0; w < spw->n_spawns; w++) {
1306  struct kmr_spawn_state *s = &(spw->spawned[w]);
1307  assert((s->index + s->count) <= spw->n_processes);
1308  for (int j = 0; j < s->count; j++) {
1309  if (fd == spw->watches[s->index + j]) {
1310  index = (s->index + j);
1311  done = w;
1312  break;
1313  }
1314  }
1315  }
1316  assert(fd != -1 && index != -1 && done != -1);
1317  assert(0 <= index && index < spw->n_processes);
1318  assert(0 <= done && done < spw->n_spawns);
1319  //struct kmr_spawn_state *s = &(spw->spawned[done]);
1320 
1321  ssize_t rr = read(fd, garbage, sizeof(garbage));
1322  if (rr == 0) {
1323  /* Got EOF. */
1324  assert(fd == spw->watches[index]);
1325  cc = close(fd);
1326  assert(cc == 0);
1327  spw->watches[index] = -1;
1328  break;
1329  } else if (rr > 0) {
1330  /* Read out garbage data. */
1331  continue;
1332  } else if (rr == -1 && (errno == EAGAIN || errno == EINTR)) {
1333  char ee[80];
1334  char *m = strerror(errno);
1335  snprintf(ee, sizeof(ee),
1336  "read (for watch-programs) returned: %s", m);
1337  kmr_warning(mr, 1, ee);
1338  continue;
1339  } else if (rr == -1) {
1340  char ee[80];
1341  char *m = strerror(errno);
1342  snprintf(ee, sizeof(ee),
1343  "%s: read (for watch-programs) failed: %s",
1344  spw->fn, m);
1345  kmr_error(mr, ee);
1346  }
1347  }
1348  assert(done != -1);
1349 
1350  assert(0 <= done && done < spw->n_spawns);
1351  struct kmr_spawn_state *s = &(spw->spawned[done]);
1352  int count = ((mr->spawn_watch_all) ? s->count : 1);
1353  int nreplies = 0;
1354  assert((s->index + count) <= spw->n_processes);
1355  for (int j = 0; j < count; j++) {
1356  if (spw->watches[s->index + j] == -1) {
1357  nreplies++;
1358  }
1359  }
1360 
1361  if (tracing5) {
1362  fprintf(stderr, (";;KMR [%05d] %s [%d]:"
1363  " detected a watch done (%d/%d)\n"),
1364  mr->rank, spw->fn, done, nreplies, count);
1365  fflush(0);
1366  }
1367 
1368  _Bool fin = (nreplies == count);
1369  if (fin) {
1370  if (s->icomm != MPI_COMM_NULL) {
1371  cc = kmr_free_comm_with_tracing(mr, spw, s);
1372  assert(cc == MPI_SUCCESS);
1373  }
1374  }
1375 
1376  kmr_free(fds, (sizeof(struct pollfd) * (size_t)nfds));
1377  return (fin ? done : -1);
1378 }
1379 
1380 static int
1381 kmr_wait_then_map(KMR *mr, struct kmr_spawning *spw,
1382  KMR_KVS *kvi, KMR_KVS *kvo, void *arg,
1383  struct kmr_spawn_option opt, kmr_mapfn_t m)
1384 {
1385  int cc;
1386  enum kmr_spawn_mode mode = spw->mode;
1387  int done;
1388  if (mode == KMR_SPAWN_INTERACT) {
1389  done = kmr_wait_for_reply(mr, spw, opt);
1390  } else if (mode == KMR_SPAWN_SERIAL) {
1391  done = kmr_wait_for_reply(mr, spw, opt);
1392  } else if (mode == KMR_SPAWN_PARALLEL) {
1393  done = kmr_wait_for_watch(mr, spw, opt);
1394  } else {
1395  assert(0);
1396  done = -1;
1397  }
1398  if (done != -1) {
1399  assert(0 <= done && done < spw->n_spawns);
1400  struct kmr_spawn_state *s = &(spw->spawned[done]);
1401  s->timestamp[3] = MPI_Wtime();
1402  if (m != 0) {
1403  if (mr->spawn_pass_intercomm_in_argument
1404  && mode == KMR_SPAWN_INTERACT) {
1405  assert(mr->spawn_comms != 0);
1406  assert(mr->spawn_comms[done] == &(s->icomm));
1407  struct kmr_spawn_info si;
1408  kmr_spawn_info_put(&si, s, opt, arg);
1409  cc = (*m)(spw->ev[done], kvi, kvo, &si, done);
1410  if (cc != MPI_SUCCESS) {
1411  char ee[80];
1412  snprintf(ee, sizeof(ee),
1413  "Map-fn returned with error cc=%d", cc);
1414  kmr_error(mr, ee);
1415  }
1416  kmr_spawn_info_get(&si, s);
1417  } else {
1418  cc = (*m)(spw->ev[done], kvi, kvo, arg, done);
1419  if (cc != MPI_SUCCESS) {
1420  char ee[80];
1421  snprintf(ee, sizeof(ee),
1422  "Map-fn returned with error cc=%d", cc);
1423  kmr_error(mr, ee);
1424  }
1425  }
1426  }
1427  s->timestamp[4] = MPI_Wtime();
1428  if (s->icomm != MPI_COMM_NULL) {
1429  cc = kmr_free_comm_with_tracing(mr, spw, s);
1430  assert(cc == MPI_SUCCESS);
1431  }
1432  s->timestamp[5] = MPI_Wtime();
1433  assert(s->running);
1434  s->running = 0;
1435  spw->n_runnings -= s->count;
1436  if (kmr_ckpt_enabled(mr)) {
1437  kmr_ckpt_save_kvo_each_add(mr, kvo, done);
1438  }
1439  }
1440  return MPI_SUCCESS;
1441 }
1442 
1443 static int
1444 kmr_map_spawned_processes(enum kmr_spawn_mode mode, char *name,
1445  KMR_KVS *kvi, KMR_KVS *kvo, void *arg,
1446  MPI_Info info,
1447  struct kmr_spawn_option opt, kmr_mapfn_t mapfn)
1448 {
1449  kmr_assert_kvs_ok(kvi, kvo, 1, 0);
1450  assert(kvi->c.value_data == KMR_KV_OPAQUE
1451  || kvi->c.value_data == KMR_KV_CSTRING);
1452  assert(kvi->c.element_count <= INT_MAX);
1453  _Bool use_reply = (mode == KMR_SPAWN_INTERACT || mode == KMR_SPAWN_SERIAL);
1454  _Bool use_watch = (mode != KMR_SPAWN_INTERACT);
1455  KMR * const mr = kvi->c.mr;
1456  _Bool tracing5 = (mr->trace_map_spawn && (5 <= mr->verbosity));
1457  struct kmr_spawning spawning0;
1458  struct kmr_spawning *spw = &spawning0;
1459  char magic[20];
1460  char hostport[MAXHOSTNAMELEN + 10];
1461  int from = 0;
1462  int cc;
1463 
1464  if (use_watch) {
1465  int kcdc = kmr_ckpt_disable_ckpt(mr);
1466  cc = kmr_install_watch_program(mr, name);
1467  assert(cc == MPI_SUCCESS);
1468  kmr_ckpt_enable_ckpt(mr, kcdc);
1469  }
1470 
1471  int cnt = (int)kvi->c.element_count;
1472  memset(spw, 0, sizeof(struct kmr_spawning));
1473  spw->fn = name;
1474  spw->mode = mode;
1475  spw->n_spawns = cnt;
1476  spw->n_starteds = 0;
1477  spw->n_runnings = 0;
1478  spw->spawned = kmr_malloc(sizeof(struct kmr_spawn_state) * (size_t)spw->n_spawns);
1479  spw->ev = kmr_malloc(sizeof(struct kmr_kv_box) * (size_t)spw->n_spawns);
1480  spw->watch_listener = -1;
1481  spw->watch_host[0] = 0;
1482 
1483  int kcdc = kmr_ckpt_disable_ckpt(mr);
1484  cc = kmr_list_spawns(spw, kvi, info, opt);
1485  assert(cc == MPI_SUCCESS);
1486  kmr_ckpt_enable_ckpt(mr, kcdc);
1487 
1488  if (kmr_ckpt_enabled(mr)) {
1489  struct kmr_option kopt = kmr_noopt;
1490  if (opt.take_ckpt) {
1491  kopt.take_ckpt = 1;
1492  }
1493  if (kmr_ckpt_progress_init(kvi, kvo, kopt)) {
1494  if (kvo != 0) {
1495  /* No "keep_open" option (!opt.keep_open). */
1496  kmr_add_kv_done(kvo);
1497  }
1498  if (1) {
1499  /* No "inspect" option (!opt.inspect). */
1500  kmr_free_kvs(kvi);
1501  }
1502  return MPI_SUCCESS;
1503  }
1504  from = (int)kmr_ckpt_first_unprocessed_kv(mr);
1505  kmr_ckpt_save_kvo_each_init(mr, kvo);
1506  }
1507 
1508  if (use_reply) {
1509  assert(spw->replies == 0);
1510  spw->replies = kmr_malloc(sizeof(MPI_Request) * (size_t)spw->n_processes);
1511  for (int i = 0; i < spw->n_processes; i++) {
1512  spw->replies[i] = MPI_REQUEST_NULL;
1513  }
1514  }
1515  if (mode == KMR_SPAWN_PARALLEL) {
1516  assert(spw->watches == 0);
1517  spw->watches = kmr_malloc(sizeof(int) * (size_t)spw->n_processes);
1518  for (int i = 0; i < spw->n_processes; i++) {
1519  spw->watches[i] = -1;
1520  }
1521  }
1522  assert(mr->spawn_comms == 0);
1523  mr->spawn_size = spw->n_spawns;
1524  mr->spawn_comms = kmr_malloc(sizeof(MPI_Comm *) * (size_t)spw->n_spawns);
1525  for (int w = 0; w < spw->n_spawns; w++) {
1526  mr->spawn_comms[w] = &(spw->spawned[w].icomm);
1527  }
1528 
1529  int gap;
1530  if (mr->spawn_gap_msec[0] == 0) {
1531  gap = 0;
1532  } else {
1533  int usz = 0;
1534  unsigned int v = (unsigned int)spw->usize;
1535  while (v > 0) {
1536  v = (v >> 1);
1537  usz++;
1538  }
1539  gap = (int)((((long)mr->spawn_gap_msec[1] * usz) / 10)
1540  + mr->spawn_gap_msec[0]);
1541  }
1542 
1543  /* Spawn by each entry. */
1544 
1545  for (int w = from; w < spw->n_spawns; w++) {
1546  struct kmr_spawn_state *s = &(spw->spawned[w]);
1547 
1548  /* Wait while no more processes are available. */
1549 
1550  if ((spw->n_runnings + s->n_procs) > spw->spawn_limit) {
1551  while ((spw->n_runnings + s->n_procs) > spw->spawn_limit) {
1552  cc = kmr_wait_then_map(mr, spw,
1553  kvi, kvo, arg, opt, mapfn);
1554  assert(cc == MPI_SUCCESS);
1555  }
1556  if (gap != 0) {
1557  if (tracing5) {
1558  fprintf(stderr,
1559  ";;KMR [%05d] %s: sleeping for spawn gap"
1560  " (%d msec)\n",
1561  mr->rank, spw->fn, gap);
1562  fflush(0);
1563  }
1564  kmr_msleep(gap, 1);
1565  }
1566  }
1567 
1568  if (mode == KMR_SPAWN_PARALLEL) {
1569  cc = kmr_listen_to_watch(mr, spw, w);
1570  assert(cc == MPI_SUCCESS);
1571  } else {
1572  cc = snprintf(spw->watch_host, sizeof(spw->watch_host),
1573  "0");
1574  assert(cc < (int)sizeof(spw->watch_host));
1575  }
1576 
1577  {
1578  char **argv;
1579  int argc;
1580  if (use_watch) {
1581  argc = (s->argc + 5);
1582  argv = kmr_malloc(sizeof(char *) * (size_t)(argc + 1));
1583 
1584  cc = snprintf(hostport, sizeof(hostport),
1585  "%s/%d", spw->watch_host, s->watch_port);
1586  assert(cc < (int)sizeof(hostport));
1587 
1588  unsigned int vv = (unsigned int)random();
1589  cc = snprintf(magic, sizeof(magic), "%08xN%dV0%s",
1590  vv, w, ((mr->trace_map_spawn) ? "T1" : ""));
1591  assert(cc < (int)sizeof(magic));
1592 
1593  assert(mr->spawn_watch_program != 0);
1594  argv[0] = mr->spawn_watch_program;
1595  argv[1] = ((mode == KMR_SPAWN_SERIAL) ? "seq" : "mpi");
1596  argv[2] = hostport;
1597  argv[3] = magic;
1598  argv[4] = "--";
1599  for (int i = 0; i < s->argc; i++) {
1600  argv[5 + i] = s->argv[i];
1601  }
1602  argv[(s->argc + 5)] = 0;
1603  } else {
1604  argc = s->argc;
1605  argv = s->argv;
1606  }
1607  assert(argv[argc] == 0);
1608 
1609  if (tracing5) {
1610  char ee[160];
1611  kmr_make_pretty_argument_string(ee, sizeof(ee), argc, argv);
1612  fprintf(stderr, (";;KMR [%05d] %s [%d]: MPI_Comm_spawn"
1613  " (maxprocs=%d;%s)\n"),
1614  mr->rank, spw->fn, w, s->n_procs, ee);
1615  fflush(0);
1616  }
1617 
1618  s->timestamp[0] = MPI_Wtime();
1619 
1620  int nspawns;
1621  assert(s->icomm == MPI_COMM_NULL);
1622  int *ec = kmr_malloc(sizeof(int) * (size_t)s->n_procs);
1623  const int root = 0;
1624  MPI_Comm spawncomm = MPI_COMM_SELF;
1625  cc = MPI_Comm_spawn(argv[0], &(argv[1]), s->n_procs, info,
1626  root, spawncomm, &s->icomm, ec);
1627  assert(cc == MPI_SUCCESS || cc == MPI_ERR_SPAWN);
1628  if (cc == MPI_ERR_SPAWN) {
1629  /* SOFT-case. */
1630  nspawns = 0;
1631  for (int r = 0; r < s->n_procs; r++) {
1632  if (ec[r] == MPI_SUCCESS) {
1633  nspawns++;
1634  }
1635  }
1636  } else {
1637  nspawns = s->n_procs;
1638  }
1639  assert(nspawns > 0);
1640 
1641  s->timestamp[1] = MPI_Wtime();
1642 
1643  kmr_free(ec, (sizeof(int) * (size_t)s->n_procs));
1644  if (argv != s->argv) {
1645  kmr_free(argv, (sizeof(char *) * (size_t)(argc + 1)));
1646  }
1647  argv = 0;
1648 
1649  s->running = 1;
1650  s->index = spw->n_starteds;
1651  s->count = nspawns;
1652  spw->n_starteds += nspawns;
1653  spw->n_runnings += nspawns;
1654  }
1655 
1656  if (mode == KMR_SPAWN_PARALLEL) {
1657  cc = kmr_accept_on_watch(mr, spw, w);
1658  assert(cc == MPI_SUCCESS);
1659  }
1660 
1661  if (mr->spawn_disconnect_early && mode == KMR_SPAWN_PARALLEL) {
1662  if (s->icomm != MPI_COMM_NULL) {
1663  cc = kmr_free_comm_with_tracing(mr, spw, s);
1664  assert(cc == MPI_SUCCESS);
1665  }
1666  }
1667 
1668  if (mr->spawn_sync_at_startup && s->icomm != MPI_COMM_NULL) {
1669  int flag;
1670  cc = MPI_Comm_test_inter(s->icomm, &flag);
1671  assert(cc == MPI_SUCCESS && flag != 0);
1672  int peernprocs;
1673  cc = MPI_Comm_remote_size(s->icomm, &peernprocs);
1674  assert(cc == MPI_SUCCESS && peernprocs == s->count);
1675  }
1676 
1677  s->timestamp[2] = MPI_Wtime();
1678 
1679  if (use_reply) {
1680  cc = kmr_receive_for_reply(mr, spw, w,
1681  opt.reply_each, opt.reply_root);
1682  assert(cc == MPI_SUCCESS);
1683  }
1684  }
1685 
1686  while (spw->n_runnings > 0) {
1687  cc = kmr_wait_then_map(mr, spw,
1688  kvi, kvo, arg, opt, mapfn);
1689  assert(cc == MPI_SUCCESS);
1690  }
1691 
1692  if (tracing5) {
1693  for (int w = 0; w < spw->n_spawns; w++) {
1694  struct kmr_spawn_state *s = &(spw->spawned[w]);
1695  fprintf(stderr, (";;KMR [%05d] %s [%d/%d]"
1696  " timing:"
1697  " spawn=%f setup=%f run=%f mapfn=%f clean=%f"
1698  " (msec)\n"),
1699  mr->rank, spw->fn, w, spw->n_spawns,
1700  ((s->timestamp[1] - s->timestamp[0]) * 1e3),
1701  ((s->timestamp[2] - s->timestamp[1]) * 1e3),
1702  ((s->timestamp[3] - s->timestamp[2]) * 1e3),
1703  ((s->timestamp[4] - s->timestamp[3]) * 1e3),
1704  ((s->timestamp[5] - s->timestamp[4]) * 1e3));
1705  fflush(0);
1706  }
1707  }
1708 
1709  assert(mr->spawn_comms != 0);
1710  mr->spawn_size = 0;
1711  kmr_free(mr->spawn_comms, (sizeof(MPI_Comm *) * (size_t)spw->n_spawns));
1712  mr->spawn_comms = 0;
1713 
1714  for (int w = 0; w < spw->n_spawns; w++) {
1715  struct kmr_spawn_state *s = &(spw->spawned[w]);
1716  assert(s->icomm == MPI_COMM_NULL);
1717  assert(s->abuf != 0);
1718  kmr_free(s->abuf, s->alen);
1719  s->abuf = 0;
1720  assert(s->argv0 != 0);
1721  kmr_free(s->argv0, (sizeof(char *) * (size_t)(s->argc0 + 1)));
1722  s->argv0 = 0;
1723  }
1724 
1725  assert(spw->ev != 0);
1726  kmr_free(spw->ev, (sizeof(struct kmr_kv_box) * (size_t)spw->n_spawns));
1727  spw->ev = 0;
1728  assert(spw->spawned != 0);
1729  kmr_free(spw->spawned, (sizeof(struct kmr_spawn_state) * (size_t)spw->n_spawns));
1730  spw->spawned = 0;
1731 
1732  if (use_reply) {
1733  assert(spw->replies != 0);
1734  for (int i = 0; i < spw->n_processes; i++) {
1735  assert(spw->replies[i] == MPI_REQUEST_NULL);
1736  }
1737  kmr_free(spw->replies, (sizeof(MPI_Request) * (size_t)spw->n_processes));
1738  spw->replies = 0;
1739  }
1740  if (mode == KMR_SPAWN_PARALLEL) {
1741  assert(spw->watches != 0);
1742  for (int i = 0; i < spw->n_processes; i++) {
1743  assert(spw->watches[i] == -1);
1744  }
1745  kmr_free(spw->watches, (sizeof(int) * (size_t)spw->n_processes));
1746  spw->watches = 0;
1747  }
1748 
1749  assert(spw->watch_listener == -1);
1750 
1751  if (kmr_ckpt_enabled(mr)) {
1752  kmr_ckpt_save_kvo_each_fin(mr, kvo);
1753  }
1754 
1755  if (kvo != 0) {
1756  /* No "keep_open" option (!opt.keep_open). */
1757  kmr_add_kv_done(kvo);
1758  }
1759  if (1) {
1760  /* No "inspect" option (!opt.inspect). */
1761  kmr_free_kvs(kvi);
1762  }
1763 
1764  if (kmr_ckpt_enabled(mr)) {
1766  }
1767 
1768  return MPI_SUCCESS;
1769 }
1770 
1771 /** Sends a reply message in the spawned process, which tells it is
1772  ready to finish and may have some data to send to the spawner in
1773  kmr_map_via_spawn(). */
1774 
1775 int
1777 {
1778  int cc;
1779  MPI_Comm ic = MPI_COMM_NULL;
1780  cc = MPI_Comm_get_parent(&ic);
1781  assert(cc == MPI_SUCCESS);
1782  if (ic == MPI_COMM_NULL) {
1783  kmr_error(mr, ("kmr_reply_to_spawner:"
1784  " may be called in a not-spawned process"));
1785  }
1786  int peer = 0;
1787  cc = MPI_Send(0, 0, MPI_BYTE, peer, KMR_TAG_SPAWN_REPLY, ic);
1788  assert(cc == MPI_SUCCESS);
1789  return MPI_SUCCESS;
1790 }
1791 
1792 /** Obtains (a reference to) a parent inter-communicator of a spawned
1793  process. It is used inside a map-function of kmr_map_via_spawn();
1794  Pass INDEX the same argument to a map-function. It returns a
1795  reference for the side-effect of freeing a communicator in a
1796  map-function. */
1797 
1798 MPI_Comm *
1800 {
1801  if (mr->spawn_comms == 0) {
1802  kmr_error(mr, ("kmr_get_spawner_communicator() be called"
1803  " outside of kmr_map_via_spawn()"));
1804  }
1805  if (index >= mr->spawn_size) {
1806  kmr_error(mr, ("kmr_get_spawner_communicator() be called"
1807  " with index out of range"));
1808  }
1809  MPI_Comm *comm = mr->spawn_comms[index];
1810  return comm;
1811 }
1812 
1813 int
1814 kmr_get_spawner_communicator_ff(KMR *mr, long ii, int *comm)
1815 {
1816  MPI_Comm *c = kmr_get_spawner_communicator(mr, ii);
1817  *comm = MPI_Comm_c2f(*c);
1818  return MPI_SUCCESS;
1819 }
1820 
1821 /** Maps on processes started by MPI_Comm_spawn(). It is intended to
1822  run custom MPI programs which will return a reply as MPI messages.
1823  Consider other variations to run independent processes, when the
1824  spawned processes will not interact with the parent:
1825  kmr_map_processes() or kmr_map_ms_commands().\n The spawner
1826  (parent) spawns processes specified by key-value pairs. The key
1827  part is ignored, and the value part is a list of null-separated
1828  strings which constitutes a command and arguments. The option
1829  SEPARATOR_SPACE changes the separator character to whitespaces.
1830  If the first string is "maxprocs=n", then the number of processes
1831  is taken from this string. Or, an MPI_Info entry "maxprocs" in
1832  INFO is used, and "maxprocs" is common to all spawns. It is an
1833  error if neither is specified. The multile spawners (more than
1834  one ranks can have entries to spawn) divide the universe of
1835  processes evenly among them, and tries to control the number of
1836  the simultaneously running processes in the range.\n The option
1837  REPLY_EACH or REPLY_ROOT lets the spawner wait for the reply
1838  messages from the spawned processes, and then the spawner calls
1839  the map-function. A reply message is of the tag
1840  KMR_TAG_SPAWN_REPLY=500 and length zero, and
1841  kmr_reply_to_spawner() can be used to send this reply. When none
1842  of REPLY_EACH or REPLY_ROOT are specified, the spawner immediately
1843  calls the map-function one-by-one in the FIFO order (before the
1844  spawned processes finish). In that case, no load-balance is
1845  taken. The map-function should wait for the spawned processes to
1846  finish, otherwise, the spawner starts next spawns continuously and
1847  runs out the processes, which causes the MPI runtime to signal an
1848  error.\n Communication between the spawned processes and the
1849  map-function of the spawner is through the inter-communicator.
1850  The parent inter-communicator of the spawned processes can be
1851  taken by MPI_Comm_get_parent() as usual. The inter-communicator
1852  at the spawner side can be obtained by calling
1853  kmr_get_spawner_communicator() inside a map-function.\n The INFO
1854  argument is passed to MPI_Comm_spawn() unchanged.\n NOTE: There is
1855  no way to check the availability of processes for spawning in the
1856  MPI specification and MPI implementations. And, the MPI runtime
1857  signals errors when it runs out the processes. Thus, it puts a
1858  sleep (1 sec) in between MPI_Comm_spawn() calls to allow clean-ups
1859  in the MPI runtime and to avoid timing issues.\n INTERFACE CHANGE:
1860  Set mr->spawn_pass_intercomm_in_argument=1 to enables the old
1861  interface, where the map-function MAPFN is called with the
1862  kmr_spawn_state structure as the general argument. The argument
1863  ARG passed to the mapper is stored in the MAPARG slot in the
1864  kmr_spawn_state structure.
1865  When TAKE_CKPT option is specified, a checkpoint data file of the
1866  output key-value stream is saved if both CKPT_ENABLE and
1867  CKPT_SELECTIVE global options are set. */
1868 
1869 int
1870 kmr_map_via_spawn(KMR_KVS *kvi, KMR_KVS *kvo, void *arg,
1871  MPI_Info info, struct kmr_spawn_option opt,
1872  kmr_mapfn_t mapfn)
1873 {
1874  int cc = kmr_map_spawned_processes(KMR_SPAWN_INTERACT,
1875  "kmr_map_via_spawn",
1876  kvi, kvo, arg, info, opt, mapfn);
1877  return cc;
1878 }
1879 
1880 /** Maps on processes started by MPI_Comm_spawn() to run independent
1881  MPI processes, which will not communicate to the parent. The
1882  programs need to be MPI. It is a variation of
1883  kmr_map_via_spawn(), and refer to the comment on it for the basic
1884  usage. Since the spawned program does not know the parent, there
1885  is no way to communicate from the spawner. The map-function is
1886  called after the processes have exited, so that the map-function
1887  can check the result files created by the spawned processes.\n
1888  This function detects the end of spawned processes using a
1889  watch-program "kmrwatch0", by checking a closure of a socket to
1890  which "kmrwatch0" connected.\n NOTE THAT THIS OPERATION WILL BLOCK
1891  INDEFINITELY AND FAIL, DEPENDING ON THE BEHAVIOR OF AN MPI
1892  IMPLEMENTATION. It is checked to work with Open MPI (1.6) and
1893  MPICH2 (1.5), but not with Intel MPI (4.1) and YAMPI2 (GridMPI
1894  2.1). It depends on the behavior that MPI_Comm_free() on the
1895  parent and MPI_Finalize() on the child do not synchronize. The
1896  quote of the standard (MPI 2.x) says: "Though collective,
1897  MPI_Comm_free is anticipated that this operation will normally be
1898  implemented to be local, ..." The blocking situation can be
1899  checked by enabling tracing around calls to MPI_Comm_free() by
1900  (mr->trace_map_spawn=1).\n NOTE (on MPI spawn implementations):
1901  Open MPI (1.6) allows to spawn non-MPI processes by passing an
1902  special MPI_Info. MPICH2 (1.5) does not allow to spawn non-MPI
1903  processes, because MPI_Comm_spawn() of the parent and MPI_Init()
1904  of the child synchronize. In Intel MPI (4.1) and YAMPI2
1905  (GridMPI), the calls of MPI_Comm_free() on the parent and
1906  MPI_Finalize() or MPI_Comm_free() on the child synchronize, and
1907  thus, they require to call MPI_Comm_free() at an appropriate time
1908  on the parent.\n Options REPLY_ROOT and REPLY_EACH have no
1909  effect.
1910  When TAKE_CKPT option is specified, a checkpoint data file of the
1911  output key-value stream is saved if both CKPT_ENABLE and
1912  CKPT_SELECTIVE global options are set. */
1913 
1914 int
1916  MPI_Info info, struct kmr_spawn_option opt,
1917  kmr_mapfn_t mapfn)
1918 {
1919  struct kmr_spawn_option ssopt = opt;
1920  ssopt.reply_root = 0;
1921  ssopt.reply_each = 1;
1922  int cc = kmr_map_spawned_processes(KMR_SPAWN_PARALLEL,
1923  "kmr_map_parallel_processes",
1924  kvi, kvo, arg, info, ssopt, mapfn);
1925  return cc;
1926 }
1927 
1928 /** Maps on processes started by MPI_Comm_spawn() to run serial
1929  processes. This should NOT be used; Use kmr_map_ms_commands(),
1930  instead. Fork-execing in kmr_map_ms_commands() is simpler than
1931  spawning. See also the comment on kmr_map_via_spawn() and
1932  kmr_map_parallel_processes(). The map-function is called after
1933  the processes have exited, thus, there is no way to communicate
1934  from the map-function. Instead, the map-function can check the
1935  result files created by the spawned processes.\n This function
1936  detects the end of spawned processes using a watch-program
1937  "kmrwatch0" which sends a reply to the parent in place of the
1938  serial program. Options REPLY_ROOT and REPLY_EACH have no
1939  effect.
1940  When TAKE_CKPT option is specified, a checkpoint data file of the
1941  output key-value stream is saved if both CKPT_ENABLE and
1942  CKPT_SELECTIVE global options are set. */
1943 
1944 int
1946  MPI_Info info, struct kmr_spawn_option opt,
1947  kmr_mapfn_t mapfn)
1948 {
1949  struct kmr_spawn_option ssopt = opt;
1950  ssopt.reply_root = 0;
1951  ssopt.reply_each = 1;
1952  int cc = kmr_map_spawned_processes(KMR_SPAWN_SERIAL,
1953  "kmr_map_serial_processes",
1954  kvi, kvo, arg, info, ssopt, mapfn);
1955  return cc;
1956 }
1957 
1958 /** Maps on processes started by MPI_Comm_spawn() to run independent
1959  processes. It either calls kmr_map_parallel_processes() or
1960  kmr_map_serial_processes() with regard to the NONMPI argument.
1961  See the comments of kmr_map_parallel_processes() and
1962  kmr_map_serial_processes(). */
1963 
1964 int
1965 kmr_map_processes(_Bool nonmpi, KMR_KVS *kvi, KMR_KVS *kvo, void *arg,
1966  MPI_Info info, struct kmr_spawn_option opt,
1967  kmr_mapfn_t mapfn)
1968 {
1969  KMR *mr = kvi->c.mr;
1970  if (opt.reply_root || opt.reply_each) {
1971  kmr_error(mr, "kmr_map_processes:"
1972  " options REPLY_ROOT/REPLY_EACH not allowed");
1973  }
1974 
1975  struct kmr_spawn_option ssopt = opt;
1976  ssopt.reply_root = 0;
1977  ssopt.reply_each = 1;
1978  if (nonmpi) {
1979  int cc = kmr_map_spawned_processes(KMR_SPAWN_SERIAL,
1980  "kmr_map_processes",
1981  kvi, kvo, arg, info, ssopt, mapfn);
1982  return cc;
1983  } else {
1984  int cc = kmr_map_spawned_processes(KMR_SPAWN_PARALLEL,
1985  "kmr_map_processes",
1986  kvi, kvo, arg, info, ssopt, mapfn);
1987  return cc;
1988  }
1989 }
1990 
1991 /* Creates a dummy context in spawned processes. It only be used to
1992  make KVS for adding elements. */
1993 
1994 KMR *
1995 kmr_create_dummy_context(void)
1996 {
1997  KMR *mr = kmr_create_context(MPI_COMM_SELF, MPI_INFO_NULL, 0);
1998  return mr;
1999 }
2000 
2001 /** Sends the KVS from a spawned process to the map-function of the
2002  spawner. It is paired with kmr_receive_kvs_from_spawned_fn(). */
2003 
2004 int
2006 {
2007  int cc;
2008  MPI_Comm ic = MPI_COMM_NULL;
2009  cc = MPI_Comm_get_parent(&ic);
2010  assert(cc == MPI_SUCCESS);
2011  if (ic == MPI_COMM_NULL) {
2012  kmr_error(mr, ("kmr_send_kvs_to_spawner:"
2013  " may be called in a not-spawned process"));
2014  }
2015  void *data = 0;
2016  size_t sz = 0;
2017  cc = kmr_save_kvs(kvs, &data, &sz, kmr_noopt);
2018  assert(cc == MPI_SUCCESS && data != 0 && sz != 0);
2019  int siz = (int)sz;
2020  int peer = 0;
2021  cc = MPI_Send(&siz, 1, MPI_INT, peer, KMR_TAG_SPAWN_REPLY1, ic);
2022  assert(cc == MPI_SUCCESS);
2023  cc = MPI_Send(data, (int)sz, MPI_BYTE, peer, KMR_TAG_SPAWN_REPLY1, ic);
2024  assert(cc == MPI_SUCCESS);
2025  free(data);
2026  return MPI_SUCCESS;
2027 }
2028 
2029 /** Collects key-value pairs generated by spawned processes. It is a
2030  map-function to be used with kmr_map_via_spawn() with the
2031  REPLY_EACH option. The spawned processes call
2032  kmr_send_kvs_to_spawner() to send generated key-value pairs, and
2033  this function receives and puts them into KVO. PROTOCOL: The
2034  reply consists of one or two messages with the tag
2035  KMR_TAG_SPAWN_REPLY1=501. One is the data size, which is followed
2036  by a marshaled key-value stream when the data size is non-zero. */
2037 
2038 int
2040  const KMR_KVS *kvi, KMR_KVS *kvo, void *arg,
2041  const long index)
2042 {
2043  _Bool replyeach = 1;
2044  assert(kvi != 0);
2045  KMR * const mr = kvi->c.mr;
2046  MPI_Comm *icommr = kmr_get_spawner_communicator(mr, index);
2047  assert(icommr != 0);
2048  MPI_Comm icomm = *icommr;
2049  int cc;
2050  int peernprocs;
2051  cc = MPI_Comm_remote_size(icomm, &peernprocs);
2052  assert(cc == MPI_SUCCESS);
2053  int npeers = (replyeach ? peernprocs : 1);
2054  for (int peerrank = 0; peerrank < npeers; peerrank++) {
2055  assert(kvo != 0);
2056  MPI_Status st;
2057  int sz;
2058  cc = MPI_Recv(&sz, 1, MPI_INT,
2059  peerrank, KMR_TAG_SPAWN_REPLY1,
2060  icomm, &st);
2061  assert(cc == MPI_SUCCESS);
2062  if (sz == 0) {
2063  continue;
2064  }
2065  void *data = kmr_malloc((size_t)sz);
2066  cc = MPI_Recv(data, sz, MPI_BYTE,
2067  peerrank, KMR_TAG_SPAWN_REPLY1,
2068  icomm, &st);
2069  assert(cc == MPI_SUCCESS);
2070  KMR_KVS *kvx = kmr_create_kvs(mr, KMR_KV_BAD, KMR_KV_BAD);
2071  cc = kmr_restore_kvs(kvx, data, (size_t)sz, kmr_noopt);
2072  assert(cc == MPI_SUCCESS);
2073  struct kmr_option keepopen = {.keep_open = 1};
2074  cc = kmr_map(kvx, kvo, 0, keepopen, kmr_add_identity_fn);
2075  assert(cc == MPI_SUCCESS);
2076  kmr_free(data, (size_t)sz);
2077  }
2078  return MPI_SUCCESS;
2079 }
2080 
2082  struct kmr_spawn_option opt;
2083  void *arg;
2084  kmr_mapfn_t fn;
2085 };
2086 
2087 /** Runs commands in kmr_map_ms_commands(). */
2088 
2089 static int
2091  const KMR_KVS *kvi, KMR_KVS *kvo, void *arg,
2092  const long index)
2093 {
2094  char *name = "kmr_map_ms_commands";
2095  KMR *mr = kvi->c.mr;
2096  _Bool tracing5 = (mr->trace_map_ms && (5 <= mr->verbosity));
2097  struct kmr_map_ms_commands_argument *xarg = arg;
2098  struct kmr_spawn_option opt = xarg->opt;
2099  int cc;
2100  char *abuf = kmr_malloc((size_t)kv.vlen);
2101  memcpy(abuf, kv.v.p, (size_t)kv.vlen);
2102  int argc;
2103  char *argv[256];
2104  const int maxargc = (sizeof(argv) / sizeof(*argv));
2105  cc = kmr_scan_argv_strings(mr, abuf, (size_t)kv.vlen, maxargc,
2106  &argc, argv,
2107  opt.separator_space, name);
2108  assert(cc == MPI_SUCCESS);
2109  argv[argc] = 0;
2110 
2111  if (tracing5) {
2112  char ss[160];
2113  kmr_make_pretty_argument_string(ss, sizeof(ss), argc, argv);
2114  fprintf(stderr,
2115  ";;KMR [%05d] %s: fork-exec: %s\n",
2116  mr->rank, name, ss);
2117  fflush(0);
2118  }
2119 
2120  int closefds;
2121  if (mr->keep_fds_at_fork) {
2122  closefds = 0;
2123  } else {
2124  closefds = kmr_getdtablesize(mr);
2125  }
2126 
2127  int pid = fork();
2128  if (pid == -1) {
2129  char ee[80];
2130  char *m = strerror(errno);
2131  snprintf(ee, sizeof(ee), "%s: fork() failed: %s",
2132  name, m);
2133  kmr_error(mr, ee);
2134  } else {
2135  if (pid == 0) {
2136  for (int fd = 3; fd < closefds; fd++) {
2137  close(fd);
2138  }
2139  cc = execvp(argv[0], argv);
2140  char ss[160];
2141  kmr_make_pretty_argument_string(ss, sizeof(ss), argc, argv);
2142  if (cc == -1) {
2143  char ee[80];
2144  char *m = strerror(errno);
2145  snprintf(ee, sizeof(ee), "%s: execvp(%s) failed: %s",
2146  name, ss, m);
2147  kmr_error(mr, ee);
2148  } else {
2149  char ee[80];
2150  snprintf(ee, sizeof(ee), "%s: execvp(%s) returned with cc=%d",
2151  name, ss, cc);
2152  kmr_error(mr, ee);
2153  }
2154  } else {
2155  int st;
2156  cc = waitpid(pid, &st, 0);
2157  if (cc == -1) {
2158  if (errno == EINTR) {
2159  char ee[80];
2160  snprintf(ee, sizeof(ee), "%s: waitpid() interrupted",
2161  name);
2162  kmr_warning(mr, 1, ee);
2163  } else {
2164  char ee[80];
2165  char *m = strerror(errno);
2166  snprintf(ee, sizeof(ee), "%s: waitpid() failed: %s",
2167  name, m);
2168  kmr_warning(mr, 1, ee);
2169  }
2170  }
2171  }
2172  }
2173 
2174  if (tracing5) {
2175  char ss[160];
2176  kmr_make_pretty_argument_string(ss, sizeof(ss), argc, argv);
2177  fprintf(stderr,
2178  ";;KMR [%05d] %s: fork-exec done: %s\n",
2179  mr->rank, name, ss);
2180  fflush(0);
2181  }
2182 
2183  cc = (*xarg->fn)(kv, kvi, kvo, xarg->arg, index);
2184  assert(cc == MPI_SUCCESS);
2185 
2186  kmr_free(abuf, (size_t)kv.vlen);
2187  return MPI_SUCCESS;
2188 }
2189 
2190 /** Maps in master-slave mode, specialized to run serial commands. It
2191  fork-execs commands specified by key-values, then calls a
2192  map-function at finishes of the commands. It takes the commands
2193  in the same way as kmr_map_via_spawn(). The commands never be MPI
2194  programs. It is implemented with kmr_map_ms(); see the comments
2195  on kmr_map_ms(). */
2196 
2197 int
2199  void *arg, struct kmr_option opt,
2200  struct kmr_spawn_option sopt, kmr_mapfn_t m)
2201 {
2202  int cc;
2203  struct kmr_map_ms_commands_argument xarg = {
2204  .arg = arg,
2205  .opt = sopt,
2206  .fn = m
2207  };
2208  cc = kmr_map_ms(kvi, kvo, &xarg, opt, kmr_map_ms_fork_exec_command);
2209  return cc;
2210 }
2211 
2212 /*
2213 Copyright (C) 2012-2016 RIKEN AICS
2214 This library is distributed WITHOUT ANY WARRANTY. This library can be
2215 redistributed and/or modified under the terms of the BSD 2-Clause License.
2216 */
int kmr_ckpt_progress_init(KMR_KVS *, KMR_KVS *, struct kmr_option)
It initializes a progress of MapReduce checkpointing.
Definition: kmrckpt.c:2753
static int kmr_map_ms_fork_exec_command(const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long index)
Runs commands in kmr_map_ms_commands().
Definition: kmrmapms.c:2090
Key-Value Stream (abstract).
Definition: kmr.h:587
static int kmr_map_master(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Delivers key-value pairs as requested.
Definition: kmrmapms.c:70
Utilities Private Part (do not include from applications).
Options to Mapping, Shuffling, and Reduction.
Definition: kmr.h:613
void kmr_ckpt_progress_fin(KMR *)
It finalizes the progress of MapReduce checkpointing.
Definition: kmrckpt.c:2845
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
Definition: kmrbase.c:751
void kmr_ckpt_save_kvo_each_init(KMR *, KMR_KVS *)
It initializes saving indexed key-value pairs of the output KVS to a checkpoint data file...
Definition: kmrckpt.c:2703
Spawning Info.
Definition: kmr.h:712
void kmr_ckpt_save_kvo_each_fin(KMR *, KMR_KVS *)
It finalizes saving indexed key-value pairs of the output KVS to the checkpoint data file...
Definition: kmrckpt.c:2733
#define kmr_malloc(Z)
Allocates memory, or aborts when failed.
Definition: kmrimpl.h:177
State of Spawner.
Definition: kmrmapms.c:392
#define kmr_create_kvs(MR, KF, VF)
Makes a new key-value stream (of type KMR_KVS) with the specified field datatypes.
Definition: kmr.h:71
int kmr_save_kvs(KMR_KVS *kvi, void **dataq, size_t *szq, struct kmr_option opt)
Packs locally the contents of a key-value stream to a byte array.
Definition: kmrbase.c:968
int kmr_ckpt_enabled(KMR *)
Check if checkpoint/restart is enabled.
Definition: kmrckpt.c:2478
long kmr_ckpt_first_unprocessed_kv(KMR *)
It returns the index of the first unprocessed key-value in the input KVS.
Definition: kmrckpt.c:2535
int kmr_add_kv_done(KMR_KVS *kvs)
Marks finished adding key-value pairs.
Definition: kmrbase.c:881
Definition: kmr.h:348
KMR Context.
Definition: kmr.h:222
void kmr_ckpt_save_kvo_each_add(KMR *, KMR_KVS *, long)
It adds new key-value pairs of the output KVS to the checkpoint data file.
Definition: kmrckpt.c:2718
int kmr_free_kvs(KMR_KVS *kvs)
Releases a key-value stream (type KMR_KVS).
Definition: kmrbase.c:621
int kmr_map_processes(_Bool nonmpi, KMR_KVS *kvi, KMR_KVS *kvo, void *arg, MPI_Info info, struct kmr_spawn_option opt, kmr_mapfn_t mapfn)
Maps on processes started by MPI_Comm_spawn() to run independent processes.
Definition: kmrmapms.c:1965
int kmr_map_via_spawn(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, MPI_Info info, struct kmr_spawn_option opt, kmr_mapfn_t mapfn)
Maps on processes started by MPI_Comm_spawn().
Definition: kmrmapms.c:1870
kmr_kv_field
Datatypes of Keys or Values.
Definition: kmr.h:325
#define kmr_map(KVI, KVO, ARG, OPT, M)
Maps simply.
Definition: kmr.h:82
Handy Copy of a Key-Value Field.
Definition: kmr.h:358
Options to Mapping by Spawns.
Definition: kmr.h:662
State during kmr_map_ms().
Definition: kmr.h:408
int kmr_ckpt_disable_ckpt(KMR *)
It temporally disables checkpoint/restart.
Definition: kmrckpt.c:2494
State of each Spawning.
Definition: kmrmapms.c:371
int kmr_map_ms(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Maps in master-slave mode.
Definition: kmrmapms.c:310
#define kmr_realloc(P, Z)
Allocates memory, or aborts when failed.
Definition: kmrimpl.h:181
int kmr_send_kvs_to_spawner(KMR *mr, KMR_KVS *kvs)
Sends the KVS from a spawned process to the map-function of the spawner.
Definition: kmrmapms.c:2005
int kmr_restore_kvs(KMR_KVS *kvo, void *data, size_t sz, struct kmr_option opt)
Unpacks locally the contents of a key-value stream from a byte array.
Definition: kmrbase.c:1034
MPI_Comm * kmr_get_spawner_communicator(KMR *mr, long index)
Obtains (a reference to) a parent inter-communicator of a spawned process.
Definition: kmrmapms.c:1799
KMR Interface.
int kmr_add_identity_fn(const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long i)
Adds a given key-value pair unmodified.
Definition: kmrbase.c:937
int kmr_getdtablesize(KMR *mr)
Does getdtablesize(); it is defined, because it is not Posix.
Definition: kmrutil.c:635
int kmr_reply_to_spawner(KMR *mr)
Sends a reply message in the spawned process, which tells it is ready to finish and may have some dat...
Definition: kmrmapms.c:1776
int kmr_replicate(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Replicates key-value pairs to be visible on all ranks, that is, it has the effect of bcast or all-gat...
Definition: kmrbase.c:2182
static int kmr_map_slave(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Asks the master for a task, then calls a map-function.
Definition: kmrmapms.c:195
int kmr_map_parallel_processes(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, MPI_Info info, struct kmr_spawn_option opt, kmr_mapfn_t mapfn)
Maps on processes started by MPI_Comm_spawn() to run independent MPI processes, which will not commun...
Definition: kmrmapms.c:1915
static struct kmr_kv_box kmr_pick_kv(struct kmr_kvs_entry *e, KMR_KVS *kvs)
Returns a handle to a key-value entry – a reverse of kmr_poke_kv().
Definition: kmrimpl.h:551
int kmr_reduce_as_one(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_redfn_t r)
Calls a reduce-function once as if all key-value pairs had the same key.
Definition: kmrbase.c:2625
int kmr_receive_kvs_from_spawned_fn(const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long index)
Collects key-value pairs generated by spawned processes.
Definition: kmrmapms.c:2039
int kmr_map_ms_commands(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, struct kmr_spawn_option sopt, kmr_mapfn_t m)
Maps in master-slave mode, specialized to run serial commands.
Definition: kmrmapms.c:2198
int kmr_ckpt_enable_ckpt(KMR *, int)
It temporally enables checkpoint/restart which has been disabled by calling kmr_ckpt_disable_ckpt().
Definition: kmrckpt.c:2515
int kmr_map_serial_processes(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, MPI_Info info, struct kmr_spawn_option opt, kmr_mapfn_t mapfn)
Maps on processes started by MPI_Comm_spawn() to run serial processes.
Definition: kmrmapms.c:1945
int(* kmr_mapfn_t)(const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long index)
Map-function Type.
Definition: kmr.h:689
KMR * kmr_create_context(const MPI_Comm comm, const MPI_Info conf, const char *name)
Makes a new KMR context (a context has type KMR).
Definition: kmrbase.c:147