KMR
test0.c
1 /* test0.c (2014-02-04) */
2 
3 /* Check KMR basic operations. */
4 
5 /* Run it with "mpirun -np n a.out". Most of the checks are in the
6  form of assertion "assert(x)". Do not use this code as an example.
7  KMR makes rank-aware code unnecessary at all. Also, KMR makes MPI
8  calls unnecessary, too, except initialization and finalization.
9  But, this code includes many rank-aware code. */
10 
11 #include <mpi.h>
12 #include <stdio.h>
13 #include <stdlib.h>
14 #include <unistd.h>
15 #include "kmr.h"
16 
17 #define MAX(a,b) (((a)>(b))?(a):(b))
18 
19 /* Puts five key-value pairs to output KVO. It is a map-function. It
20  runs only on rank0. Inputs (KV0 and KVS0) are dummy. */
21 
22 static int
23 addfivekeysfn(const struct kmr_kv_box kv0,
24  const KMR_KVS *kvs0, KMR_KVS *kvo, void *p, const long i_)
25 {
26  assert(kvs0 == 0 && kv0.klen == 0 && kv0.vlen == 0 && kvo != 0);
27  struct kmr_kv_box kv[] = {
28  {.klen = 5, .vlen = 7, .k.p = "key0", .v.p = "value0"},
29  {.klen = 5, .vlen = 7, .k.p = "key1", .v.p = "value1"},
30  {.klen = 5, .vlen = 7, .k.p = "key2", .v.p = "value2"},
31  {.klen = 5, .vlen = 7, .k.p = "key3", .v.p = "value3"},
32  {.klen = 5, .vlen = 7, .k.p = "key4", .v.p = "value4"}
33  };
34  int cc;
35  for (int i = 0; i < 5; i++) {
36  cc = kmr_add_kv(kvo, kv[i]);
37  assert(cc == MPI_SUCCESS);
38  }
39  return MPI_SUCCESS;
40 }
41 
42 static int
43 replacevaluefn(const struct kmr_kv_box kv0,
44  const KMR_KVS *kvs0, KMR_KVS *kvo, void *p, const long i)
45 {
46  assert(kvs0 != 0 && kvo != 0);
47  int cc, x;
48  char gomi;
49  cc = sscanf((&((char *)kv0.k.p)[3]), "%d%c", &x, &gomi);
50  assert(cc == 1);
51  char buf[10];
52  snprintf(buf, 10, "newvalue%d", x);
53  struct kmr_kv_box kv = {.klen = kv0.klen,
54  .vlen = 10,
55  .k.p = kv0.k.p,
56  .v.p = buf};
57  cc = kmr_add_kv(kvo, kv);
58  assert(cc == MPI_SUCCESS);
59  return MPI_SUCCESS;
60 }
61 
62 /* Aggregates reduction pairs. It asserts key equality for
63  checking. */
64 
65 static int
66 aggregatevaluesfn(const struct kmr_kv_box kv[], const long n,
67  const KMR_KVS *kvs, KMR_KVS *kvo, void *p)
68 {
69  int rank;
70  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
71  char kbuf[48];
72  char vbuf[48];
73  printf("[%03d] pairs n=%ld\n", rank, n);
74  for (long i = 0; i < n; i++) {
75  kmr_dump_opaque(kv[i].k.p, kv[i].klen, kbuf, sizeof(kbuf));
76  kmr_dump_opaque(kv[i].v.p, kv[i].vlen, vbuf, sizeof(vbuf));
77  printf("[%03d] k[%d]=%s;v[%d]=%s\n", rank,
78  kv[i].klen, kbuf, kv[i].vlen, vbuf);
79 
80  if (i > 0) {
81  struct kmr_kv_box b0 = kv[0];
82  switch (kvs->c.key_data) {
83  case KMR_KV_BAD:
84  assert(kvs->c.key_data != KMR_KV_BAD);
85  break;
86  case KMR_KV_INTEGER:
87  assert(kv[i].klen == b0.klen
88  && kv[i].k.i == b0.k.i);
89  break;
90  case KMR_KV_FLOAT8:
91  assert(kv[i].klen == b0.klen
92  && kv[i].k.d == b0.k.d);
93  break;
94  case KMR_KV_OPAQUE:
95  case KMR_KV_CSTRING:
96  case KMR_KV_POINTER_OWNED:
97  case KMR_KV_POINTER_UNMANAGED:
98  assert(kv[i].klen == b0.klen
99  && memcmp(kv[i].k.p, b0.k.p, (size_t)b0.klen) == 0);
100  break;
101  default:
102  assert(0);
103  break;
104  }
105  }
106  }
107  char buf[48];
108  int cc;
109  snprintf(buf, 48, "%ld_*_%s", n, kv[0].v.p);
110  int len = (int)(strlen(buf) + 1);
111  struct kmr_kv_box akv = {.klen = kv[0].klen,
112  .vlen = len,
113  .k.p = kv[0].k.p,
114  .v.p = buf};
115  cc = kmr_add_kv(kvo, akv);
116  assert(cc == MPI_SUCCESS);
117  return MPI_SUCCESS;
118 }
119 
120 static int
121 checkkeyvaluefn(const struct kmr_kv_box kv0,
122  const KMR_KVS *kvs0, KMR_KVS *kvo, void *p, const long i)
123 {
124  KMR *mr = kvs0->c.mr;
125  char buf[1024];
126  int cc;
127  int x;
128  char gomi;
129  cc = sscanf((&((char *)kv0.k.p)[3]), "%d%c", &x, &gomi);
130  assert(cc == 1);
131  snprintf(buf, sizeof(buf), "%d_*_newvalue%d", mr->nprocs, x);
132  assert(strlen(kv0.v.p) == strlen(buf));
133  size_t len = strlen(buf);
134  assert(strncmp((kv0.v.p), buf, len) == 0);
135  return MPI_SUCCESS;
136 }
137 
138 /* Tests simplest map/shuffle/reduce sequence. */
139 
140 static void
141 simple0(int nprocs, int rank)
142 {
143  MPI_Barrier(MPI_COMM_WORLD);
144  usleep(50 * 1000);
145  if (rank == 0) {printf("simple0...\n");}
146  fflush(0);
147 
148  int cc;
149 
150  KMR *mr = kmr_create_context(MPI_COMM_WORLD, MPI_INFO_NULL, 0);
151 
152  /* Put five pairs. */
153 
154  MPI_Barrier(mr->comm);
155  usleep(50 * 1000);
156  if (rank == 0) printf("ADD\n");
157  fflush(0);
158 
159  int N = 5;
160 
161  KMR_KVS *kvs0 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
162  cc = kmr_map_on_rank_zero(kvs0, 0, kmr_noopt, addfivekeysfn);
163  assert(cc == MPI_SUCCESS);
164  kmr_dump_kvs(kvs0, 0);
165 
166  long cnt0;
167  kmr_get_element_count(kvs0, &cnt0);
168  printf("[%03d] cnt0 = %ld\n", rank, cnt0);
169  assert(cnt0 == N);
170 
171  {
172  long histo[MAX(nprocs, 5)];
173  double var[4];
174  for (int i = 0; i < MAX(nprocs, 5); i++) {histo[i] = 0;}
175  kmr_histogram_count_by_ranks(kvs0, histo, var, 0);
176  if (rank == 0) {
177  printf(("[%03d] histo: %ld %ld %ld %ld %ld...\n"
178  "[%03d] ave/var/min/max: %f %f %ld %ld\n"),
179  rank, histo[0], histo[1], histo[2], histo[3], histo[4],
180  rank, var[0], var[1], (long)var[2], (long)var[3]);
181  }
182  }
183 
184  /* Replicate pairs to all ranks. */
185 
186  MPI_Barrier(mr->comm);
187  usleep(50 * 1000);
188  if (rank == 0) printf("REPLICATE\n");
189  fflush(0);
190 
191  KMR_KVS *kvs1 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
192  cc = kmr_replicate(kvs0, kvs1, kmr_noopt);
193  assert(cc == MPI_SUCCESS);
194  kmr_dump_kvs(kvs1, 0);
195 
196  long cnt1;
197  kmr_get_element_count(kvs1, &cnt1);
198  printf("[%03d] cnt0 = %ld\n", rank, cnt1);
199  assert(cnt1 == (N * nprocs));
200 
201  {
202  long histo[MAX(nprocs, 5)];
203  double var[4];
204  for (int i = 0; i < MAX(nprocs, 5); i++) {histo[i] = 0;}
205  kmr_histogram_count_by_ranks(kvs1, histo, var, 0);
206  if (rank == 0) {
207  printf(("[%03d] histo: %ld %ld %ld %ld %ld...\n"
208  "[%03d] ave/var/min/max/: %f %f %ld %ld\n"),
209  rank, histo[0], histo[1], histo[2], histo[3], histo[4],
210  rank, var[0], var[1], (long)var[2], (long)var[3]);
211  }
212  }
213 
214  /* Map pairs. */
215 
216  MPI_Barrier(mr->comm);
217  usleep(50 * 1000);
218  if (rank == 0) printf("MAP\n");
219  fflush(0);
220 
221  KMR_KVS *kvs2 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
222  cc = kmr_map(kvs1, kvs2, 0, kmr_noopt, replacevaluefn);
223  assert(cc == MPI_SUCCESS);
224 
225  long cnt2;
226  kmr_get_element_count(kvs2, &cnt2);
227  assert(cnt2 == (N * nprocs));
228 
229  /* Collect pairs by theirs keys. */
230 
231  MPI_Barrier(mr->comm);
232  usleep(50 * 1000);
233  if (rank == 0) printf("SHUFFLE\n");
234  fflush(0);
235 
236  KMR_KVS *kvs3 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
237  cc = kmr_shuffle(kvs2, kvs3, kmr_noopt);
238  assert(cc == MPI_SUCCESS);
239 
240  long cnt3;
241  kmr_get_element_count(kvs3, &cnt3);
242  assert(cnt3 == (N * nprocs));
243 
244  /* Reduce collected pairs. */
245 
246  MPI_Barrier(mr->comm);
247  usleep(50 * 1000);
248  if (rank == 0) printf("REDUCE\n");
249  fflush(0);
250 
251  KMR_KVS *kvs4 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
252  cc = kmr_reduce(kvs3, kvs4, 0, kmr_noopt, aggregatevaluesfn);
253  assert(cc == MPI_SUCCESS);
254 
255  kmr_dump_kvs(kvs4, 0);
256 
257  long cnt4;
258  kmr_get_element_count(kvs4, &cnt4);
259  assert(cnt4 == N);
260 
261  /* Gather pairs to rank0. */
262 
263  MPI_Barrier(mr->comm);
264  usleep(50 * 1000);
265  if (rank == 0) printf("GATHER\n");
266  fflush(0);
267 
268  KMR_KVS *kvs5 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
269  struct kmr_option opt = {.rank_zero = 1};
270  cc = kmr_replicate(kvs4, kvs5, opt);
271  assert(cc == MPI_SUCCESS);
272  kmr_dump_kvs(kvs5, 0);
273 
274  long cnt5;
275  kmr_get_element_count(kvs5, &cnt5);
276  assert(cnt5 == N);
277 
278  cc = kmr_map(kvs5, 0, 0, kmr_noopt, checkkeyvaluefn);
279  assert(cc == MPI_SUCCESS);
280 
281  kmr_free_context(mr);
282 }
283 
284 /* Tests on empty KVS. */
285 
286 static void
287 simple1(int nprocs, int rank)
288 {
289  MPI_Barrier(MPI_COMM_WORLD);
290  usleep(50 * 1000);
291  if (rank == 0) printf("simple1...\n");
292  fflush(0);
293 
294  int cc;
295 
296  KMR *mr = kmr_create_context(MPI_COMM_WORLD, MPI_INFO_NULL, 0);
297 
298  /* Make empty KVS. */
299 
300  KMR_KVS *kvs0 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
301  kmr_add_kv_done(kvs0);
302 
303  long cnt0;
304  kmr_get_element_count(kvs0, &cnt0);
305  assert(cnt0 == 0);
306 
307  /* Replicate. */
308 
309  KMR_KVS *kvs1 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
310  cc = kmr_replicate(kvs0, kvs1, kmr_noopt);
311  assert(cc == MPI_SUCCESS);
312 
313  /* Map. */
314 
315  KMR_KVS *kvs2 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
316  cc = kmr_map(kvs1, kvs2, 0, kmr_noopt, 0);
317  assert(cc == MPI_SUCCESS);
318 
319  /* Shuffle. */
320 
321  KMR_KVS *kvs3 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
322  cc = kmr_shuffle(kvs2, kvs3, kmr_noopt);
323  assert(cc == MPI_SUCCESS);
324 
325  /* Reduce. */
326 
327  KMR_KVS *kvs4 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
328  cc = kmr_reduce(kvs3, kvs4, 0, kmr_noopt, 0);
329  assert(cc == MPI_SUCCESS);
330 
331  /* Save/Restore. */
332 
333  void *data = 0;
334  size_t sz = 0;
335  cc = kmr_save_kvs(kvs4, &data, &sz, kmr_noopt);
336  assert(cc == MPI_SUCCESS && data != 0 && sz != 0);
337  cc = kmr_free_kvs(kvs4);
338  assert(cc == MPI_SUCCESS);
339 
340  KMR_KVS *kvs5 = kmr_create_kvs(mr, KMR_KV_BAD, KMR_KV_BAD);
341  cc = kmr_restore_kvs(kvs5, data, sz, kmr_noopt);
342  assert(cc == MPI_SUCCESS);
343  free(data);
344  cc = kmr_free_kvs(kvs5);
345  assert(cc == MPI_SUCCESS);
346 
347  kmr_free_context(mr);
348 }
349 
350 int
351 main(int argc, char *argv[])
352 {
353  int nprocs, rank, thlv;
354  /*MPI_Init(&argc, &argv);*/
355  MPI_Init_thread(&argc, &argv, MPI_THREAD_SERIALIZED, &thlv);
356  MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
357  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
358 
359  kmr_init();
360 
361  simple0(nprocs, rank);
362  simple1(nprocs, rank);
363 
364  MPI_Barrier(MPI_COMM_WORLD);
365  usleep(50 * 1000);
366  if (rank == 0) printf("OK\n");
367  fflush(0);
368 
369  kmr_fin();
370 
371  MPI_Finalize();
372 
373  return 0;
374 }
Key-Value Stream (abstract).
Definition: kmr.h:587
#define kmr_reduce(KVI, KVO, ARG, OPT, R)
Reduces key-value pairs.
Definition: kmr.h:88
Options to Mapping, Shuffling, and Reduction.
Definition: kmr.h:613
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
Definition: kmrbase.c:751
#define kmr_create_kvs(MR, KF, VF)
Makes a new key-value stream (of type KMR_KVS) with the specified field datatypes.
Definition: kmr.h:71
int kmr_save_kvs(KMR_KVS *kvi, void **dataq, size_t *szq, struct kmr_option opt)
Packs locally the contents of a key-value stream to a byte array.
Definition: kmrbase.c:968
int kmr_shuffle(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Shuffles key-value pairs to the appropriate destination ranks.
Definition: kmrbase.c:2036
int kmr_add_kv_done(KMR_KVS *kvs)
Marks finished adding key-value pairs.
Definition: kmrbase.c:881
KMR Context.
Definition: kmr.h:222
int kmr_free_kvs(KMR_KVS *kvs)
Releases a key-value stream (type KMR_KVS).
Definition: kmrbase.c:621
#define kmr_map(KVI, KVO, ARG, OPT, M)
Maps simply.
Definition: kmr.h:82
Handy Copy of a Key-Value Field.
Definition: kmr.h:358
int kmr_dump_kvs(KMR_KVS *kvs, int flag)
Dumps contents of a key-value stream to stdout.
Definition: kmrutil.c:1609
int kmr_fin(void)
Clears the environment.
Definition: kmrbase.c:124
int kmr_get_element_count(KMR_KVS *kvs, long *v)
Gets the total number of key-value pairs.
Definition: kmrmoreops.c:114
#define kmr_init()
Sets up the environment.
Definition: kmr.h:747
void kmr_dump_opaque(const char *p, int siz, char *buf, int buflen)
Puts the string of the key or value field into a buffer BUF as printable string.
Definition: kmrutil.c:1511
int kmr_histogram_count_by_ranks(KMR_KVS *kvs, long *frq, double *var, _Bool rankzeroonly)
Fills an integer array FRQ[i] with the count of the elements of each rank.
Definition: kmrmoreops.c:1569
int kmr_restore_kvs(KMR_KVS *kvo, void *data, size_t sz, struct kmr_option opt)
Unpacks locally the contents of a key-value stream from a byte array.
Definition: kmrbase.c:1034
int kmr_free_context(KMR *mr)
Releases a context created with kmr_create_context().
Definition: kmrbase.c:326
KMR Interface.
int kmr_replicate(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Replicates key-value pairs to be visible on all ranks, that is, it has the effect of bcast or all-gat...
Definition: kmrbase.c:2182
int kmr_map_on_rank_zero(KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Maps on rank0 only.
Definition: kmrbase.c:1456
KMR * kmr_create_context(const MPI_Comm comm, const MPI_Info conf, const char *name)
Makes a new KMR context (a context has type KMR).
Definition: kmrbase.c:147