17 static struct timeval tv0 = {.tv_sec = 0};
20 cc = gettimeofday(&tv, 0);
22 if (tv0.tv_sec == 0) {
24 assert(tv0.tv_sec != 0);
26 double dt = ((double)(tv.tv_sec - tv0.tv_sec)
27 + ((double)(tv.tv_usec - tv0.tv_usec) * 1e-6));
38 assert(kvs0 == 0 && kv0.klen == 0 && kv0.vlen == 0 && kvo != 0);
42 for (
int i = 0; i < 200; i++) {
43 snprintf(k, 80,
"key%d", i);
44 snprintf(v, 80,
"value%d", i);
46 .klen = (int)(strlen(k) + 1),
47 .vlen = (
int)(strlen(v) + 1),
52 assert(cc == MPI_SUCCESS);
62 assert(kvs0 != 0 && kvo != 0);
65 cc = sscanf((&((
char *)kv0.k.p)[3]),
"%d%c", &x, &gomi);
68 snprintf(v, 10,
"newvalue%d", x);
70 .vlen = (int)(strlen(v) + 1),
75 assert(cc == MPI_SUCCESS);
80 emptyreducefn(
const struct kmr_kv_box kv[],
const long n,
89 simple0(
int nprocs,
int rank)
98 for (
int i = 0; i < 10000; i++) {
106 .klen = (int)
sizeof(
long),
107 .vlen = (int)
sizeof(
long),
109 .v.i = ((t1 - t0) > 20.0)
112 assert(cc == MPI_SUCCESS);
115 assert(cc == MPI_SUCCESS);
118 assert(cc == MPI_SUCCESS);
119 struct kmr_kv_box tok = {.klen = (int)
sizeof(
long), .k.p = 0,
120 .vlen = 0, .v.p = 0};
123 assert(cc == MPI_SUCCESS);
125 assert(cc == MPI_SUCCESS);
128 printf(
"loops %d\n", i);
137 assert(cc == MPI_SUCCESS);
143 assert(cc == MPI_SUCCESS);
148 cc =
kmr_map(kvs1, kvs2, 0, kmr_noopt, replacevaluefn);
149 assert(cc == MPI_SUCCESS);
155 assert(cc == MPI_SUCCESS);
160 cc =
kmr_reduce(kvs3, kvs4, 0, kmr_noopt, emptyreducefn);
161 assert(cc == MPI_SUCCESS);
164 assert(cc == MPI_SUCCESS);
168 assert(cc == MPI_SUCCESS);
172 main(
int argc,
char *argv[])
178 int nprocs, rank, thlv;
180 MPI_Init_thread(&argc, &argv, MPI_THREAD_SERIALIZED, &thlv);
181 MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
182 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
186 MPI_Barrier(MPI_COMM_WORLD);
187 if (rank == 0) {printf(
"Check leakage by observing heap size.\n");}
188 if (rank == 0) {printf(
"Watch VSZ changes (loops %d times)...\n", N);}
189 if (rank == 0) {printf(
"(Each loop will take approx. 20 sec).\n");}
192 MPI_Barrier(MPI_COMM_WORLD);
194 for (
int i = 0; i < N; i++) {
195 simple0(nprocs, rank);
197 MPI_Barrier(MPI_COMM_WORLD);
199 snprintf(cmd,
sizeof(cmd),
"ps l %d", pid);
205 MPI_Barrier(MPI_COMM_WORLD);
206 if (rank == 0) printf(
"OK\n");
Key-Value Stream (abstract).
#define kmr_reduce(KVI, KVO, ARG, OPT, R)
Reduces key-value pairs.
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
int kmr_find_key(KMR_KVS *kvi, struct kmr_kv_box ki, struct kmr_kv_box *vo)
Finds a key-value pair for a key.
#define kmr_create_kvs(MR, KF, VF)
Makes a new key-value stream (of type KMR_KVS) with the specified field datatypes.
int kmr_shuffle(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Shuffles key-value pairs to the appropriate destination ranks.
int kmr_add_kv_done(KMR_KVS *kvs)
Marks finished adding key-value pairs.
int kmr_free_kvs(KMR_KVS *kvs)
Releases a key-value stream (type KMR_KVS).
#define kmr_map(KVI, KVO, ARG, OPT, M)
Maps simply.
Handy Copy of a Key-Value Field.
int kmr_fin(void)
Clears the environment.
#define kmr_init()
Sets up the environment.
int kmr_free_context(KMR *mr)
Releases a context created with kmr_create_context().
int kmr_replicate(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Replicates key-value pairs to be visible on all ranks, that is, it has the effect of bcast or all-gat...
int kmr_map_on_rank_zero(KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Maps on rank0 only.
KMR * kmr_create_context(const MPI_Comm comm, const MPI_Info conf, const char *name)
Makes a new KMR context (a context has type KMR).