20 #define MAX(a,b) (((a)>(b))?(a):(b)) 22 static int kmr_alltoallv_mpi(
KMR *mr,
void *sbuf,
long *scnts,
long *sdsps,
23 void *rbuf,
long *rcnts,
long *rdsps);
24 static int kmr_alltoallv_bruck(
KMR *mr,
void *sbuf,
long *scnts,
long *sdsps,
25 void *rbuf,
long *rcnts,
long *rdsps);
26 static int kmr_alltoall_bruck(
KMR *mr,
void *sbuf,
void *rbuf,
int cnt);
27 static void kmr_atoa_dump_(
KMR *mr,
void *sbuf,
int sz,
char *title,
int step);
34 return ((x > 0) && ((x & (x - 1)) == 0));
40 return (kmr_powerof2_p(x) && ((x & 0x2aaaaaaa) == 0));
48 MPI_Comm comm = mr->comm;
50 cc = MPI_Alltoall(sbuf, 1, MPI_LONG, rbuf, 1, MPI_LONG, comm);
51 assert(cc == MPI_SUCCESS);
60 MPI_Comm comm = mr->comm;
62 cc = MPI_Allgather(&siz, 1, MPI_LONG, rbuf, 1, MPI_LONG, comm);
63 assert(cc == MPI_SUCCESS);
71 void *rbuf,
long *rcnts,
long *rdsps)
73 MPI_Comm comm = mr->comm;
74 int nprocs = mr->nprocs;
78 if (!rankzeroonly ||
self == 0) {
79 rsz =
kmr_malloc(
sizeof(
int) * (
size_t)nprocs);
80 rdp =
kmr_malloc(
sizeof(
int) * (
size_t)nprocs);
81 for (
int r = 0; r < nprocs; r++) {
82 assert(INT_MIN <= rcnts[r] && rcnts[r] <= INT_MAX);
83 assert(INT_MIN <= rdsps[r] && rdsps[r] <= INT_MAX);
84 rsz[r] = (int)rcnts[r];
85 rdp[r] = (int)rdsps[r];
93 cc = MPI_Gatherv(sbuf, (
int)scnt, MPI_BYTE,
94 rbuf, rsz, rdp, MPI_BYTE, 0, comm);
95 assert(cc == MPI_SUCCESS);
97 cc = MPI_Allgatherv(sbuf, (
int)scnt, MPI_BYTE,
98 rbuf, rsz, rdp, MPI_BYTE, comm);
99 assert(cc == MPI_SUCCESS);
102 kmr_free(rsz, (
sizeof(
int) * (
size_t)nprocs));
105 kmr_free(rdp, (
sizeof(
int) * (
size_t)nprocs));
117 void *sbuf,
long *scnts,
long *sdsps,
118 void *rbuf,
long *rcnts,
long *rdsps)
120 int nprocs = mr->nprocs;
122 if (!kmr_powerof4_p(nprocs) || nprocs == 1 || mr->atoa_threshold == 0) {
123 cc = kmr_alltoallv_mpi(mr, sbuf, scnts, sdsps,
125 assert(cc == MPI_SUCCESS);
127 cc = kmr_alltoallv_bruck(mr, sbuf, scnts, sdsps,
129 assert(cc == MPI_SUCCESS);
139 kmr_alltoallv_mpi(
KMR *mr,
140 void *sbuf,
long *scnts,
long *sdsps,
141 void *rbuf,
long *rcnts,
long *rdsps)
143 MPI_Comm comm = mr->comm;
144 int nprocs = mr->nprocs;
145 int *ssz =
kmr_malloc(
sizeof(
int) * (
size_t)nprocs);
146 int *sdp =
kmr_malloc(
sizeof(
int) * (
size_t)nprocs);
147 int *rsz =
kmr_malloc(
sizeof(
int) * (
size_t)nprocs);
148 int *rdp =
kmr_malloc(
sizeof(
int) * (
size_t)nprocs);
150 for (
int r = 0; r < nprocs; r++) {
151 assert(INT_MIN * 8L <= scnts[r] && scnts[r] <= INT_MAX * 8L);
152 assert(INT_MIN * 8L <= rcnts[r] && rcnts[r] <= INT_MAX * 8L);
153 assert(INT_MIN * 8L <= sdsps[r] && sdsps[r] <= INT_MAX * 8L);
154 assert(INT_MIN * 8L <= rdsps[r] && rdsps[r] <= INT_MAX * 8L);
155 assert(((scnts[r] & 7) == 0)
156 && ((rcnts[r] & 7) == 0)
157 && ((sdsps[r] & 7) == 0)
158 && ((rdsps[r] & 7) == 0));
159 ssz[r] = (int)(scnts[r] / 8L);
160 rsz[r] = (int)(rcnts[r] / 8L);
161 sdp[r] = (int)(sdsps[r] / 8L);
162 rdp[r] = (int)(rdsps[r] / 8L);
165 cc = MPI_Alltoallv(sbuf, ssz, sdp, MPI_LONG,
166 rbuf, rsz, rdp, MPI_LONG, comm);
167 assert(cc == MPI_SUCCESS);
169 kmr_free(ssz, (
sizeof(
int) * (
size_t)nprocs));
170 kmr_free(rsz, (
sizeof(
int) * (
size_t)nprocs));
171 kmr_free(sdp, (
sizeof(
int) * (
size_t)nprocs));
172 kmr_free(rdp, (
sizeof(
int) * (
size_t)nprocs));
180 kmr_alltoallv_bruck(
KMR *mr,
181 void *sbuf,
long *scnts,
long *sdsps,
182 void *rbuf,
long *rcnts,
long *rdsps)
184 MPI_Comm comm = mr->comm;
185 int nprocs = mr->nprocs;
191 for (
int i = 0; i < nprocs; i++) {
192 maxcnt = MAX(maxcnt, scnts[i]);
194 cc = MPI_Allreduce(MPI_IN_PLACE, &maxcnt, 1, MPI_LONG, MPI_MAX, comm);
195 assert(cc == MPI_SUCCESS);
197 if (maxcnt >= mr->atoa_threshold) {
198 cc = kmr_alltoallv_mpi(mr, sbuf, scnts, sdsps,
200 assert(cc == MPI_SUCCESS);
204 char *sb =
kmr_malloc((
size_t)(maxcnt * nprocs));
205 char *rb =
kmr_malloc((
size_t)(maxcnt * nprocs));
206 for (
int i = 0; i < nprocs; i++) {
207 memcpy(&sb[maxcnt * i], &sbuf0[sdsps[i]], (
size_t)scnts[i]);
209 cc = kmr_alltoall_bruck(mr, sb, rb, (
int)maxcnt);
210 assert(cc == MPI_SUCCESS);
211 for (
int i = 0; i < nprocs; i++) {
212 memcpy(&rbuf0[rdsps[i]], &rb[maxcnt * i], (
size_t)rcnts[i]);
214 kmr_free(sb, (
size_t)(maxcnt * nprocs));
215 kmr_free(rb, (
size_t)(maxcnt * nprocs));
222 kmr_alltoall_naive(
KMR *mr,
void *sbuf,
void *rbuf,
int cnt)
224 MPI_Comm comm = mr->comm;
225 int nprocs = mr->nprocs;
227 int tag = KMR_TAG_ATOA;
228 MPI_Request *rqs =
kmr_malloc(
sizeof(MPI_Request) * (
size_t)(nprocs * 2));
231 for (
int i = 0; i < nprocs; i++) {
232 cc = MPI_Irecv(&r[i * cnt], cnt, MPI_BYTE,
233 i, tag, comm, &rqs[i]);
234 assert(cc == MPI_SUCCESS);
238 peer = (rank % nprocs);
239 for (
int i = 0; i < nprocs; i++) {
241 if (peer >= nprocs) {
244 cc = MPI_Irsend(&s[peer * cnt], cnt, MPI_BYTE,
245 peer, tag, comm, &rqs[nprocs + peer]);
246 assert(cc == MPI_SUCCESS);
248 cc = MPI_Waitall((2 * nprocs), rqs, MPI_STATUSES_IGNORE);
249 assert(cc == MPI_SUCCESS);
250 kmr_free(rqs, (
sizeof(MPI_Request) * (
size_t)(nprocs * 2)));
258 kmr_alltoall_bruck(
KMR *mr,
void *sbuf,
void *rbuf,
int cnt)
260 #define DUMP_(X0,X1,X2,X3,X4) if (tracing) kmr_atoa_dump_(X0,X1,X2,X3,X4) 261 MPI_Comm comm = mr->comm;
262 int nprocs = mr->nprocs;
264 int tag = KMR_TAG_ATOA;
265 _Bool tracing = mr->trace_alltoall;
266 assert((nprocs & 3) == 0);
267 int nprocs4th = (nprocs / 4);
271 while ((1 << lognprocs) < nprocs) {
274 assert((1 << lognprocs) == nprocs);
276 char *buf0 =
kmr_malloc((
size_t)(cnt * nprocs));
277 char *buf1 =
kmr_malloc((
size_t)(cnt * nprocs));
278 memcpy(buf0, sbuf, (
size_t)(cnt * nprocs));
281 for (
int stage = 0; stage < lognprocs; stage += 2) {
282 DUMP_(mr, buf0, cnt,
"step", stage);
283 for (
int j = 0; j < nprocs4th; j++) {
284 for (
int i = 0; i < 4; i++) {
285 void *s = &buf0[cnt * (i + (j * 4))];
286 void *r = &buf1[cnt * (nprocs4th * i + j)];
287 memcpy(r, s, (
size_t)cnt);
290 DUMP_(mr, buf1, cnt,
"pack", stage);
291 for (
int k = 0; k < 4; k++) {
292 int flip = (k << stage);
293 int peer = (rank ^ flip);
294 int baserank = ((rank >> stage) & 3);
295 int basepeer = ((peer >> stage) & 3);
297 void *s = &buf1[cnt * (baserank * nprocs4th)];
298 void *r = &buf0[cnt * (baserank * nprocs4th)];
299 memcpy(r, s, (
size_t)(cnt * nprocs4th));
301 void *s = &buf1[cnt * (basepeer * nprocs4th)];
302 void *r = &buf0[cnt * (basepeer * nprocs4th)];
304 cc = MPI_Sendrecv(s, (cnt * nprocs4th), MPI_BYTE, peer, tag,
305 r, (cnt * nprocs4th), MPI_BYTE, peer, tag,
306 comm, MPI_STATUS_IGNORE);
307 assert(cc == MPI_SUCCESS);
309 cc = MPI_Isend(s, (cnt * nprocs4th), MPI_BYTE, peer, tag,
310 comm, &rqs[(k - 1) * 2 + 1]);
311 assert(cc == MPI_SUCCESS);
312 cc = MPI_Irecv(r, (cnt * nprocs4th), MPI_BYTE, peer, tag,
313 comm, &rqs[(k - 1) * 2]);
314 assert(cc == MPI_SUCCESS);
318 cc = MPI_Waitall(6, rqs, MPI_STATUSES_IGNORE);
319 assert(cc == MPI_SUCCESS);
320 DUMP_(mr, buf0, cnt,
"exchange", stage);
322 memcpy(rbuf, buf0, (
size_t)(cnt * nprocs));
323 kmr_free(buf0, (
size_t)(cnt * nprocs));
324 kmr_free(buf1, (
size_t)(cnt * nprocs));
332 kmr_atoa_dump_(
KMR *mr,
void *sbuf,
int sz,
char *title,
int step)
334 MPI_Comm comm = mr->comm;
335 int nprocs = mr->nprocs;
341 xbuf = malloc((
size_t)(sz * nprocs * nprocs));
346 cc = MPI_Gather(sbuf, (sz * nprocs), MPI_BYTE,
347 xbuf, (sz * nprocs), MPI_BYTE,
349 assert(cc == MPI_SUCCESS);
351 fprintf(stderr,
";;KMR %s (%d)\n", title, step);
352 for (
int j = 0; j < nprocs; j++) {
353 fprintf(stderr,
";;KMR ");
354 for (
int i = 0; i < nprocs; i++) {
355 fprintf(stderr,
"%02x ",
356 (0xff & xbuf[(i * (sz * nprocs)) + (j * sz)]));
358 fprintf(stderr,
"\n");
360 fprintf(stderr,
";;KMR\n");
374 kmr_exscan(
void *sbuf,
void *rbuf,
int cnt, MPI_Datatype dt, MPI_Op op,
377 const int SCANTAG = 60;
378 MPI_Comm comm = kvs->c.mr->comm;
379 int nprocs = kvs->c.mr->nprocs;
380 int self = kvs->c.mr->rank;
383 for (
int stage = 1; stage < nprocs; stage <<= 1) {
384 int peer = (
self ^ stage);
386 cc = MPI_Sendrecv(&ssz, 1, MPI_LONG, peer, SCANTAG,
387 &rsz, 1, MPI_LONG, peer, SCANTAG,
388 comm, MPI_STATUS_IGNORE);
389 assert(cc == MPI_SUCCESS);
390 cc = MPI_Sendrecv(sbuf, ssz, MPI_BYTE, peer, SCANTAG,
391 rbuf, rsz, MPI_BYTE, peer, SCANTAG,
392 comm, MPI_STATUS_IGNORE);
393 assert(cc == MPI_SUCCESS);
396 if ((
self & (stage - 1)) != 0) {
397 kmr_add_kv_vector(kvo, rbuf, rsz);
401 if (commute ||
self > peer) {
402 kmr_add_kv_vector(kvs, rbuf, rsz);
405 kmr_add_kv_vector(kvs, rbuf, rsz);
408 if (kvs->element_count > threshold) {
int kmr_allgatherv(KMR *mr, _Bool rankzeroonly, void *sbuf, long scnt, void *rbuf, long *rcnts, long *rdsps)
All-gathers data, or gathers data when RANKZEROONLY.
Utilities Private Part (do not include from applications).
#define kmr_malloc(Z)
Allocates memory, or aborts when failed.
int kmr_exchange_sizes(KMR *mr, long *sbuf, long *rbuf)
Calls all-to-all to exchange one long-integer.
int kmr_alltoallv(KMR *mr, void *sbuf, long *scnts, long *sdsps, void *rbuf, long *rcnts, long *rdsps)
Does all-to-all-v, but it takes arguments of long-integers.
int kmr_gather_sizes(KMR *mr, long siz, long *rbuf)
Calls all-gather for collecting one long-integer.