KMR
kmraltkvs.c
Go to the documentation of this file.
1 /* kmraltkvs.c (2014-02-04) */
2 /* Copyright (C) 2012-2016 RIKEN AICS */
3 
4 /** \file kmraltkvs.c Other Key-Value Stream Implementations. A
5  "push-off" key-value stream performs shuffling at key-value
6  addition. It aims at an overlap of communication and computation.
7  It includes RDMA-based event notification to tell readiness of MPI
8  messages. */
9 
10 #include <mpi.h>
11 #include <stdlib.h>
12 #include <string.h>
13 #include <unistd.h>
14 #include <limits.h>
15 #include <errno.h>
16 #include <assert.h>
17 #include <ctype.h>
18 
19 #include "../config.h"
20 #include "kmr.h"
21 #include "kmrimpl.h"
22 
23 #if (defined(__K) && defined(KMRFASTNOTICE))
24 #include <mpi-ext.h>
25 #endif
26 
27 #define MIN(a,b) (((a)<(b))?(a):(b))
28 #define MAX(a,b) (((a)>(b))?(a):(b))
29 #define NEVERHERE 0
30 
31 /* Statistics Timers and Counters. */
32 
33 #define STAT_TEST_TIME 0
34 #define STAT_WAIT_TIME 1
35 
36 #define STAT_RECV_CALLS 0
37 #define STAT_SEND_CALLS 1
38 #define STAT_TEST_CALLS 2
39 #define STAT_WAIT_CALLS 3
40 #define STAT_TEST0_COUNT 4
41 #define STAT_TEST1_COUNT 5
42 #define STAT_WAIT_COUNT 6
43 #define STAT_SEND_PEND_COUNT 7
44 #define STAT_RDMA_POLLCQ_CALLS 8
45 #define STAT_TEST_CLOCKS 9
46 
47 static int kmr_pushoff_seqno = 0;
48 static KMR_KVS *kmr_pushoff_movings[64];
49 
50 /* Areas and Values for Fast-Notice. See kmr_pushoff_notice(). */
51 
52 const size_t kmr_pushoff_area_size = (sizeof(int) * 1024);
53 const int kmr_pushoff_memid = 1;
54 
55 static volatile int *kmr_pushoff_area = 0;
56 static uint64_t *kmr_pushoff_addrs = 0;
57 static int kmr_pushoff_nprocs;
58 static int kmr_pushoff_rank;
59 
60 static void kmr_pushoff_notice(KMR *mr, int peer);
61 
62 /* Flag Written by RDMA. It is used to check by polling incoming
63  messages. A single place is written by many ranks. */
64 
65 #define FAST_NOTICE (kmr_pushoff_area[0])
66 
67 /* RMDA write value (any non-zero value). */
68 
69 #define PUT_VALUE (((int)'A'<<24)|((int)'H'<<16)|((int)'O'<<8)|((int)'%'))
70 
71 static inline int
72 kmr_send_size_of_block(struct kmr_kvs_block *b)
73 {
74  return (int)(offsetof(struct kmr_kvs_block, data)
75  + b->fill_size + kmr_kvs_entry_header);
76 }
77 
78 /** Makes a new key-value stream with the specified field data-types.
79  It cannot be used with checkpointing. It allocates by the size of
80  the union, which is larger than the necessary for replacement
81  later by an on-core KVS at kmr_add_kv_done(). See
82  kmr_add_kv_done_pushoff(). */
83 
84 KMR_KVS *
86  struct kmr_option opt,
87  const char *file, const int line, const char *func)
88 {
89  assert(mr != 0);
90  int cc;
91  if (mr->ckpt_enable) {
92  kmr_error(mr, ("kmr_create_pushoff_kvs:"
93  " Unable to use it under checkpointing"));
94  }
95  if ((kf == KMR_KV_POINTER_OWNED) || (kf == KMR_KV_POINTER_UNMANAGED)
96  || (vf == KMR_KV_POINTER_OWNED) || (vf == KMR_KV_POINTER_UNMANAGED)) {
97  kmr_error(mr, "kmr_create_pushoff_kvs: Pointers not allowed");
98  }
99 
100  KMR_KVS *kvs = kmr_malloc(sizeof(KMR_KVS));
101  KMR_DEBUGX(memset(kvs, 0, sizeof(KMR_KVS)));
102  kvs->o.magic = KMR_KVS_PUSHOFF;
103  kvs->o.mr = mr;
104  kmr_link_kvs(kvs);
105  kvs->o.key_data = kf;
106  kvs->o.value_data = vf;
107  kvs->o.element_count = 0;
108  kvs->o.info_line0.file = file;
109  kvs->o.info_line0.func = func;
110  kvs->o.info_line0.line = line;
111 
112  kvs->o.oncore = 0;
113  kvs->o.stowed = 0;
114  kvs->o.nogrow = 0;
115  kvs->o.sorted = 0;
116  kvs->o._uniformly_sized_ = 0;
117 
118  int nmovings = (int)(sizeof(kmr_pushoff_movings)
119  / sizeof(kmr_pushoff_movings[0]));
120  kvs->o.seqno = (kmr_pushoff_seqno % nmovings);
121  kmr_pushoff_seqno++;
122  assert(kmr_pushoff_movings[kvs->o.seqno] == 0);
123  kmr_pushoff_movings[kvs->o.seqno] = kvs;
124 
125  kvs->o.storage = kmr_create_kvs7(mr, kf, vf, opt, file, line, func);
126 
127  int nprocs = mr->nprocs;
128  size_t vsz = (sizeof(struct kmr_pushoff_buffers) * (size_t)nprocs);
129  kvs->o.peers = kmr_malloc(vsz);
130  kvs->o.reqs = kmr_malloc(sizeof(MPI_Request) * (size_t)(2 * nprocs));
131  kvs->o.indexes = kmr_malloc(sizeof(int) * (size_t)(2 * nprocs));
132  kvs->o.statuses = kmr_malloc(sizeof(MPI_Status) * (size_t)(2 * nprocs));
133  memset(kvs->o.indexes, 0, (sizeof(int) * (size_t)(2 * nprocs)));
134  memset(kvs->o.statuses, 0, (sizeof(MPI_Status) * (size_t)(2 * nprocs)));
135  for (int r = 0; r < nprocs; r++) {
136  struct kmr_pushoff_buffers *po = &(kvs->o.peers[r]);
137  kvs->o.reqs[r] = MPI_REQUEST_NULL;
138  kvs->o.reqs[r + nprocs] = MPI_REQUEST_NULL;
139  struct kmr_kvs_block *b0 = kmr_malloc(mr->pushoff_block_size);
140  kmr_kvs_reset_block(kvs, b0, mr->pushoff_block_size, 0);
141  po->adding_point = &(b0->data[0]);
142  po->fillbuf = b0;
143  po->sendbufs[0] = 0;
144  po->sendbufs[1] = 0;
145  po->sends = 0;
146  po->closed[0] = 0;
147  po->closed[1] = 0;
148  if (r == mr->rank) {
149  po->recvbuf = 0;
150  } else {
151  struct kmr_kvs_block *b1 = kmr_malloc(mr->pushoff_block_size);
152  kmr_kvs_reset_block(kvs, b1, mr->pushoff_block_size, 0);
153  po->recvbuf = b1;
154  int sz = (int)mr->pushoff_block_size;
155  cc = MPI_Irecv(po->recvbuf, sz, MPI_BYTE, r,
156  (KMR_TAG_PUSHOFF + kvs->o.seqno),
157  mr->comm, &kvs->o.reqs[r + nprocs]);
158  assert(cc == MPI_SUCCESS);
159  mr->pushoff_statistics.counts[STAT_RECV_CALLS]++;
160  }
161  }
162 
163  return kvs;
164 }
165 
166 int
167 kmr_free_kvs_pushoff(KMR_KVS *kvs, _Bool deallocate)
168 {
169  KMR *mr = kvs->o.mr;
170  int nprocs = mr->nprocs;
171  if (kvs->o.storage != 0) {
172  kmr_free_kvs(kvs->o.storage);
173  kvs->o.storage = 0;
174  }
175  assert(kvs->o.reqs != 0);
176  for (int r = 0; r < nprocs; r++) {
177  if (kvs->o.reqs[r] != MPI_REQUEST_NULL) {
178  kmr_error(mr, "kmr_free_kvs: Some send pending");
179  }
180  if (kvs->o.reqs[r + nprocs] != MPI_REQUEST_NULL) {
181  kmr_error(mr, "kmr_free_kvs: Some receive pending");
182  }
183  }
184  kmr_free(kvs->o.reqs, (sizeof(MPI_Request) * (size_t)(2 * nprocs)));
185  kvs->o.reqs = 0;
186  kmr_free(kvs->o.indexes, (sizeof(int) * (size_t)(2 * nprocs)));
187  kvs->o.indexes = 0;
188  kmr_free(kvs->o.statuses, (sizeof(MPI_Status) * (size_t)(2 * nprocs)));
189  kvs->o.statuses = 0;
190  assert(kvs->o.peers != 0);
191  for (int r = 0; r < nprocs; r++) {
192  struct kmr_pushoff_buffers *po = &kvs->o.peers[r];
193  assert(po->fillbuf == 0);
194  assert(po->recvbuf == 0);
195  assert(po->sendbufs[0] == 0 && po->sendbufs[1] == 0);
196  }
197  size_t vsz = (sizeof(struct kmr_pushoff_buffers) * (size_t)nprocs);
198  kmr_free(kvs->o.peers, vsz);
199  kvs->o.peers = 0;
200 
201  if (deallocate) {
202  kmr_free(kvs, sizeof(struct kmr_kvs_pushoff));
203  }
204  return MPI_SUCCESS;
205 }
206 
207 /** Links a block for sending. Calling kmr_pushoff_do_send() starts
208  sending. */
209 
210 static inline void
212  struct kmr_pushoff_buffers *po,
213  struct kmr_kvs_block *b)
214 {
215  KMR *mr = kvs->o.mr;
216  assert(b->next == 0);
217  if (peer == mr->rank) {
218  kmr_kvs_insert_block(kvs->o.storage, b);
219  assert(po->sendbufs[0] == 0 && po->sendbufs[1] == 0 && po->sends == 0);
220  } else {
221  if (po->sendbufs[0] != 0) {
222  assert(po->sendbufs[1] != 0);
223  po->sendbufs[1]->next = b;
224  po->sendbufs[1] = b;
225  } else {
226  assert(po->sendbufs[1] == 0);
227  po->sendbufs[0] = b;
228  po->sendbufs[1] = b;
229  }
230  po->sends++;
231  }
232 }
233 
234 /** Sends the first one in the list of buffered blocks, or it does
235  nothing when the pipe is full. It sends a closing message when
236  CLOSING is true and nothing remains to send. Note MPI operations
237  are called inside a mutex (OMP critical). */
238 
239 static int
240 kmr_pushoff_do_send(KMR_KVS *kvs, int peer, _Bool closing)
241 {
242  KMR *mr = kvs->o.mr;
243  int cc;
244  struct kmr_pushoff_buffers *po = &(kvs->o.peers[peer]);
245 
246  if (kvs->o.reqs[peer] != MPI_REQUEST_NULL) {
247  /* Do nothing, called later when the previous send finishes. */
248  } else if (po->sendbufs[0] != 0) {
249  assert(peer != mr->rank);
250  struct kmr_kvs_block *b = po->sendbufs[0];
251  struct kmr_kvs_entry *e = kmr_kvs_adding_point(b);
252  kmr_kvs_mark_entry_tail(e);
253  int sz = kmr_send_size_of_block(b);
254  cc = MPI_Isend(b, sz, MPI_BYTE, peer,
255  (KMR_TAG_PUSHOFF + kvs->o.seqno),
256  mr->comm, &kvs->o.reqs[peer]);
257  assert(cc == MPI_SUCCESS);
258  if (mr->pushoff_fast_notice) {
259  kmr_pushoff_notice(mr, peer);
260  }
261  mr->pushoff_statistics.counts[STAT_SEND_CALLS]++;
262  } else if (closing) {
263  assert(po->sendbufs[0] == 0);
264  assert(po->sends == 0);
265  if (peer != mr->rank) {
266  /* Send a closing message. */
267  cc = MPI_Isend(0, 0, MPI_BYTE, peer,
268  (KMR_TAG_PUSHOFF + kvs->o.seqno),
269  mr->comm, &kvs->o.reqs[peer]);
270  assert(cc == MPI_SUCCESS);
271  po->closed[0] = 1;
272  if (mr->pushoff_fast_notice) {
273  kmr_pushoff_notice(mr, peer);
274  }
275  mr->pushoff_statistics.counts[STAT_SEND_CALLS]++;
276  }
277  } else {
278  assert(!closing && po->sendbufs[0] == 0);
279  /* Do nothing, send of the closing message finishes. */
280  }
281  return MPI_SUCCESS;
282 }
283 
284 /* Puts incoming block to the strage. It is only called with positive
285  COUNT. Note MPI operations are called inside a mutex (OMP
286  critical). */
287 
288 static int
289 kmr_pushoff_do_recv(KMR_KVS *kvs, int peer)
290 {
291  int cc;
292  KMR *mr = kvs->o.mr;
293  int nprocs = mr->nprocs;
294  assert(peer != mr->rank);
295  assert(kvs->o.reqs[peer + nprocs] == MPI_REQUEST_NULL);
296  struct kmr_pushoff_buffers *po = &(kvs->o.peers[peer]);
297  struct kmr_kvs_block *b = kmr_malloc(mr->pushoff_block_size);
298  kmr_kvs_reset_block(kvs, b, mr->pushoff_block_size, 0);
299  assert(po->recvbuf == 0);
300  po->recvbuf = b;
301  int sz = (int)mr->pushoff_block_size;
302  cc = MPI_Irecv(b, sz, MPI_BYTE, peer,
303  (KMR_TAG_PUSHOFF + kvs->o.seqno),
304  mr->comm, &kvs->o.reqs[peer + nprocs]);
305  assert(cc == MPI_SUCCESS);
306  if (mr->pushoff_fast_notice) {
307  kmr_pushoff_notice(mr, peer);
308  }
309  mr->pushoff_statistics.counts[STAT_RECV_CALLS]++;
310  return MPI_SUCCESS;
311 }
312 
313 /* Checks requests finish. It returns the number of active requests
314  remaining. CLOSING is true when called from kmr_add_kv_done(). It
315  is not inside of a mutex when CLOSING and safe to wait. */
316 
317 static int
318 kmr_pushoff_poll(KMR_KVS *kvs, _Bool closing, _Bool block)
319 {
320  KMR *mr = kvs->o.mr;
321  int nprocs = mr->nprocs;
322  int nprocs2 = (2 * nprocs);
323  int cc;
324 
325  int remains;
326  remains = 0;
327  for (int r = 0; r < nprocs2; r++) {
328  remains += ((kvs->o.reqs[r] != MPI_REQUEST_NULL) ? 1 : 0);
329  }
330  if (remains == 0) {
331  return remains;
332  }
333  int hits = 0;
334  do {
335  if (block) {
336  double t0 = ((!mr->pushoff_stat) ? 0.0 : MPI_Wtime());
337  cc = MPI_Waitsome(nprocs2, kvs->o.reqs, &hits,
338  kvs->o.indexes, kvs->o.statuses);
339  assert(cc == MPI_SUCCESS && hits > 0 && hits != MPI_UNDEFINED);
340  double t1 = ((!mr->pushoff_stat) ? 0.0 : MPI_Wtime());
341  mr->pushoff_statistics.counts[STAT_WAIT_CALLS]++;
342  mr->pushoff_statistics.counts[STAT_WAIT_COUNT] += hits;
343  mr->pushoff_statistics.times[STAT_WAIT_TIME] += (t1 - t0);
344  } else {
345  double t0 = ((!mr->pushoff_stat) ? 0.0 : MPI_Wtime());
346  long c0 = kmr_tick();
347  cc = MPI_Testsome(nprocs2, kvs->o.reqs, &hits,
348  kvs->o.indexes, kvs->o.statuses);
349  /* (DO NOT SWAP TWO LINES BELOW (for icc 20140120)). */
350  assert(cc == MPI_SUCCESS);
351  long c1 = kmr_tick();
352  if (hits == MPI_UNDEFINED) {
353  hits = 0;
354  }
355  double t1 = ((!mr->pushoff_stat) ? 0.0 : MPI_Wtime());
356  mr->pushoff_statistics.counts[STAT_TEST_CALLS]++;
357  if (!closing) {
358  mr->pushoff_statistics.counts[STAT_TEST0_COUNT] += hits;
359  } else {
360  mr->pushoff_statistics.counts[STAT_TEST1_COUNT] += hits;
361  }
362  mr->pushoff_statistics.times[STAT_TEST_TIME] += (t1 - t0);
363  mr->pushoff_statistics.counts[STAT_TEST_CLOCKS] += (c1 - c0);
364  }
365  assert(hits <= remains);
366  for (int i = 0; i < hits; i++) {
367  int rank2 = kvs->o.indexes[i];
368  MPI_Status *st= &(kvs->o.statuses[i]);
369  assert(rank2 != MPI_UNDEFINED
370  && kvs->o.reqs[rank2] == MPI_REQUEST_NULL);
371 
372  if (rank2 < nprocs) {
373  /* SEND */
374  int peer = rank2;
375  struct kmr_pushoff_buffers *po = &(kvs->o.peers[peer]);
376  struct kmr_kvs_block *b = po->sendbufs[0];
377  if (b == 0) {
378  /* No body means a closing message. */
379  assert(closing && po->closed[0] == 1);
380  remains--;
381  } else {
382  struct kmr_kvs_block *bn = po->sendbufs[0]->next;
383  if (po->sendbufs[0] == po->sendbufs[1]) {
384  assert(bn == 0);
385  po->sendbufs[1] = bn;
386  }
387  po->sendbufs[0] = bn;
388  po->sends--;
389  kmr_free(b, mr->pushoff_block_size);
390  kmr_pushoff_do_send(kvs, peer, closing);
391  }
392  } else {
393  /* RECV */
394  int peer = (rank2 - nprocs);
395  int count;
396  cc = MPI_Get_count(st, MPI_BYTE, &count);
397  assert(cc == MPI_SUCCESS);
398  assert((size_t)count <= mr->pushoff_block_size);
399  struct kmr_pushoff_buffers *po = &(kvs->o.peers[peer]);
400  if (count == 0) {
401  /* A closing message. */
402  remains--;
403  struct kmr_kvs_block *b = po->recvbuf;
404  kmr_free(b, mr->pushoff_block_size);
405  po->recvbuf = 0;
406  po->closed[1] = 1;
407  } else {
408  /* (b->next) has arbitrary value. */
409  struct kmr_kvs_block *b = po->recvbuf;
410  assert(b->size == mr->pushoff_block_size
411  && b->partial_element_count != 0
412  && b->fill_size != 0
413  && count == kmr_send_size_of_block(b));
414  kmr_kvs_insert_block(kvs->o.storage, b);
415  po->recvbuf = 0;
416  kmr_pushoff_do_recv(kvs, peer);
417  }
418  }
419  }
420  } while ((!closing && hits > 0) || (block && remains > 0));
421  if (block) {
422  for (int r = 0; r < nprocs2; r++) {
423  assert(kvs->o.reqs[r] == MPI_REQUEST_NULL);
424  }
425  }
426  return remains;
427 }
428 
429 static int
430 kmr_pushoff_poll_all(void)
431 {
432  int nmovings = (int)(sizeof(kmr_pushoff_movings)
433  / sizeof(kmr_pushoff_movings[0]));
434  for (int i = 0; i < nmovings; i++) {
435  KMR_KVS *kvs = kmr_pushoff_movings[i];
436  if (kvs != 0) {
437  KMR *mr = kvs->o.mr;
438  int nprocs = mr->nprocs;
439  int remains = kmr_pushoff_poll(kvs, (kvs->o.stowed), 0);
440  if (remains == 0) {
441  int nprocs2 = (2 * nprocs);
442  for (int r = 0; r < nprocs2; r++) {
443  assert(kvs->o.reqs[r] == MPI_REQUEST_NULL);
444  }
445  kmr_pushoff_movings[i] = 0;
446  }
447  }
448  }
449 
450  return MPI_SUCCESS;
451 }
452 
453 /** Adds a key-value pair. It is called from inside a mutex (OMP
454  critical). It first stores a KV into a buffer, and then sends the
455  buffer when it gets full. It sends an empty message as an
456  end-of-stream marker. IT POLLS MESSAGES TOO OFTEN OR TOO
457  SELDOM. */
458 
459 int
460 kmr_add_kv_pushoff(KMR_KVS *kvs, const struct kmr_kv_box kv)
461 {
462  kmr_assert_kv_sizes(kvs, kv);
463  assert(!kvs->o.nogrow);
464  KMR *mr = kvs->o.mr;
465 
466  int r = kmr_pitch_rank(kv, kvs);
467  struct kmr_pushoff_buffers *po = &(kvs->o.peers[r]);
468 
469  size_t sz = kmr_kvs_entry_netsize_of_box(kv);
470  struct kmr_kvs_block *b0 = po->fillbuf;
471  assert(po->adding_point == kmr_kvs_adding_point(b0));
472  if (!kmr_kvs_entry_fits_in_block(kvs, b0, sz)) {
473  kmr_kvs_mark_entry_tail(po->adding_point);
474  kmr_pushoff_link_to_send(kvs, r, po, b0);
475  struct kmr_kvs_block *n = kmr_malloc(mr->pushoff_block_size);
476  kmr_kvs_reset_block(kvs, n, mr->pushoff_block_size, 0);
477  po->fillbuf = n;
478  po->adding_point = &(n->data[0]);
479 
480  long m0 = mr->pushoff_statistics.counts[STAT_SEND_PEND_COUNT];
481  long m1 = MAX(po->sends, m0);
482  mr->pushoff_statistics.counts[STAT_SEND_PEND_COUNT] = m1;
483 
484  /* TOO SELDOM. */
485 
486  if (mr->pushoff_poll_rate == 0) {
487  if (mr->pushoff_fast_notice) {
488  FAST_NOTICE = 0;
489  }
490  /*kmr_pushoff_poll(kvs, 0);*/
491  kmr_pushoff_poll_all();
492  }
493  }
494 
495  struct kmr_kvs_entry *e = po->adding_point;
496  kmr_poke_kv(e, kv, 0, kvs, 0);
497  po->adding_point = kmr_kvs_next_entry(kvs, e);
498 
499  struct kmr_kvs_block *b1 = po->fillbuf;
500  b1->partial_element_count++;
501  b1->fill_size += sz;
502  kvs->o.element_count++;
503 
504  if (mr->pushoff_fast_notice && FAST_NOTICE) {
505  FAST_NOTICE = 0;
506  /*kmr_pushoff_poll(kvs, 0);*/
507  kmr_pushoff_poll_all();
508  }
509 
510  if ((mr->pushoff_poll_rate == 1) && (po->sendbufs[0] != 0)) {
511  /* TOO OFTEN (BE AVOIDED). */
512  /*kmr_pushoff_poll(kvs, 0);*/
513  kmr_pushoff_poll_all();
514  }
515 
516  return MPI_SUCCESS;
517 }
518 
519 /** Replaces KVS0 with KVS1. That is, it moves the structure slots
520  from KVS1 to KVS0 and frees KVS1. (The first one be push-off, and
521  the second one be on-core). */
522 
523 static int
525 {
526  assert(kvs1->c.magic == KMR_KVS_ONCORE
527  && kvs1->c.ms == 0);
528  assert(kvs0->o.mr == kvs1->c.mr);
529 
530  kvs0->c.magic = KMR_KVS_ONCORE;
531  kvs0->c.key_data = kvs1->c.key_data;
532  kvs0->c.value_data = kvs1->c.value_data;
533  kvs0->c.element_count = kvs1->c.element_count;
534  kvs0->c.oncore = kvs1->c.oncore;
535 
536  kvs0->c.storage_netsize = kvs1->c.storage_netsize;
537  kvs0->c.block_count = kvs1->c.block_count;
538  kvs0->c.first_block = kvs1->c.first_block;
539 
540  kvs1->c.element_count = 0;
541  kvs1->c.storage_netsize = 0;
542  kvs1->c.block_count = 0;
543  kvs1->c.first_block = 0;
544 
545  kmr_free_kvs(kvs1);
546 
547  return MPI_SUCCESS;
548 }
549 
550 /** Marks finished adding key-value pairs, called from
551  kmr_add_kv_done(). It flushes pending buffers and it is a
552  collective operation in effect. */
553 
554 int
556 {
557  KMR *mr = kvs->o.mr;
558  int nprocs = mr->nprocs;
559 
560  if (kvs->o.stowed) {
561  kmr_error(mr, "kmr_add_kv_done: may be called already");
562  }
563 
564  for (int r = 0; r < nprocs; r++) {
565  struct kmr_pushoff_buffers *po = &(kvs->o.peers[r]);
566  struct kmr_kvs_block *b0 = po->fillbuf;
567  if (b0->partial_element_count > 0) {
568  kmr_kvs_mark_entry_tail(po->adding_point);
569  kmr_pushoff_link_to_send(kvs, r, po, b0);
570  po->fillbuf = 0;
571  } else {
572  kmr_free(po->fillbuf, mr->pushoff_block_size);
573  po->fillbuf = 0;
574  }
575  if (r != mr->rank) {
576  kmr_pushoff_do_send(kvs, r, 1);
577  }
578  }
579 
580  kvs->o.stowed = 1;
581 
582  if (!mr->pushoff_hang_out) {
583  kmr_pushoff_make_stationary(kvs);
584  }
585 
586  return MPI_SUCCESS;
587 }
588 
589 /* Destructively replaces this kvs by on-core one, after waiting for
590  all communication to finish. */
591 
592 int
593 kmr_pushoff_make_stationary(KMR_KVS *kvs)
594 {
595  KMR *mr = kvs->o.mr;
596  int nprocs = mr->nprocs;
597 
598  kmr_pushoff_poll(kvs, 1, 1);
599  kmr_pushoff_poll_all();
600 
601 #if 0
602  int remains;
603  do {
604  remains = kmr_pushoff_poll(kvs, 1, 0);
605  } while (remains > 0);
606  kmr_pushoff_poll_all();
607 #endif
608 
609  for (int r = 0; r < nprocs; r++) {
610  struct kmr_pushoff_buffers *po = &(kvs->o.peers[r]);
611  po->adding_point = 0;
612  assert(po->fillbuf == 0);
613  assert(po->sendbufs[0] == 0 && po->sendbufs[1] == 0);
614  }
615 
616  KMR_KVS *storage = kvs->o.storage;
617  kvs->o.storage = 0;
618  kmr_free_kvs_pushoff(kvs, 0);
619  kmr_init_kvs_oncore(kvs, mr);
620  kmr_replace_kvs_components(kvs, storage);
621 
622  long count;
623  size_t netsize;
624  struct kmr_kvs_block *lastblock;
625  count = 0;
626  netsize = 0;
627  lastblock = 0;
628  for (struct kmr_kvs_block *b = kvs->c.first_block; b != 0; b = b->next) {
629  count += b->partial_element_count;
630  netsize += b->fill_size;
631  lastblock = b;
632  }
633  kvs->c.element_count = count;
634  kvs->c.storage_netsize = netsize;
635  if (kvs->c.element_count == 0) {
636  kvs->c.current_block = 0;
637  kvs->c.adding_point = 0;
638  } else {
639  assert(lastblock != 0);
640  kvs->c.current_block = lastblock;
641  kvs->c.adding_point = (void *)((char *)&lastblock->data[0]
642  + lastblock->fill_size);
643  }
644 
645  kvs->c.shuffled_in_pushoff = 1;
646  kmr_add_kv_done(kvs);
647 
648  return MPI_SUCCESS;
649 }
650 
651 void
652 kmr_print_statistics_on_pushoff(KMR *mr, char *titlestring)
653 {
654  if (mr->pushoff_stat) {
655  long *counts = mr->pushoff_statistics.counts;
656  double *times = mr->pushoff_statistics.times;
657  fprintf(stderr,
658  ("%s"
659  "[%d] push-off sends=%ld\n"
660  "[%d] push-off recvs=%ld\n"
661  "[%d] push-off tests=%ld hits=%ld time=%f (clocks=%ld)\n"
662  "[%d] push-off waits=%ld hits=%ld time=%f (closing)\n"
663  "[%d] push-off tests hits=(%ld + %ld) (%.2f%% + %.2f%%)\n"
664  "[%d] push-off max-send-pendings=%ld\n"
665  "[%d] push-off cq-polls=%ld\n"),
666  titlestring,
667  mr->rank,
668  counts[STAT_SEND_CALLS],
669  mr->rank,
670  counts[STAT_RECV_CALLS],
671  mr->rank,
672  counts[STAT_TEST_CALLS],
673  (counts[STAT_TEST0_COUNT] + counts[STAT_TEST1_COUNT]),
674  times[STAT_TEST_TIME],
675  counts[STAT_TEST_CLOCKS],
676  mr->rank,
677  counts[STAT_WAIT_CALLS],
678  counts[STAT_WAIT_COUNT],
679  times[STAT_WAIT_TIME],
680  mr->rank,
681  counts[STAT_TEST0_COUNT],
682  counts[STAT_TEST1_COUNT],
683  (100.0 * (double)counts[STAT_TEST0_COUNT]
684  / (double)(counts[STAT_SEND_CALLS]
685  + counts[STAT_RECV_CALLS])),
686  (100.0 * (double)counts[STAT_TEST1_COUNT]
687  / (double)(counts[STAT_SEND_CALLS]
688  + counts[STAT_RECV_CALLS])),
689  mr->rank,
690  counts[STAT_SEND_PEND_COUNT],
691  mr->rank,
692  counts[STAT_RDMA_POLLCQ_CALLS]);
693  fflush(0);
694  }
695 }
696 
697 /* ================================================================ */
698 
699 /* FAKE FUNCTIONS FOR FUJITSU-MPI. */
700 
701 #if (!(defined(__K) && defined(KMRFASTNOTICE)))
702 #define FJMPI_RDMA_ERROR (0)
703 #define FJMPI_RDMA_NOTICE (1)
704 #define FJMPI_RDMA_NIC0 0
705 #define FJMPI_RDMA_LOCAL_NIC0 0
706 #define FJMPI_RDMA_REMOTE_NIC0 0
707 #define FJMPI_RDMA_PATH0 0
708 struct FJMPI_Rdma_cq {int _;};
709 static int FJMPI_Rdma_init(void) {return 0;}
710 static int FJMPI_Rdma_finalize(void) {return 0;}
711 static uint64_t FJMPI_Rdma_reg_mem(int m, void *b, size_t l) {return 1;}
712 static int FJMPI_Rdma_dereg_mem(int m) {return 0;}
713 static uint64_t FJMPI_Rdma_get_remote_addr(int r, int m) {return 1;}
714 static int FJMPI_Rdma_put(int r, int tag, uint64_t ra, uint64_t la,
715  size_t sz, int f) {return 0;}
716 static int FJMPI_Rdma_poll_cq(int nic, struct FJMPI_Rdma_cq *cq) {
717  return 0;
718 }
719 #endif /*__K*/
720 
721 /** Initializes RDMA for fast-notice. Fast-notice is RDMA-based event
722  notification to tell readiness of MPI messages. It is only usable
723  with communicators having the same processes. */
724 
725 void
726 kmr_init_pushoff_fast_notice_(MPI_Comm comm, _Bool verbose)
727 {
728  int cc;
729 
730  unsigned int verbosity = (verbose ? 5 : 9);
731 
732  int nprocs;
733  int rank;
734  MPI_Comm_size(comm, &nprocs);
735  MPI_Comm_rank(comm, &rank);
736 
737  if (kmr_pushoff_area != 0) {
738  assert(kmr_pushoff_nprocs == nprocs && kmr_pushoff_rank == rank);
739  return;
740  }
741 
742  if (rank == 0) {
743  kmr_warning(0, verbosity, "Initialize pushoff_fast_notice");
744  }
745 
746 #if (!(defined(__K) && defined(KMRFASTNOTICE)))
747  if (rank == 0) {
748  kmr_warning(0, verbosity, ("Fast-notice needs Fujitsu MPI extension"));
749  }
750 #endif
751 
752  cc = FJMPI_Rdma_init();
753  assert(cc == 0);
754 
755  kmr_pushoff_nprocs = nprocs;
756  kmr_pushoff_rank = rank;
757 
758  kmr_pushoff_addrs = kmr_malloc(sizeof(uint64_t) * (size_t)nprocs);
759  /*kmr_pushoff_area = kmr_malloc(kmr_pushoff_area_size);*/
760  size_t malign = (2 * 1024 * 1024);
761  cc = posix_memalign((void **)&kmr_pushoff_area, malign,
762  kmr_pushoff_area_size);
763  if (cc != 0) {
764  char ee[80];
765  char *m = strerror(errno);
766  snprintf(ee, sizeof(ee), "posix_memalign(sz=%zd) failed: %s",
767  kmr_pushoff_area_size, m);
768  kmr_error(0, ee);
769  }
770  assert(kmr_pushoff_area != 0);
771 
772  uint64_t a0 = FJMPI_Rdma_reg_mem(kmr_pushoff_memid,
773  (void *)kmr_pushoff_area,
774  kmr_pushoff_area_size);
775  assert(a0 != FJMPI_RDMA_ERROR);
776  kmr_pushoff_addrs[rank] = a0;
777 
778  /* Clear RMDA area. */
779 
780  FAST_NOTICE = 0;
781 
782  /* Set RMDA source value non-zero. */
783 
784  kmr_pushoff_area[1] = (int)PUT_VALUE;
785  assert((int)PUT_VALUE != 0);
786 
787  for (int r = 0; r < nprocs; r++) {
788  if (r == rank) {
789  continue;
790  }
791  uint64_t a1;
792  do {
793  a1 = FJMPI_Rdma_get_remote_addr(r, kmr_pushoff_memid);
794  } while (a1 == FJMPI_RDMA_ERROR);
795  kmr_pushoff_addrs[r] = a1;
796 #if 0
797  fprintf(stderr, "[%d] rdma addr: b=%p, l=%p r=%p\n",
798  r, (void *)kmr_pushoff_area, (void *)a0, (void *)a1);
799  fflush(0);
800 #endif
801  }
802 }
803 
804 /** Check if fast-notice works. Check be at immediately after
805  initialization. */
806 
807 void
809 {
810  int nprocs = mr->nprocs;
811  int rank = mr->rank;
812 
813  _Bool check = 1;
814 
815 #if (!(defined(__K) && defined(KMRFASTNOTICE)))
816  check = 0;
817 #endif
818 
819  if (check) {
820  int cc;
821  assert(kmr_pushoff_area != 0);
822  assert(kmr_pushoff_nprocs == nprocs && kmr_pushoff_rank == rank);
823 
824  if (rank == 0) {
825  kmr_warning(mr, 5, "Checking fast notification works");
826  }
827  double t0 = MPI_Wtime();
828 
829  int peer = ((rank + 1) % nprocs);
830  if (rank == 0) {
831  kmr_pushoff_notice(mr, peer);
832  }
833  for (;;) {
834  for (int j = 0; j < 1000; j++) {
835  if (FAST_NOTICE != 0) {
836  break;
837  }
838  }
839  if (FAST_NOTICE != 0) {
840  break;
841  }
842  double tm = MPI_Wtime();
843  if ((tm - t0) >= 200.0) {
844  break;
845  }
846  }
847  if (FAST_NOTICE == 0) {
848  kmr_error(mr, "FAST_NOTICE timeout (200 sec)");
849  return;
850  }
851  if (rank != 0) {
852  kmr_pushoff_notice(mr, peer);
853  }
854  FAST_NOTICE = 0;
855 
856  double t1 = MPI_Wtime();
857 
858  /* Try to reclaim CQ entries. */
859 
860  do {
861  struct FJMPI_Rdma_cq cq;
862  cc = FJMPI_Rdma_poll_cq(FJMPI_RDMA_NIC0, &cq);
863  assert(cc == 0 || cc == FJMPI_RDMA_NOTICE);
864  } while (cc != 0);
865 
866  if (rank == 0) {
867  char ee[80];
868  snprintf(ee, sizeof(ee), "Fast notification works (%f sec)",
869  (t1 - t0));
870  kmr_warning(mr, 5, ee);
871  }
872  }
873 }
874 
875 void
876 kmr_fin_pushoff_fast_notice_(void)
877 {
878  int cc;
879 
880  if (kmr_pushoff_area == 0) {
881  return;
882  }
883 
884  int nprocs = kmr_pushoff_nprocs;
885  int rank = kmr_pushoff_rank;
886 
887  if (rank == 0) {
888  kmr_warning(0, 9, "Finalize pushoff_fast_notice");
889  }
890 
891  /* Try to reclaim CQ entries. Not exhaustive. */
892 
893  do {
894  struct FJMPI_Rdma_cq cq;
895  cc = FJMPI_Rdma_poll_cq(FJMPI_RDMA_NIC0, &cq);
896  assert(cc == 0 || cc == FJMPI_RDMA_NOTICE);
897  } while (cc != 0);
898 
899  cc = FJMPI_Rdma_dereg_mem(kmr_pushoff_memid);
900  assert(cc == 0);
901  cc = FJMPI_Rdma_finalize();
902  assert(cc == 0);
903 
904  kmr_free((void *)kmr_pushoff_area, kmr_pushoff_area_size);
905  kmr_pushoff_area = 0;
906  kmr_free(kmr_pushoff_addrs, (sizeof(uint64_t) * (size_t)nprocs));
907  kmr_pushoff_addrs = 0;
908 }
909 
910 /* Notifies an event to RANK by writing 1 to kmr_pushoff_area[0]. */
911 
912 static void
913 kmr_pushoff_notice(KMR *mr, int peer)
914 {
915  assert(mr->pushoff_fast_notice);
916  assert(kmr_pushoff_area[1] == PUT_VALUE);
917  assert(peer != mr->rank);
918  int cc;
919 
920  /* Reclaim CQ entries. */
921 
922  do {
923  struct FJMPI_Rdma_cq cq;
924  cc = FJMPI_Rdma_poll_cq(FJMPI_RDMA_NIC0, &cq);
925  assert(cc == 0 || cc == FJMPI_RDMA_NOTICE);
926  mr->pushoff_statistics.counts[STAT_RDMA_POLLCQ_CALLS]++;
927  } while (cc != 0);
928 
929  int tag0 = 0x7;
930  uint64_t ra = kmr_pushoff_addrs[peer];
931  uint64_t la = kmr_pushoff_addrs[mr->rank];
932  int flag = (FJMPI_RDMA_LOCAL_NIC0|FJMPI_RDMA_REMOTE_NIC0|FJMPI_RDMA_PATH0);
933  cc = FJMPI_Rdma_put(peer, tag0, ra, (la + sizeof(int)), 4, flag);
934  assert(cc == 0);
935 }
936 
937 /*
938 Copyright (C) 2012-2016 RIKEN AICS
939 This library is distributed WITHOUT ANY WARRANTY. This library can be
940 redistributed and/or modified under the terms of the BSD 2-Clause License.
941 */
Key-Value Stream (abstract).
Definition: kmr.h:587
Utilities Private Part (do not include from applications).
Options to Mapping, Shuffling, and Reduction.
Definition: kmr.h:613
#define kmr_malloc(Z)
Allocates memory, or aborts when failed.
Definition: kmrimpl.h:177
KMR_KVS * kmr_create_pushoff_kvs(KMR *mr, enum kmr_kv_field kf, enum kmr_kv_field vf, struct kmr_option opt, const char *file, const int line, const char *func)
Makes a new key-value stream with the specified field data-types.
Definition: kmraltkvs.c:85
void kmr_init_pushoff_fast_notice_(MPI_Comm comm, _Bool verbose)
Initializes RDMA for fast-notice.
Definition: kmraltkvs.c:726
static const size_t kmr_kvs_entry_header
Size of an Entry Header.
Definition: kmr.h:382
int kmr_add_kv_done(KMR_KVS *kvs)
Marks finished adding key-value pairs.
Definition: kmrbase.c:881
Definition: kmr.h:348
KMR Context.
Definition: kmr.h:222
int kmr_free_kvs(KMR_KVS *kvs)
Releases a key-value stream (type KMR_KVS).
Definition: kmrbase.c:621
static void kmr_pushoff_link_to_send(KMR_KVS *kvs, int peer, struct kmr_pushoff_buffers *po, struct kmr_kvs_block *b)
Links a block for sending.
Definition: kmraltkvs.c:211
static int kmr_pushoff_do_send(KMR_KVS *kvs, int peer, _Bool closing)
Sends the first one in the list of buffered blocks, or it does nothing when the pipe is full...
Definition: kmraltkvs.c:240
static long kmr_tick()
Returns the clock counter value.
Definition: kmrimpl.h:135
void kmr_check_pushoff_fast_notice_(KMR *mr)
Check if fast-notice works.
Definition: kmraltkvs.c:808
int kmr_add_kv_pushoff(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
Definition: kmraltkvs.c:460
kmr_kv_field
Datatypes of Keys or Values.
Definition: kmr.h:325
static int kmr_replace_kvs_components(KMR_KVS *kvs0, KMR_KVS *kvs1)
Replaces KVS0 with KVS1.
Definition: kmraltkvs.c:524
Handy Copy of a Key-Value Field.
Definition: kmr.h:358
Key-Value Stream with Shuffling at Addition of Key-Values.
Definition: kmr.h:542
int kmr_add_kv_done_pushoff(KMR_KVS *kvs)
Marks finished adding key-value pairs, called from kmr_add_kv_done().
Definition: kmraltkvs.c:555
KMR Interface.
KMR_KVS * kmr_create_kvs7(KMR *mr, enum kmr_kv_field k, enum kmr_kv_field v, struct kmr_option opt, const char *, const int, const char *)
Makes a new key-value stream with the specified field data-types.
Definition: kmrbase.c:510
static void kmr_poke_kv(struct kmr_kvs_entry *e, const struct kmr_kv_box kv, struct kmr_kv_box *xkv, const KMR_KVS *kvs, _Bool reserve_space_only)
Stores a key-value pair at the entry E in the store – a reverse of kmr_pick_kv().
Definition: kmrimpl.h:599
Record of Push-Off Key-Value Stream for a Rank.
Definition: kmr.h:525