19 #include "../config.h" 23 #if (defined(__K) && defined(KMRFASTNOTICE)) 27 #define MIN(a,b) (((a)<(b))?(a):(b)) 28 #define MAX(a,b) (((a)>(b))?(a):(b)) 33 #define STAT_TEST_TIME 0 34 #define STAT_WAIT_TIME 1 36 #define STAT_RECV_CALLS 0 37 #define STAT_SEND_CALLS 1 38 #define STAT_TEST_CALLS 2 39 #define STAT_WAIT_CALLS 3 40 #define STAT_TEST0_COUNT 4 41 #define STAT_TEST1_COUNT 5 42 #define STAT_WAIT_COUNT 6 43 #define STAT_SEND_PEND_COUNT 7 44 #define STAT_RDMA_POLLCQ_CALLS 8 45 #define STAT_TEST_CLOCKS 9 47 static int kmr_pushoff_seqno = 0;
48 static KMR_KVS *kmr_pushoff_movings[64];
52 const size_t kmr_pushoff_area_size = (
sizeof(int) * 1024);
53 const int kmr_pushoff_memid = 1;
55 static volatile int *kmr_pushoff_area = 0;
56 static uint64_t *kmr_pushoff_addrs = 0;
57 static int kmr_pushoff_nprocs;
58 static int kmr_pushoff_rank;
60 static void kmr_pushoff_notice(
KMR *mr,
int peer);
65 #define FAST_NOTICE (kmr_pushoff_area[0]) 69 #define PUT_VALUE (((int)'A'<<24)|((int)'H'<<16)|((int)'O'<<8)|((int)'%')) 87 const char *file,
const int line,
const char *func)
91 if (mr->ckpt_enable) {
92 kmr_error(mr, (
"kmr_create_pushoff_kvs:" 93 " Unable to use it under checkpointing"));
95 if ((kf == KMR_KV_POINTER_OWNED) || (kf == KMR_KV_POINTER_UNMANAGED)
96 || (vf == KMR_KV_POINTER_OWNED) || (vf == KMR_KV_POINTER_UNMANAGED)) {
97 kmr_error(mr,
"kmr_create_pushoff_kvs: Pointers not allowed");
101 KMR_DEBUGX(memset(kvs, 0,
sizeof(
KMR_KVS)));
102 kvs->o.magic = KMR_KVS_PUSHOFF;
105 kvs->o.key_data = kf;
106 kvs->o.value_data = vf;
107 kvs->o.element_count = 0;
108 kvs->o.info_line0.file = file;
109 kvs->o.info_line0.func = func;
110 kvs->o.info_line0.line = line;
116 kvs->o._uniformly_sized_ = 0;
118 int nmovings = (int)(
sizeof(kmr_pushoff_movings)
119 /
sizeof(kmr_pushoff_movings[0]));
120 kvs->o.seqno = (kmr_pushoff_seqno % nmovings);
122 assert(kmr_pushoff_movings[kvs->o.seqno] == 0);
123 kmr_pushoff_movings[kvs->o.seqno] = kvs;
127 int nprocs = mr->nprocs;
130 kvs->o.reqs =
kmr_malloc(
sizeof(MPI_Request) * (
size_t)(2 * nprocs));
131 kvs->o.indexes =
kmr_malloc(
sizeof(
int) * (
size_t)(2 * nprocs));
132 kvs->o.statuses =
kmr_malloc(
sizeof(MPI_Status) * (
size_t)(2 * nprocs));
133 memset(kvs->o.indexes, 0, (
sizeof(
int) * (
size_t)(2 * nprocs)));
134 memset(kvs->o.statuses, 0, (
sizeof(MPI_Status) * (
size_t)(2 * nprocs)));
135 for (
int r = 0; r < nprocs; r++) {
137 kvs->o.reqs[r] = MPI_REQUEST_NULL;
138 kvs->o.reqs[r + nprocs] = MPI_REQUEST_NULL;
140 kmr_kvs_reset_block(kvs, b0, mr->pushoff_block_size, 0);
141 po->adding_point = &(b0->data[0]);
152 kmr_kvs_reset_block(kvs, b1, mr->pushoff_block_size, 0);
154 int sz = (int)mr->pushoff_block_size;
155 cc = MPI_Irecv(po->recvbuf, sz, MPI_BYTE, r,
156 (KMR_TAG_PUSHOFF + kvs->o.seqno),
157 mr->comm, &kvs->o.reqs[r + nprocs]);
158 assert(cc == MPI_SUCCESS);
159 mr->pushoff_statistics.counts[STAT_RECV_CALLS]++;
167 kmr_free_kvs_pushoff(
KMR_KVS *kvs, _Bool deallocate)
170 int nprocs = mr->nprocs;
171 if (kvs->o.storage != 0) {
175 assert(kvs->o.reqs != 0);
176 for (
int r = 0; r < nprocs; r++) {
177 if (kvs->o.reqs[r] != MPI_REQUEST_NULL) {
178 kmr_error(mr,
"kmr_free_kvs: Some send pending");
180 if (kvs->o.reqs[r + nprocs] != MPI_REQUEST_NULL) {
181 kmr_error(mr,
"kmr_free_kvs: Some receive pending");
184 kmr_free(kvs->o.reqs, (
sizeof(MPI_Request) * (
size_t)(2 * nprocs)));
186 kmr_free(kvs->o.indexes, (
sizeof(
int) * (
size_t)(2 * nprocs)));
188 kmr_free(kvs->o.statuses, (
sizeof(MPI_Status) * (
size_t)(2 * nprocs)));
190 assert(kvs->o.peers != 0);
191 for (
int r = 0; r < nprocs; r++) {
193 assert(po->fillbuf == 0);
194 assert(po->recvbuf == 0);
195 assert(po->sendbufs[0] == 0 && po->sendbufs[1] == 0);
198 kmr_free(kvs->o.peers, vsz);
216 assert(b->next == 0);
217 if (peer == mr->rank) {
218 kmr_kvs_insert_block(kvs->o.storage, b);
219 assert(po->sendbufs[0] == 0 && po->sendbufs[1] == 0 && po->sends == 0);
221 if (po->sendbufs[0] != 0) {
222 assert(po->sendbufs[1] != 0);
223 po->sendbufs[1]->next = b;
226 assert(po->sendbufs[1] == 0);
246 if (kvs->o.reqs[peer] != MPI_REQUEST_NULL) {
248 }
else if (po->sendbufs[0] != 0) {
249 assert(peer != mr->rank);
252 kmr_kvs_mark_entry_tail(e);
253 int sz = kmr_send_size_of_block(b);
254 cc = MPI_Isend(b, sz, MPI_BYTE, peer,
255 (KMR_TAG_PUSHOFF + kvs->o.seqno),
256 mr->comm, &kvs->o.reqs[peer]);
257 assert(cc == MPI_SUCCESS);
258 if (mr->pushoff_fast_notice) {
259 kmr_pushoff_notice(mr, peer);
261 mr->pushoff_statistics.counts[STAT_SEND_CALLS]++;
262 }
else if (closing) {
263 assert(po->sendbufs[0] == 0);
264 assert(po->sends == 0);
265 if (peer != mr->rank) {
267 cc = MPI_Isend(0, 0, MPI_BYTE, peer,
268 (KMR_TAG_PUSHOFF + kvs->o.seqno),
269 mr->comm, &kvs->o.reqs[peer]);
270 assert(cc == MPI_SUCCESS);
272 if (mr->pushoff_fast_notice) {
273 kmr_pushoff_notice(mr, peer);
275 mr->pushoff_statistics.counts[STAT_SEND_CALLS]++;
278 assert(!closing && po->sendbufs[0] == 0);
289 kmr_pushoff_do_recv(
KMR_KVS *kvs,
int peer)
293 int nprocs = mr->nprocs;
294 assert(peer != mr->rank);
295 assert(kvs->o.reqs[peer + nprocs] == MPI_REQUEST_NULL);
298 kmr_kvs_reset_block(kvs, b, mr->pushoff_block_size, 0);
299 assert(po->recvbuf == 0);
301 int sz = (int)mr->pushoff_block_size;
302 cc = MPI_Irecv(b, sz, MPI_BYTE, peer,
303 (KMR_TAG_PUSHOFF + kvs->o.seqno),
304 mr->comm, &kvs->o.reqs[peer + nprocs]);
305 assert(cc == MPI_SUCCESS);
306 if (mr->pushoff_fast_notice) {
307 kmr_pushoff_notice(mr, peer);
309 mr->pushoff_statistics.counts[STAT_RECV_CALLS]++;
318 kmr_pushoff_poll(
KMR_KVS *kvs, _Bool closing, _Bool block)
321 int nprocs = mr->nprocs;
322 int nprocs2 = (2 * nprocs);
327 for (
int r = 0; r < nprocs2; r++) {
328 remains += ((kvs->o.reqs[r] != MPI_REQUEST_NULL) ? 1 : 0);
336 double t0 = ((!mr->pushoff_stat) ? 0.0 : MPI_Wtime());
337 cc = MPI_Waitsome(nprocs2, kvs->o.reqs, &hits,
338 kvs->o.indexes, kvs->o.statuses);
339 assert(cc == MPI_SUCCESS && hits > 0 && hits != MPI_UNDEFINED);
340 double t1 = ((!mr->pushoff_stat) ? 0.0 : MPI_Wtime());
341 mr->pushoff_statistics.counts[STAT_WAIT_CALLS]++;
342 mr->pushoff_statistics.counts[STAT_WAIT_COUNT] += hits;
343 mr->pushoff_statistics.times[STAT_WAIT_TIME] += (t1 - t0);
345 double t0 = ((!mr->pushoff_stat) ? 0.0 : MPI_Wtime());
347 cc = MPI_Testsome(nprocs2, kvs->o.reqs, &hits,
348 kvs->o.indexes, kvs->o.statuses);
350 assert(cc == MPI_SUCCESS);
352 if (hits == MPI_UNDEFINED) {
355 double t1 = ((!mr->pushoff_stat) ? 0.0 : MPI_Wtime());
356 mr->pushoff_statistics.counts[STAT_TEST_CALLS]++;
358 mr->pushoff_statistics.counts[STAT_TEST0_COUNT] += hits;
360 mr->pushoff_statistics.counts[STAT_TEST1_COUNT] += hits;
362 mr->pushoff_statistics.times[STAT_TEST_TIME] += (t1 - t0);
363 mr->pushoff_statistics.counts[STAT_TEST_CLOCKS] += (c1 - c0);
365 assert(hits <= remains);
366 for (
int i = 0; i < hits; i++) {
367 int rank2 = kvs->o.indexes[i];
368 MPI_Status *st= &(kvs->o.statuses[i]);
369 assert(rank2 != MPI_UNDEFINED
370 && kvs->o.reqs[rank2] == MPI_REQUEST_NULL);
372 if (rank2 < nprocs) {
379 assert(closing && po->closed[0] == 1);
383 if (po->sendbufs[0] == po->sendbufs[1]) {
385 po->sendbufs[1] = bn;
387 po->sendbufs[0] = bn;
389 kmr_free(b, mr->pushoff_block_size);
394 int peer = (rank2 - nprocs);
396 cc = MPI_Get_count(st, MPI_BYTE, &count);
397 assert(cc == MPI_SUCCESS);
398 assert((
size_t)count <= mr->pushoff_block_size);
404 kmr_free(b, mr->pushoff_block_size);
410 assert(b->size == mr->pushoff_block_size
411 && b->partial_element_count != 0
413 && count == kmr_send_size_of_block(b));
414 kmr_kvs_insert_block(kvs->o.storage, b);
416 kmr_pushoff_do_recv(kvs, peer);
420 }
while ((!closing && hits > 0) || (block && remains > 0));
422 for (
int r = 0; r < nprocs2; r++) {
423 assert(kvs->o.reqs[r] == MPI_REQUEST_NULL);
430 kmr_pushoff_poll_all(
void)
432 int nmovings = (int)(
sizeof(kmr_pushoff_movings)
433 /
sizeof(kmr_pushoff_movings[0]));
434 for (
int i = 0; i < nmovings; i++) {
435 KMR_KVS *kvs = kmr_pushoff_movings[i];
438 int nprocs = mr->nprocs;
439 int remains = kmr_pushoff_poll(kvs, (kvs->o.stowed), 0);
441 int nprocs2 = (2 * nprocs);
442 for (
int r = 0; r < nprocs2; r++) {
443 assert(kvs->o.reqs[r] == MPI_REQUEST_NULL);
445 kmr_pushoff_movings[i] = 0;
462 kmr_assert_kv_sizes(kvs, kv);
463 assert(!kvs->o.nogrow);
466 int r = kmr_pitch_rank(kv, kvs);
469 size_t sz = kmr_kvs_entry_netsize_of_box(kv);
471 assert(po->adding_point == kmr_kvs_adding_point(b0));
472 if (!kmr_kvs_entry_fits_in_block(kvs, b0, sz)) {
473 kmr_kvs_mark_entry_tail(po->adding_point);
476 kmr_kvs_reset_block(kvs, n, mr->pushoff_block_size, 0);
478 po->adding_point = &(n->data[0]);
480 long m0 = mr->pushoff_statistics.counts[STAT_SEND_PEND_COUNT];
481 long m1 = MAX(po->sends, m0);
482 mr->pushoff_statistics.counts[STAT_SEND_PEND_COUNT] = m1;
486 if (mr->pushoff_poll_rate == 0) {
487 if (mr->pushoff_fast_notice) {
491 kmr_pushoff_poll_all();
497 po->adding_point = kmr_kvs_next_entry(kvs, e);
500 b1->partial_element_count++;
502 kvs->o.element_count++;
504 if (mr->pushoff_fast_notice && FAST_NOTICE) {
507 kmr_pushoff_poll_all();
510 if ((mr->pushoff_poll_rate == 1) && (po->sendbufs[0] != 0)) {
513 kmr_pushoff_poll_all();
526 assert(kvs1->c.magic == KMR_KVS_ONCORE
528 assert(kvs0->o.mr == kvs1->c.mr);
530 kvs0->c.magic = KMR_KVS_ONCORE;
531 kvs0->c.key_data = kvs1->c.key_data;
532 kvs0->c.value_data = kvs1->c.value_data;
533 kvs0->c.element_count = kvs1->c.element_count;
534 kvs0->c.oncore = kvs1->c.oncore;
536 kvs0->c.storage_netsize = kvs1->c.storage_netsize;
537 kvs0->c.block_count = kvs1->c.block_count;
538 kvs0->c.first_block = kvs1->c.first_block;
540 kvs1->c.element_count = 0;
541 kvs1->c.storage_netsize = 0;
542 kvs1->c.block_count = 0;
543 kvs1->c.first_block = 0;
558 int nprocs = mr->nprocs;
561 kmr_error(mr,
"kmr_add_kv_done: may be called already");
564 for (
int r = 0; r < nprocs; r++) {
567 if (b0->partial_element_count > 0) {
568 kmr_kvs_mark_entry_tail(po->adding_point);
572 kmr_free(po->fillbuf, mr->pushoff_block_size);
582 if (!mr->pushoff_hang_out) {
583 kmr_pushoff_make_stationary(kvs);
593 kmr_pushoff_make_stationary(
KMR_KVS *kvs)
596 int nprocs = mr->nprocs;
598 kmr_pushoff_poll(kvs, 1, 1);
599 kmr_pushoff_poll_all();
604 remains = kmr_pushoff_poll(kvs, 1, 0);
605 }
while (remains > 0);
606 kmr_pushoff_poll_all();
609 for (
int r = 0; r < nprocs; r++) {
611 po->adding_point = 0;
612 assert(po->fillbuf == 0);
613 assert(po->sendbufs[0] == 0 && po->sendbufs[1] == 0);
616 KMR_KVS *storage = kvs->o.storage;
618 kmr_free_kvs_pushoff(kvs, 0);
619 kmr_init_kvs_oncore(kvs, mr);
628 for (
struct kmr_kvs_block *b = kvs->c.first_block; b != 0; b = b->next) {
629 count += b->partial_element_count;
630 netsize += b->fill_size;
633 kvs->c.element_count = count;
634 kvs->c.storage_netsize = netsize;
635 if (kvs->c.element_count == 0) {
636 kvs->c.current_block = 0;
637 kvs->c.adding_point = 0;
639 assert(lastblock != 0);
640 kvs->c.current_block = lastblock;
641 kvs->c.adding_point = (
void *)((
char *)&lastblock->data[0]
642 + lastblock->fill_size);
645 kvs->c.shuffled_in_pushoff = 1;
652 kmr_print_statistics_on_pushoff(
KMR *mr,
char *titlestring)
654 if (mr->pushoff_stat) {
655 long *counts = mr->pushoff_statistics.counts;
656 double *times = mr->pushoff_statistics.times;
659 "[%d] push-off sends=%ld\n" 660 "[%d] push-off recvs=%ld\n" 661 "[%d] push-off tests=%ld hits=%ld time=%f (clocks=%ld)\n" 662 "[%d] push-off waits=%ld hits=%ld time=%f (closing)\n" 663 "[%d] push-off tests hits=(%ld + %ld) (%.2f%% + %.2f%%)\n" 664 "[%d] push-off max-send-pendings=%ld\n" 665 "[%d] push-off cq-polls=%ld\n"),
668 counts[STAT_SEND_CALLS],
670 counts[STAT_RECV_CALLS],
672 counts[STAT_TEST_CALLS],
673 (counts[STAT_TEST0_COUNT] + counts[STAT_TEST1_COUNT]),
674 times[STAT_TEST_TIME],
675 counts[STAT_TEST_CLOCKS],
677 counts[STAT_WAIT_CALLS],
678 counts[STAT_WAIT_COUNT],
679 times[STAT_WAIT_TIME],
681 counts[STAT_TEST0_COUNT],
682 counts[STAT_TEST1_COUNT],
683 (100.0 * (
double)counts[STAT_TEST0_COUNT]
684 / (
double)(counts[STAT_SEND_CALLS]
685 + counts[STAT_RECV_CALLS])),
686 (100.0 * (
double)counts[STAT_TEST1_COUNT]
687 / (
double)(counts[STAT_SEND_CALLS]
688 + counts[STAT_RECV_CALLS])),
690 counts[STAT_SEND_PEND_COUNT],
692 counts[STAT_RDMA_POLLCQ_CALLS]);
701 #if (!(defined(__K) && defined(KMRFASTNOTICE))) 702 #define FJMPI_RDMA_ERROR (0) 703 #define FJMPI_RDMA_NOTICE (1) 704 #define FJMPI_RDMA_NIC0 0 705 #define FJMPI_RDMA_LOCAL_NIC0 0 706 #define FJMPI_RDMA_REMOTE_NIC0 0 707 #define FJMPI_RDMA_PATH0 0 709 static int FJMPI_Rdma_init(
void) {
return 0;}
710 static int FJMPI_Rdma_finalize(
void) {
return 0;}
711 static uint64_t FJMPI_Rdma_reg_mem(
int m,
void *b,
size_t l) {
return 1;}
712 static int FJMPI_Rdma_dereg_mem(
int m) {
return 0;}
713 static uint64_t FJMPI_Rdma_get_remote_addr(
int r,
int m) {
return 1;}
714 static int FJMPI_Rdma_put(
int r,
int tag, uint64_t ra, uint64_t la,
715 size_t sz,
int f) {
return 0;}
716 static int FJMPI_Rdma_poll_cq(
int nic,
struct FJMPI_Rdma_cq *cq) {
730 unsigned int verbosity = (verbose ? 5 : 9);
734 MPI_Comm_size(comm, &nprocs);
735 MPI_Comm_rank(comm, &rank);
737 if (kmr_pushoff_area != 0) {
738 assert(kmr_pushoff_nprocs == nprocs && kmr_pushoff_rank == rank);
743 kmr_warning(0, verbosity,
"Initialize pushoff_fast_notice");
746 #if (!(defined(__K) && defined(KMRFASTNOTICE))) 748 kmr_warning(0, verbosity, (
"Fast-notice needs Fujitsu MPI extension"));
752 cc = FJMPI_Rdma_init();
755 kmr_pushoff_nprocs = nprocs;
756 kmr_pushoff_rank = rank;
758 kmr_pushoff_addrs =
kmr_malloc(
sizeof(uint64_t) * (
size_t)nprocs);
760 size_t malign = (2 * 1024 * 1024);
761 cc = posix_memalign((
void **)&kmr_pushoff_area, malign,
762 kmr_pushoff_area_size);
765 char *m = strerror(errno);
766 snprintf(ee,
sizeof(ee),
"posix_memalign(sz=%zd) failed: %s",
767 kmr_pushoff_area_size, m);
770 assert(kmr_pushoff_area != 0);
772 uint64_t a0 = FJMPI_Rdma_reg_mem(kmr_pushoff_memid,
773 (
void *)kmr_pushoff_area,
774 kmr_pushoff_area_size);
775 assert(a0 != FJMPI_RDMA_ERROR);
776 kmr_pushoff_addrs[rank] = a0;
784 kmr_pushoff_area[1] = (int)PUT_VALUE;
785 assert((
int)PUT_VALUE != 0);
787 for (
int r = 0; r < nprocs; r++) {
793 a1 = FJMPI_Rdma_get_remote_addr(r, kmr_pushoff_memid);
794 }
while (a1 == FJMPI_RDMA_ERROR);
795 kmr_pushoff_addrs[r] = a1;
797 fprintf(stderr,
"[%d] rdma addr: b=%p, l=%p r=%p\n",
798 r, (
void *)kmr_pushoff_area, (
void *)a0, (
void *)a1);
810 int nprocs = mr->nprocs;
815 #if (!(defined(__K) && defined(KMRFASTNOTICE))) 821 assert(kmr_pushoff_area != 0);
822 assert(kmr_pushoff_nprocs == nprocs && kmr_pushoff_rank == rank);
825 kmr_warning(mr, 5,
"Checking fast notification works");
827 double t0 = MPI_Wtime();
829 int peer = ((rank + 1) % nprocs);
831 kmr_pushoff_notice(mr, peer);
834 for (
int j = 0; j < 1000; j++) {
835 if (FAST_NOTICE != 0) {
839 if (FAST_NOTICE != 0) {
842 double tm = MPI_Wtime();
843 if ((tm - t0) >= 200.0) {
847 if (FAST_NOTICE == 0) {
848 kmr_error(mr,
"FAST_NOTICE timeout (200 sec)");
852 kmr_pushoff_notice(mr, peer);
856 double t1 = MPI_Wtime();
862 cc = FJMPI_Rdma_poll_cq(FJMPI_RDMA_NIC0, &cq);
863 assert(cc == 0 || cc == FJMPI_RDMA_NOTICE);
868 snprintf(ee,
sizeof(ee),
"Fast notification works (%f sec)",
870 kmr_warning(mr, 5, ee);
876 kmr_fin_pushoff_fast_notice_(
void)
880 if (kmr_pushoff_area == 0) {
884 int nprocs = kmr_pushoff_nprocs;
885 int rank = kmr_pushoff_rank;
888 kmr_warning(0, 9,
"Finalize pushoff_fast_notice");
895 cc = FJMPI_Rdma_poll_cq(FJMPI_RDMA_NIC0, &cq);
896 assert(cc == 0 || cc == FJMPI_RDMA_NOTICE);
899 cc = FJMPI_Rdma_dereg_mem(kmr_pushoff_memid);
901 cc = FJMPI_Rdma_finalize();
904 kmr_free((
void *)kmr_pushoff_area, kmr_pushoff_area_size);
905 kmr_pushoff_area = 0;
906 kmr_free(kmr_pushoff_addrs, (
sizeof(uint64_t) * (
size_t)nprocs));
907 kmr_pushoff_addrs = 0;
913 kmr_pushoff_notice(
KMR *mr,
int peer)
915 assert(mr->pushoff_fast_notice);
916 assert(kmr_pushoff_area[1] == PUT_VALUE);
917 assert(peer != mr->rank);
924 cc = FJMPI_Rdma_poll_cq(FJMPI_RDMA_NIC0, &cq);
925 assert(cc == 0 || cc == FJMPI_RDMA_NOTICE);
926 mr->pushoff_statistics.counts[STAT_RDMA_POLLCQ_CALLS]++;
930 uint64_t ra = kmr_pushoff_addrs[peer];
931 uint64_t la = kmr_pushoff_addrs[mr->rank];
932 int flag = (FJMPI_RDMA_LOCAL_NIC0|FJMPI_RDMA_REMOTE_NIC0|FJMPI_RDMA_PATH0);
933 cc = FJMPI_Rdma_put(peer, tag0, ra, (la +
sizeof(
int)), 4, flag);
Key-Value Stream (abstract).
Utilities Private Part (do not include from applications).
Options to Mapping, Shuffling, and Reduction.
#define kmr_malloc(Z)
Allocates memory, or aborts when failed.
KMR_KVS * kmr_create_pushoff_kvs(KMR *mr, enum kmr_kv_field kf, enum kmr_kv_field vf, struct kmr_option opt, const char *file, const int line, const char *func)
Makes a new key-value stream with the specified field data-types.
void kmr_init_pushoff_fast_notice_(MPI_Comm comm, _Bool verbose)
Initializes RDMA for fast-notice.
static const size_t kmr_kvs_entry_header
Size of an Entry Header.
int kmr_add_kv_done(KMR_KVS *kvs)
Marks finished adding key-value pairs.
int kmr_free_kvs(KMR_KVS *kvs)
Releases a key-value stream (type KMR_KVS).
static void kmr_pushoff_link_to_send(KMR_KVS *kvs, int peer, struct kmr_pushoff_buffers *po, struct kmr_kvs_block *b)
Links a block for sending.
static int kmr_pushoff_do_send(KMR_KVS *kvs, int peer, _Bool closing)
Sends the first one in the list of buffered blocks, or it does nothing when the pipe is full...
static long kmr_tick()
Returns the clock counter value.
void kmr_check_pushoff_fast_notice_(KMR *mr)
Check if fast-notice works.
int kmr_add_kv_pushoff(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
kmr_kv_field
Datatypes of Keys or Values.
static int kmr_replace_kvs_components(KMR_KVS *kvs0, KMR_KVS *kvs1)
Replaces KVS0 with KVS1.
Handy Copy of a Key-Value Field.
Key-Value Stream with Shuffling at Addition of Key-Values.
int kmr_add_kv_done_pushoff(KMR_KVS *kvs)
Marks finished adding key-value pairs, called from kmr_add_kv_done().
KMR_KVS * kmr_create_kvs7(KMR *mr, enum kmr_kv_field k, enum kmr_kv_field v, struct kmr_option opt, const char *, const int, const char *)
Makes a new key-value stream with the specified field data-types.
static void kmr_poke_kv(struct kmr_kvs_entry *e, const struct kmr_kv_box kv, struct kmr_kv_box *xkv, const KMR_KVS *kvs, _Bool reserve_space_only)
Stores a key-value pair at the entry E in the store – a reverse of kmr_pick_kv().
Record of Push-Off Key-Value Stream for a Rank.