14 #include <sys/types.h> 17 #include <sys/param.h> 18 #include <arpa/inet.h> 26 #define MAX(a,b) (((a)>(b))?(a):(b)) 27 #define MIN(a,b) (((a)<(b))?(a):(b)) 29 static const int kmr_kv_buffer_slack_size = 1024;
34 KMR_RPC_NONE, KMR_RPC_GOON, KMR_RPC_DONE
38 kmr_assert_peer_tag(
int tag)
40 assert(KMR_TAG_PEER_0 <= tag && tag < KMR_TAG_PEER_END);
50 #define KMR_RPC_ID_NONE -1 51 #define KMR_RPC_ID_FIN -2 74 if (kmr_fields_pointer_p(kvi)) {
75 kmr_error(mr,
"kmr_map_ms: cannot handle pointer field types");
77 assert(kvo->c.key_data == kvi->c.key_data
78 && kvo->c.value_data == kvi->c.value_data);
79 MPI_Comm comm = mr->comm;
80 int nprocs = mr->nprocs;
81 long cnt = kvi->c.element_count;
82 assert(INT_MIN <= cnt && cnt <= INT_MAX);
84 char *msstates = &(ms->states[0]);
88 ms =
kmr_malloc((hdsz +
sizeof(
char) * (
size_t)cnt));
93 msstates = &(ms->states[0]);
94 for (
long i = 0; i < cnt; i++) {
95 msstates[i] = KMR_RPC_NONE;
98 if (ms->dones == cnt && ms->nodes == (nprocs - 1)) {
100 if (kvi->c.temporary_data != 0) {
101 kmr_free(kvi->c.temporary_data,
103 kvi->c.temporary_data = 0;
111 kvi->c.temporary_data = ev;
112 kvi->c.current_block = kvi->c.first_block;
113 struct kmr_kvs_entry *e = kmr_kvs_first_entry(kvi, kvi->c.first_block);
114 for (
long i = 0; i < cnt; i++) {
117 e = kmr_kvs_next(kvi, e, 0);
121 assert(ms->dones <= ms->kicks);
125 cc = MPI_Recv(req, 3, MPI_INT, MPI_ANY_SOURCE, KMR_TAG_REQ, comm, &st);
126 assert(cc == MPI_SUCCESS);
127 int peer_tag = req[0];
128 int peer = st.MPI_SOURCE;
133 if (
id == KMR_RPC_ID_NONE) {
135 }
else if (
id == KMR_RPC_ID_FIN) {
138 assert(ms->nodes <= (nprocs - 1));
142 kmr_assert_peer_tag(peer_tag);
144 cc = MPI_Recv(packed, sz, MPI_BYTE, peer, peer_tag, comm, &st);
145 assert(cc == MPI_SUCCESS);
148 assert(cc == MPI_SUCCESS);
149 struct kmr_option keepopen = {.keep_open = 1};
151 assert(cc == MPI_SUCCESS);
152 kmr_free(packed, (
size_t)sz);
153 assert(msstates[
id] == KMR_RPC_GOON);
154 msstates[id] = KMR_RPC_DONE;
158 if (ms->kicks < cnt) {
161 for (
id = 0;
id < cnt;
id++) {
162 if (msstates[
id] == KMR_RPC_NONE) {
166 assert(
id != KMR_RPC_ID_NONE &&
id != cnt);
168 int sz = (int)kmr_kvs_entry_netsize(e);
170 int ack[2] = {id, sz};
171 cc = MPI_Send(ack, 2, MPI_INT, peer, peer_tag, comm);
172 assert(cc == MPI_SUCCESS);
173 cc = MPI_Send(e, sz, MPI_BYTE, peer, peer_tag, comm);
174 assert(cc == MPI_SUCCESS);
175 assert(msstates[
id] == KMR_RPC_NONE);
176 msstates[id] = KMR_RPC_GOON;
180 int ack[2] = {KMR_RPC_ID_NONE, 0};
181 cc = MPI_Send(ack, 2, MPI_INT, peer, peer_tag, comm);
182 assert(cc == MPI_SUCCESS);
198 assert(!kmr_fields_pointer_p(kvi)
199 && kvo->c.key_data == kvi->c.key_data
200 && kvo->c.value_data == kvi->c.value_data);
201 KMR *
const mr = kvi->c.mr;
202 const MPI_Comm comm = mr->comm;
203 const int rank = mr->rank;
204 const enum kmr_kv_field keyf = kmr_unit_sized_or_opaque(kvo->c.key_data);
205 const enum kmr_kv_field valf = kmr_unit_sized_or_opaque(kvo->c.value_data);
207 assert(kvi->c.element_count == 0);
209 const _Bool threading = !(mr->single_thread || opt.nothreading);
211 KMR_OMP_PARALLEL_IF_(threading)
214 int thr = KMR_OMP_GET_THREAD_NUM();
218 int peer_tag = KMR_TAG_PEER(thr);
219 kmr_assert_peer_tag(peer_tag);
222 int req[3] = {peer_tag, KMR_RPC_ID_NONE, 0};
224 cc = MPI_Send(req, 3, MPI_INT, 0, KMR_TAG_REQ, comm);
225 assert(cc == MPI_SUCCESS);
230 cc = MPI_Recv(ack, 2, MPI_INT, 0, peer_tag, comm, &st);
231 assert(cc == MPI_SUCCESS);
234 if (
id == KMR_RPC_ID_NONE) {
237 assert(
id >= 0 && sz > 0);
239 maxsz = (sz + kmr_kv_buffer_slack_size);
244 cc = MPI_Recv(e, sz, MPI_BYTE, 0, peer_tag, comm, &st);
245 assert(cc == MPI_SUCCESS);
251 cc = (*m)(kv, kvi, kvx, arg, id);
252 if (cc != MPI_SUCCESS) {
254 snprintf(ee,
sizeof(ee),
255 "Map-fn returned with error cc=%d", cc);
262 assert(cc == MPI_SUCCESS && packed != 0);
264 assert(packsz <= (
size_t)INT_MAX);
266 int req[3] = {peer_tag, id, sz};
268 cc = MPI_Send(req, 3, MPI_INT, 0, KMR_TAG_REQ, comm);
269 assert(cc == MPI_SUCCESS);
271 cc = MPI_Send(packed, sz, MPI_BYTE, 0, peer_tag, comm);
272 assert(cc == MPI_SUCCESS);
276 assert(cc == MPI_SUCCESS);
277 kmr_free(packed, packsz);
280 kmr_free(e, (
size_t)maxsz);
287 int req[3] = {0, KMR_RPC_ID_FIN, 0};
288 cc = MPI_Send(req, 3, MPI_INT, 0, KMR_TAG_REQ, comm);
289 assert(cc == MPI_SUCCESS);
313 kmr_assert_kvs_ok(kvi, kvo, 1, 0);
315 struct kmr_option kmr_supported = {.nothreading = 1, .keep_open = 1};
316 kmr_check_fn_options(mr, kmr_supported, opt, __func__);
319 long cnt = kvi->c.element_count;
320 assert(INT_MIN <= cnt && cnt <= INT_MAX);
325 if (ccr == MPI_SUCCESS) {
327 assert(cc == MPI_SUCCESS);
329 assert(cc == MPI_SUCCESS);
334 assert(cc == MPI_SUCCESS);
336 assert(cc == MPI_SUCCESS);
354 enum kmr_spawn_mode {
355 KMR_SPAWN_INTERACT, KMR_SPAWN_SERIAL, KMR_SPAWN_PARALLEL
394 enum kmr_spawn_mode mode;
406 MPI_Request *replies;
410 char watch_host[MAXHOSTNAMELEN + 10];
417 kmr_sum_on_all_ranks(
KMR *mr,
int v,
int *sum)
423 .klen = (int)
sizeof(
long),
424 .vlen = (int)
sizeof(
long),
429 assert(cc == MPI_SUCCESS);
433 assert(cc == MPI_SUCCESS);
436 assert(cc == MPI_SUCCESS);
442 kmr_make_pretty_argument_string(
char *s,
size_t sz,
int argc,
char **argv)
446 for (
int i = 0; (i < argc && argv[i] != 0); i++) {
447 cc = snprintf(&s[cnt], (sz - cnt), (i == 0 ?
"%s" :
",%s"), argv[i]);
461 MPI_Info info = MPI_Info_f2c(finfo);
472 info->u.icomm = s->icomm;
473 info->icomm_ff = MPI_Comm_c2f(s->icomm);
474 info->reply_root = opt.reply_root;
481 if (info->u.icomm != s->icomm) {
482 s->icomm = info->u.icomm;
483 }
else if (info->icomm_ff != MPI_Comm_c2f(s->icomm)) {
484 s->icomm = MPI_Comm_f2c(info->icomm_ff);
494 assert(spw->n_spawns == (
int)kvi->c.element_count);
495 KMR *
const mr = kvi->c.mr;
496 _Bool tracing5 = (mr->trace_map_spawn && (5 <= mr->verbosity));
501 kvi->c.current_block = kvi->c.first_block;
502 struct kmr_kvs_entry *e = kmr_kvs_first_entry(kvi, kvi->c.first_block);
503 for (
int w = 0; w < spw->n_spawns; w++) {
505 e = kmr_kvs_next(kvi, e, 0);
511 cc = kmr_sum_on_all_ranks(mr, ((spw->n_spawns > 0) ? 1 : 0), &nranks);
512 assert(cc == MPI_SUCCESS && nranks <= mr->nprocs);
513 spw->n_spawners = nranks;
516 cc = MPI_Attr_get(MPI_COMM_WORLD, MPI_UNIVERSE_SIZE, &usizep, &uflag);
517 if (cc != MPI_SUCCESS || uflag == 0) {
519 snprintf(ee,
sizeof(ee),
"%s: MPI lacks universe size", spw->fn);
522 spw->usize = *usizep;
523 if (spw->usize <= mr->nprocs) {
525 snprintf(ee,
sizeof(ee),
"%s: no dynamic processes in universe",
529 int m = spw->usize - mr->nprocs;
530 if (spw->n_spawners != 0) {
531 m /= spw->n_spawners;
533 spw->spawn_limit = ((mr->spawn_max_processes != 0)
534 ? MIN(mr->spawn_max_processes, m)
537 if (spw->n_spawns > 0) {
539 ";;KMR [%05d] %s: universe-size=%d spawn-limit=%d\n",
540 mr->rank, spw->fn, spw->usize, spw->spawn_limit);
549 char *infoval =
kmr_malloc((
size_t)(MPI_MAX_INFO_VAL + 1));
551 if (info != MPI_INFO_NULL) {
552 cc = MPI_Info_get(info,
"maxprocs", MPI_MAX_INFO_VAL,
554 assert(cc == MPI_SUCCESS);
560 cc = kmr_parse_int(infoval, &v);
561 if (cc == 0 || v < 0) {
563 snprintf(ee,
sizeof(ee),
"%s: bad value in info maxprocs=%s",
573 kmr_free(infoval, (
size_t)(MPI_MAX_INFO_VAL + 1));
578 spw->n_processes = 0;
579 for (
int w = 0; w < spw->n_spawns; w++) {
592 s->icomm = MPI_COMM_NULL;
595 s->alen = (size_t)kv.vlen;
597 memcpy(s->abuf, kv.v.p, (
size_t)kv.vlen);
599 cc = kmr_scan_argv_strings(mr, s->abuf, s->alen,
601 opt.separator_space, spw->fn);
602 assert(cc == MPI_SUCCESS);
603 s->argv0 =
kmr_malloc(
sizeof(
char *) * (
size_t)(maxargc + 1));
604 memset(s->argv0, 0, (
sizeof(
char *) * (
size_t)(maxargc + 1)));
605 cc = kmr_scan_argv_strings(mr, s->abuf, s->alen,
606 (maxargc + 1), &s->argc0, s->argv0,
607 opt.separator_space, spw->fn);
608 assert(cc == MPI_SUCCESS);
609 assert(maxargc == s->argc0);
613 if (s->argc0 > 0 && strncmp(
"maxprocs=", s->argv0[0], 9) == 0) {
615 cc = kmr_parse_int(&s->argv0[0][9], &v);
616 if (cc == 0 || v < 0) {
618 snprintf(ee,
sizeof(ee),
"%s: bad maxprocs=%s",
619 spw->fn, s->argv0[0]);
623 s->argc = (s->argc0 - 1);
624 s->argv = (s->argv0 + 1);
626 s->n_procs = maxprocs;
633 snprintf(ee,
sizeof(ee),
"%s: no arguments", spw->fn);
636 if (s->n_procs <= 0) {
638 snprintf(ee,
sizeof(ee),
"%s: maxprocs not specified",
642 if (s->n_procs > spw->spawn_limit) {
644 snprintf(ee,
sizeof(ee),
645 "%s: maxprocs too large, (maxprocs=%d limit=%d)",
646 spw->fn, s->n_procs, spw->spawn_limit);
650 spw->n_processes += s->n_procs;
661 _Bool tracing5 = (mr->trace_map_spawn && (5 <= mr->verbosity));
662 if (s->icomm != MPI_COMM_NULL) {
664 ptrdiff_t done = (s - spw->spawned);
665 fprintf(stderr, (
";;KMR [%05d] %s [%ld]:" 666 " MPI_Comm_free (could block)...\n"),
667 mr->rank, spw->fn, done);
671 if (!mr->spawn_disconnect_but_free) {
672 cc = MPI_Comm_free(&(s->icomm));
674 cc = MPI_Comm_disconnect(&(s->icomm));
676 assert(cc == MPI_SUCCESS);
679 ptrdiff_t done = (s - spw->spawned);
680 fprintf(stderr, (
";;KMR [%05d] %s [%ld]:" 681 " MPI_Comm_free done\n"),
682 mr->rank, spw->fn, done);
696 assert(
sizeof(spw->watch_host) >= 46);
700 struct sockaddr_in sa4;
701 struct sockaddr_in6 sa6;
702 struct sockaddr_storage ss;
704 char hostname[MAXHOSTNAMELEN];
705 char address[INET6_ADDRSTRLEN];
709 int af = mr->spawn_watch_af;
710 assert(af == 0 || af == 4 || af == 6);
711 char *family = (af == 4 ?
"AF_INET" :
"AF_INET6");
713 assert(spw->watch_listener == -1);
714 const int *ports = mr->spawn_watch_port_range;
715 assert(ports[0] != 0 || ports[1] == 0);
716 for (
int port = ports[0]; port <= ports[1]; port++) {
718 sa.sa.sa_family = AF_INET;
719 }
else if (af == 0 || af == 6) {
720 sa.sa.sa_family = AF_INET6;
724 int fd = socket(sa.sa.sa_family, SOCK_STREAM, 0);
727 char *m = strerror(errno);
728 snprintf(ee,
sizeof(ee),
"%s: socket(%s) failed: %s",
733 cc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one,
sizeof(one));
736 char *m = strerror(errno);
737 snprintf(ee,
sizeof(ee),
"%s: setsockopt(SO_REUSEADDR): %s",
739 kmr_warning(mr, 1, ee);
744 memset(&sa, 0,
sizeof(sa));
745 sa.sa4.sin_family = AF_INET;
746 sa.sa4.sin_addr.s_addr = htonl(INADDR_ANY);
747 sa.sa4.sin_port = htons((uint16_t)port);
748 salen =
sizeof(sa.sa4);
749 }
else if (af == 0 || af == 6) {
750 memset(&sa, 0,
sizeof(sa));
751 sa.sa6.sin6_family = AF_INET6;
752 sa.sa6.sin6_addr = in6addr_any;
753 sa.sa6.sin6_port = htons((uint16_t)port);
754 salen =
sizeof(sa.sa6);
761 cc = bind(fd, &sa.sa, salen);
763 if (errno == EADDRINUSE || errno == EINVAL) {
769 char *m = strerror(errno);
770 snprintf(ee,
sizeof(ee),
"%s: bind(%s, port=%d) failed: %s",
771 spw->fn, family, port, m);
778 int backlog = spw->spawn_limit;
779 cc = listen(fd, backlog);
781 if (errno == EADDRINUSE || errno == EINVAL) {
787 char *m = strerror(errno);
788 snprintf(ee,
sizeof(ee),
"%s: listen(%s, port=%d) failed: %s",
789 spw->fn, family, port, m);
794 spw->watch_listener = fd;
798 int fd = spw->watch_listener;
801 snprintf(ee,
sizeof(ee),
"%s: no ports to listen to watch-programs",
808 memset(&sa, 0,
sizeof(sa));
809 socklen_t salen =
sizeof(sa);
810 cc = getsockname(fd, &sa.sa, &salen);
813 char *m = strerror(errno);
814 snprintf(ee,
sizeof(ee),
"%s: getsockname() failed: %s",
820 if (sa.sa.sa_family == AF_INET) {
821 port = ntohs(sa.sa4.sin_port);
822 }
else if (sa.sa.sa_family == AF_INET6) {
823 port = ntohs(sa.sa6.sin6_port);
826 snprintf(ee,
sizeof(ee),
"%s: getsockname(): unknown ip family=%d",
827 spw->fn, sa.sa.sa_family);
832 if (mr->spawn_watch_host_name != 0) {
833 cc = snprintf(hostname,
sizeof(hostname),
834 "%s", mr->spawn_watch_host_name);
835 assert(cc < (
int)
sizeof(hostname));
837 cc = gethostname(hostname,
sizeof(hostname));
840 char *m = strerror(errno);
841 snprintf(ee,
sizeof(ee),
"%s: gethostname() failed: %s",
847 struct addrinfo hints;
848 memset(&hints, 0,
sizeof(hints));
849 hints.ai_flags = AI_ADDRCONFIG;
850 hints.ai_socktype = SOCK_STREAM;
851 hints.ai_protocol = IPPROTO_TCP;
853 hints.ai_family = AF_INET;
854 }
else if (af == 6) {
855 hints.ai_family = AF_INET6;
856 }
else if (af == 0) {
857 hints.ai_family = (AF_INET6 | AI_V4MAPPED);
861 struct addrinfo *addrs = 0;
862 cc = getaddrinfo(hostname, 0, &hints, &addrs);
865 const char *m = gai_strerror(cc);
866 snprintf(ee,
sizeof(ee),
"%s: getaddrinfo(%s) failed: %s",
867 spw->fn, hostname, m);
871 for (p = addrs; p != 0; p = p->ai_next) {
872 if (!(p->ai_family == AF_INET || p->ai_family == AF_INET6)) {
875 if (af == 4 && p->ai_family != AF_INET) {
878 if (af == 6 && p->ai_family != AF_INET6) {
885 snprintf(ee,
sizeof(ee),
"%s: getaddrinfo(%s): no address for host",
890 if (p->ai_family == AF_INET) {
891 void *inaddr = &(((
struct sockaddr_in *)p->ai_addr)->sin_addr);
892 inet_ntop(p->ai_family, inaddr, address,
sizeof(address));
893 }
else if (p->ai_family == AF_INET6) {
894 void *inaddr = &(((
struct sockaddr_in6 *)p->ai_addr)->sin6_addr);
895 inet_ntop(p->ai_family, inaddr, address,
sizeof(address));
898 snprintf(ee,
sizeof(ee),
"%s: getaddrinfo(%s): unknown ip family=%d",
899 spw->fn, hostname, p->ai_family);
904 assert(0 <= index && index < spw->n_spawns);
906 s->watch_port = port;
908 cc = snprintf(spw->watch_host,
sizeof(spw->watch_host),
910 assert(cc < (
int)
sizeof(spw->watch_host));
921 _Bool tracing5 = (mr->trace_map_spawn && (5 <= mr->verbosity));
922 assert(0 <= index && index < spw->n_spawns);
924 assert(s->n_procs > 0);
927 struct sockaddr_in sa4;
928 struct sockaddr_in6 sa6;
929 struct sockaddr_storage ss;
933 assert(spw->watch_listener != -1);
934 int fd0 = spw->watch_listener;
935 for (
int count = 0; count < s->n_procs; count++) {
938 struct pollfd fds0, *fds = &fds0;
939 memset(fds, 0, (
sizeof(
struct pollfd) * nfds));
941 fds[0].events = (POLLIN|POLLPRI);
944 assert(mr->spawn_watch_accept_onhold_msec >= (60 * 1000));
945 int msec = mr->spawn_watch_accept_onhold_msec;
946 int nn = poll(fds, nfds, msec);
949 snprintf(ee,
sizeof(ee),
950 "%s: accepting watch-programs timed out" 951 " (msec=%d)", spw->fn, msec);
953 }
else if (nn < 0 && (errno == EAGAIN || errno == EINTR)) {
955 char *m = strerror(errno);
956 snprintf(ee,
sizeof(ee),
957 "%s: poll (for watch-programs) returned: %s",
959 kmr_warning(mr, 1, ee);
963 char *m = strerror(errno);
964 snprintf(ee,
sizeof(ee),
965 "%s: poll (for watch-programs) failed: %s",
972 memset(&sa, 0,
sizeof(sa));
973 socklen_t salen =
sizeof(sa);
974 int fd = accept(fd0, &sa.sa, &salen);
977 char *m = strerror(errno);
978 snprintf(ee,
sizeof(ee),
979 "%s: accept (for watch-programs) failed: %s",
987 char address[INET6_ADDRSTRLEN];
989 if (sa.sa.sa_family == AF_INET) {
990 void *inaddr = &sa.sa4.sin_addr;
991 inet_ntop(sa.sa.sa_family, inaddr, address,
sizeof(address));
993 }
else if (sa.sa.sa_family == AF_INET6) {
994 void *inaddr = &sa.sa6.sin6_addr;
995 inet_ntop(sa.sa.sa_family, inaddr, address,
sizeof(address));
999 snprintf(ee,
sizeof(ee),
"%s: accept(): unknown ip family=%d",
1000 spw->fn, sa.sa.sa_family);
1004 fprintf(stderr, (
";;KMR [%05d] %s [%d]:" 1005 " accepting a connection of watch-programs" 1006 " on port=%d from %s (%d/%d)\n"),
1007 mr->rank, spw->fn, index,
1008 s->watch_port, address, (count + 1), s->n_procs);
1013 if (count == 0 || mr->spawn_watch_all) {
1014 assert((s->index + count) <= spw->n_processes);
1015 spw->watches[s->index + count] = fd;
1021 ssize_t wsize = write(fd, &val,
sizeof(
int));
1024 char *m = strerror(errno);
1025 snprintf(ee,
sizeof(ee),
1026 "%s: write (for watch-programs) failed: %s",
1030 assert(wsize ==
sizeof(
int));
1033 ssize_t rsize = read(fd, &rval,
sizeof(
int));
1036 char *m = strerror(errno);
1037 snprintf(ee,
sizeof(ee),
1038 "%s: read (for watch-programs) failed: %s",
1042 assert(rsize ==
sizeof(
int));
1043 assert(val == rval);
1045 if (!(count == 0 || mr->spawn_watch_all)) {
1051 cc = close(spw->watch_listener);
1053 spw->watch_listener = -1;
1060 int w, _Bool replyeach, _Bool replyroot)
1062 assert(0 <= w && w < spw->n_spawns);
1064 enum kmr_spawn_mode mode = spw->mode;
1066 MPI_Request *reqs = spw->replies;
1067 if (mode == KMR_SPAWN_INTERACT) {
1069 assert(s->index + s->count <= spw->n_processes);
1070 for (
int rank = 0; rank < s->count; rank++) {
1071 assert(reqs[s->index + rank] == MPI_REQUEST_NULL);
1072 cc = MPI_Irecv(0, 0, MPI_BYTE,
1073 rank, KMR_TAG_SPAWN_REPLY,
1074 s->icomm, &reqs[s->index + rank]);
1075 assert(cc == MPI_SUCCESS);
1076 assert(reqs[s->index + rank] != MPI_REQUEST_NULL);
1078 }
else if (replyroot) {
1079 assert(w <= spw->n_processes);
1081 assert(reqs[w] == MPI_REQUEST_NULL);
1082 cc = MPI_Irecv(0, 0, MPI_BYTE,
1083 rank, KMR_TAG_SPAWN_REPLY,
1084 s->icomm, &reqs[w]);
1085 assert(cc == MPI_SUCCESS);
1086 assert(reqs[w] != MPI_REQUEST_NULL);
1090 }
else if (mode == KMR_SPAWN_SERIAL) {
1093 assert(s->index + s->count <= spw->n_processes);
1094 for (
int rank = 0; rank < s->count; rank++) {
1095 assert(reqs[s->index + rank] == MPI_REQUEST_NULL);
1096 cc = MPI_Irecv(0, 0, MPI_BYTE,
1097 rank, KMR_TAG_SPAWN_REPLY,
1098 s->icomm, &reqs[s->index + rank]);
1099 assert(cc == MPI_SUCCESS);
1100 assert(reqs[s->index + rank] != MPI_REQUEST_NULL);
1104 assert(mode == KMR_SPAWN_INTERACT || mode == KMR_SPAWN_SERIAL);
1119 _Bool tracing5 = (mr->trace_map_spawn && (5 <= mr->verbosity));
1121 MPI_Request *reqs = spw->replies;
1122 if (opt.reply_each) {
1125 int nwait = spw->n_processes;
1126 cc = MPI_Waitany(nwait, reqs, &index, &st);
1127 assert(cc == MPI_SUCCESS && index != MPI_UNDEFINED);
1128 assert(index < spw->n_processes);
1129 assert(reqs[index] == MPI_REQUEST_NULL);
1132 for (
int w = 0; w < spw->n_spawns; w++) {
1134 if (index < (s->index + s->count)) {
1135 assert(s->index <= index);
1143 int count = (opt.reply_each ? s->count : 1);
1145 assert((s->index + count) <= spw->n_processes);
1146 for (
int j = 0; j < count; j++) {
1147 if (reqs[s->index + j] == MPI_REQUEST_NULL) {
1153 fprintf(stderr, (
";;KMR [%05d] %s [%d]: got a reply (%d/%d)\n"),
1154 mr->rank, spw->fn, done, nreplies, count);
1158 _Bool fin = (nreplies == count);
1159 return (fin ? done : -1);
1160 }
else if (opt.reply_root) {
1163 int nwait = spw->n_spawns;
1164 cc = MPI_Waitany(nwait, reqs, &index, &st);
1165 assert(cc == MPI_SUCCESS && index != MPI_UNDEFINED);
1166 assert(index <= spw->n_spawns);
1167 assert(reqs[index] == MPI_REQUEST_NULL);
1169 assert(0 <= done && done < spw->n_spawns);
1172 assert(reqs[done] == MPI_REQUEST_NULL);
1175 fprintf(stderr, (
";;KMR [%05d] %s [%d]: got a root reply\n"),
1176 mr->rank, spw->fn, done);
1183 for (
int w = 0; w < spw->n_spawns; w++) {
1192 fprintf(stderr, (
";;KMR [%05d] %s [%d]: (no checks of replies)\n"),
1193 mr->rank, spw->fn, done);
1213 _Bool tracing5 = (mr->trace_map_spawn && (5 <= mr->verbosity));
1218 for (
int w = 0; w < spw->n_spawns; w++) {
1224 assert(nruns != 0 && spw->n_runnings == nruns);
1227 for (
int i = 0; i < spw->n_processes; i++) {
1228 if (spw->watches[i] != -1) {
1234 struct pollfd *fds =
kmr_malloc(
sizeof(
struct pollfd) * (
size_t)nfds);
1238 memset(fds, 0, (
sizeof(
struct pollfd) * nfds));
1240 for (
int i = 0; i < spw->n_processes; i++) {
1241 if (spw->watches[i] != -1) {
1242 assert(fdix < nfds);
1243 fds[fdix].fd = spw->watches[i];
1244 fds[fdix].events = (POLLIN|POLLPRI);
1245 fds[fdix].revents = 0;
1249 assert(fdix == nfds);
1252 fprintf(stderr, (
";;KMR [%05d] %s:" 1253 " waiting for some watch-programs finish\n"),
1260 int nn = poll(fds, nfds, msec);
1265 MPI_Testany(0, 0, &index, &ok, &st);
1270 }
else if (nn < 0 && (errno == EAGAIN || errno == EINTR)) {
1272 char *m = strerror(errno);
1273 snprintf(ee,
sizeof(ee),
1274 (
"poll (for watch-programs) interrupted;" 1275 " continuing: %s"), m);
1276 kmr_warning(mr, 1, ee);
1280 char *m = strerror(errno);
1281 snprintf(ee,
sizeof(ee),
1282 "%s: poll (for watch-programs) failed: %s",
1291 for (nfds_t k = 0; k < nfds; k++) {
1292 if (fds[k].fd != -1 && fds[k].revents != 0) {
1299 snprintf(ee,
sizeof(ee),
"poll (for watch-programs) no FD found");
1300 kmr_warning(mr, 1, ee);
1305 for (
int w = 0; w < spw->n_spawns; w++) {
1307 assert((s->index + s->count) <= spw->n_processes);
1308 for (
int j = 0; j < s->count; j++) {
1309 if (fd == spw->watches[s->index + j]) {
1310 index = (s->index + j);
1316 assert(fd != -1 && index != -1 && done != -1);
1317 assert(0 <= index && index < spw->n_processes);
1318 assert(0 <= done && done < spw->n_spawns);
1321 ssize_t rr = read(fd, garbage,
sizeof(garbage));
1324 assert(fd == spw->watches[index]);
1327 spw->watches[index] = -1;
1329 }
else if (rr > 0) {
1332 }
else if (rr == -1 && (errno == EAGAIN || errno == EINTR)) {
1334 char *m = strerror(errno);
1335 snprintf(ee,
sizeof(ee),
1336 "read (for watch-programs) returned: %s", m);
1337 kmr_warning(mr, 1, ee);
1339 }
else if (rr == -1) {
1341 char *m = strerror(errno);
1342 snprintf(ee,
sizeof(ee),
1343 "%s: read (for watch-programs) failed: %s",
1350 assert(0 <= done && done < spw->n_spawns);
1352 int count = ((mr->spawn_watch_all) ? s->count : 1);
1354 assert((s->index + count) <= spw->n_processes);
1355 for (
int j = 0; j < count; j++) {
1356 if (spw->watches[s->index + j] == -1) {
1362 fprintf(stderr, (
";;KMR [%05d] %s [%d]:" 1363 " detected a watch done (%d/%d)\n"),
1364 mr->rank, spw->fn, done, nreplies, count);
1368 _Bool fin = (nreplies == count);
1370 if (s->icomm != MPI_COMM_NULL) {
1371 cc = kmr_free_comm_with_tracing(mr, spw, s);
1372 assert(cc == MPI_SUCCESS);
1376 kmr_free(fds, (
sizeof(
struct pollfd) * (
size_t)nfds));
1377 return (fin ? done : -1);
1386 enum kmr_spawn_mode mode = spw->mode;
1388 if (mode == KMR_SPAWN_INTERACT) {
1389 done = kmr_wait_for_reply(mr, spw, opt);
1390 }
else if (mode == KMR_SPAWN_SERIAL) {
1391 done = kmr_wait_for_reply(mr, spw, opt);
1392 }
else if (mode == KMR_SPAWN_PARALLEL) {
1393 done = kmr_wait_for_watch(mr, spw, opt);
1399 assert(0 <= done && done < spw->n_spawns);
1401 s->timestamp[3] = MPI_Wtime();
1403 if (mr->spawn_pass_intercomm_in_argument
1404 && mode == KMR_SPAWN_INTERACT) {
1405 assert(mr->spawn_comms != 0);
1406 assert(mr->spawn_comms[done] == &(s->icomm));
1408 kmr_spawn_info_put(&si, s, opt, arg);
1409 cc = (*m)(spw->ev[done], kvi, kvo, &si, done);
1410 if (cc != MPI_SUCCESS) {
1412 snprintf(ee,
sizeof(ee),
1413 "Map-fn returned with error cc=%d", cc);
1416 kmr_spawn_info_get(&si, s);
1418 cc = (*m)(spw->ev[done], kvi, kvo, arg, done);
1419 if (cc != MPI_SUCCESS) {
1421 snprintf(ee,
sizeof(ee),
1422 "Map-fn returned with error cc=%d", cc);
1427 s->timestamp[4] = MPI_Wtime();
1428 if (s->icomm != MPI_COMM_NULL) {
1429 cc = kmr_free_comm_with_tracing(mr, spw, s);
1430 assert(cc == MPI_SUCCESS);
1432 s->timestamp[5] = MPI_Wtime();
1435 spw->n_runnings -= s->count;
1444 kmr_map_spawned_processes(
enum kmr_spawn_mode mode,
char *name,
1449 kmr_assert_kvs_ok(kvi, kvo, 1, 0);
1450 assert(kvi->c.value_data == KMR_KV_OPAQUE
1451 || kvi->c.value_data == KMR_KV_CSTRING);
1452 assert(kvi->c.element_count <= INT_MAX);
1453 _Bool use_reply = (mode == KMR_SPAWN_INTERACT || mode == KMR_SPAWN_SERIAL);
1454 _Bool use_watch = (mode != KMR_SPAWN_INTERACT);
1455 KMR *
const mr = kvi->c.mr;
1456 _Bool tracing5 = (mr->trace_map_spawn && (5 <= mr->verbosity));
1460 char hostport[MAXHOSTNAMELEN + 10];
1466 cc = kmr_install_watch_program(mr, name);
1467 assert(cc == MPI_SUCCESS);
1471 int cnt = (int)kvi->c.element_count;
1475 spw->n_spawns = cnt;
1476 spw->n_starteds = 0;
1477 spw->n_runnings = 0;
1480 spw->watch_listener = -1;
1481 spw->watch_host[0] = 0;
1484 cc = kmr_list_spawns(spw, kvi, info, opt);
1485 assert(cc == MPI_SUCCESS);
1490 if (opt.take_ckpt) {
1509 assert(spw->replies == 0);
1510 spw->replies =
kmr_malloc(
sizeof(MPI_Request) * (
size_t)spw->n_processes);
1511 for (
int i = 0; i < spw->n_processes; i++) {
1512 spw->replies[i] = MPI_REQUEST_NULL;
1515 if (mode == KMR_SPAWN_PARALLEL) {
1516 assert(spw->watches == 0);
1517 spw->watches =
kmr_malloc(
sizeof(
int) * (
size_t)spw->n_processes);
1518 for (
int i = 0; i < spw->n_processes; i++) {
1519 spw->watches[i] = -1;
1522 assert(mr->spawn_comms == 0);
1523 mr->spawn_size = spw->n_spawns;
1524 mr->spawn_comms =
kmr_malloc(
sizeof(MPI_Comm *) * (
size_t)spw->n_spawns);
1525 for (
int w = 0; w < spw->n_spawns; w++) {
1526 mr->spawn_comms[w] = &(spw->spawned[w].icomm);
1530 if (mr->spawn_gap_msec[0] == 0) {
1534 unsigned int v = (
unsigned int)spw->usize;
1539 gap = (int)((((
long)mr->spawn_gap_msec[1] * usz) / 10)
1540 + mr->spawn_gap_msec[0]);
1545 for (
int w = from; w < spw->n_spawns; w++) {
1550 if ((spw->n_runnings + s->n_procs) > spw->spawn_limit) {
1551 while ((spw->n_runnings + s->n_procs) > spw->spawn_limit) {
1552 cc = kmr_wait_then_map(mr, spw,
1553 kvi, kvo, arg, opt, mapfn);
1554 assert(cc == MPI_SUCCESS);
1559 ";;KMR [%05d] %s: sleeping for spawn gap" 1561 mr->rank, spw->fn, gap);
1568 if (mode == KMR_SPAWN_PARALLEL) {
1569 cc = kmr_listen_to_watch(mr, spw, w);
1570 assert(cc == MPI_SUCCESS);
1572 cc = snprintf(spw->watch_host,
sizeof(spw->watch_host),
1574 assert(cc < (
int)
sizeof(spw->watch_host));
1581 argc = (s->argc + 5);
1582 argv =
kmr_malloc(
sizeof(
char *) * (
size_t)(argc + 1));
1584 cc = snprintf(hostport,
sizeof(hostport),
1585 "%s/%d", spw->watch_host, s->watch_port);
1586 assert(cc < (
int)
sizeof(hostport));
1588 unsigned int vv = (
unsigned int)random();
1589 cc = snprintf(magic,
sizeof(magic),
"%08xN%dV0%s",
1590 vv, w, ((mr->trace_map_spawn) ?
"T1" :
""));
1591 assert(cc < (
int)
sizeof(magic));
1593 assert(mr->spawn_watch_program != 0);
1594 argv[0] = mr->spawn_watch_program;
1595 argv[1] = ((mode == KMR_SPAWN_SERIAL) ?
"seq" :
"mpi");
1599 for (
int i = 0; i < s->argc; i++) {
1600 argv[5 + i] = s->argv[i];
1602 argv[(s->argc + 5)] = 0;
1607 assert(argv[argc] == 0);
1611 kmr_make_pretty_argument_string(ee,
sizeof(ee), argc, argv);
1612 fprintf(stderr, (
";;KMR [%05d] %s [%d]: MPI_Comm_spawn" 1613 " (maxprocs=%d;%s)\n"),
1614 mr->rank, spw->fn, w, s->n_procs, ee);
1618 s->timestamp[0] = MPI_Wtime();
1621 assert(s->icomm == MPI_COMM_NULL);
1622 int *ec =
kmr_malloc(
sizeof(
int) * (
size_t)s->n_procs);
1624 MPI_Comm spawncomm = MPI_COMM_SELF;
1625 cc = MPI_Comm_spawn(argv[0], &(argv[1]), s->n_procs, info,
1626 root, spawncomm, &s->icomm, ec);
1627 assert(cc == MPI_SUCCESS || cc == MPI_ERR_SPAWN);
1628 if (cc == MPI_ERR_SPAWN) {
1631 for (
int r = 0; r < s->n_procs; r++) {
1632 if (ec[r] == MPI_SUCCESS) {
1637 nspawns = s->n_procs;
1639 assert(nspawns > 0);
1641 s->timestamp[1] = MPI_Wtime();
1643 kmr_free(ec, (
sizeof(
int) * (
size_t)s->n_procs));
1644 if (argv != s->argv) {
1645 kmr_free(argv, (
sizeof(
char *) * (
size_t)(argc + 1)));
1650 s->index = spw->n_starteds;
1652 spw->n_starteds += nspawns;
1653 spw->n_runnings += nspawns;
1656 if (mode == KMR_SPAWN_PARALLEL) {
1657 cc = kmr_accept_on_watch(mr, spw, w);
1658 assert(cc == MPI_SUCCESS);
1661 if (mr->spawn_disconnect_early && mode == KMR_SPAWN_PARALLEL) {
1662 if (s->icomm != MPI_COMM_NULL) {
1663 cc = kmr_free_comm_with_tracing(mr, spw, s);
1664 assert(cc == MPI_SUCCESS);
1668 if (mr->spawn_sync_at_startup && s->icomm != MPI_COMM_NULL) {
1670 cc = MPI_Comm_test_inter(s->icomm, &flag);
1671 assert(cc == MPI_SUCCESS && flag != 0);
1673 cc = MPI_Comm_remote_size(s->icomm, &peernprocs);
1674 assert(cc == MPI_SUCCESS && peernprocs == s->count);
1677 s->timestamp[2] = MPI_Wtime();
1680 cc = kmr_receive_for_reply(mr, spw, w,
1681 opt.reply_each, opt.reply_root);
1682 assert(cc == MPI_SUCCESS);
1686 while (spw->n_runnings > 0) {
1687 cc = kmr_wait_then_map(mr, spw,
1688 kvi, kvo, arg, opt, mapfn);
1689 assert(cc == MPI_SUCCESS);
1693 for (
int w = 0; w < spw->n_spawns; w++) {
1695 fprintf(stderr, (
";;KMR [%05d] %s [%d/%d]" 1697 " spawn=%f setup=%f run=%f mapfn=%f clean=%f" 1699 mr->rank, spw->fn, w, spw->n_spawns,
1700 ((s->timestamp[1] - s->timestamp[0]) * 1e3),
1701 ((s->timestamp[2] - s->timestamp[1]) * 1e3),
1702 ((s->timestamp[3] - s->timestamp[2]) * 1e3),
1703 ((s->timestamp[4] - s->timestamp[3]) * 1e3),
1704 ((s->timestamp[5] - s->timestamp[4]) * 1e3));
1709 assert(mr->spawn_comms != 0);
1711 kmr_free(mr->spawn_comms, (
sizeof(MPI_Comm *) * (
size_t)spw->n_spawns));
1712 mr->spawn_comms = 0;
1714 for (
int w = 0; w < spw->n_spawns; w++) {
1716 assert(s->icomm == MPI_COMM_NULL);
1717 assert(s->abuf != 0);
1718 kmr_free(s->abuf, s->alen);
1720 assert(s->argv0 != 0);
1721 kmr_free(s->argv0, (
sizeof(
char *) * (
size_t)(s->argc0 + 1)));
1725 assert(spw->ev != 0);
1726 kmr_free(spw->ev, (
sizeof(
struct kmr_kv_box) * (
size_t)spw->n_spawns));
1728 assert(spw->spawned != 0);
1729 kmr_free(spw->spawned, (
sizeof(
struct kmr_spawn_state) * (
size_t)spw->n_spawns));
1733 assert(spw->replies != 0);
1734 for (
int i = 0; i < spw->n_processes; i++) {
1735 assert(spw->replies[i] == MPI_REQUEST_NULL);
1737 kmr_free(spw->replies, (
sizeof(MPI_Request) * (
size_t)spw->n_processes));
1740 if (mode == KMR_SPAWN_PARALLEL) {
1741 assert(spw->watches != 0);
1742 for (
int i = 0; i < spw->n_processes; i++) {
1743 assert(spw->watches[i] == -1);
1745 kmr_free(spw->watches, (
sizeof(
int) * (
size_t)spw->n_processes));
1749 assert(spw->watch_listener == -1);
1779 MPI_Comm ic = MPI_COMM_NULL;
1780 cc = MPI_Comm_get_parent(&ic);
1781 assert(cc == MPI_SUCCESS);
1782 if (ic == MPI_COMM_NULL) {
1783 kmr_error(mr, (
"kmr_reply_to_spawner:" 1784 " may be called in a not-spawned process"));
1787 cc = MPI_Send(0, 0, MPI_BYTE, peer, KMR_TAG_SPAWN_REPLY, ic);
1788 assert(cc == MPI_SUCCESS);
1801 if (mr->spawn_comms == 0) {
1802 kmr_error(mr, (
"kmr_get_spawner_communicator() be called" 1803 " outside of kmr_map_via_spawn()"));
1805 if (index >= mr->spawn_size) {
1806 kmr_error(mr, (
"kmr_get_spawner_communicator() be called" 1807 " with index out of range"));
1809 MPI_Comm *comm = mr->spawn_comms[index];
1814 kmr_get_spawner_communicator_ff(
KMR *mr,
long ii,
int *comm)
1817 *comm = MPI_Comm_c2f(*c);
1874 int cc = kmr_map_spawned_processes(KMR_SPAWN_INTERACT,
1875 "kmr_map_via_spawn",
1876 kvi, kvo, arg, info, opt, mapfn);
1920 ssopt.reply_root = 0;
1921 ssopt.reply_each = 1;
1922 int cc = kmr_map_spawned_processes(KMR_SPAWN_PARALLEL,
1923 "kmr_map_parallel_processes",
1924 kvi, kvo, arg, info, ssopt, mapfn);
1950 ssopt.reply_root = 0;
1951 ssopt.reply_each = 1;
1952 int cc = kmr_map_spawned_processes(KMR_SPAWN_SERIAL,
1953 "kmr_map_serial_processes",
1954 kvi, kvo, arg, info, ssopt, mapfn);
1969 KMR *mr = kvi->c.mr;
1970 if (opt.reply_root || opt.reply_each) {
1971 kmr_error(mr,
"kmr_map_processes:" 1972 " options REPLY_ROOT/REPLY_EACH not allowed");
1976 ssopt.reply_root = 0;
1977 ssopt.reply_each = 1;
1979 int cc = kmr_map_spawned_processes(KMR_SPAWN_SERIAL,
1980 "kmr_map_processes",
1981 kvi, kvo, arg, info, ssopt, mapfn);
1984 int cc = kmr_map_spawned_processes(KMR_SPAWN_PARALLEL,
1985 "kmr_map_processes",
1986 kvi, kvo, arg, info, ssopt, mapfn);
1995 kmr_create_dummy_context(
void)
2008 MPI_Comm ic = MPI_COMM_NULL;
2009 cc = MPI_Comm_get_parent(&ic);
2010 assert(cc == MPI_SUCCESS);
2011 if (ic == MPI_COMM_NULL) {
2012 kmr_error(mr, (
"kmr_send_kvs_to_spawner:" 2013 " may be called in a not-spawned process"));
2018 assert(cc == MPI_SUCCESS && data != 0 && sz != 0);
2021 cc = MPI_Send(&siz, 1, MPI_INT, peer, KMR_TAG_SPAWN_REPLY1, ic);
2022 assert(cc == MPI_SUCCESS);
2023 cc = MPI_Send(data, (
int)sz, MPI_BYTE, peer, KMR_TAG_SPAWN_REPLY1, ic);
2024 assert(cc == MPI_SUCCESS);
2043 _Bool replyeach = 1;
2045 KMR *
const mr = kvi->c.mr;
2047 assert(icommr != 0);
2048 MPI_Comm icomm = *icommr;
2051 cc = MPI_Comm_remote_size(icomm, &peernprocs);
2052 assert(cc == MPI_SUCCESS);
2053 int npeers = (replyeach ? peernprocs : 1);
2054 for (
int peerrank = 0; peerrank < npeers; peerrank++) {
2058 cc = MPI_Recv(&sz, 1, MPI_INT,
2059 peerrank, KMR_TAG_SPAWN_REPLY1,
2061 assert(cc == MPI_SUCCESS);
2066 cc = MPI_Recv(data, sz, MPI_BYTE,
2067 peerrank, KMR_TAG_SPAWN_REPLY1,
2069 assert(cc == MPI_SUCCESS);
2072 assert(cc == MPI_SUCCESS);
2073 struct kmr_option keepopen = {.keep_open = 1};
2075 assert(cc == MPI_SUCCESS);
2076 kmr_free(data, (
size_t)sz);
2094 char *name =
"kmr_map_ms_commands";
2095 KMR *mr = kvi->c.mr;
2096 _Bool tracing5 = (mr->trace_map_ms && (5 <= mr->verbosity));
2101 memcpy(abuf, kv.v.p, (
size_t)kv.vlen);
2104 const int maxargc = (
sizeof(argv) /
sizeof(*argv));
2105 cc = kmr_scan_argv_strings(mr, abuf, (
size_t)kv.vlen, maxargc,
2107 opt.separator_space, name);
2108 assert(cc == MPI_SUCCESS);
2113 kmr_make_pretty_argument_string(ss,
sizeof(ss), argc, argv);
2115 ";;KMR [%05d] %s: fork-exec: %s\n",
2116 mr->rank, name, ss);
2121 if (mr->keep_fds_at_fork) {
2130 char *m = strerror(errno);
2131 snprintf(ee,
sizeof(ee),
"%s: fork() failed: %s",
2136 for (
int fd = 3; fd < closefds; fd++) {
2139 cc = execvp(argv[0], argv);
2141 kmr_make_pretty_argument_string(ss,
sizeof(ss), argc, argv);
2144 char *m = strerror(errno);
2145 snprintf(ee,
sizeof(ee),
"%s: execvp(%s) failed: %s",
2150 snprintf(ee,
sizeof(ee),
"%s: execvp(%s) returned with cc=%d",
2156 cc = waitpid(pid, &st, 0);
2158 if (errno == EINTR) {
2160 snprintf(ee,
sizeof(ee),
"%s: waitpid() interrupted",
2162 kmr_warning(mr, 1, ee);
2165 char *m = strerror(errno);
2166 snprintf(ee,
sizeof(ee),
"%s: waitpid() failed: %s",
2168 kmr_warning(mr, 1, ee);
2176 kmr_make_pretty_argument_string(ss,
sizeof(ss), argc, argv);
2178 ";;KMR [%05d] %s: fork-exec done: %s\n",
2179 mr->rank, name, ss);
2183 cc = (*xarg->fn)(kv, kvi, kvo, xarg->arg, index);
2184 assert(cc == MPI_SUCCESS);
2186 kmr_free(abuf, (
size_t)kv.vlen);
int kmr_ckpt_progress_init(KMR_KVS *, KMR_KVS *, struct kmr_option)
It initializes a progress of MapReduce checkpointing.
static int kmr_map_ms_fork_exec_command(const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long index)
Runs commands in kmr_map_ms_commands().
Key-Value Stream (abstract).
static int kmr_map_master(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Delivers key-value pairs as requested.
Utilities Private Part (do not include from applications).
Options to Mapping, Shuffling, and Reduction.
void kmr_ckpt_progress_fin(KMR *)
It finalizes the progress of MapReduce checkpointing.
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
void kmr_ckpt_save_kvo_each_init(KMR *, KMR_KVS *)
It initializes saving indexed key-value pairs of the output KVS to a checkpoint data file...
void kmr_ckpt_save_kvo_each_fin(KMR *, KMR_KVS *)
It finalizes saving indexed key-value pairs of the output KVS to the checkpoint data file...
#define kmr_malloc(Z)
Allocates memory, or aborts when failed.
#define kmr_create_kvs(MR, KF, VF)
Makes a new key-value stream (of type KMR_KVS) with the specified field datatypes.
int kmr_save_kvs(KMR_KVS *kvi, void **dataq, size_t *szq, struct kmr_option opt)
Packs locally the contents of a key-value stream to a byte array.
int kmr_ckpt_enabled(KMR *)
Check if checkpoint/restart is enabled.
long kmr_ckpt_first_unprocessed_kv(KMR *)
It returns the index of the first unprocessed key-value in the input KVS.
int kmr_add_kv_done(KMR_KVS *kvs)
Marks finished adding key-value pairs.
void kmr_ckpt_save_kvo_each_add(KMR *, KMR_KVS *, long)
It adds new key-value pairs of the output KVS to the checkpoint data file.
int kmr_free_kvs(KMR_KVS *kvs)
Releases a key-value stream (type KMR_KVS).
int kmr_map_processes(_Bool nonmpi, KMR_KVS *kvi, KMR_KVS *kvo, void *arg, MPI_Info info, struct kmr_spawn_option opt, kmr_mapfn_t mapfn)
Maps on processes started by MPI_Comm_spawn() to run independent processes.
int kmr_map_via_spawn(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, MPI_Info info, struct kmr_spawn_option opt, kmr_mapfn_t mapfn)
Maps on processes started by MPI_Comm_spawn().
kmr_kv_field
Datatypes of Keys or Values.
#define kmr_map(KVI, KVO, ARG, OPT, M)
Maps simply.
Handy Copy of a Key-Value Field.
Options to Mapping by Spawns.
State during kmr_map_ms().
int kmr_ckpt_disable_ckpt(KMR *)
It temporally disables checkpoint/restart.
int kmr_map_ms(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Maps in master-slave mode.
#define kmr_realloc(P, Z)
Allocates memory, or aborts when failed.
int kmr_send_kvs_to_spawner(KMR *mr, KMR_KVS *kvs)
Sends the KVS from a spawned process to the map-function of the spawner.
int kmr_restore_kvs(KMR_KVS *kvo, void *data, size_t sz, struct kmr_option opt)
Unpacks locally the contents of a key-value stream from a byte array.
MPI_Comm * kmr_get_spawner_communicator(KMR *mr, long index)
Obtains (a reference to) a parent inter-communicator of a spawned process.
int kmr_add_identity_fn(const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long i)
Adds a given key-value pair unmodified.
int kmr_getdtablesize(KMR *mr)
Does getdtablesize(); it is defined, because it is not Posix.
int kmr_reply_to_spawner(KMR *mr)
Sends a reply message in the spawned process, which tells it is ready to finish and may have some dat...
int kmr_replicate(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Replicates key-value pairs to be visible on all ranks, that is, it has the effect of bcast or all-gat...
static int kmr_map_slave(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Asks the master for a task, then calls a map-function.
int kmr_map_parallel_processes(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, MPI_Info info, struct kmr_spawn_option opt, kmr_mapfn_t mapfn)
Maps on processes started by MPI_Comm_spawn() to run independent MPI processes, which will not commun...
static struct kmr_kv_box kmr_pick_kv(struct kmr_kvs_entry *e, KMR_KVS *kvs)
Returns a handle to a key-value entry – a reverse of kmr_poke_kv().
int kmr_reduce_as_one(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_redfn_t r)
Calls a reduce-function once as if all key-value pairs had the same key.
int kmr_receive_kvs_from_spawned_fn(const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long index)
Collects key-value pairs generated by spawned processes.
int kmr_map_ms_commands(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, struct kmr_spawn_option sopt, kmr_mapfn_t m)
Maps in master-slave mode, specialized to run serial commands.
int kmr_ckpt_enable_ckpt(KMR *, int)
It temporally enables checkpoint/restart which has been disabled by calling kmr_ckpt_disable_ckpt().
int kmr_map_serial_processes(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, MPI_Info info, struct kmr_spawn_option opt, kmr_mapfn_t mapfn)
Maps on processes started by MPI_Comm_spawn() to run serial processes.
int(* kmr_mapfn_t)(const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long index)
Map-function Type.
KMR * kmr_create_context(const MPI_Comm comm, const MPI_Info conf, const char *name)
Makes a new KMR context (a context has type KMR).