17 #define MAX(a,b) (((a)>(b))?(a):(b)) 26 assert(kvs0 == 0 && kv0.klen == 0 && kv0.vlen == 0 && kvo != 0);
28 {.klen = 5, .vlen = 7, .k.p =
"key0", .v.p =
"value0"},
29 {.klen = 5, .vlen = 7, .k.p =
"key1", .v.p =
"value1"},
30 {.klen = 5, .vlen = 7, .k.p =
"key2", .v.p =
"value2"},
31 {.klen = 5, .vlen = 7, .k.p =
"key3", .v.p =
"value3"},
32 {.klen = 5, .vlen = 7, .k.p =
"key4", .v.p =
"value4"}
35 for (
int i = 0; i < 5; i++) {
37 assert(cc == MPI_SUCCESS);
46 assert(kvs0 != 0 && kvo != 0);
49 cc = sscanf((&((
char *)kv0.k.p)[3]),
"%d%c", &x, &gomi);
52 snprintf(buf, 10,
"newvalue%d", x);
58 assert(cc == MPI_SUCCESS);
66 aggregatevaluesfn(
const struct kmr_kv_box kv[],
const long n,
70 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
73 printf(
"[%03d] pairs n=%ld\n", rank, n);
74 for (
long i = 0; i < n; i++) {
77 printf(
"[%03d] k[%d]=%s;v[%d]=%s\n", rank,
78 kv[i].klen, kbuf, kv[i].vlen, vbuf);
82 switch (kvs->c.key_data) {
84 assert(kvs->c.key_data != KMR_KV_BAD);
87 assert(kv[i].klen == b0.klen
88 && kv[i].k.i == b0.k.i);
91 assert(kv[i].klen == b0.klen
92 && kv[i].k.d == b0.k.d);
96 case KMR_KV_POINTER_OWNED:
97 case KMR_KV_POINTER_UNMANAGED:
98 assert(kv[i].klen == b0.klen
99 && memcmp(kv[i].k.p, b0.k.p, (
size_t)b0.klen) == 0);
109 snprintf(buf, 48,
"%ld_*_%s", n, kv[0].v.p);
110 int len = (int)(strlen(buf) + 1);
116 assert(cc == MPI_SUCCESS);
124 KMR *mr = kvs0->c.mr;
129 cc = sscanf((&((
char *)kv0.k.p)[3]),
"%d%c", &x, &gomi);
131 snprintf(buf,
sizeof(buf),
"%d_*_newvalue%d", mr->nprocs, x);
132 assert(strlen(kv0.v.p) == strlen(buf));
133 size_t len = strlen(buf);
134 assert(strncmp((kv0.v.p), buf, len) == 0);
141 simple0(
int nprocs,
int rank)
143 MPI_Barrier(MPI_COMM_WORLD);
145 if (rank == 0) {printf(
"simple0...\n");}
154 MPI_Barrier(mr->comm);
156 if (rank == 0) printf(
"ADD\n");
163 assert(cc == MPI_SUCCESS);
168 printf(
"[%03d] cnt0 = %ld\n", rank, cnt0);
172 long histo[MAX(nprocs, 5)];
174 for (
int i = 0; i < MAX(nprocs, 5); i++) {histo[i] = 0;}
177 printf((
"[%03d] histo: %ld %ld %ld %ld %ld...\n" 178 "[%03d] ave/var/min/max: %f %f %ld %ld\n"),
179 rank, histo[0], histo[1], histo[2], histo[3], histo[4],
180 rank, var[0], var[1], (
long)var[2], (
long)var[3]);
186 MPI_Barrier(mr->comm);
188 if (rank == 0) printf(
"REPLICATE\n");
193 assert(cc == MPI_SUCCESS);
198 printf(
"[%03d] cnt0 = %ld\n", rank, cnt1);
199 assert(cnt1 == (N * nprocs));
202 long histo[MAX(nprocs, 5)];
204 for (
int i = 0; i < MAX(nprocs, 5); i++) {histo[i] = 0;}
207 printf((
"[%03d] histo: %ld %ld %ld %ld %ld...\n" 208 "[%03d] ave/var/min/max/: %f %f %ld %ld\n"),
209 rank, histo[0], histo[1], histo[2], histo[3], histo[4],
210 rank, var[0], var[1], (
long)var[2], (
long)var[3]);
216 MPI_Barrier(mr->comm);
218 if (rank == 0) printf(
"MAP\n");
222 cc =
kmr_map(kvs1, kvs2, 0, kmr_noopt, replacevaluefn);
223 assert(cc == MPI_SUCCESS);
227 assert(cnt2 == (N * nprocs));
231 MPI_Barrier(mr->comm);
233 if (rank == 0) printf(
"SHUFFLE\n");
238 assert(cc == MPI_SUCCESS);
242 assert(cnt3 == (N * nprocs));
246 MPI_Barrier(mr->comm);
248 if (rank == 0) printf(
"REDUCE\n");
252 cc =
kmr_reduce(kvs3, kvs4, 0, kmr_noopt, aggregatevaluesfn);
253 assert(cc == MPI_SUCCESS);
263 MPI_Barrier(mr->comm);
265 if (rank == 0) printf(
"GATHER\n");
271 assert(cc == MPI_SUCCESS);
278 cc =
kmr_map(kvs5, 0, 0, kmr_noopt, checkkeyvaluefn);
279 assert(cc == MPI_SUCCESS);
287 simple1(
int nprocs,
int rank)
289 MPI_Barrier(MPI_COMM_WORLD);
291 if (rank == 0) printf(
"simple1...\n");
311 assert(cc == MPI_SUCCESS);
316 cc =
kmr_map(kvs1, kvs2, 0, kmr_noopt, 0);
317 assert(cc == MPI_SUCCESS);
323 assert(cc == MPI_SUCCESS);
329 assert(cc == MPI_SUCCESS);
336 assert(cc == MPI_SUCCESS && data != 0 && sz != 0);
338 assert(cc == MPI_SUCCESS);
342 assert(cc == MPI_SUCCESS);
345 assert(cc == MPI_SUCCESS);
351 main(
int argc,
char *argv[])
353 int nprocs, rank, thlv;
355 MPI_Init_thread(&argc, &argv, MPI_THREAD_SERIALIZED, &thlv);
356 MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
357 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
361 simple0(nprocs, rank);
362 simple1(nprocs, rank);
364 MPI_Barrier(MPI_COMM_WORLD);
366 if (rank == 0) printf(
"OK\n");
Key-Value Stream (abstract).
#define kmr_reduce(KVI, KVO, ARG, OPT, R)
Reduces key-value pairs.
Options to Mapping, Shuffling, and Reduction.
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
#define kmr_create_kvs(MR, KF, VF)
Makes a new key-value stream (of type KMR_KVS) with the specified field datatypes.
int kmr_save_kvs(KMR_KVS *kvi, void **dataq, size_t *szq, struct kmr_option opt)
Packs locally the contents of a key-value stream to a byte array.
int kmr_shuffle(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Shuffles key-value pairs to the appropriate destination ranks.
int kmr_add_kv_done(KMR_KVS *kvs)
Marks finished adding key-value pairs.
int kmr_free_kvs(KMR_KVS *kvs)
Releases a key-value stream (type KMR_KVS).
#define kmr_map(KVI, KVO, ARG, OPT, M)
Maps simply.
Handy Copy of a Key-Value Field.
int kmr_dump_kvs(KMR_KVS *kvs, int flag)
Dumps contents of a key-value stream to stdout.
int kmr_fin(void)
Clears the environment.
int kmr_get_element_count(KMR_KVS *kvs, long *v)
Gets the total number of key-value pairs.
#define kmr_init()
Sets up the environment.
void kmr_dump_opaque(const char *p, int siz, char *buf, int buflen)
Puts the string of the key or value field into a buffer BUF as printable string.
int kmr_histogram_count_by_ranks(KMR_KVS *kvs, long *frq, double *var, _Bool rankzeroonly)
Fills an integer array FRQ[i] with the count of the elements of each rank.
int kmr_restore_kvs(KMR_KVS *kvo, void *data, size_t sz, struct kmr_option opt)
Unpacks locally the contents of a key-value stream from a byte array.
int kmr_free_context(KMR *mr)
Releases a context created with kmr_create_context().
int kmr_replicate(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Replicates key-value pairs to be visible on all ranks, that is, it has the effect of bcast or all-gat...
int kmr_map_on_rank_zero(KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Maps on rank0 only.
KMR * kmr_create_context(const MPI_Comm comm, const MPI_Info conf, const char *name)
Makes a new KMR context (a context has type KMR).