KMR
kmratoa.c
Go to the documentation of this file.
1 /* kmratoa.c (2014-02-04) */
2 /* Copyright (C) 2012-2016 RIKEN AICS */
3 
4 /** \file kmratoa.c Communication Routines. KMR makes almost all data
5  exchanges through this. Some exceptions are "kmrmapms.c" and
6  "kmrfiles.c". It provides operations with size_t data length. */
7 
8 /* Used MPI routines: Alltoall, Alltoallv, Allgather, Allgatherv,
9  Allreduce, Gatherv, Exscan. Irecv, Isend, Irsend, Sendrecv,
10  Waitall. */
11 
12 #include <mpi.h>
13 #include <stdlib.h>
14 #include <limits.h>
15 #include <errno.h>
16 #include <assert.h>
17 #include "kmr.h"
18 #include "kmrimpl.h"
19 
20 #define MAX(a,b) (((a)>(b))?(a):(b))
21 
22 static int kmr_alltoallv_mpi(KMR *mr, void *sbuf, long *scnts, long *sdsps,
23  void *rbuf, long *rcnts, long *rdsps);
24 static int kmr_alltoallv_bruck(KMR *mr, void *sbuf, long *scnts, long *sdsps,
25  void *rbuf, long *rcnts, long *rdsps);
26 static int kmr_alltoall_bruck(KMR *mr, void *sbuf, void *rbuf, int cnt);
27 static void kmr_atoa_dump_(KMR *mr, void *sbuf, int sz, char *title, int step);
28 
29 /* Checks if X is power of two/four. */
30 
31 static inline _Bool
32 kmr_powerof2_p(int x)
33 {
34  return ((x > 0) && ((x & (x - 1)) == 0));
35 }
36 
37 static inline _Bool
38 kmr_powerof4_p(int x)
39 {
40  return (kmr_powerof2_p(x) && ((x & 0x2aaaaaaa) == 0));
41 }
42 
43 /** Calls all-to-all to exchange one long-integer. */
44 
45 int
46 kmr_exchange_sizes(KMR *mr, long *sbuf, long *rbuf)
47 {
48  MPI_Comm comm = mr->comm;
49  int cc;
50  cc = MPI_Alltoall(sbuf, 1, MPI_LONG, rbuf, 1, MPI_LONG, comm);
51  assert(cc == MPI_SUCCESS);
52  return MPI_SUCCESS;
53 }
54 
55 /** Calls all-gather for collecting one long-integer. */
56 
57 int
58 kmr_gather_sizes(KMR *mr, long siz, long *rbuf)
59 {
60  MPI_Comm comm = mr->comm;
61  int cc;
62  cc = MPI_Allgather(&siz, 1, MPI_LONG, rbuf, 1, MPI_LONG, comm);
63  assert(cc == MPI_SUCCESS);
64  return MPI_SUCCESS;
65 }
66 
67 /** All-gathers data, or gathers data when RANKZEROONLY. */
68 
69 int
70 kmr_allgatherv(KMR *mr, _Bool rankzeroonly, void *sbuf, long scnt,
71  void *rbuf, long *rcnts, long *rdsps)
72 {
73  MPI_Comm comm = mr->comm;
74  int nprocs = mr->nprocs;
75  int self = mr->rank;
76  int *rsz;
77  int *rdp;
78  if (!rankzeroonly || self == 0) {
79  rsz = kmr_malloc(sizeof(int) * (size_t)nprocs);
80  rdp = kmr_malloc(sizeof(int) * (size_t)nprocs);
81  for (int r = 0; r < nprocs; r++) {
82  assert(INT_MIN <= rcnts[r] && rcnts[r] <= INT_MAX);
83  assert(INT_MIN <= rdsps[r] && rdsps[r] <= INT_MAX);
84  rsz[r] = (int)rcnts[r];
85  rdp[r] = (int)rdsps[r];
86  }
87  } else {
88  rsz = 0;
89  rdp = 0;
90  }
91  int cc;
92  if (rankzeroonly) {
93  cc = MPI_Gatherv(sbuf, (int)scnt, MPI_BYTE,
94  rbuf, rsz, rdp, MPI_BYTE, 0, comm);
95  assert(cc == MPI_SUCCESS);
96  } else {
97  cc = MPI_Allgatherv(sbuf, (int)scnt, MPI_BYTE,
98  rbuf, rsz, rdp, MPI_BYTE, comm);
99  assert(cc == MPI_SUCCESS);
100  }
101  if (rsz != 0) {
102  kmr_free(rsz, (sizeof(int) * (size_t)nprocs));
103  }
104  if (rdp != 0) {
105  kmr_free(rdp, (sizeof(int) * (size_t)nprocs));
106  }
107  return MPI_SUCCESS;
108 }
109 
110 /* ================================================================ */
111 
112 /** Does all-to-all-v, but it takes arguments of long-integers.
113  Setting ATOA_THRESHOLD=0 forces to use MPI all-to-all-v. */
114 
115 int
117  void *sbuf, long *scnts, long *sdsps,
118  void *rbuf, long *rcnts, long *rdsps)
119 {
120  int nprocs = mr->nprocs;
121  int cc;
122  if (!kmr_powerof4_p(nprocs) || nprocs == 1 || mr->atoa_threshold == 0) {
123  cc = kmr_alltoallv_mpi(mr, sbuf, scnts, sdsps,
124  rbuf, rcnts, rdsps);
125  assert(cc == MPI_SUCCESS);
126  } else {
127  cc = kmr_alltoallv_bruck(mr, sbuf, scnts, sdsps,
128  rbuf, rcnts, rdsps);
129  assert(cc == MPI_SUCCESS);
130  }
131  return MPI_SUCCESS;
132 }
133 
134 /* Does all-to-all-v using MPI_Alltoallv. It takes offsets upto 8 GB
135  (we understand it is not enough). It assumes data is 8-byte
136  aligned. */
137 
138 static int
139 kmr_alltoallv_mpi(KMR *mr,
140  void *sbuf, long *scnts, long *sdsps,
141  void *rbuf, long *rcnts, long *rdsps)
142 {
143  MPI_Comm comm = mr->comm;
144  int nprocs = mr->nprocs;
145  int *ssz = kmr_malloc(sizeof(int) * (size_t)nprocs);
146  int *sdp = kmr_malloc(sizeof(int) * (size_t)nprocs);
147  int *rsz = kmr_malloc(sizeof(int) * (size_t)nprocs);
148  int *rdp = kmr_malloc(sizeof(int) * (size_t)nprocs);
149 
150  for (int r = 0; r < nprocs; r++) {
151  assert(INT_MIN * 8L <= scnts[r] && scnts[r] <= INT_MAX * 8L);
152  assert(INT_MIN * 8L <= rcnts[r] && rcnts[r] <= INT_MAX * 8L);
153  assert(INT_MIN * 8L <= sdsps[r] && sdsps[r] <= INT_MAX * 8L);
154  assert(INT_MIN * 8L <= rdsps[r] && rdsps[r] <= INT_MAX * 8L);
155  assert(((scnts[r] & 7) == 0)
156  && ((rcnts[r] & 7) == 0)
157  && ((sdsps[r] & 7) == 0)
158  && ((rdsps[r] & 7) == 0));
159  ssz[r] = (int)(scnts[r] / 8L);
160  rsz[r] = (int)(rcnts[r] / 8L);
161  sdp[r] = (int)(sdsps[r] / 8L);
162  rdp[r] = (int)(rdsps[r] / 8L);
163  }
164  int cc;
165  cc = MPI_Alltoallv(sbuf, ssz, sdp, MPI_LONG,
166  rbuf, rsz, rdp, MPI_LONG, comm);
167  assert(cc == MPI_SUCCESS);
168 
169  kmr_free(ssz, (sizeof(int) * (size_t)nprocs));
170  kmr_free(rsz, (sizeof(int) * (size_t)nprocs));
171  kmr_free(sdp, (sizeof(int) * (size_t)nprocs));
172  kmr_free(rdp, (sizeof(int) * (size_t)nprocs));
173  return MPI_SUCCESS;
174 }
175 
176 /* Does all-to-all-v using Bruck's all-to-all when message size is
177  small (less than ATOA_THRESHOLD). */
178 
179 static int
180 kmr_alltoallv_bruck(KMR *mr,
181  void *sbuf, long *scnts, long *sdsps,
182  void *rbuf, long *rcnts, long *rdsps)
183 {
184  MPI_Comm comm = mr->comm;
185  int nprocs = mr->nprocs;
186  int cc;
187  char *sbuf0 = sbuf;
188  char *rbuf0 = rbuf;
189 
190  long maxcnt = 0;
191  for (int i = 0; i < nprocs; i++) {
192  maxcnt = MAX(maxcnt, scnts[i]);
193  }
194  cc = MPI_Allreduce(MPI_IN_PLACE, &maxcnt, 1, MPI_LONG, MPI_MAX, comm);
195  assert(cc == MPI_SUCCESS);
196 
197  if (maxcnt >= mr->atoa_threshold) {
198  cc = kmr_alltoallv_mpi(mr, sbuf, scnts, sdsps,
199  rbuf, rcnts, rdsps);
200  assert(cc == MPI_SUCCESS);
201  /*cc = MPI_Alltoall(sb, maxcnt, MPI_BYTE, rb, maxcnt,
202  MPI_BYTE, comm);*/
203  } else {
204  char *sb = kmr_malloc((size_t)(maxcnt * nprocs));
205  char *rb = kmr_malloc((size_t)(maxcnt * nprocs));
206  for (int i = 0; i < nprocs; i++) {
207  memcpy(&sb[maxcnt * i], &sbuf0[sdsps[i]], (size_t)scnts[i]);
208  }
209  cc = kmr_alltoall_bruck(mr, sb, rb, (int)maxcnt);
210  assert(cc == MPI_SUCCESS);
211  for (int i = 0; i < nprocs; i++) {
212  memcpy(&rbuf0[rdsps[i]], &rb[maxcnt * i], (size_t)rcnts[i]);
213  }
214  kmr_free(sb, (size_t)(maxcnt * nprocs));
215  kmr_free(rb, (size_t)(maxcnt * nprocs));
216  }
217  return MPI_SUCCESS;
218 }
219 
220 #if 0
221 static int
222 kmr_alltoall_naive(KMR *mr, void *sbuf, void *rbuf, int cnt)
223 {
224  MPI_Comm comm = mr->comm;
225  int nprocs = mr->nprocs;
226  int rank = mr->rank;
227  int tag = KMR_TAG_ATOA;
228  MPI_Request *rqs = kmr_malloc(sizeof(MPI_Request) * (size_t)(nprocs * 2));
229  int cc;
230  char *r = rbuf;
231  for (int i = 0; i < nprocs; i++) {
232  cc = MPI_Irecv(&r[i * cnt], cnt, MPI_BYTE,
233  i, tag, comm, &rqs[i]);
234  assert(cc == MPI_SUCCESS);
235  }
236  char *s = sbuf;
237  int peer;
238  peer = (rank % nprocs);
239  for (int i = 0; i < nprocs; i++) {
240  peer++;
241  if (peer >= nprocs) {
242  peer -= nprocs;
243  }
244  cc = MPI_Irsend(&s[peer * cnt], cnt, MPI_BYTE,
245  peer, tag, comm, &rqs[nprocs + peer]);
246  assert(cc == MPI_SUCCESS);
247  }
248  cc = MPI_Waitall((2 * nprocs), rqs, MPI_STATUSES_IGNORE);
249  assert(cc == MPI_SUCCESS);
250  kmr_free(rqs, (sizeof(MPI_Request) * (size_t)(nprocs * 2)));
251  return MPI_SUCCESS;
252 }
253 #endif
254 
255 /* Does all-to-all, using Bruck-like butter-fly pattern. */
256 
257 static int
258 kmr_alltoall_bruck(KMR *mr, void *sbuf, void *rbuf, int cnt)
259 {
260 #define DUMP_(X0,X1,X2,X3,X4) if (tracing) kmr_atoa_dump_(X0,X1,X2,X3,X4)
261  MPI_Comm comm = mr->comm;
262  int nprocs = mr->nprocs;
263  int rank = mr->rank;
264  int tag = KMR_TAG_ATOA;
265  _Bool tracing = mr->trace_alltoall;
266  assert((nprocs & 3) == 0);
267  int nprocs4th = (nprocs / 4);
268  int cc;
269 
270  int lognprocs = 0;
271  while ((1 << lognprocs) < nprocs) {
272  lognprocs++;
273  }
274  assert((1 << lognprocs) == nprocs);
275 
276  char *buf0 = kmr_malloc((size_t)(cnt * nprocs));
277  char *buf1 = kmr_malloc((size_t)(cnt * nprocs));
278  memcpy(buf0, sbuf, (size_t)(cnt * nprocs));
279 
280  MPI_Request rqs[6];
281  for (int stage = 0; stage < lognprocs; stage += 2) {
282  DUMP_(mr, buf0, cnt, "step", stage);
283  for (int j = 0; j < nprocs4th; j++) {
284  for (int i = 0; i < 4; i++) {
285  void *s = &buf0[cnt * (i + (j * 4))];
286  void *r = &buf1[cnt * (nprocs4th * i + j)];
287  memcpy(r, s, (size_t)cnt);
288  }
289  }
290  DUMP_(mr, buf1, cnt, "pack", stage);
291  for (int k = 0; k < 4; k++) {
292  int flip = (k << stage);
293  int peer = (rank ^ flip);
294  int baserank = ((rank >> stage) & 3);
295  int basepeer = ((peer >> stage) & 3);
296  if (k == 0) {
297  void *s = &buf1[cnt * (baserank * nprocs4th)];
298  void *r = &buf0[cnt * (baserank * nprocs4th)];
299  memcpy(r, s, (size_t)(cnt * nprocs4th));
300  } else {
301  void *s = &buf1[cnt * (basepeer * nprocs4th)];
302  void *r = &buf0[cnt * (basepeer * nprocs4th)];
303 #if 0
304  cc = MPI_Sendrecv(s, (cnt * nprocs4th), MPI_BYTE, peer, tag,
305  r, (cnt * nprocs4th), MPI_BYTE, peer, tag,
306  comm, MPI_STATUS_IGNORE);
307  assert(cc == MPI_SUCCESS);
308 #else
309  cc = MPI_Isend(s, (cnt * nprocs4th), MPI_BYTE, peer, tag,
310  comm, &rqs[(k - 1) * 2 + 1]);
311  assert(cc == MPI_SUCCESS);
312  cc = MPI_Irecv(r, (cnt * nprocs4th), MPI_BYTE, peer, tag,
313  comm, &rqs[(k - 1) * 2]);
314  assert(cc == MPI_SUCCESS);
315 #endif
316  }
317  }
318  cc = MPI_Waitall(6, rqs, MPI_STATUSES_IGNORE);
319  assert(cc == MPI_SUCCESS);
320  DUMP_(mr, buf0, cnt, "exchange", stage);
321  }
322  memcpy(rbuf, buf0, (size_t)(cnt * nprocs));
323  kmr_free(buf0, (size_t)(cnt * nprocs));
324  kmr_free(buf1, (size_t)(cnt * nprocs));
325  return MPI_SUCCESS;
326 }
327 
328 /* Displays buffer contents (first byte) in the middle of all-to-all.
329  It does nothing when the number of ranks is large. */
330 
331 static void
332 kmr_atoa_dump_(KMR *mr, void *sbuf, int sz, char *title, int step)
333 {
334  MPI_Comm comm = mr->comm;
335  int nprocs = mr->nprocs;
336  int rank = mr->rank;
337  int cc;
338  if (nprocs <= 64) {
339  char *xbuf;
340  if (rank == 0) {
341  xbuf = malloc((size_t)(sz * nprocs * nprocs));
342  assert(xbuf != 0);
343  } else {
344  xbuf = 0;
345  }
346  cc = MPI_Gather(sbuf, (sz * nprocs), MPI_BYTE,
347  xbuf, (sz * nprocs), MPI_BYTE,
348  0, comm);
349  assert(cc == MPI_SUCCESS);
350  if (rank == 0) {
351  fprintf(stderr, ";;KMR %s (%d)\n", title, step);
352  for (int j = 0; j < nprocs; j++) {
353  fprintf(stderr, ";;KMR ");
354  for (int i = 0; i < nprocs; i++) {
355  fprintf(stderr, "%02x ",
356  (0xff & xbuf[(i * (sz * nprocs)) + (j * sz)]));
357  }
358  fprintf(stderr, "\n");
359  }
360  fprintf(stderr, ";;KMR\n");
361  fflush(0);
362  }
363  if (xbuf != 0) {
364  free(xbuf);
365  }
366  MPI_Barrier(comm);
367  }
368 }
369 
370 /* ================================================================ */
371 
372 #if 0
373 int
374 kmr_exscan(void *sbuf, void *rbuf, int cnt, MPI_Datatype dt, MPI_Op op,
375  MPI_Comm comm)
376 {
377  const int SCANTAG = 60;
378  MPI_Comm comm = kvs->c.mr->comm;
379  int nprocs = kvs->c.mr->nprocs;
380  int self = kvs->c.mr->rank;
381  int cc;
382  /*cc = MPI_Exscan(sbuf, rbuf, cnt, dt, op, comm);*/
383  for (int stage = 1; stage < nprocs; stage <<= 1) {
384  int peer = (self ^ stage);
385  if (peer < nprocs) {
386  cc = MPI_Sendrecv(&ssz, 1, MPI_LONG, peer, SCANTAG,
387  &rsz, 1, MPI_LONG, peer, SCANTAG,
388  comm, MPI_STATUS_IGNORE);
389  assert(cc == MPI_SUCCESS);
390  cc = MPI_Sendrecv(sbuf, ssz, MPI_BYTE, peer, SCANTAG,
391  rbuf, rsz, MPI_BYTE, peer, SCANTAG,
392  comm, MPI_STATUS_IGNORE);
393  assert(cc == MPI_SUCCESS);
394  if (self > peer) {
395  /* Do not include the first element of segment. */
396  if ((self & (stage - 1)) != 0) {
397  kmr_add_kv_vector(kvo, rbuf, rsz);
398  }
399  }
400  /* reducevalue*=xbuf */
401  if (commute || self > peer) {
402  kmr_add_kv_vector(kvs, rbuf, rsz);
403  } else {
404  /* PUT AT FRONT */
405  kmr_add_kv_vector(kvs, rbuf, rsz);
406  }
407  }
408  if (kvs->element_count > threshold) {
409  reduce();
410  }
411  }
412  return MPI_SUCCESS;
413 }
414 #endif
415 
416 /*
417 Copyright (C) 2012-2016 RIKEN AICS
418 This library is distributed WITHOUT ANY WARRANTY. This library can be
419 redistributed and/or modified under the terms of the BSD 2-Clause License.
420 */
int kmr_allgatherv(KMR *mr, _Bool rankzeroonly, void *sbuf, long scnt, void *rbuf, long *rcnts, long *rdsps)
All-gathers data, or gathers data when RANKZEROONLY.
Definition: kmratoa.c:70
Utilities Private Part (do not include from applications).
#define kmr_malloc(Z)
Allocates memory, or aborts when failed.
Definition: kmrimpl.h:177
KMR Context.
Definition: kmr.h:222
int kmr_exchange_sizes(KMR *mr, long *sbuf, long *rbuf)
Calls all-to-all to exchange one long-integer.
Definition: kmratoa.c:46
KMR Interface.
int kmr_alltoallv(KMR *mr, void *sbuf, long *scnts, long *sdsps, void *rbuf, long *rcnts, long *rdsps)
Does all-to-all-v, but it takes arguments of long-integers.
Definition: kmratoa.c:116
int kmr_gather_sizes(KMR *mr, long siz, long *rbuf)
Calls all-gather for collecting one long-integer.
Definition: kmratoa.c:58