13 #include <sys/types.h> 24 _Bool skipmpiwork = 0;
32 printf(
"test4:empty-map-fn[%d]: called.\n", (
int)ii);
42 KMR *mr = kmr_get_context_of_kvs(kvi);
47 if (
sizeof(
int) !=
sizeof(
void *) &&
sizeof(ic) ==
sizeof(
void *)) {
48 printf(
"test4:empty-map-fn[%d]: sleeping 12 sec (icomm=%p)...\n",
51 printf(
"test4:empty-map-fn[%d]: sleeping 12 sec (icomm=%d)...\n",
63 KMR *mr = kmr_get_context_of_kvs(kvi);
68 if (
sizeof(
int) !=
sizeof(
void *) &&
sizeof(ic) ==
sizeof(
void *)) {
69 printf(
"test4:empty-map-fn[%d]: called (icomm=%p).\n",
72 printf(
"test4:empty-map-fn[%d]: called (icomm=%d).\n",
80 simple0(
int nprocs,
int rank)
83 mr->trace_map_spawn = 1;
84 mr->spawn_max_processes = 4;
86 MPI_Barrier(MPI_COMM_WORLD);
88 if (rank == 0) {printf(
"** CHECK kmr_map_via_spawn...\n");}
93 MPI_Barrier(MPI_COMM_WORLD);
95 if (rank == 0) {printf(
"* kmr_map_via_spawn" 96 " WITH RETURNING KVS...\n");}
97 if (rank == 0) {printf(
"Spawn 2-rank work 4 times" 98 " using %d dynamic processes.\n",
99 mr->spawn_max_processes);}
107 snprintf(kbuf,
sizeof(kbuf),
"key");
108 snprintf(vbuf,
sizeof(vbuf),
109 "maxprocs=2 ./a.out mpi returnkvs a0 a1 a2");
111 .klen = (int)(strlen(kbuf) + 1),
112 .vlen = (
int)(strlen(vbuf) + 1),
115 for (
int i = 0; i < 4; i++) {
122 .separator_space = 1};
127 assert(kvs01->c.element_count == 32);
129 assert(kvs01->c.element_count == 0);
138 simple1(
int nprocs,
int rank)
141 mr->trace_map_spawn = 1;
142 mr->spawn_max_processes = 4;
144 MPI_Barrier(MPI_COMM_WORLD);
146 if (rank == 0) {printf(
"** CHECK kmr_map_via_spawn...\n");}
151 MPI_Barrier(MPI_COMM_WORLD);
153 if (rank == 0) {printf(
"* kmr_map_via_spawn" 154 " WAITING IN MAP-FN...\n");}
155 if (rank == 0) {printf(
"Spawn 2-rank work 4 times" 156 " using %d dynamic processes.\n",
157 mr->spawn_max_processes);}
165 snprintf(kbuf,
sizeof(kbuf),
"key");
166 snprintf(vbuf,
sizeof(vbuf),
167 "maxprocs=2 ./a.out mpi noreply a0 a1 a2");
169 .klen = (int)(strlen(kbuf) + 1),
170 .vlen = (
int)(strlen(vbuf) + 1),
173 for (
int i = 0; i < 4; i++) {
181 MPI_INFO_NULL, opt, empty_map_fn_mpi_noreply);
186 MPI_Barrier(MPI_COMM_WORLD);
188 if (rank == 0) {printf(
"* kmr_map_via_spawn" 189 " WAITING WITH REPLY (EACH)...\n");}
190 if (rank == 0) {printf(
"Spawn 2-rank work 4 times" 191 " using %d dynamic processes.\n",
192 mr->spawn_max_processes);}
200 snprintf(kbuf,
sizeof(kbuf),
"key");
201 snprintf(vbuf,
sizeof(vbuf),
202 "maxprocs=2 ./a.out mpi eachreply a0 a1 a2");
204 .klen = (int)(strlen(kbuf) + 1),
205 .vlen = (
int)(strlen(vbuf) + 1),
208 for (
int i = 0; i < 4; i++) {
215 .separator_space = 1};
217 MPI_INFO_NULL, opt, empty_map_fn_mpi_with_reply);
222 MPI_Barrier(MPI_COMM_WORLD);
224 if (rank == 0) {printf(
"* kmr_map_via_spawn" 225 " WAITING WITH REPLY (ROOT)...\n");}
226 if (rank == 0) {printf(
"Spawn 2-rank work 4 times" 227 " using %d dynamic processes.\n",
228 mr->spawn_max_processes);}
236 snprintf(kbuf,
sizeof(kbuf),
"key");
237 snprintf(vbuf,
sizeof(vbuf),
238 "maxprocs=2 ./a.out mpi rootreply a0 a1 a2");
240 .klen = (int)(strlen(kbuf) + 1),
241 .vlen = (
int)(strlen(vbuf) + 1),
244 for (
int i = 0; i < 4; i++) {
251 .separator_space = 1};
253 MPI_INFO_NULL, opt, empty_map_fn_mpi_with_reply);
261 simple2(
int nprocs,
int rank)
264 mr->trace_map_spawn = 1;
265 mr->spawn_max_processes = 4;
267 MPI_Barrier(MPI_COMM_WORLD);
269 if (rank == 0) {printf(
"** CHECK kmr_map_processes...\n");}
274 MPI_Barrier(MPI_COMM_WORLD);
276 if (rank == 0) {printf(
"* kmr_map_processes(serial)...\n");}
277 if (rank == 0) {printf(
"Spawn 2 serial processes 4 times.\n");}
285 snprintf(kbuf,
sizeof(kbuf),
"key");
286 snprintf(vbuf,
sizeof(vbuf),
287 "maxprocs=2 ./a.out seq noreply a0 a1 a2");
289 .klen = (int)(strlen(kbuf) + 1),
290 .vlen = (
int)(strlen(vbuf) + 1),
293 for (
int i = 0; i < 8; i++) {
306 MPI_Barrier(MPI_COMM_WORLD);
308 if (rank == 0) {printf(
"* kmr_map_processes(mpi)...\n");}
309 if (rank == 0) {printf(
"Spawn 2-rank work 4 times" 310 " using %d dynamic processes.\n",
311 mr->spawn_max_processes);}
312 if (rank == 0) {printf(
"** ON SOME IMPLEMENTATIONS OF MPI," 313 " THIS TEST MAY BLOCK INDEFINITELY. **\n");}
314 if (rank == 0) {printf(
"** THEN, RUN THIS TEST WITH a.out 0" 315 " TO SKIP THIS PART. **\n");}
323 snprintf(kbuf,
sizeof(kbuf),
"key");
324 snprintf(vbuf,
sizeof(vbuf),
325 "maxprocs=2 ./a.out mpi noreply a0 a1 a2");
327 .klen = (int)(strlen(kbuf) + 1),
328 .vlen = (
int)(strlen(vbuf) + 1),
331 for (
int i = 0; i < 4; i++) {
347 spawned(
int argc,
char *argv[])
351 assert(strcmp(argv[1],
"seq") == 0 || strcmp(argv[1],
"mpi") == 0);
352 if (strcmp(argv[1],
"seq") == 0) {
353 printf(
"test4:spawned-process(serial): started (%s,%s,%s).\n",
354 argv[0], argv[1], argv[3]);
355 printf(
"test4:spawned-process(serial): sleeping 3 sec...\n");
358 printf(
"test4:spawned-process(serial): exits.\n");
360 }
else if (strcmp(argv[1],
"mpi") == 0) {
361 int nprocs, rank, lev;
362 MPI_Init_thread(&argc, &argv, MPI_THREAD_SERIALIZED, &lev);
363 MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
364 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
367 MPI_Comm_get_parent(&parent);
368 assert(parent != MPI_COMM_NULL);
371 MPI_Comm_remote_size(parent, &peer_nprocs);
372 assert(peer_nprocs == 1);
373 printf(
"test4:spawned-process(mpi;rank=%d/%d): started (%s,%s,%s).\n",
374 rank, nprocs, argv[0], argv[1], argv[2]);
375 printf(
"test4:spawned-process(mpi;rank=%d/%d): sleeping 3 sec...\n",
380 assert((strcmp(argv[2],
"noreply") == 0)
381 || (strcmp(argv[2],
"eachreply") == 0)
382 || (strcmp(argv[2],
"rootreply") == 0)
383 || (strcmp(argv[2],
"returnkvs") == 0));
384 if (strcmp(argv[2],
"noreply") == 0) {
386 printf(
"test4:spawned-process(mpi;rank=%d/%d):" 389 }
else if (strcmp(argv[2],
"eachreply") == 0) {
391 printf(
"test4:spawned-process(mpi;rank=%d/%d):" 392 " sending a reply.\n",
395 MPI_Send(0, 0, MPI_BYTE, peer,
396 KMR_TAG_SPAWN_REPLY, parent);
397 }
else if (strcmp(argv[2],
"rootreply") == 0) {
400 printf(
"test4:spawned-process(mpi;rank=%d/%d):" 401 " sending a root reply.\n",
404 MPI_Send(0, 0, MPI_BYTE, peer,
405 KMR_TAG_SPAWN_REPLY, parent);
407 }
else if (strcmp(argv[2],
"returnkvs") == 0) {
408 printf(
"test4:spawned-process(mpi;rank=%d/%d):" 411 KMR *mr = kmr_create_dummy_context();
414 for (
int i = 0; i < 4; i++) {
417 snprintf(k,
sizeof(k),
"k%d", i);
418 snprintf(v,
sizeof(v),
"v%d", i);
428 printf(
"test4:spawned-process(mpi;rank=%d/%d):" 429 " call MPI_Comm_free (could block)...\n",
432 MPI_Comm_free(&parent);
433 printf(
"test4:spawned-process(mpi;rank=%d/%d):" 434 " MPI_Comm_free done.\n",
438 printf(
"test4:spawned-process(mpi;rank=%d/%d): finalized.\n",
440 printf(
"test4:spawned-process(mpi;rank=%d/%d): exits.\n",
449 main(
int argc,
char *argv[])
451 if (argc == 1 || argc == 2) {
454 int nprocs, rank, lev;
455 MPI_Init_thread(&argc, &argv, MPI_THREAD_SERIALIZED, &lev);
456 MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
457 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
461 skipmpiwork = (argc == 2 && argv[1][0] ==
'0');
463 if (rank == 0) {printf(
"CHECK SPAWNING MAPPER\n");}
464 if (rank == 0) {printf(
"Running this test needs 4 or more" 465 " dynamic processes.\n");}
468 simple0(nprocs, rank);
469 simple1(nprocs, rank);
470 simple2(nprocs, rank);
472 MPI_Barrier(MPI_COMM_WORLD);
474 if (rank == 0) {printf(
"OK\n");}
Key-Value Stream (abstract).
Utilities Private Part (do not include from applications).
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
#define kmr_create_kvs(MR, KF, VF)
Makes a new key-value stream (of type KMR_KVS) with the specified field datatypes.
MPI_Comm * kmr_get_spawner_communicator(KMR *mr, long index)
Obtains (a reference to) a parent inter-communicator of a spawned process.
int kmr_send_kvs_to_spawner(KMR *mr, KMR_KVS *kvs)
Sends the KVS from a spawned process to the map-function of the spawner.
int kmr_add_kv_done(KMR_KVS *kvs)
Marks finished adding key-value pairs.
int kmr_reply_to_spawner(KMR *mr)
Sends a reply message in the spawned process, which tells it is ready to finish and may have some dat...
int kmr_free_kvs(KMR_KVS *kvs)
Releases a key-value stream (type KMR_KVS).
Handy Copy of a Key-Value Field.
Options to Mapping by Spawns.
int kmr_fin(void)
Clears the environment.
#define kmr_init()
Sets up the environment.
int kmr_map_via_spawn(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, MPI_Info info, struct kmr_spawn_option opt, kmr_mapfn_t mapfn)
Maps on processes started by MPI_Comm_spawn().
int kmr_free_context(KMR *mr)
Releases a context created with kmr_create_context().
int kmr_add_string(KMR_KVS *kvs, const char *k, const char *v)
Adds a key-value pair of strings.
int kmr_receive_kvs_from_spawned_fn(const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long index)
Collects key-value pairs generated by spawned processes.
int kmr_map_processes(_Bool nonmpi, KMR_KVS *kvi, KMR_KVS *kvo, void *arg, MPI_Info info, struct kmr_spawn_option opt, kmr_mapfn_t mapfn)
Maps on processes started by MPI_Comm_spawn() to run independent processes.
KMR * kmr_create_context(const MPI_Comm comm, const MPI_Info conf, const char *name)
Makes a new KMR context (a context has type KMR).