KMR
test5.c
1 /* test5.c (2014-02-04) */
2 
3 /* Check spawning mappers. */
4 
5 /* Run it with four or more dynamic processes. */
6 
7 #include <mpi.h>
8 #include <stdio.h>
9 #include <stdlib.h>
10 #include <unistd.h>
11 #include <fcntl.h>
12 #include <limits.h>
13 #include <sys/types.h>
14 #include <sys/stat.h>
15 #include <sys/time.h>
16 #include <assert.h>
17 #ifdef _OPENMP
18 #include <omp.h>
19 #endif
20 
21 #include "kmr.h"
22 #include "kmrimpl.h"
23 
24 _Bool skipmpiwork = 0;
25 
26 static int
27 empty_map_fn_seq(const struct kmr_kv_box kv, const KMR_KVS *kvi,
28  KMR_KVS *kvo, void *p, long ii)
29 {
30  fflush(0);
31  usleep(50 * 1000);
32  printf("test4:empty-map-fn[%d]: called.\n", (int)ii);
33  fflush(0);
34  sleep(3);
35  return MPI_SUCCESS;
36 }
37 
38 static int
39 empty_map_fn_mpi_noreply(const struct kmr_kv_box kv, const KMR_KVS *kvi,
40  KMR_KVS *kvo, void *p, long ii)
41 {
42  KMR *mr = kmr_get_context_of_kvs(kvi);
43  MPI_Comm *pp = kmr_get_spawner_communicator(mr, ii);
44  MPI_Comm ic = *pp;
45  fflush(0);
46  usleep(50 * 1000);
47  if (sizeof(int) != sizeof(void *) && sizeof(ic) == sizeof(void *)) {
48  printf("test4:empty-map-fn[%d]: sleeping 12 sec (icomm=%p)...\n",
49  (int)ii, (void *)ic);
50  } else {
51  printf("test4:empty-map-fn[%d]: sleeping 12 sec (icomm=%d)...\n",
52  (int)ii, (int)ic);
53  }
54  fflush(0);
55  sleep(12);
56  return MPI_SUCCESS;
57 }
58 
59 static int
60 empty_map_fn_mpi_with_reply(const struct kmr_kv_box kv, const KMR_KVS *kvi,
61  KMR_KVS *kvo, void *p, long ii)
62 {
63  KMR *mr = kmr_get_context_of_kvs(kvi);
64  MPI_Comm *pp = kmr_get_spawner_communicator(mr, ii);
65  MPI_Comm ic = *pp;
66  fflush(0);
67  usleep(50 * 1000);
68  if (sizeof(int) != sizeof(void *) && sizeof(ic) == sizeof(void *)) {
69  printf("test4:empty-map-fn[%d]: called (icomm=%p).\n",
70  (int)ii, (void *)ic);
71  } else {
72  printf("test4:empty-map-fn[%d]: called (icomm=%d).\n",
73  (int)ii, (int)ic);
74  }
75  fflush(0);
76  return MPI_SUCCESS;
77 }
78 
79 static void
80 simple0(int nprocs, int rank)
81 {
82  KMR *mr = kmr_create_context(MPI_COMM_WORLD, MPI_INFO_NULL, 0);
83  mr->trace_map_spawn = 1;
84  mr->spawn_max_processes = 4;
85 
86  MPI_Barrier(MPI_COMM_WORLD);
87  usleep(50 * 1000);
88  if (rank == 0) {printf("** CHECK kmr_map_via_spawn...\n");}
89  fflush(0);
90  usleep(50 * 1000);
91 
92  if (1) {
93  MPI_Barrier(MPI_COMM_WORLD);
94  usleep(50 * 1000);
95  if (rank == 0) {printf("* kmr_map_via_spawn"
96  " WITH RETURNING KVS...\n");}
97  if (rank == 0) {printf("Spawn 2-rank work 4 times"
98  " using %d dynamic processes.\n",
99  mr->spawn_max_processes);}
100  fflush(0);
101  usleep(50 * 1000);
102 
103  KMR_KVS *kvs00 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
104  if (mr->rank == 0) {
105  char kbuf[256];
106  char vbuf[256];
107  snprintf(kbuf, sizeof(kbuf), "key");
108  snprintf(vbuf, sizeof(vbuf),
109  "maxprocs=2 ./a.out mpi returnkvs a0 a1 a2");
110  struct kmr_kv_box nkv = {
111  .klen = (int)(strlen(kbuf) + 1),
112  .vlen = (int)(strlen(vbuf) + 1),
113  .k.p = kbuf,
114  .v.p = vbuf};
115  for (int i = 0; i < 4; i++) {
116  kmr_add_kv(kvs00, nkv);
117  }
118  }
119  kmr_add_kv_done(kvs00);
120  KMR_KVS *kvs01 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
121  struct kmr_spawn_option opt = {.reply_each = 1,
122  .separator_space = 1};
123  kmr_map_via_spawn(kvs00, kvs01, 0,
124  MPI_INFO_NULL, opt, kmr_receive_kvs_from_spawned_fn);
125  /*kmr_dump_kvs(kvs01, 0);*/
126  if (mr->rank == 0) {
127  assert(kvs01->c.element_count == 32);
128  } else {
129  assert(kvs01->c.element_count == 0);
130  }
131  kmr_free_kvs(kvs01);
132  }
133 
134  kmr_free_context(mr);
135 }
136 
137 static void
138 simple1(int nprocs, int rank)
139 {
140  KMR *mr = kmr_create_context(MPI_COMM_WORLD, MPI_INFO_NULL, 0);
141  mr->trace_map_spawn = 1;
142  mr->spawn_max_processes = 4;
143 
144  MPI_Barrier(MPI_COMM_WORLD);
145  usleep(50 * 1000);
146  if (rank == 0) {printf("** CHECK kmr_map_via_spawn...\n");}
147  fflush(0);
148  usleep(50 * 1000);
149 
150  if (1) {
151  MPI_Barrier(MPI_COMM_WORLD);
152  usleep(50 * 1000);
153  if (rank == 0) {printf("* kmr_map_via_spawn"
154  " WAITING IN MAP-FN...\n");}
155  if (rank == 0) {printf("Spawn 2-rank work 4 times"
156  " using %d dynamic processes.\n",
157  mr->spawn_max_processes);}
158  fflush(0);
159  usleep(50 * 1000);
160 
161  KMR_KVS *kvs00 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
162  if (mr->rank == 0) {
163  char kbuf[256];
164  char vbuf[256];
165  snprintf(kbuf, sizeof(kbuf), "key");
166  snprintf(vbuf, sizeof(vbuf),
167  "maxprocs=2 ./a.out mpi noreply a0 a1 a2");
168  struct kmr_kv_box nkv = {
169  .klen = (int)(strlen(kbuf) + 1),
170  .vlen = (int)(strlen(vbuf) + 1),
171  .k.p = kbuf,
172  .v.p = vbuf};
173  for (int i = 0; i < 4; i++) {
174  kmr_add_kv(kvs00, nkv);
175  }
176  }
177  kmr_add_kv_done(kvs00);
178  KMR_KVS *kvs01 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
179  struct kmr_spawn_option opt = {.separator_space = 1};
180  kmr_map_via_spawn(kvs00, kvs01, 0,
181  MPI_INFO_NULL, opt, empty_map_fn_mpi_noreply);
182  kmr_free_kvs(kvs01);
183  }
184 
185  if (1) {
186  MPI_Barrier(MPI_COMM_WORLD);
187  usleep(50 * 1000);
188  if (rank == 0) {printf("* kmr_map_via_spawn"
189  " WAITING WITH REPLY (EACH)...\n");}
190  if (rank == 0) {printf("Spawn 2-rank work 4 times"
191  " using %d dynamic processes.\n",
192  mr->spawn_max_processes);}
193  fflush(0);
194  usleep(50 * 1000);
195 
196  KMR_KVS *kvs10 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
197  if (mr->rank == 0) {
198  char kbuf[256];
199  char vbuf[256];
200  snprintf(kbuf, sizeof(kbuf), "key");
201  snprintf(vbuf, sizeof(vbuf),
202  "maxprocs=2 ./a.out mpi eachreply a0 a1 a2");
203  struct kmr_kv_box nkv = {
204  .klen = (int)(strlen(kbuf) + 1),
205  .vlen = (int)(strlen(vbuf) + 1),
206  .k.p = kbuf,
207  .v.p = vbuf};
208  for (int i = 0; i < 4; i++) {
209  kmr_add_kv(kvs10, nkv);
210  }
211  }
212  kmr_add_kv_done(kvs10);
213  KMR_KVS *kvs11 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
214  struct kmr_spawn_option opt = {.reply_each = 1,
215  .separator_space = 1};
216  kmr_map_via_spawn(kvs10, kvs11, 0,
217  MPI_INFO_NULL, opt, empty_map_fn_mpi_with_reply);
218  kmr_free_kvs(kvs11);
219  }
220 
221  if (1) {
222  MPI_Barrier(MPI_COMM_WORLD);
223  usleep(50 * 1000);
224  if (rank == 0) {printf("* kmr_map_via_spawn"
225  " WAITING WITH REPLY (ROOT)...\n");}
226  if (rank == 0) {printf("Spawn 2-rank work 4 times"
227  " using %d dynamic processes.\n",
228  mr->spawn_max_processes);}
229  fflush(0);
230  usleep(50 * 1000);
231 
232  KMR_KVS *kvs20 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
233  if (mr->rank == 0) {
234  char kbuf[256];
235  char vbuf[256];
236  snprintf(kbuf, sizeof(kbuf), "key");
237  snprintf(vbuf, sizeof(vbuf),
238  "maxprocs=2 ./a.out mpi rootreply a0 a1 a2");
239  struct kmr_kv_box nkv = {
240  .klen = (int)(strlen(kbuf) + 1),
241  .vlen = (int)(strlen(vbuf) + 1),
242  .k.p = kbuf,
243  .v.p = vbuf};
244  for (int i = 0; i < 4; i++) {
245  kmr_add_kv(kvs20, nkv);
246  }
247  }
248  kmr_add_kv_done(kvs20);
249  KMR_KVS *kvs21 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
250  struct kmr_spawn_option opt = {.reply_root = 1,
251  .separator_space = 1};
252  kmr_map_via_spawn(kvs20, kvs21, 0,
253  MPI_INFO_NULL, opt, empty_map_fn_mpi_with_reply);
254  kmr_free_kvs(kvs21);
255  }
256 
257  kmr_free_context(mr);
258 }
259 
260 static void
261 simple2(int nprocs, int rank)
262 {
263  KMR *mr = kmr_create_context(MPI_COMM_WORLD, MPI_INFO_NULL, 0);
264  mr->trace_map_spawn = 1;
265  mr->spawn_max_processes = 4;
266 
267  MPI_Barrier(MPI_COMM_WORLD);
268  usleep(50 * 1000);
269  if (rank == 0) {printf("** CHECK kmr_map_processes...\n");}
270  fflush(0);
271  usleep(50 * 1000);
272 
273  if (1) {
274  MPI_Barrier(MPI_COMM_WORLD);
275  usleep(50 * 1000);
276  if (rank == 0) {printf("* kmr_map_processes(serial)...\n");}
277  if (rank == 0) {printf("Spawn 2 serial processes 4 times.\n");}
278  fflush(0);
279  usleep(50 * 1000);
280 
281  KMR_KVS *kvs10 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
282  if (mr->rank == 0) {
283  char kbuf[256];
284  char vbuf[256];
285  snprintf(kbuf, sizeof(kbuf), "key");
286  snprintf(vbuf, sizeof(vbuf),
287  "maxprocs=2 ./a.out seq noreply a0 a1 a2");
288  struct kmr_kv_box nkv = {
289  .klen = (int)(strlen(kbuf) + 1),
290  .vlen = (int)(strlen(vbuf) + 1),
291  .k.p = kbuf,
292  .v.p = vbuf};
293  for (int i = 0; i < 8; i++) {
294  kmr_add_kv(kvs10, nkv);
295  }
296  }
297  kmr_add_kv_done(kvs10);
298  KMR_KVS *kvs11 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
299  struct kmr_spawn_option opt = {.separator_space = 1};
300  kmr_map_processes(1, kvs10, kvs11, 0, MPI_INFO_NULL, opt,
301  empty_map_fn_seq);
302  kmr_free_kvs(kvs11);
303  }
304 
305  if (!skipmpiwork) {
306  MPI_Barrier(MPI_COMM_WORLD);
307  usleep(50 * 1000);
308  if (rank == 0) {printf("* kmr_map_processes(mpi)...\n");}
309  if (rank == 0) {printf("Spawn 2-rank work 4 times"
310  " using %d dynamic processes.\n",
311  mr->spawn_max_processes);}
312  if (rank == 0) {printf("** ON SOME IMPLEMENTATIONS OF MPI,"
313  " THIS TEST MAY BLOCK INDEFINITELY. **\n");}
314  if (rank == 0) {printf("** THEN, RUN THIS TEST WITH a.out 0"
315  " TO SKIP THIS PART. **\n");}
316  fflush(0);
317  usleep(50 * 1000);
318 
319  KMR_KVS *kvs00 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
320  if (mr->rank == 0) {
321  char kbuf[256];
322  char vbuf[256];
323  snprintf(kbuf, sizeof(kbuf), "key");
324  snprintf(vbuf, sizeof(vbuf),
325  "maxprocs=2 ./a.out mpi noreply a0 a1 a2");
326  struct kmr_kv_box nkv = {
327  .klen = (int)(strlen(kbuf) + 1),
328  .vlen = (int)(strlen(vbuf) + 1),
329  .k.p = kbuf,
330  .v.p = vbuf};
331  for (int i = 0; i < 4; i++) {
332  kmr_add_kv(kvs00, nkv);
333  }
334  }
335  kmr_add_kv_done(kvs00);
336  KMR_KVS *kvs01 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
337  struct kmr_spawn_option opt = {.separator_space = 1};
338  kmr_map_processes(0, kvs00, kvs01, 0, MPI_INFO_NULL, opt,
339  empty_map_fn_seq);
340  kmr_free_kvs(kvs01);
341  }
342 
343  kmr_free_context(mr);
344 }
345 
346 static int
347 spawned(int argc, char *argv[])
348 {
349  /* SPAWNED CHILD */
350 
351  assert(strcmp(argv[1], "seq") == 0 || strcmp(argv[1], "mpi") == 0);
352  if (strcmp(argv[1], "seq") == 0) {
353  printf("test4:spawned-process(serial): started (%s,%s,%s).\n",
354  argv[0], argv[1], argv[3]);
355  printf("test4:spawned-process(serial): sleeping 3 sec...\n");
356  fflush(0);
357  sleep(3);
358  printf("test4:spawned-process(serial): exits.\n");
359  fflush(0);
360  } else if (strcmp(argv[1], "mpi") == 0) {
361  int nprocs, rank, lev;
362  MPI_Init_thread(&argc, &argv, MPI_THREAD_SERIALIZED, &lev);
363  MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
364  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
365 
366  MPI_Comm parent;
367  MPI_Comm_get_parent(&parent);
368  assert(parent != MPI_COMM_NULL);
369 
370  int peer_nprocs;
371  MPI_Comm_remote_size(parent, &peer_nprocs);
372  assert(peer_nprocs == 1);
373  printf("test4:spawned-process(mpi;rank=%d/%d): started (%s,%s,%s).\n",
374  rank, nprocs, argv[0], argv[1], argv[2]);
375  printf("test4:spawned-process(mpi;rank=%d/%d): sleeping 3 sec...\n",
376  rank, nprocs);
377  fflush(0);
378  sleep(3);
379 
380  assert((strcmp(argv[2], "noreply") == 0)
381  || (strcmp(argv[2], "eachreply") == 0)
382  || (strcmp(argv[2], "rootreply") == 0)
383  || (strcmp(argv[2], "returnkvs") == 0));
384  if (strcmp(argv[2], "noreply") == 0) {
385  /* NO REPLY */
386  printf("test4:spawned-process(mpi;rank=%d/%d):"
387  " no reply.\n",
388  rank, nprocs);
389  } else if (strcmp(argv[2], "eachreply") == 0) {
390  /* EACH REPLY */
391  printf("test4:spawned-process(mpi;rank=%d/%d):"
392  " sending a reply.\n",
393  rank, nprocs);
394  int peer = 0;
395  MPI_Send(0, 0, MPI_BYTE, peer,
396  KMR_TAG_SPAWN_REPLY, parent);
397  } else if (strcmp(argv[2], "rootreply") == 0) {
398  /* ROOT REPLY */
399  if (rank == 0) {
400  printf("test4:spawned-process(mpi;rank=%d/%d):"
401  " sending a root reply.\n",
402  rank, nprocs);
403  int peer = 0;
404  MPI_Send(0, 0, MPI_BYTE, peer,
405  KMR_TAG_SPAWN_REPLY, parent);
406  }
407  } else if (strcmp(argv[2], "returnkvs") == 0) {
408  printf("test4:spawned-process(mpi;rank=%d/%d):"
409  " sending a kvs.\n",
410  rank, nprocs);
411  KMR *mr = kmr_create_dummy_context();
413  KMR_KVS *kvs00 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
414  for (int i = 0; i < 4; i++) {
415  char k[40];
416  char v[40];
417  snprintf(k, sizeof(k), "k%d", i);
418  snprintf(v, sizeof(v), "v%d", i);
419  kmr_add_string(kvs00, k, v);
420  }
421  kmr_add_kv_done(kvs00);
422  kmr_send_kvs_to_spawner(mr, kvs00);
423  } else {
424  /* NO REPLY */
425  assert(0);
426  }
427 
428  printf("test4:spawned-process(mpi;rank=%d/%d):"
429  " call MPI_Comm_free (could block)...\n",
430  rank, nprocs);
431  fflush(0);
432  MPI_Comm_free(&parent);
433  printf("test4:spawned-process(mpi;rank=%d/%d):"
434  " MPI_Comm_free done.\n",
435  rank, nprocs);
436  fflush(0);
437  MPI_Finalize();
438  printf("test4:spawned-process(mpi;rank=%d/%d): finalized.\n",
439  rank, nprocs);
440  printf("test4:spawned-process(mpi;rank=%d/%d): exits.\n",
441  rank, nprocs);
442  fflush(0);
443  }
444 
445  return 0;
446 }
447 
448 int
449 main(int argc, char *argv[])
450 {
451  if (argc == 1 || argc == 2) {
452  /* SPAWNER */
453 
454  int nprocs, rank, lev;
455  MPI_Init_thread(&argc, &argv, MPI_THREAD_SERIALIZED, &lev);
456  MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
457  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
458 
459  kmr_init();
460 
461  skipmpiwork = (argc == 2 && argv[1][0] == '0');
462 
463  if (rank == 0) {printf("CHECK SPAWNING MAPPER\n");}
464  if (rank == 0) {printf("Running this test needs 4 or more"
465  " dynamic processes.\n");}
466  fflush(0);
467 
468  simple0(nprocs, rank);
469  simple1(nprocs, rank);
470  simple2(nprocs, rank);
471 
472  MPI_Barrier(MPI_COMM_WORLD);
473  usleep(50 * 1000);
474  if (rank == 0) {printf("OK\n");}
475  fflush(0);
476 
477  kmr_fin();
478 
479  MPI_Finalize();
480  } else {
481  spawned(argc, argv);
482  }
483  return 0;
484 }
Key-Value Stream (abstract).
Definition: kmr.h:587
Utilities Private Part (do not include from applications).
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
Definition: kmrbase.c:751
#define kmr_create_kvs(MR, KF, VF)
Makes a new key-value stream (of type KMR_KVS) with the specified field datatypes.
Definition: kmr.h:71
MPI_Comm * kmr_get_spawner_communicator(KMR *mr, long index)
Obtains (a reference to) a parent inter-communicator of a spawned process.
Definition: kmrmapms.c:1799
int kmr_send_kvs_to_spawner(KMR *mr, KMR_KVS *kvs)
Sends the KVS from a spawned process to the map-function of the spawner.
Definition: kmrmapms.c:2005
int kmr_add_kv_done(KMR_KVS *kvs)
Marks finished adding key-value pairs.
Definition: kmrbase.c:881
int kmr_reply_to_spawner(KMR *mr)
Sends a reply message in the spawned process, which tells it is ready to finish and may have some dat...
Definition: kmrmapms.c:1776
KMR Context.
Definition: kmr.h:222
int kmr_free_kvs(KMR_KVS *kvs)
Releases a key-value stream (type KMR_KVS).
Definition: kmrbase.c:621
Handy Copy of a Key-Value Field.
Definition: kmr.h:358
Options to Mapping by Spawns.
Definition: kmr.h:662
int kmr_fin(void)
Clears the environment.
Definition: kmrbase.c:124
#define kmr_init()
Sets up the environment.
Definition: kmr.h:747
int kmr_map_via_spawn(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, MPI_Info info, struct kmr_spawn_option opt, kmr_mapfn_t mapfn)
Maps on processes started by MPI_Comm_spawn().
Definition: kmrmapms.c:1870
int kmr_free_context(KMR *mr)
Releases a context created with kmr_create_context().
Definition: kmrbase.c:326
KMR Interface.
int kmr_add_string(KMR_KVS *kvs, const char *k, const char *v)
Adds a key-value pair of strings.
Definition: kmrbase.c:913
int kmr_receive_kvs_from_spawned_fn(const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long index)
Collects key-value pairs generated by spawned processes.
Definition: kmrmapms.c:2039
int kmr_map_processes(_Bool nonmpi, KMR_KVS *kvi, KMR_KVS *kvo, void *arg, MPI_Info info, struct kmr_spawn_option opt, kmr_mapfn_t mapfn)
Maps on processes started by MPI_Comm_spawn() to run independent processes.
Definition: kmrmapms.c:1965
KMR * kmr_create_context(const MPI_Comm comm, const MPI_Info conf, const char *name)
Makes a new KMR context (a context has type KMR).
Definition: kmrbase.c:147