KMR
test1.c
1 /* test1.c (2014-02-04) */
2 
3 /* Check KMR basic operations. It includes similar tests as test0.c,
4  but with many key-value pairs. */
5 
6 /* Run it with "mpirun -np n a.out". */
7 
8 #include <mpi.h>
9 #include <stdio.h>
10 #include <stdlib.h>
11 #include <unistd.h>
12 #include <fcntl.h>
13 #include <limits.h>
14 #include <sys/types.h>
15 #include <sys/stat.h>
16 #include <sys/time.h>
17 #include <assert.h>
18 #ifdef _OPENMP
19 #include <omp.h>
20 #endif
21 
22 #include "kmr.h"
23 #include "kmrimpl.h"
24 
25 static double
26 wtime()
27 {
28  static struct timeval tv0 = {.tv_sec = 0};
29  struct timeval tv;
30  int cc;
31  cc = gettimeofday(&tv, 0);
32  assert(cc == 0);
33  if (tv0.tv_sec == 0) {
34  tv0 = tv;
35  assert(tv0.tv_sec != 0);
36  }
37  double dt = ((double)(tv.tv_sec - tv0.tv_sec)
38  + ((double)(tv.tv_usec - tv0.tv_usec) * 1e-6));
39  return dt;
40 }
41 
42 /* Puts many key-value pairs to output KVO. */
43 
44 static int
45 addsomekeys0(const struct kmr_kv_box kv0,
46  const KMR_KVS *kvs0, KMR_KVS *kvo, void *p, const long i_)
47 {
48  assert(kvs0 == 0 && kv0.klen == 0 && kv0.vlen == 0 && kvo != 0);
49  int N = *((int *)p);
50  char k[80];
51  char v[80];
52  int cc;
53  for (int i = 0; i < N; i++) {
54  snprintf(k, 80, "key%d", i);
55  snprintf(v, 80, "value%d", i);
56  struct kmr_kv_box kv = {
57  .klen = (int)(strlen(k) + 1), .vlen = (int)(strlen(v) + 1),
58  .k.p = k, .v.p = v};
59  cc = kmr_add_kv(kvo, kv);
60  assert(cc == MPI_SUCCESS);
61  }
62  return MPI_SUCCESS;
63 }
64 
65 static int
66 replacevalues0(const struct kmr_kv_box kv0,
67  const KMR_KVS *kvs0, KMR_KVS *kvo, void *p, const long i)
68 {
69  char buf[1024];
70  assert(kvs0 != 0 && kvo != 0);
71  int cc, x;
72  char gomi;
73  cc = sscanf((&((char *)kv0.k.p)[3]), "%d%c", &x, &gomi);
74  assert(cc == 1);
75  snprintf(buf, sizeof(buf), "newvalue%d", x);
76  int vlen = (int)(strlen(buf) + 1);
77  struct kmr_kv_box kv = {.klen = kv0.klen,
78  .vlen = vlen,
79  .k.p = kv0.k.p,
80  .v.p = buf};
81  cc = kmr_add_kv(kvo, kv);
82  assert(cc == MPI_SUCCESS);
83  return MPI_SUCCESS;
84 }
85 
86 /* Aggregates reduction pairs. It asserts key equality for
87  checking. */
88 
89 static int
90 aggregatevalues0(const struct kmr_kv_box kv[], const long n,
91  const KMR_KVS *kvs, KMR_KVS *kvo, void *p)
92 {
93  int nprocs, rank;
94  MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
95  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
96  assert(n == nprocs);
97  for (int i = 0; i < n; i++) {
98  if (i > 0) {
99  struct kmr_kv_box b0 = kv[0];
100  switch (kvs->c.key_data) {
101  case KMR_KV_BAD:
102  assert(kvs->c.key_data != KMR_KV_BAD);
103  break;
104  case KMR_KV_INTEGER:
105  assert(kv[i].klen == b0.klen
106  && kv[i].k.i == b0.k.i);
107  break;
108  case KMR_KV_FLOAT8:
109  assert(kv[i].klen == b0.klen
110  && kv[i].k.d == b0.k.d);
111  break;
112  case KMR_KV_OPAQUE:
113  case KMR_KV_CSTRING:
114  case KMR_KV_POINTER_OWNED:
115  case KMR_KV_POINTER_UNMANAGED:
116  assert(kv[i].klen == b0.klen
117  && memcmp(kv[i].k.p, b0.k.p, (size_t)b0.klen) == 0);
118  break;
119  default:
120  assert(0);
121  break;
122  }
123  }
124  }
125  char buf[48];
126  int cc;
127  snprintf(buf, 48, "%ld_*_%s", n, kv[0].v.p);
128  int len = (int)(strlen(buf) + 1);
129  struct kmr_kv_box akv = {.klen = kv[0].klen,
130  .vlen = len,
131  .k.p = kv[0].k.p,
132  .v.p = buf};
133  cc = kmr_add_kv(kvo, akv);
134  assert(cc == MPI_SUCCESS);
135  return MPI_SUCCESS;
136 }
137 
138 static int
139 checkkeyvalues0(const struct kmr_kv_box kv0,
140  const KMR_KVS *kvs0, KMR_KVS *kvo, void *p, const long i)
141 {
142  KMR *mr = kvs0->c.mr;
143  char buf[1024];
144  int cc;
145  int x;
146  char gomi;
147  cc = sscanf((&((char *)kv0.k.p)[3]), "%d%c", &x, &gomi);
148  assert(cc == 1);
149  snprintf(buf, sizeof(buf), "%d_*_newvalue%d", mr->nprocs, x);
150  assert(strlen(kv0.v.p) == strlen(buf));
151  size_t len = strlen(buf);
152  assert(strncmp((kv0.v.p), buf, len) == 0);
153  return MPI_SUCCESS;
154 }
155 
156 /* Tests simplest. It shrinks the block-size (preset_block_size), to
157  test growing buffers. */
158 
159 static void
160 simple0(int nprocs, int rank, _Bool pushoff)
161 {
162  MPI_Barrier(MPI_COMM_WORLD);
163  usleep(50 * 1000);
164  if (!pushoff) {
165  if (rank == 0) {printf("CHECK SIMPLE OPERATIONS...\n");}
166  } else {
167  if (rank == 0) {printf("CHECK PUSH-OFF KVS...\n");}
168  }
169  fflush(0);
170  usleep(50 * 1000);
171 
172  int cc;
173 
174  KMR *mr = kmr_create_context(MPI_COMM_WORLD, MPI_INFO_NULL, 0);
175  assert(mr != 0);
176  mr->pushoff_stat = 1;
177  mr->preset_block_size = 750;
178  //mr->pushoff_block_size = 1024;
179 
180  int N = 1000000;
181 
182  /* Put pairs. */
183 
184  MPI_Barrier(mr->comm);
185  usleep(50 * 1000);
186  if (rank == 0) {printf("ADD (%d elements)\n", N);}
187  fflush(0);
188  usleep(50 * 1000);
189 
190  KMR_KVS *kvs0 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
191  cc = kmr_map_on_rank_zero(kvs0, &N, kmr_noopt, addsomekeys0);
192  assert(cc == MPI_SUCCESS);
193 
194  long cnt0;
195  cc = kmr_get_element_count(kvs0, &cnt0);
196  assert(cc == MPI_SUCCESS);
197  assert(cnt0 == N);
198 
199  /* Replicate pairs to all ranks. */
200 
201  MPI_Barrier(mr->comm);
202  usleep(50 * 1000);
203  if (rank == 0) {printf("REPLICATE\n");}
204  fflush(0);
205  usleep(50 * 1000);
206 
207  KMR_KVS *kvs1 = kmr_create_kvs(mr, kvs0->c.key_data, kvs0->c.value_data);
208  cc = kmr_replicate(kvs0, kvs1, kmr_noopt);
209  assert(cc == MPI_SUCCESS);
210 
211  long cnt1;
212  cc = kmr_get_element_count(kvs1, &cnt1);
213  assert(cc == MPI_SUCCESS);
214  assert(cnt1 == (N * nprocs));
215 
216  /* Pack and Unpack. */
217 
218  MPI_Barrier(mr->comm);
219  usleep(50 * 1000);
220  if (rank == 0) {printf("PACK+UNPACK\n");}
221  fflush(0);
222  usleep(50 * 1000);
223 
224  void *data = 0;
225  size_t sz = 0;
226  cc = kmr_save_kvs(kvs1, &data, &sz, kmr_noopt);
227  assert(cc == MPI_SUCCESS && data != 0 && sz != 0);
228  cc = kmr_free_kvs(kvs1);
229  assert(cc == MPI_SUCCESS);
230 
231  KMR_KVS *kvs1r = kmr_create_kvs(mr, KMR_KV_BAD, KMR_KV_BAD);
232  cc = kmr_restore_kvs(kvs1r, data, sz, kmr_noopt);
233  assert(cc == MPI_SUCCESS);
234  free(data);
235  assert((kvs1r->c.key_data == KMR_KV_OPAQUE
236  || kvs1r->c.key_data == KMR_KV_CSTRING)
237  && (kvs1r->c.value_data == KMR_KV_OPAQUE
238  || kvs1r->c.value_data == KMR_KV_CSTRING));
239 
240  /* Map pairs. */
241 
242  MPI_Barrier(mr->comm);
243  usleep(50 * 1000);
244  if (rank == 0) {printf("MAP\n");}
245  fflush(0);
246  usleep(50 * 1000);
247 
248  KMR_KVS *kvs2;
249  if (!pushoff) {
250  kvs2 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
251  } else {
252  kvs2 = kmr_create_pushoff_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE,
253  kmr_noopt,
254  __FILE__, __LINE__, __func__);
255  }
256  cc = kmr_map(kvs1r, kvs2, 0, kmr_noopt, replacevalues0);
257  assert(cc == MPI_SUCCESS);
258 
259  long cnt2;
260  cc = kmr_get_element_count(kvs2, &cnt2);
261  assert(cc == MPI_SUCCESS);
262  assert(cnt2 == (N * nprocs));
263 
264  /* Collect pairs by theirs keys. */
265 
266  MPI_Barrier(mr->comm);
267  usleep(50 * 1000);
268  if (rank == 0) {printf("SHUFFLE\n");}
269  fflush(0);
270  usleep(50 * 1000);
271 
272  KMR_KVS *kvs3 = kmr_create_kvs(mr, kvs2->c.key_data, kvs2->c.value_data);
273  cc = kmr_shuffle(kvs2, kvs3, kmr_noopt);
274  assert(cc == MPI_SUCCESS);
275 
276 #if 0
277  if (!pushoff) {
278  kvs3 = kmr_create_kvs(mr, kvs2->c.key_data, kvs2->c.value_data);
279  cc = kmr_shuffle(kvs2, kvs3, kmr_noopt);
280  } else {
281  kvs3 = kvs2;
282  kvs2 = 0;
283  cc = MPI_SUCCESS;
284  }
285  assert(cc == MPI_SUCCESS);
286 #endif
287 
288  long cnt3;
289  cc = kmr_get_element_count(kvs3, &cnt3);
290  assert(cc == MPI_SUCCESS);
291  assert(cnt3 == (N * nprocs));
292 
293  /* Reduce collected pairs. */
294 
295  MPI_Barrier(mr->comm);
296  usleep(50 * 1000);
297  if (rank == 0) {printf("REDUCE\n");}
298  fflush(0);
299  usleep(50 * 1000);
300 
301  KMR_KVS *kvs4 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
302  cc = kmr_reduce(kvs3, kvs4, 0, kmr_noopt, aggregatevalues0);
303  assert(cc == MPI_SUCCESS);
304 
305  //kmr_dump_kvs(kvs4, 0);
306 
307  long cnt4;
308  cc = kmr_get_element_count(kvs4, &cnt4);
309  assert(cc == MPI_SUCCESS);
310  assert(cnt4 == N);
311 
312  /* Gather pairs to rank0. */
313 
314  MPI_Barrier(mr->comm);
315  usleep(50 * 1000);
316  if (rank == 0) {printf("GATHER\n");}
317  fflush(0);
318  usleep(50 * 1000);
319 
320  KMR_KVS *kvs5 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
321  struct kmr_option opt = {.rank_zero = 1};
322  cc = kmr_replicate(kvs4, kvs5, opt);
323  assert(cc == MPI_SUCCESS);
324 
325  long cnt5;
326  cc = kmr_get_element_count(kvs5, &cnt5);
327  assert(cc == MPI_SUCCESS);
328  assert(cnt5 == N);
329 
330  /* Check key-value pairs (pairs on rank0 only). */
331 
332  cc = kmr_map(kvs5, 0, 0, kmr_noopt, checkkeyvalues0);
333  assert(cc == MPI_SUCCESS);
334 
335  if (pushoff && mr->pushoff_stat) {
336  char *s = "STATISTICS on push-off kvs:\n";
337  kmr_print_statistics_on_pushoff(mr, s);
338  }
339 
340  cc = kmr_free_context(mr);
341  assert(cc == MPI_SUCCESS);
342 }
343 
344 static int
345 addkeys1(const struct kmr_kv_box kv0,
346  const KMR_KVS *kvs0, KMR_KVS *kvo, void *p, const long i_)
347 {
348  assert(kvs0 == 0);
349  KMR *mr = kvo->c.mr;
350  assert(mr->rank == 0);
351  long *arg = p;
352  int NN = (int)*arg;
353  for (int i = 0; i < NN; i++) {
354  struct kmr_kv_box kv = {
355  .klen = sizeof(long),
356  .vlen = sizeof(long),
357  .k.i = i,
358  .v.i = i
359  };
360  kmr_add_kv(kvo, kv);
361  }
362  return MPI_SUCCESS;
363 }
364 
365 static int
366 checksorted1(const struct kmr_kv_box kv0,
367  const KMR_KVS *kvs0, KMR_KVS *kvo, void *p, const long i)
368 {
369  assert(kv0.k.i == i && kv0.v.i == i);
370  return MPI_SUCCESS;
371 }
372 
373 /* Tests distribute(). */
374 
375 static void
376 simple1(int nprocs, int rank)
377 {
378  MPI_Barrier(MPI_COMM_WORLD);
379  if (rank == 0) {printf("DISTIBUTE\n");}
380  fflush(0);
381  usleep(50 * 1000);
382 
383  /* Check with 10 keys for each rank. */
384 
385  const long MM = 10;
386  const long NN = MM * nprocs;
387 
388  KMR *mr = kmr_create_context(MPI_COMM_WORLD, MPI_INFO_NULL, 0);
389 
390  KMR_KVS *kvs0 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_INTEGER);
391  kmr_map_on_rank_zero(kvs0, (void *)&NN, kmr_noopt, addkeys1);
392 
393  KMR_KVS *kvs1 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_INTEGER);
394  kmr_distribute(kvs0, kvs1, 1, kmr_noopt);
395  //kmr_dump_kvs(kvs1, 0);
396  assert(kvs1->c.element_count == MM);
397 
398  KMR_KVS *kvs2 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_INTEGER);
399  kmr_sort_small(kvs1, kvs2, kmr_noopt);
400  //kmr_dump_kvs(kvs2, 0);
401 
402  KMR_KVS *kvs3 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_INTEGER);
403  kmr_distribute(kvs2, kvs3, 0, kmr_noopt);
404  //kmr_dump_kvs(kvs3, 0);
405  assert(kvs3->c.element_count == MM);
406 
407  /* Check key-value pairs (after collecting to rank0). */
408 
409  KMR_KVS *kvs4 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_INTEGER);
410  struct kmr_option opt = {.rank_zero = 1};
411  kmr_replicate(kvs3, kvs4, opt);
412 
413  KMR_KVS *kvs5 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_INTEGER);
414  kmr_sort_locally(kvs4, kvs5, 0, kmr_noopt);
415 
416  kmr_map(kvs5, 0, 0, kmr_noopt, checksorted1);
417 
418  kmr_free_context(mr);
419 }
420 
421 /* Tests on empty KVS. */
422 
423 static void
424 simple2(int nprocs, int rank)
425 {
426  MPI_Barrier(MPI_COMM_WORLD);
427  usleep(50 * 1000);
428  if (rank == 0) {printf("CHECK OPERATIONS WITH EMPTY KVS...\n");}
429  fflush(0);
430  usleep(50 * 1000);
431 
432  int cc;
433 
434  KMR *mr = kmr_create_context(MPI_COMM_WORLD, MPI_INFO_NULL, 0);
435  assert(mr != 0);
436 
437  /* Make empty KVS. */
438 
439  KMR_KVS *kvs0 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
440  cc = kmr_add_kv_done(kvs0);
441  assert(cc == MPI_SUCCESS);
442 
443  long cnt0;
444  cc = kmr_get_element_count(kvs0, &cnt0);
445  assert(cc == MPI_SUCCESS);
446  assert(cnt0 == 0);
447 
448  /* Replicate. */
449 
450  KMR_KVS *kvs1 = kmr_create_kvs(mr, kvs0->c.key_data, kvs0->c.value_data);
451  cc = kmr_replicate(kvs0, kvs1, kmr_noopt);
452  assert(cc == MPI_SUCCESS);
453 
454  /* Map. */
455 
456  KMR_KVS *kvs2 = kmr_create_kvs(mr, kvs1->c.key_data, kvs1->c.value_data);
457  cc = kmr_map(kvs1, kvs2, 0, kmr_noopt, 0);
458  assert(cc == MPI_SUCCESS);
459 
460  /* Shuffle. */
461 
462  KMR_KVS *kvs3 = kmr_create_kvs(mr, kvs2->c.key_data, kvs2->c.value_data);
463  cc = kmr_shuffle(kvs2, kvs3, kmr_noopt);
464  assert(cc == MPI_SUCCESS);
465 
466  /* Reduce. */
467 
468  KMR_KVS *kvs4 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
469  cc = kmr_reduce(kvs3, kvs4, 0, kmr_noopt, 0);
470  assert(cc == MPI_SUCCESS);
471 
472  /* Sort. */
473 
474  KMR_KVS *kvs5 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
475  cc = kmr_sort(kvs4, kvs5, kmr_noopt);
476  assert(cc == MPI_SUCCESS);
477  KMR_KVS *kvs6 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
478  cc = kmr_sort_locally(kvs5, kvs6, 0, kmr_noopt);
479  assert(cc == MPI_SUCCESS);
480  cc = kmr_free_kvs(kvs6);
481  assert(cc == MPI_SUCCESS);
482 
483  /* Concat. */
484 
485  KMR_KVS *kvs70 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
486  cc = kmr_add_kv_done(kvs70);
487  assert(cc == MPI_SUCCESS);
488  KMR_KVS *kvs71 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
489  cc = kmr_add_kv_done(kvs71);
490  assert(cc == MPI_SUCCESS);
491  KMR_KVS *kvs72 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
492  KMR_KVS *vec[] = {kvs70, kvs71};
493  kmr_concatenate_kvs(vec, 2, kvs72, kmr_noopt);
494  assert(cc == MPI_SUCCESS);
495  cc = kmr_free_kvs(kvs72);
496  assert(cc == MPI_SUCCESS);
497 
498  cc = kmr_free_context(mr);
499  assert(cc == MPI_SUCCESS);
500 }
501 
502 /* Tests on option (property list) loader. */
503 
504 static void
505 simple3(int nprocs, int rank)
506 {
507  MPI_Barrier(MPI_COMM_WORLD);
508  usleep(50 * 1000);
509  if (rank == 0) {printf("CHECK LOADING OPTIONS (PROPERTY LIST)...\n");}
510  fflush(0);
511  usleep(50 * 1000);
512 
513  static char props[] =
514  "# Test file for property loader.\n"
515  "key0=value0\n"
516  "key1=\n"
517  "=value2\n"
518  "key3 value3\n"
519  "key4\\\n" " " ":value4\n"
520  "ke\\\n" " " "y5=val\\\n" " " "ue5\n"
521  "ke\\\n" " " "\\\n" " " "y6\\\n"
522  "=val\\\n" " " "\\\n" " " "ue6\n"
523  "key\\u0037=\\u0076alue7\n"
524  "key#is!anything=value#is!much=more\n";
525 
526  static char *pairs[][2] =
527  {{"key0", "value0"},
528  {"key1", ""},
529  {"", "value2"},
530  {"key3", "value3"},
531  {"key4", "value4"},
532  {"key5", "value5"},
533  {"key6", "value6"},
534  {"key7", "value7"},
535  {"key#is!anything", "value#is!much=more"}};
536 
537  int npairs = (sizeof(pairs) / (sizeof(char *) * 2));
538 
539  int cc;
540 
541  if (rank == 0) {
542  if (1) {
543  system("rm -f properties");
544  FILE *f = fopen("properties", "w");
545  assert(f != 0);
546  size_t cx = fwrite(props, (sizeof(props) - 1), 1, f);
547  assert(cx == 1);
548  cc = fclose(f);
549  assert(cc == 0);
550  }
551 
552  MPI_Info info;
553  MPI_Info_create(&info);
554 
555  cc = kmr_load_properties(info, "properties");
556  assert(cc == MPI_SUCCESS);
557 
558  //kmr_dump_mpi_info("", info);
559 
560  /* Decrease count by one, because an empty key is ignore. */
561 
562  {
563  int nkeys;
564  cc = MPI_Info_get_nkeys(info, &nkeys);
565  assert(cc == MPI_SUCCESS);
566  /* (SKIP EMPTY KEY AND EMPTY VALUE IN TEST DATA). */
567  assert((npairs - 2) == nkeys);
568  for (int i = 0; i < npairs; i++) {
569  if (*pairs[i][0] == 0) {
570  continue;
571  }
572  if (*pairs[i][1] == 0) {
573  continue;
574  }
575  char *key = pairs[i][0];
576  char value[MPI_MAX_INFO_VAL + 1];
577  int vlen;
578  int flag;
579  cc = MPI_Info_get_valuelen(info, key, &vlen, &flag);
580  assert(cc == MPI_SUCCESS && flag != 0);
581  assert(vlen <= MPI_MAX_INFO_VAL);
582  cc = MPI_Info_get(info, key, MPI_MAX_INFO_VAL, value, &flag);
583  assert(cc == MPI_SUCCESS && flag != 0);
584  assert(strcmp(pairs[i][1], value) == 0);
585  }
586  }
587 
588  MPI_Info_free(&info);
589 
590  if (1) {
591  system("rm -f properties");
592  }
593  }
594 }
595 
596 /* Tests on sorting for small number of entries. */
597 
598 static void
599 simple4(int nprocs, int rank)
600 {
601 #define KLEN 10
602 #define VLEN 90
603 
604  MPI_Barrier(MPI_COMM_WORLD);
605  usleep(50 * 1000);
606  if (rank == 0) {printf("CHECK SORTER (SMALL DATA)...\n");}
607  fflush(0);
608  usleep(50 * 1000);
609 
610  KMR *mr = kmr_create_context(MPI_COMM_WORLD, MPI_INFO_NULL, 0);
611  assert(mr != 0);
612 
613  int cc;
614  char k[KLEN+1];
615  char v[VLEN+1];
616 
617  struct kmr_option inspect = kmr_noopt;
618  inspect.inspect = 1;
619 
620  /* Local sort on opaques. */
621 
622  {
623  int N = 1000 * 1000;
624 
625  if (rank == 0) {
626  printf("Checking local sort on opaques N=%d\n", N);
627  printf("Generating random strings...\n");
628  fflush(0);
629  }
630 
631  KMR_KVS *kvs0 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
632  for (int i = 0; i < N; i++) {
633  const int len = 100;
634  char bb[128];
635  for (int j = 0; j < (len - 1); j++) {
636  long vv = lrand48();
637  assert(vv >= 0);
638  int c0 = (int)(vv % 26);
639  int c1 = ('A' + (c0 - 0));
640  bb[j] = (char)c1;
641  }
642  bb[(len - 1)] = 0;
643  struct kmr_kv_box kv = {
644  .klen = len,
645  .vlen = len,
646  .k.p = bb,
647  .v.p = bb
648  };
649  cc = kmr_add_kv(kvs0, kv);
650  assert(cc == MPI_SUCCESS);
651  }
652  cc = kmr_add_kv_done(kvs0);
653  assert(cc == MPI_SUCCESS);
654 
655  //kmr_dump_kvs(kvs0, 0); fflush(0);
656 
657  if (rank == 0) {printf("Sorting (normal)...\n"); fflush(0);}
658 
659  KMR_KVS *kvs1 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
660  cc = kmr_sort_locally(kvs0, kvs1, 0, kmr_noopt);
661  assert(cc == MPI_SUCCESS);
662  if (rank == 0) {printf("Checking...\n"); fflush(0);}
663  kmr_assert_sorted(kvs1, 1, 0, 0);
664 
665  if (rank == 0) {printf("Sorting (shuffle)...\n"); fflush(0);}
666 
667  KMR_KVS *kvs2 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
668  cc = kmr_sort_locally(kvs1, kvs2, 1, kmr_noopt);
669  assert(cc == MPI_SUCCESS);
670  if (rank == 0) {printf("Check...\n"); fflush(0);}
671  kmr_assert_sorted(kvs2, 1, 1, 0);
672 
673  cc = kmr_free_kvs(kvs2);
674  assert(cc == MPI_SUCCESS);
675  }
676 
677  /* Local sort on integers. */
678 
679  {
680  int N = 1000 * 1000;
681 
682  if (rank == 0) {
683  printf("Checking local sort on integers N=%d\n", N);
684  printf("Generating random numbers...\n");
685  fflush(0);
686  }
687 
688  KMR_KVS *kvs0 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_INTEGER);
689  for (int i = 0; i < N; i++) {
690  long vv = ((mrand48() << 32) ^ mrand48());
691  struct kmr_kv_box kv = {
692  .klen = sizeof(long),
693  .vlen = sizeof(long),
694  .k.i = vv,
695  .v.i = vv
696  };
697  cc = kmr_add_kv(kvs0, kv);
698  assert(cc == MPI_SUCCESS);
699  }
700  cc = kmr_add_kv_done(kvs0);
701  assert(cc == MPI_SUCCESS);
702 
703  if (rank == 0) {printf("Sorting (normal)...\n"); fflush(0);}
704 
705  KMR_KVS *kvs1 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_INTEGER);
706  cc = kmr_sort_locally(kvs0, kvs1, 0, kmr_noopt);
707  assert(cc == MPI_SUCCESS);
708  //kmr_dump_kvs(kvs1, 0); fflush(0);
709  if (rank == 0) {printf("Checking...\n"); fflush(0);}
710  kmr_assert_sorted(kvs1, 1, 0, 0);
711 
712  if (rank == 0) {printf("Sorting (shuffle)...\n"); fflush(0);}
713 
714  KMR_KVS *kvs2 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_INTEGER);
715  cc = kmr_sort_locally(kvs1, kvs2, 1, kmr_noopt);
716  assert(cc == MPI_SUCCESS);
717  if (rank == 0) {printf("Checking...\n"); fflush(0);}
718  kmr_assert_sorted(kvs2, 1, 1, 0);
719 
720  cc = kmr_free_kvs(kvs2);
721  assert(cc == MPI_SUCCESS);
722  }
723 
724  /* Local sort on doubles. */
725 
726  {
727  int N = 1000 * 1000;
728 
729  if (rank == 0) {
730  printf("Checking local sort on doubles N=%d\n", N);
731  printf("Generating random numbers...\n");
732  fflush(0);
733  }
734 
735  KMR_KVS *kvs0 = kmr_create_kvs(mr, KMR_KV_FLOAT8, KMR_KV_FLOAT8);
736  for (int i = 0; i < N; i++) {
737  double vv = (drand48() - 0.5) * 10000.0;
738  struct kmr_kv_box kv = {
739  .klen = sizeof(double),
740  .vlen = sizeof(double),
741  .k.d = vv,
742  .v.d = vv
743  };
744  cc = kmr_add_kv(kvs0, kv);
745  assert(cc == MPI_SUCCESS);
746  }
747  cc = kmr_add_kv_done(kvs0);
748  assert(cc == MPI_SUCCESS);
749 
750  if (rank == 0) {printf("Sorting (normal)...\n"); fflush(0);}
751 
752  KMR_KVS *kvs1 = kmr_create_kvs(mr, KMR_KV_FLOAT8, KMR_KV_FLOAT8);
753  cc = kmr_sort_locally(kvs0, kvs1, 0, kmr_noopt);
754  assert(cc == MPI_SUCCESS);
755  //kmr_dump_kvs(kvs1, 0); fflush(0);
756  if (rank == 0) {printf("Checking...\n"); fflush(0);}
757  kmr_assert_sorted(kvs1, 1, 0, 0);
758 
759  if (rank == 0) {printf("Sorting (shuffle)...\n"); fflush(0);}
760 
761  KMR_KVS *kvs2 = kmr_create_kvs(mr, KMR_KV_FLOAT8, KMR_KV_FLOAT8);
762  cc = kmr_sort_locally(kvs1, kvs2, 1, kmr_noopt);
763  assert(cc == MPI_SUCCESS);
764  if (rank == 0) {printf("Checking...\n"); fflush(0);}
765  kmr_assert_sorted(kvs2, 1, 1, 0);
766 
767  cc = kmr_free_kvs(kvs2);
768  assert(cc == MPI_SUCCESS);
769  }
770 
771  /* Sort short opaque key-values. */
772 
773  {
774  KMR_KVS *kvs0 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
775  int N = 1000;
776  for (int i = 0; i < N; i++) {
777  memset(k, 0, sizeof(k));
778  memset(v, 0, sizeof(v));
779  snprintf(k, (KLEN+1), "%05d%05d", rank, (N - 1 - i));
780  snprintf(v, (VLEN+1), "value=%d/%d", rank, i);
781  struct kmr_kv_box kv = {
782  .klen = KLEN, .vlen = VLEN, .k.p = k, .v.p = v};
783  cc = kmr_add_kv(kvs0, kv);
784  assert(cc == MPI_SUCCESS);
785  }
786  cc = kmr_add_kv_done(kvs0);
787  assert(cc == MPI_SUCCESS);
788 
789  KMR_KVS *kvs1 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
790  cc = kmr_sort_locally(kvs0, kvs1, 0, inspect);
791  assert(cc == MPI_SUCCESS);
792  //kmr_dump_kvs(kvs1, 0);
793  kmr_assert_sorted(kvs1, 1, 0, 0);
794 
795  cc = kmr_free_kvs(kvs0);
796  assert(cc == MPI_SUCCESS);
797  cc = kmr_free_kvs(kvs1);
798  assert(cc == MPI_SUCCESS);
799  }
800 
801  /* (nprocs x nprocs) entries. */
802 
803  {
804  MPI_Barrier(MPI_COMM_WORLD);
805  usleep(50 * 1000);
806  if (rank == 0) {printf("SORT (int) nprocs x nprocs data...\n");}
807  fflush(0);
808  usleep(50 * 1000);
809 
810  KMR_KVS *kvs0 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
811  int N = nprocs;
812  for (int i = 0; i < N; i++) {
813  snprintf(v, 80, "value=%d/%d", rank, i);
814  struct kmr_kv_box kv = {
815  .klen = (int)sizeof(long),
816  .vlen = (int)(strlen(v) + 1),
817  .k.i = (100 * i),
818  .v.p = v};
819  cc = kmr_add_kv(kvs0, kv);
820  assert(cc == MPI_SUCCESS);
821  }
822  cc = kmr_add_kv_done(kvs0);
823  assert(cc == MPI_SUCCESS);
824 
825  KMR_KVS *kvs1 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
826  cc = kmr_sort_small(kvs0, kvs1, inspect);
827  assert(cc == MPI_SUCCESS);
828  //kmr_dump_kvs(kvs1, 0);
829  kmr_assert_sorted(kvs1, 0, 0, 0);
830 
831  KMR_KVS *kvs2 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
832  cc = kmr_sort_large(kvs0, kvs2, inspect);
833  assert(cc == MPI_SUCCESS);
834  //kmr_dump_kvs(kvs2, 0);
835  kmr_assert_sorted(kvs2, 0, 0, 0);
836 
837  KMR_KVS *kvs3 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
838  cc = kmr_sort_by_one(kvs0, kvs3, inspect);
839  assert(cc == MPI_SUCCESS);
840  kmr_assert_sorted(kvs3, 0, 0, 0);
841 
842  cc = kmr_free_kvs(kvs0);
843  assert(cc == MPI_SUCCESS);
844  }
845 
846  /* nprocs entries (on a sole rank). */
847 
848  {
849  MPI_Barrier(MPI_COMM_WORLD);
850  usleep(50 * 1000);
851  if (rank == 0) {printf("SORT (int) nprocs data...\n");}
852  fflush(0);
853  usleep(50 * 1000);
854 
855  KMR_KVS *kvs0 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
856  int N = nprocs;
857  if (rank == (nprocs - 1)) {
858  for (int i = 0; i < N; i++) {
859  snprintf(v, 80, "value=%d/%d", rank, i);
860  struct kmr_kv_box kv = {
861  .klen = (int)sizeof(long),
862  .vlen = (int)(strlen(v) + 1),
863  .k.i = (100 * i),
864  .v.p = v};
865  cc = kmr_add_kv(kvs0, kv);
866  assert(cc == MPI_SUCCESS);
867  }
868  }
869  cc = kmr_add_kv_done(kvs0);
870  assert(cc == MPI_SUCCESS);
871 
872  KMR_KVS *kvs1 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
873  cc = kmr_sort_small(kvs0, kvs1, inspect);
874  assert(cc == MPI_SUCCESS);
875  //kmr_dump_kvs(kvs1, 0);
876  kmr_assert_sorted(kvs1, 0, 0, 0);
877 
878  KMR_KVS *kvs2 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
879  cc = kmr_sort_large(kvs0, kvs2, inspect);
880  assert(cc == MPI_SUCCESS);
881  //kmr_dump_kvs(kvs2, 0);
882  kmr_assert_sorted(kvs2, 0, 0, 0);
883 
884  KMR_KVS *kvs3 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
885  cc = kmr_sort_by_one(kvs0, kvs3, inspect);
886  assert(cc == MPI_SUCCESS);
887  kmr_assert_sorted(kvs3, 0, 0, 0);
888 
889  cc = kmr_free_kvs(kvs0);
890  assert(cc == MPI_SUCCESS);
891  }
892 
893  /* (1/2 x N x nprocs x nprocs) entries. */
894 
895  {
896  MPI_Barrier(MPI_COMM_WORLD);
897  usleep(50 * 1000);
898  if (rank == 0) {printf("SORT (int) N x nprocs x nprocs data...\n");}
899  fflush(0);
900  usleep(50 * 1000);
901 
902  KMR_KVS *kvs0 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
903  int N = (100 * nprocs);
904  for (int i = 0; i < N; i++) {
905  snprintf(v, 80, "value=%d/%d", rank, i);
906  struct kmr_kv_box kv = {
907  .klen = (int)sizeof(long),
908  .vlen = (int)(strlen(v) + 1),
909  .k.i = (100 * i),
910  .v.p = v};
911  cc = kmr_add_kv(kvs0, kv);
912  assert(cc == MPI_SUCCESS);
913  }
914  cc = kmr_add_kv_done(kvs0);
915  assert(cc == MPI_SUCCESS);
916 
917  KMR_KVS *kvs1 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
918  cc = kmr_sort_small(kvs0, kvs1, inspect);
919  assert(cc == MPI_SUCCESS);
920  //kmr_dump_kvs(kvs1, 0);
921  kmr_assert_sorted(kvs1, 0, 0, 0);
922 
923  KMR_KVS *kvs2 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
924  cc = kmr_sort_large(kvs0, kvs2, inspect);
925  assert(cc == MPI_SUCCESS);
926  //kmr_dump_kvs(kvs2, 0);
927  kmr_assert_sorted(kvs2, 0, 0, 0);
928 
929  KMR_KVS *kvs3 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
930  cc = kmr_sort_by_one(kvs0, kvs3, inspect);
931  assert(cc == MPI_SUCCESS);
932  kmr_assert_sorted(kvs3, 0, 0, 0);
933 
934  cc = kmr_free_kvs(kvs0);
935  assert(cc == MPI_SUCCESS);
936  }
937 
938  /* (nprocs x nprocs) entries. */
939 
940  {
941  MPI_Barrier(MPI_COMM_WORLD);
942  usleep(50 * 1000);
943  if (rank == 0) {printf("SORT (byte array) nprocs x nprocs data...\n");}
944  fflush(0);
945  usleep(50 * 1000);
946 
947  KMR_KVS *kvs0 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
948  int N = nprocs;
949  for (int i = 0; i < N; i++) {
950  memset(k, 0, sizeof(k));
951  memset(v, 0, sizeof(v));
952  snprintf(k, (KLEN+1), "%05d%05d", rank, (N - 1 - i));
953  snprintf(v, (VLEN+1), "value=%d/%d", rank, i);
954  struct kmr_kv_box kv = {
955  .klen = KLEN, .vlen = VLEN, .k.p = k, .v.p = v};
956  cc = kmr_add_kv(kvs0, kv);
957  assert(cc == MPI_SUCCESS);
958  }
959  cc = kmr_add_kv_done(kvs0);
960  assert(cc == MPI_SUCCESS);
961 
962  KMR_KVS *kvs1 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
963  cc = kmr_sort_small(kvs0, kvs1, inspect);
964  assert(cc == MPI_SUCCESS);
965  //kmr_dump_kvs(kvs1, 0);
966  kmr_assert_sorted(kvs1, 0, 0, 0);
967 
968  KMR_KVS *kvs2 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
969  cc = kmr_sort_large(kvs0, kvs2, inspect);
970  assert(cc == MPI_SUCCESS);
971  //kmr_dump_kvs(kvs2, 0);
972  kmr_assert_sorted(kvs2, 0, 0, 0);
973 
974  KMR_KVS *kvs3 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
975  cc = kmr_sort_by_one(kvs0, kvs3, inspect);
976  assert(cc == MPI_SUCCESS);
977  kmr_assert_sorted(kvs3, 0, 0, 0);
978 
979  cc = kmr_free_kvs(kvs0);
980  assert(cc == MPI_SUCCESS);
981  }
982 
983  cc = kmr_free_context(mr);
984  assert(cc == MPI_SUCCESS);
985 }
986 
987 static int
988 kmr_icmp(const void *a0, const void *a1)
989 {
990  const long *p0 = a0;
991  const long *p1 = a1;
992  long d = (*p0 - *p1);
993  return ((d == 0) ? 0 : ((d < 0) ? -1 : 1));
994 }
995 
996 /* Check bsearch utility. */
997 
998 static void
999 simple5(int nprocs, int rank)
1000 {
1001  MPI_Barrier(MPI_COMM_WORLD);
1002  usleep(50 * 1000);
1003  if (rank == 0) {printf("CHECK BSEARCH UTILITY...\n");}
1004  fflush(0);
1005  usleep(50 * 1000);
1006 
1007  /* Check with length 20-40. */
1008 
1009 #define NN 41
1010 #define A0(I) (10 + 10 * (I))
1011 
1012  long a0[NN];
1013  for (int i = 0; i < NN; i++) {
1014  a0[i] = A0(i);
1015  }
1016 
1017  for (int N = 20; N < NN; N++) {
1018  long *p0;
1019  long k;
1020 
1021  k = A0(-1);
1022  p0 = kmr_bsearch(&k, a0, (size_t)N, sizeof(long), kmr_icmp);
1023  assert(p0 == &a0[0]);
1024 
1025  for (int i = 0; i < N; i++) {
1026  k = A0(i);
1027  p0 = kmr_bsearch(&k, a0, (size_t)N, sizeof(long), kmr_icmp);
1028  assert(p0 == &a0[i]);
1029  }
1030 
1031  k = A0(N);
1032  p0 = kmr_bsearch(&k, a0, (size_t)N, sizeof(long), kmr_icmp);
1033  assert(p0 == &a0[N]);
1034  }
1035 
1036  MPI_Barrier(MPI_COMM_WORLD);
1037  usleep(50 * 1000);
1038 
1039 #undef NN
1040 #undef A0
1041 }
1042 
1043 /* Test kmr_map_for_some(). */
1044 
1045 static int
1046 addkeys6(const struct kmr_kv_box kv0,
1047  const KMR_KVS *kvs0, KMR_KVS *kvo, void *p, const long i_)
1048 {
1049  long *arg = p;
1050  int NN = (int)*arg;
1051  for (int i = 0; i < NN; i++) {
1052  struct kmr_kv_box kv = {
1053  .klen = sizeof(long),
1054  .vlen = sizeof(long),
1055  .k.i = i,
1056  .v.i = i
1057  };
1058  kmr_add_kv(kvo, kv);
1059  }
1060  return MPI_SUCCESS;
1061 }
1062 
1063 static int
1064 addeach6(const struct kmr_kv_box kv,
1065  const KMR_KVS *kvi, KMR_KVS *kvo, void *arg,
1066  const long index)
1067 {
1068  long *counter = arg;
1069  _Pragma("omp critical")
1070  {
1071  (*counter)++;
1072  }
1073  kmr_add_kv(kvo, kv);
1074  return MPI_SUCCESS;
1075 }
1076 
1077 static int
1078 addnone6(const struct kmr_kv_box kv,
1079  const KMR_KVS *kvi, KMR_KVS *kvo, void *arg,
1080  const long index)
1081 {
1082  long *counter = arg;
1083  _Pragma("omp critical")
1084  {
1085  (*counter)++;
1086  }
1087  return MPI_SUCCESS;
1088 }
1089 
1090 static void
1091 simple6(int nprocs, int rank)
1092 {
1093  MPI_Barrier(MPI_COMM_WORLD);
1094  usleep(50 * 1000);
1095  if (rank == 0) {printf("CHECK KMR_MAP_FOR_SOME()...\n");}
1096  fflush(0);
1097  usleep(50 * 1000);
1098 
1099  struct kmr_option inspect = {.inspect = 1};
1100 
1101  /* Check with 10000 keys. */
1102 
1103  const long NN = 10000;
1104 
1105  KMR *mr = kmr_create_context(MPI_COMM_WORLD, MPI_INFO_NULL, 0);
1106 
1107  KMR_KVS *kvs0 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_INTEGER);
1108  kmr_map_once(kvs0, (void *)&NN, kmr_noopt, 0, addkeys6);
1109 
1110  KMR_KVS *kvs1 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_INTEGER);
1111  kmr_distribute(kvs0, kvs1, 1, kmr_noopt);
1112  assert(kvs1->c.element_count == NN);
1113 
1114  long c1 = 0;
1115  kmr_get_element_count(kvs1, &c1);
1116 
1117  long calls2 = 0;
1118  KMR_KVS *kvs2 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_INTEGER);
1119  kmr_map_for_some(kvs1, kvs2, &calls2, inspect, addeach6);
1120  long c2 = 0;
1121  kmr_get_element_count(kvs2, &c2);
1122  kmr_free_kvs(kvs2);
1123 
1124  if (rank == 0) {
1125  printf("ADD ALL count(kvs2)=%ld for %ld (calls=%ld)\n",
1126  c2, c1, calls2);
1127  fflush(0);
1128  }
1129  assert(c2 > 0 && c2 <= c1);
1130 
1131  long calls3 = 0;
1132  long e1 = kvs1->c.element_count;
1133  KMR_KVS *kvs3 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_INTEGER);
1134  kmr_map_for_some(kvs1, kvs3, &calls3, inspect, addnone6);
1135  long c3 = 0;
1136  kmr_get_element_count(kvs3, &c3);
1137  kmr_free_kvs(kvs3);
1138 
1139  if (rank == 0) {
1140  printf("ADD NONE count(kvs3)=%ld for %ld (calls=%ld for %ld)\n",
1141  c3, c1, calls3, e1);
1142  fflush(0);
1143  }
1144  assert(c3 == 0 && e1 == calls3);
1145 
1146  kmr_free_kvs(kvs1);
1147 
1148  /* Check using KVS after inspecting by kmr_take_one(). */
1149 
1150  MPI_Barrier(MPI_COMM_WORLD);
1151  usleep(50 * 1000);
1152  if (rank == 0) {printf("CHECK KMR_TAKE_ONE()...\n");}
1153  fflush(0);
1154  usleep(50 * 1000);
1155 
1156  const long ONE = 1;
1157 
1158  KMR_KVS *kvs4 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_INTEGER);
1159  kmr_map_once(kvs4, (void *)&ONE, kmr_noopt, 0, addkeys6);
1160 
1161  struct kmr_kv_box kv;
1162  kmr_take_one(kvs4, &kv);
1163  assert(kv.k.i == 0 && kv.v.i == 0);
1164 
1165  long calls4 = 0;
1166  KMR_KVS *kvs5 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_INTEGER);
1167  kmr_map(kvs4, kvs5, &calls4, kmr_noopt, addeach6);
1168 
1169  kmr_free_kvs(kvs5);
1170 
1171  kmr_free_context(mr);
1172 }
1173 
1174 extern void kmr_isort(void *a, size_t n, size_t es, int depth);
1175 
1176 /* Check isort() utility. */
1177 
1178 static void
1179 simple7(int nprocs, int rank)
1180 {
1181  MPI_Barrier(MPI_COMM_WORLD);
1182  usleep(50 * 1000);
1183  if (rank == 0) {printf("CHECK ISORT UTILITY (local sort)...\n");}
1184  fflush(0);
1185  usleep(50 * 1000);
1186 
1187  int setntheads = 1;
1188 
1189 #ifdef __K
1190  char *fastomp = getenv("FLIB_FASTOMP");
1191  if (!(fastomp != 0 && strcmp(fastomp, "FALSE") == 0)) {
1192  if (rank == 0) {printf("Set environment variable FLIB_FASTOMP=FALSE"
1193  " and run again. Otherwise,"
1194  " omp_set_num_threads() is ignored.\n");}
1195  fflush(0);
1196  setntheads = 0;
1197  }
1198 #endif
1199 
1200  /* Check with length 20. */
1201 
1202  if (1) {
1203  long a0[20] = {19, 18, 17, 16, 15, 14, 13, 12, 11, 10,
1204  9, 8, 7, 6, 5, 4, 3, 2, 1, 0};
1205  size_t n0 = (sizeof(a0) / sizeof(long));
1206  for (int i = 0; i < (int)n0; i++) {
1207  a0[i] += 1000000000000;
1208  }
1209 
1210  if (rank == 0) {printf("Sorting data N=%zd\n", n0);}
1211  fflush(0);
1212 
1213  kmr_isort(a0, n0, sizeof(long), 0);
1214 
1215  for (int i = 0; i < (int)n0; i++) {
1216  assert(a0[i] == (i + 1000000000000));
1217  }
1218 
1219  //if (rank == 0) {printf("OK\n");}
1220  //fflush(0);
1221  }
1222 
1223  /* Check with length 3M. */
1224 
1225  if (1) {
1226  //long n1 = 100000000; /*100M*/
1227  long n1 = 3000000; /*3M*/
1228  long *a1 = malloc(sizeof(long) * (size_t)n1);
1229  if (a1 == 0) {
1230  perror("malloc");
1231  MPI_Abort(MPI_COMM_WORLD, 1);
1232  }
1233 
1234  if (rank == 0) {printf("Sorting data N=%zd\n", n1);}
1235  fflush(0);
1236 
1237  if (rank == 0) {printf("Generating random numbers...\n");}
1238  fflush(0);
1239 
1240  for (long i = 0; i < n1; i++) {
1241  a1[i] = ((((long)rand()) << 31) ^ ((long)rand()));
1242  }
1243 
1244  if (0) {
1245  printf("Problem...\n");
1246  for (long i = 0; i < 10; i++) {
1247  printf("%ld\n", a1[i]);
1248  }
1249  printf("\n");
1250  }
1251 
1252  if (rank == 0) {printf("Sorting (threads=1)...\n");}
1253  fflush(0);
1254 
1255  double t0 = wtime();
1256  kmr_isort(a1, (size_t)n1, sizeof(long), 0);
1257  double t1 = wtime();
1258  if (rank == 0) {printf("time=%f\n", (t1 - t0));}
1259  fflush(0);
1260 
1261  if (0) {
1262  printf("Result...\n");
1263  for (long i = 0; i < 10; i++) {
1264  printf("%ld\n", a1[i]);
1265  }
1266  printf("\n");
1267  }
1268 
1269  if (rank == 0) {printf("Checking...\n");}
1270  long lb = LONG_MIN;
1271  for (long i = 0; i < n1; i++) {
1272  assert(a1[i] >= lb);
1273  if (a1[i] > lb) {
1274  lb = a1[i];
1275  }
1276  }
1277 
1278  //if (rank == 0) {printf("OK\n");}
1279  //fflush(0);
1280  }
1281 
1282  if (1) {
1283 #ifdef _OPENMP
1284  //long n5 = 100000000; /*100M*/
1285  long n5 = 3000000; /*3M*/
1286  long *a5 = malloc(sizeof(long) * (size_t)n5);
1287  if (a5 == 0) {
1288  perror("malloc");
1289  MPI_Abort(MPI_COMM_WORLD, 1);
1290  }
1291 
1292  if (rank == 0) {printf("Sorting data N=%zd\n", n5);}
1293 
1294  for (int threads = 1; threads <= 8; threads *= 2) {
1295  if (setntheads) {
1296  omp_set_num_threads(threads);
1297  }
1298 
1299  int threadsused = 0;
1300 #pragma omp parallel
1301  {
1302  threadsused = omp_get_num_threads();
1303  }
1304 
1305  if (rank == 0) {printf("Generating random numbers...\n");}
1306  fflush(0);
1307 
1308  srand(20140204);
1309  for (long i = 0; i < n5; i++) {
1310  a5[i] = ((((long)rand()) << 31) ^ ((long)rand()));
1311  }
1312 
1313  if (rank == 0) {printf("Sorting (threads=%d)...\n", threadsused);}
1314  fflush(0);
1315 
1316  double t0 = wtime();
1317  kmr_isort(a5, (size_t)n5, sizeof(long), 5);
1318  double t1 = wtime();
1319  if (rank == 0) {printf("time=%f\n", (t1 - t0));}
1320  fflush(0);
1321 
1322  if (rank == 0) {printf("Checking...\n");}
1323  fflush(0);
1324 
1325  long lb5 = LONG_MIN;
1326  for (long i = 0; i < n5; i++) {
1327  assert(a5[i] >= lb5);
1328  if (a5[i] > lb5) {
1329  lb5 = a5[i];
1330  }
1331  }
1332 
1333  //if (rank == 0) {printf("OK\n");}
1334  //fflush(0);
1335  }
1336 #else
1337  printf("NOT OMP\n");
1338  fflush(0);
1339 #endif
1340  }
1341 }
1342 
1343 /* Tests kmr_shuffle_leveling_pair_count(). makemanyintegerkeys8()
1344  generate random pairs. copynegate8() makes a copy negating the
1345  value in the KVS for later checking. */
1346 
1347 static int
1348 makemanyintegerkeys8(const struct kmr_kv_box kv0,
1349  const KMR_KVS *kvs0, KMR_KVS *kvo, void *p, const long i_)
1350 {
1351  /* (N: Same keys are generated upto N times). */
1352  int N = 4;
1353  long MM = *(long *)p;
1354  KMR *mr = kvo->c.mr;
1355  int rank = mr->rank;
1356  int nprocs = mr->nprocs;
1357  long cnt = (MM * (nprocs - rank) / nprocs);
1358  for (long i = 0; i < cnt; i++) {
1359  long j = (i / N);
1360  struct kmr_kv_box kv = {
1361  .klen = sizeof(long),
1362  .vlen = sizeof(long),
1363  .k.i = (rank + (j * nprocs)),
1364  .v.i = (i + 1)
1365  };
1366  kmr_add_kv(kvo, kv);
1367  }
1368  return MPI_SUCCESS;
1369 }
1370 
1371 static int
1372 makemanystringkeys8(const struct kmr_kv_box kv0,
1373  const KMR_KVS *kvs0, KMR_KVS *kvo, void *p, const long i_)
1374 {
1375  /* (N: Same keys are generated upto N times). */
1376  int N = 4;
1377  long MM = *(long *)p;
1378  KMR *mr = kvo->c.mr;
1379  int rank = mr->rank;
1380  int nprocs = mr->nprocs;
1381  long cnt = (MM * (nprocs - rank) / nprocs);
1382  for (long i = 0; i < cnt; i++) {
1383  long j = (i / N);
1384  char k[80];
1385  snprintf(k, 80, "key%ld", (rank + (j * nprocs)));
1386  struct kmr_kv_box kv = {
1387  .klen = (int)(strlen(k) + 1),
1388  .vlen = sizeof(long),
1389  .k.p = k,
1390  .v.i = (i + 1)
1391  };
1392  kmr_add_kv(kvo, kv);
1393  }
1394  return MPI_SUCCESS;
1395 }
1396 
1397 static int
1398 copynegate8(const struct kmr_kv_box kv0,
1399  const KMR_KVS *kvs0, KMR_KVS *kvo, void *p, const long i_)
1400 {
1401  assert(kv0.v.i > 0);
1402  struct kmr_kv_box kv = {
1403  .klen = kv0.klen,
1404  .vlen = kv0.vlen,
1405  .k = kv0.k,
1406  .v.i = (- kv0.v.i),
1407  };
1408  kmr_add_kv(kvo, kv);
1409  return MPI_SUCCESS;
1410 }
1411 
1412 static int
1413 comparebycanceling8(const struct kmr_kv_box kv[], const long n,
1414  const KMR_KVS *kvs, KMR_KVS *kvo, void *p)
1415 {
1416  long values[n];
1417  for (long i = 0; i < n; i++) {
1418  assert(kv[i].v.i != 0);
1419  values[i] = kv[i].v.i;
1420  }
1421  for (long i = 0; i < n; i++) {
1422  if (values[i] == 0) {
1423  continue;
1424  } else {
1425  assert(i < (n - 1));
1426  for (long j = (i + 1); j < n; j++) {
1427  if (values[i] == (- values[j])) {
1428  values[i] = 0;
1429  values[j] = 0;
1430  break;
1431  }
1432  assert(j != (n - 1));
1433  }
1434  }
1435  }
1436  for (long i = 0; i < n; i++) {
1437  assert(values[i] == 0);
1438  }
1439  return MPI_SUCCESS;
1440 }
1441 
1442 static void
1443 simple8(int nprocs, int rank)
1444 {
1445  MPI_Barrier(MPI_COMM_WORLD);
1446  if (rank == 0) {printf("PSEUDO-SCAN\n");}
1447  fflush(0);
1448  usleep(50 * 1000);
1449 
1450  /* Check with (MM * (nprocs - rank) / nprocs) keys on ranks. */
1451 
1452  const long MM = 100;
1453 
1454  KMR *mr = kmr_create_context(MPI_COMM_WORLD, MPI_INFO_NULL, 0);
1455 
1456  /* DO ONCE ON INTEGER KEYS AND ONCE ON STRING KEYS. */
1457 
1458  for (int i = 0; i < 2; i++) {
1459  assert(i == 0 || i == 1);
1460  kmr_mapfn_t makedatafn = ((i == 0)
1461  ? makemanyintegerkeys8
1462  : makemanystringkeys8);
1463  enum kmr_kv_field keyf = ((i == 0)
1464  ? KMR_KV_INTEGER
1465  : KMR_KV_OPAQUE);
1466 
1467  KMR_KVS *kvs0 = kmr_create_kvs(mr, keyf, KMR_KV_INTEGER);
1468  kmr_map_once(kvs0, (void *)&MM, kmr_noopt, 0, makedatafn);
1469 
1470  KMR_KVS *kvs1 = kmr_create_kvs(mr, keyf, KMR_KV_INTEGER);
1471  struct kmr_option inspect = {.inspect = 1};
1472  kmr_map(kvs0, kvs1, 0, inspect, copynegate8);
1473 
1474  KMR_KVS *kvs2 = kmr_create_kvs(mr, keyf, KMR_KV_INTEGER);
1475  kmr_shuffle_leveling_pair_count(kvs0, kvs2);
1476 
1477  long counts[nprocs];
1478  double stat[4];
1479  kmr_histogram_count_by_ranks(kvs2, counts, stat, 1);
1480 
1481  if (rank == 0) {
1482  printf("number of pairs on ranks:\n");
1483  for (int r = 0; r < nprocs; r++) {
1484  printf("%ld", counts[r]);
1485  if (r == (nprocs - 1)) {
1486  printf("\n");
1487  } else if (r == 10) {
1488  printf("\n");
1489  } else {
1490  printf(",");
1491  }
1492  }
1493  fflush(0);
1494  }
1495  //kmr_dump_kvs(kvs2, 0);
1496 
1497  /* Check the shuffled KVS is a rearrangement of the original KVS. */
1498 
1499  KMR_KVS *kvs3 = kmr_create_kvs(mr, keyf, KMR_KV_INTEGER);
1500  KMR_KVS *kvsvec[2] = {kvs1, kvs2};
1501  kmr_concatenate_kvs(kvsvec, 2, kvs3, kmr_noopt);
1502 
1503  KMR_KVS *kvs4 = kmr_create_kvs(mr, keyf, KMR_KV_INTEGER);
1504  kmr_shuffle(kvs3, kvs4, kmr_noopt);
1505 
1506  kmr_reduce(kvs4, 0, 0, kmr_noopt, comparebycanceling8);
1507  }
1508 
1509  kmr_free_context(mr);
1510 }
1511 
1512 int
1513 main(int argc, char *argv[])
1514 {
1515  int nprocs, rank, thlv;
1516  /*MPI_Init(&argc, &argv);*/
1517  MPI_Init_thread(&argc, &argv, MPI_THREAD_SERIALIZED, &thlv);
1518  MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
1519  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
1520 
1521  kmr_init();
1522 
1523  if (rank == 0) {
1524 #ifdef NDEBUG
1525  _Bool assertion = 0;
1526 #else
1527  _Bool assertion = 1;
1528 #endif
1529  if (!assertion) {
1530  kmr_error(0, "Assertion disabled; recompile this test");
1531  return 1;
1532  }
1533  }
1534 
1535  simple0(nprocs, rank, 0);
1536  simple0(nprocs, rank, 1);
1537  simple1(nprocs, rank);
1538  simple2(nprocs, rank);
1539  simple3(nprocs, rank);
1540  simple4(nprocs, rank);
1541  simple5(nprocs, rank);
1542  simple6(nprocs, rank);
1543  simple7(nprocs, rank);
1544  simple8(nprocs, rank);
1545 
1546  MPI_Barrier(MPI_COMM_WORLD);
1547  usleep(50 * 1000);
1548  if (rank == 0) {printf("OK\n");}
1549  fflush(0);
1550 
1551  kmr_fin();
1552 
1553  MPI_Finalize();
1554 
1555  return 0;
1556 }
Key-Value Stream (abstract).
Definition: kmr.h:587
Utilities Private Part (do not include from applications).
int kmr_map_for_some(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Maps until some key-value are added.
Definition: kmrmoreops.c:1170
#define kmr_reduce(KVI, KVO, ARG, OPT, R)
Reduces key-value pairs.
Definition: kmr.h:88
Options to Mapping, Shuffling, and Reduction.
Definition: kmr.h:613
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
Definition: kmrbase.c:751
KMR_KVS * kmr_create_pushoff_kvs(KMR *mr, enum kmr_kv_field kf, enum kmr_kv_field vf, struct kmr_option opt, const char *, const int, const char *)
Makes a new key-value stream with the specified field data-types.
Definition: kmraltkvs.c:85
static int kmr_icmp(const void *a0, const void *a1)
Compares the key field of keyed-records for qsort/bsearch.
Definition: kmrbase.c:1815
int kmr_map_once(KMR_KVS *kvo, void *arg, struct kmr_option opt, _Bool rank_zero_only, kmr_mapfn_t m)
Maps once.
Definition: kmrbase.c:1402
#define kmr_create_kvs(MR, KF, VF)
Makes a new key-value stream (of type KMR_KVS) with the specified field datatypes.
Definition: kmr.h:71
int kmr_save_kvs(KMR_KVS *kvi, void **dataq, size_t *szq, struct kmr_option opt)
Packs locally the contents of a key-value stream to a byte array.
Definition: kmrbase.c:968
int kmr_shuffle(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Shuffles key-value pairs to the appropriate destination ranks.
Definition: kmrbase.c:2036
int kmr_distribute(KMR_KVS *kvi, KMR_KVS *kvo, _Bool cyclic, struct kmr_option opt)
Distributes key-values so that each rank has approximately the same number of pairs.
Definition: kmrmoreops.c:835
int kmr_add_kv_done(KMR_KVS *kvs)
Marks finished adding key-value pairs.
Definition: kmrbase.c:881
int kmr_sort_by_one(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Sort by rank0, a degenerated case for small number of keys.
Definition: kmrmoreops.c:544
KMR Context.
Definition: kmr.h:222
int kmr_concatenate_kvs(KMR_KVS *kvs[], int nkvs, KMR_KVS *kvo, struct kmr_option opt)
Concatenates a number of KVSes to one.
Definition: kmrbase.c:2696
int kmr_free_kvs(KMR_KVS *kvs)
Releases a key-value stream (type KMR_KVS).
Definition: kmrbase.c:621
kmr_kv_field
Datatypes of Keys or Values.
Definition: kmr.h:325
void kmr_isort(void *a, size_t n, size_t es, int depth)
Sorts by comparator on long integers.
Definition: kmrisort.c:292
int kmr_take_one(KMR_KVS *kvi, struct kmr_kv_box *kv)
Extracts a single key-value pair locally in the key-value stream KVI.
Definition: kmrbase.c:1369
#define kmr_map(KVI, KVO, ARG, OPT, M)
Maps simply.
Definition: kmr.h:82
Handy Copy of a Key-Value Field.
Definition: kmr.h:358
int kmr_sort_large(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Sorts a key-value stream by the regular or the random sampling-sort.
Definition: kmrmoreops.c:469
int kmr_fin(void)
Clears the environment.
Definition: kmrbase.c:124
int kmr_get_element_count(KMR_KVS *kvs, long *v)
Gets the total number of key-value pairs.
Definition: kmrmoreops.c:114
#define kmr_init()
Sets up the environment.
Definition: kmr.h:747
void * kmr_bsearch(const void *key, const void *base, size_t nel, size_t size, int(*compar)(const void *, const void *))
Searches a key entry like bsearch(3C), but returns a next greater entry instead of null on no match...
Definition: kmrutil.c:565
int kmr_sort(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Sorts a key-value stream globally.
Definition: kmrmoreops.c:575
int kmr_assert_sorted(KMR_KVS *kvi, _Bool locally, _Bool shuffling, _Bool ranking)
Checks a key-value stream is sorted.
Definition: kmrutil.c:702
int kmr_shuffle_leveling_pair_count(KMR_KVS *kvi, KMR_KVS *kvo)
Shuffles key-values so that each rank has approximately the same number of pairs. ...
Definition: kmrmoreops.c:1074
int kmr_histogram_count_by_ranks(KMR_KVS *kvs, long *frq, double *var, _Bool rankzeroonly)
Fills an integer array FRQ[i] with the count of the elements of each rank.
Definition: kmrmoreops.c:1569
int kmr_restore_kvs(KMR_KVS *kvo, void *data, size_t sz, struct kmr_option opt)
Unpacks locally the contents of a key-value stream from a byte array.
Definition: kmrbase.c:1034
int kmr_free_context(KMR *mr)
Releases a context created with kmr_create_context().
Definition: kmrbase.c:326
KMR Interface.
int kmr_load_properties(MPI_Info conf, char *filename)
Loads properties into MPI_Info (in Latin1 characters).
Definition: kmrutil.c:1851
int kmr_replicate(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Replicates key-value pairs to be visible on all ranks, that is, it has the effect of bcast or all-gat...
Definition: kmrbase.c:2182
int kmr_sort_small(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Sorts a key-value stream, by partitioning to equal ranges.
Definition: kmrmoreops.c:388
int kmr_sort_locally(KMR_KVS *kvi, KMR_KVS *kvo, _Bool shuffling, struct kmr_option opt)
Reorders key-value pairs in a single rank.
Definition: kmrbase.c:1993
int(* kmr_mapfn_t)(const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long index)
Map-function Type.
Definition: kmr.h:689
int kmr_map_on_rank_zero(KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Maps on rank0 only.
Definition: kmrbase.c:1456
KMR * kmr_create_context(const MPI_Comm comm, const MPI_Info conf, const char *name)
Makes a new KMR context (a context has type KMR).
Definition: kmrbase.c:147