KMR
test3.c
1 /* test3.c (2014-02-04) */
2 
3 /* Check KMR file operations. */
4 
5 /* _GNU_SOURCE for getline(). */
6 
7 #define _GNU_SOURCE
8 
9 #include <mpi.h>
10 #include <stdio.h>
11 #include <unistd.h>
12 #include <limits.h>
13 #include "kmr.h"
14 #include "kmrimpl.h"
15 
16 const _Bool runall = 1;
17 
18 static int
19 FILVAL(int i)
20 {
21  static int filval[] = {907, 911, 919, 929, 937, 941, 947, 953,
22  967, 971, 977, 983, 991, 997};
23  int N = sizeof(filval) / sizeof(int);
24  return (filval[(i % N)] + (503 + 1) * (i / N));
25 }
26 
27 /* Reads unaligned integer (int) value as v[index]. */
28 
29 static int
30 INTAT(void *v, int index)
31 {
32  unsigned char *p = v;
33  union {int i; unsigned char c[sizeof(int)];} u;
34  for (int j = 0; j < (int)sizeof(int); j++) {
35  u.c[j] = p[((int)sizeof(int) * index) + j];
36  }
37  return u.i;
38 }
39 
40 /* Tests kmr_read_files_reassemble(). */
41 
42 static void
43 simple0(KMR *mr, size_t SEGSZ, int step)
44 {
45  int nprocs = mr->nprocs;
46  int rank = mr->rank;
47 
48  MPI_Barrier(MPI_COMM_WORLD);
49  usleep(50 * 1000);
50  if (rank == 0) {printf("CHECK READ BY REASSEMBLE...\n");}
51  fflush(0);
52  usleep(50 * 1000);
53  MPI_Barrier(MPI_COMM_WORLD);
54 
55  int cc;
56  char file[80];
57 
58  /* (Use different file names for each step, to avoid long NFS
59  directory attribute caching (30 seconds)). */
60 
61  snprintf(file, sizeof(file), "dat.%d.%d", step, rank);
62 
63  /* Make test-data files of SEGSZ on each rank. */
64 
65  {
66  char *b = malloc(SEGSZ);
67  if (b == 0) {
68  perror("malloc");
69  MPI_Abort(MPI_COMM_WORLD, 1);
70  }
71  int *bx = (int *)b;
72  int x = FILVAL(rank);
73  for (int i = 0; i < (int)(SEGSZ / sizeof(int)); i++) {
74  bx[i] = x;
75  x *= 13;
76  }
77  FILE *f = fopen(file, "w");
78  assert(f != 0);
79  size_t cx = fwrite(&b[0], SEGSZ, 1, f);
80  assert(cx == 1);
81  cc = fclose(f);
82  assert(cc == 0);
83  free(b);
84 
85  usleep(50 * 1000);
86  MPI_Barrier(MPI_COMM_WORLD);
87  }
88 
89  /* Read by all ranks. */
90 
91  if (runall) {
92  if (rank == 0) {printf("Read+take by all (nprocs=%d)\n", nprocs);}
93  fflush(0);
94  usleep(50 * 1000);
95 
96  int color = 0;
97  void *buffer;
98  off_t size;
99  buffer = 0;
100  size = -1;
101  cc = kmr_read_files_reassemble(mr, file, color, 0, -1,
102  &buffer, &size);
103  assert(cc == MPI_SUCCESS);
104  assert((size_t)size == (SEGSZ * (size_t)nprocs));
105  printf("[%05d] read size=%ld (segment-size=%ld); checking...\n",
106  rank, size, SEGSZ);
107  fflush(0);
108  char *cbuffer = buffer;
109  for (int r = 0; r < nprocs; r++) {
110  int *v = (int *)&cbuffer[SEGSZ * (size_t)r];
111  int x = FILVAL(r);
112  for (int i = 0; i < (int)(SEGSZ / sizeof(int)); i++) {
113  assert(INTAT(v, i) == x);
114  x *= 13;
115  }
116  }
117  free(buffer);
118 
119  usleep(50 * 1000);
120  MPI_Barrier(MPI_COMM_WORLD);
121  }
122 
123  /* Read by one-by-another ranks. */
124 
125  if (runall) {
126  if (rank == 0) {printf("Read+take by all with 2 colors"
127  " (color by even/odd ranks)\n");}
128  fflush(0);
129  usleep(50 * 1000);
130 
131  int color = (rank % 2);
132  int colorset = ((rank % 2) == 0
133  ? ((nprocs + 1)/ 2)
134  : (nprocs / 2));
135  void *buffer;
136  off_t size;
137  buffer = 0;
138  size = -1;
139  cc = kmr_read_files_reassemble(mr, file, color, 0, -1,
140  &buffer, &size);
141  assert(cc == MPI_SUCCESS);
142  assert((size_t)size == (SEGSZ * (size_t)colorset));
143  printf("[%05d] read size=%ld (color=%d/%d segment-size=%ld);"
144  " checking...\n",
145  rank, size, color, colorset, SEGSZ);
146  fflush(0);
147  char *cbuffer = buffer;
148  int index = 0;
149  for (int r = 0; r < nprocs; r++) {
150  if ((r % 2) == color) {
151  int *v = (int *)&cbuffer[SEGSZ * (size_t)index];
152  int x = FILVAL(r);
153  for (int i = 0; i < (int)(SEGSZ / sizeof(int)); i++) {
154  assert(INTAT(v, i) == x);
155  x *= 13;
156  }
157  index++;
158  }
159  }
160  free(buffer);
161 
162  usleep(50 * 1000);
163  MPI_Barrier(MPI_COMM_WORLD);
164  }
165 
166  /* Read by one-by-another ranks, limitting read size. */
167 
168  if (runall) {
169  if (rank == 0) {printf("Read+take by all 2 colors"
170  " with offset/size (64K/16K)\n");}
171  fflush(0);
172  usleep(50 * 1000);
173 
174  off_t off2 = (64 * 1024);
175  off_t sz2 = (16 * 1024);
176  assert((size_t)(off2 + sz2) < SEGSZ);
177  int color = (rank % 2);
178  int colorset = ((rank % 2) == 0
179  ? ((nprocs + 1)/ 2)
180  : (nprocs / 2));
181  void *buffer;
182  off_t size;
183  buffer = 0;
184  size = -1;
185  cc = kmr_read_files_reassemble(mr, file, color, off2, sz2,
186  &buffer, &size);
187  assert(cc == MPI_SUCCESS);
188  assert(size == (sz2 * colorset));
189  printf("[%05d] read size=%ld (color=%d/%d off=%ld sz=%ld);"
190  " checking...\n",
191  rank, size, color, colorset, off2, sz2);
192  fflush(0);
193  char *cbuffer = buffer;
194  int index = 0;
195  for (int r = 0; r < nprocs; r++) {
196  if ((r % 2) == color) {
197  int *v = (int *)&cbuffer[sz2 * index];
198  int x = FILVAL(r);
199  for (int i = 0; i < (int)((size_t)off2 / sizeof(int)); i++) {
200  x *= 13;
201  }
202  for (int i = 0; i < (int)((size_t)sz2 / sizeof(int)); i++) {
203  assert(INTAT(v, i) == x);
204  x *= 13;
205  }
206  index++;
207  }
208  }
209  free(buffer);
210 
211  usleep(50 * 1000);
212  MPI_Barrier(MPI_COMM_WORLD);
213  }
214 
215  /* Read by all ranks but rank=2. */
216 
217  if (runall) {
218  if (rank == 0) {printf("Read by all but rank=2, take by all\n");}
219  fflush(0);
220  usleep(50 * 1000);
221 
222  int SKIPRANK = 2;
223  int nreaders = ((SKIPRANK < nprocs) ? (nprocs - 1) : nprocs);
224  char *filename = ((SKIPRANK == rank) ? 0 : file);
225  int color = 0;
226  void *buffer;
227  off_t size;
228  buffer = 0;
229  size = -1;
230  cc = kmr_read_files_reassemble(mr, filename, color, 0, -1,
231  &buffer, &size);
232  assert(cc == MPI_SUCCESS);
233  assert(buffer != 0);
234  assert((size_t)size == (SEGSZ * (size_t)nreaders));
235  printf("[%05d] read size=%ld (segment-size=%ld); checking...\n",
236  rank, size, SEGSZ);
237  fflush(0);
238  char *cbuffer = buffer;
239  {
240  int index = 0;
241  for (int r = 0; r < nprocs; r++) {
242  if (r != SKIPRANK) {
243  int *v = (int *)&cbuffer[SEGSZ * (size_t)index];
244  int x = FILVAL(r);
245  for (int i = 0; i < (int)(SEGSZ / sizeof(int)); i++) {
246  assert(INTAT(v, i) == x);
247  x *= 13;
248  }
249  index++;
250  }
251  }
252  }
253  free(buffer);
254 
255  usleep(50 * 1000);
256  MPI_Barrier(MPI_COMM_WORLD);
257  }
258 
259  /* Read by all ranks but rank=2, and rank=1 ignore data. */
260 
261  if (runall) {
262  if (rank == 0) {
263  printf("Read by all but rank=2, take by all but rank=1\n");
264  }
265  fflush(0);
266  usleep(50 * 1000);
267 
268  int skiprank = 2;
269  int notakerank = 1;
270  int nreaders = ((skiprank < nprocs) ? (nprocs - 1) : nprocs);
271  char *filename = ((skiprank == rank) ? 0 : file);
272  int color = 0;
273  void *buffer;
274  off_t size;
275  buffer = 0;
276  size = -1;
277  void **buf = ((notakerank == rank) ? 0 : &buffer);
278  cc = kmr_read_files_reassemble(mr, filename, color, 0, -1,
279  buf, &size);
280  assert(cc == MPI_SUCCESS);
281  assert((notakerank == rank)
282  || (size_t)size == (SEGSZ * (size_t)nreaders));
283  printf("[%05d] read size=%ld (segment-size=%ld); checking...\n",
284  rank, size, SEGSZ);
285  fflush(0);
286  char *cbuffer = buffer;
287  if (notakerank != rank) {
288  int index = 0;
289  for (int r = 0; r < nprocs; r++) {
290  if (r != skiprank) {
291  int *v = (int *)&cbuffer[SEGSZ * (size_t)index];
292  int x = FILVAL(r);
293  for (int i = 0; i < (int)(SEGSZ / sizeof(int)); i++) {
294  assert(INTAT(v, i) == x);
295  x *= 13;
296  }
297  index++;
298  }
299  }
300  }
301  if (notakerank != rank) {
302  free(buffer);
303  }
304 
305  usleep(50 * 1000);
306  MPI_Barrier(MPI_COMM_WORLD);
307  }
308 
309  /* Clean up. */
310 
311  {
312  cc = unlink(file);
313  assert(cc == 0);
314  }
315 }
316 
317 /* Tests kmr_read_file_by_segments(). */
318 
319 static void
320 simple1(KMR *mr, size_t SEGSZ, size_t EXTRA, int step)
321 {
322  char file0[80];
323  char file1[80];
324 
325  int nprocs = mr->nprocs;
326  int rank = mr->rank;
327 
328  MPI_Barrier(MPI_COMM_WORLD);
329  usleep(50 * 1000);
330  if (rank == 0) {printf("CHECK READ BY SEGMENTS...\n");}
331  fflush(0);
332  usleep(50 * 1000);
333  MPI_Barrier(MPI_COMM_WORLD);
334 
335  int cc;
336 
337  /* (Use different file names for each step, to avoid long NFS
338  directory attribute caching (30 seconds)). */
339 
340  cc = snprintf(file0, sizeof(file0), "dat.%d.0", step);
341  assert((size_t)cc < sizeof(file0));
342  cc = snprintf(file1, sizeof(file1), "dat.%d.1", step);
343  assert((size_t)cc < sizeof(file1));
344  /*size_t SEGSZ = (1024 * 1024);*/
345  size_t totsz = ((5 * SEGSZ * (size_t)nprocs) / 2 + EXTRA);
346 
347  /* Make two files of ((5/2 * SEGSZ * nprocs) + EXTRA). */
348 
349  {
350  if (rank == 0) {
351  char *b = malloc(totsz);
352  if (b == 0) {
353  perror("malloc");
354  MPI_Abort(MPI_COMM_WORLD, 1);
355  }
356  int nn = (int)(totsz / sizeof(int));
357  int *bx = (int *)b;
358  {
359  int x = FILVAL(0);
360  for (int i = 0; i < nn; i++) {
361  bx[i] = x;
362  x *= 13;
363  }
364  FILE *f = fopen(file0, "w");
365  assert(f != 0);
366  size_t cx = fwrite(&b[0], totsz, 1, f);
367  assert(cx == 1);
368  cc = fclose(f);
369  assert(cc == 0);
370  }
371  {
372  int x = FILVAL(1);
373  for (int i = 0; i < nn; i++) {
374  bx[i] = x;
375  x *= 13;
376  }
377  FILE *f = fopen(file1, "w");
378  assert(f != 0);
379  size_t cx = fwrite(&b[0], totsz, 1, f);
380  assert(cx == 1);
381  cc = fclose(f);
382  assert(cc == 0);
383  }
384  free(b);
385  }
386  usleep(50 * 1000);
387  MPI_Barrier(MPI_COMM_WORLD);
388  }
389 
390  /* Read by all ranks. */
391 
392  if (runall) {
393  MPI_Barrier(MPI_COMM_WORLD);
394  usleep(50 * 1000);
395  if (rank == 0) {printf("Read+take by all (nprocs=%d)\n", nprocs);}
396  fflush(0);
397  usleep(50 * 1000);
398  MPI_Barrier(MPI_COMM_WORLD);
399 
400  int color = 0;
401  void *buffer;
402  off_t size;
403  buffer = 0;
404  size = -1;
405  cc = kmr_read_file_by_segments(mr, file0, color,
406  &buffer, &size);
407  assert(cc == MPI_SUCCESS);
408  printf("[%05d] read size=%ld (segsize=%ld);"
409  " checking...\n",
410  rank, size, SEGSZ);
411  fflush(0);
412 
413  assert((size_t)size == totsz);
414  int nn = (int)(totsz / sizeof(int));
415  int *bx = (int *)buffer;
416  int x = FILVAL(0);
417  for (int i = 0; i < nn; i++) {
418  assert(bx[i] == x);
419  x *= 13;
420  }
421  free(buffer);
422 
423  usleep(50 * 1000);
424  MPI_Barrier(MPI_COMM_WORLD);
425  }
426 
427  /* Read by one-by-another ranks. */
428 
429  if (runall) {
430  MPI_Barrier(MPI_COMM_WORLD);
431  usleep(50 * 1000);
432  if (rank == 0) {printf("Read+take by all for 2 files"
433  " (color by even/odd ranks)\n");}
434  fflush(0);
435  usleep(50 * 1000);
436  MPI_Barrier(MPI_COMM_WORLD);
437 
438  char *file = (((rank % 2) == 0) ? file0 : file1);
439  int color = ((rank % 2) == 0);
440  void *buffer;
441  off_t size;
442  buffer = 0;
443  size = -1;
444  cc = kmr_read_file_by_segments(mr, file, color,
445  &buffer, &size);
446  assert(cc == MPI_SUCCESS);
447  printf("[%05d] read size=%ld (segsize=%ld);"
448  " checking...\n",
449  rank, size, SEGSZ);
450  fflush(0);
451 
452  assert((size_t)size == totsz);
453  int nn = (int)(totsz / sizeof(int));
454  int *bx = (int *)buffer;
455  int x = (((rank % 2) == 0) ? FILVAL(0) : FILVAL(1));
456  for (int i = 0; i < nn; i++) {
457  assert(bx[i] == x);
458  x *= 13;
459  }
460  free(buffer);
461 
462  usleep(50 * 1000);
463  MPI_Barrier(MPI_COMM_WORLD);
464  }
465 
466  /* Read by all ranks but rank=2. */
467 
468  if (runall) {
469  MPI_Barrier(MPI_COMM_WORLD);
470  usleep(50 * 1000);
471  if (rank == 0) {printf("Read+take by all but rank=2\n");}
472  fflush(0);
473  usleep(50 * 1000);
474  MPI_Barrier(MPI_COMM_WORLD);
475 
476  int skiprank = 2;
477  char *file = ((skiprank == rank) ? 0 : file0);
478  int color = ((skiprank == rank) ? -1 : 0);
479  void *buffer;
480  off_t size;
481  buffer = 0;
482  size = -1;
483  void **buf = ((skiprank == rank) ? 0 : &buffer);
484  cc = kmr_read_file_by_segments(mr, file, color,
485  buf, &size);
486  assert(cc == MPI_SUCCESS);
487  printf("[%05d] read size=%ld (segsize=%ld);"
488  " checking...\n",
489  rank, size, SEGSZ);
490  fflush(0);
491 
492  if (skiprank != rank) {
493  assert((size_t)size == totsz);
494  int nn = (int)(totsz / sizeof(int));
495  int *bx = (int *)buffer;
496  int x = FILVAL(0);
497  for (int i = 0; i < nn; i++) {
498  assert(bx[i] == x);
499  x *= 13;
500  }
501  free(buffer);
502  }
503 
504  usleep(50 * 1000);
505  MPI_Barrier(MPI_COMM_WORLD);
506  }
507 
508  /* Read by rank=2, and all others take data. */
509 
510  if (runall) {
511  MPI_Barrier(MPI_COMM_WORLD);
512  usleep(50 * 1000);
513  if (rank == 0) {printf("Read by rank=2, take by all but rank=2\n");}
514  fflush(0);
515  usleep(50 * 1000);
516  MPI_Barrier(MPI_COMM_WORLD);
517 
518  int readrank = 2;
519  char *file = ((readrank == rank) ? file0 : 0);
520  int color = 0;
521  void *buffer;
522  off_t size;
523  buffer = 0;
524  size = -1;
525  void **buf = ((readrank == rank) ? 0 : &buffer);
526  cc = kmr_read_file_by_segments(mr, file, color,
527  buf, &size);
528  assert(cc == MPI_SUCCESS);
529  printf("[%05d] read size=%ld (segsize=%ld);"
530  " checking...\n",
531  rank, size, SEGSZ);
532  fflush(0);
533 
534  if (readrank != rank) {
535  assert((size_t)size == totsz);
536  int nn = (int)(totsz / sizeof(int));
537  int *bx = (int *)buffer;
538  int x = FILVAL(0);
539  for (int i = 0; i < nn; i++) {
540  assert(bx[i] == x);
541  x *= 13;
542  }
543  free(buffer);
544  }
545 
546  usleep(50 * 1000);
547  MPI_Barrier(MPI_COMM_WORLD);
548  }
549 
550  /* Clean up. */
551 
552  {
553  if (rank == 0) {
554  cc = unlink(file0);
555  assert(cc == 0);
556  cc = unlink(file1);
557  assert(cc == 0);
558  }
559  }
560 }
561 
562 static int
563 list_keys(const struct kmr_kv_box kv,
564  const KMR_KVS *kvi, KMR_KVS *kvo, void *p,
565  const long index)
566 {
567  char *buf = p;
568  if (*buf != 0) {
569  strcat(buf, " ");
570  }
571  strcat(buf, kv.k.p);
572  int cc = kmr_add_kv(kvo, kv);
573  assert(cc == MPI_SUCCESS);
574  return MPI_SUCCESS;
575 }
576 
577 static void
578 simple2(KMR *mr)
579 {
580  int nprocs = mr->nprocs;
581  int rank = mr->rank;
582 
583  MPI_Barrier(MPI_COMM_WORLD);
584  usleep(50 * 1000);
585  if (rank == 0) {printf("CHECK MAP FILE NAMES...\n");}
586  fflush(0);
587  usleep(50 * 1000);
588  MPI_Barrier(MPI_COMM_WORLD);
589 
590  int cc;
591 
592  char *datx = "dat.x";
593  char *names0[] = {"dat.0", "dat.1", "dat.2", "dat.3"};
594  int nnames0 = (sizeof(names0) / sizeof(char *));
595 
596  char *dir1 = "dir.0";
597  char *names1[] = {"dat.0.0", "dat.0.1", "dat.0.2", "dat.0.3"};
598  int nnames1 = (sizeof(names1) / sizeof(char *));
599 
600  char *dir2 = "dir.0/dir.0.0";
601  char *names2[] = {"dat.0.0.0", "dat.0.0.1", "dat.0.0.2", "dat.0.0.3"};
602  int nnames2 = (sizeof(names2) / sizeof(char *));
603 
604  if (rank == 0) {
605  printf("Making test files.\n");
606  fflush(0);
607 
608  char ss[(8 * 1024)];
609  for (int i = 0; i < nnames0; i++) {
610  snprintf(ss, sizeof(ss), "touch %s", names0[i]);
611  system(ss);
612  }
613 
614  snprintf(ss, sizeof(ss), "mkdir %s", dir1);
615  system(ss);
616 
617  for (int i = 0; i < nnames1; i++) {
618  snprintf(ss, sizeof(ss), "touch %s/%s", dir1, names1[i]);
619  system(ss);
620  }
621 
622  snprintf(ss, sizeof(ss), "mkdir %s", dir2);
623  system(ss);
624 
625  for (int i = 0; i < nnames2; i++) {
626  snprintf(ss, sizeof(ss), "touch %s/%s", dir2, names2[i]);
627  system(ss);
628  }
629 
630  snprintf(ss, sizeof(ss), "echo '' > %s", datx);
631  system(ss);
632  for (int i = 0; i < nnames0; i++) {
633  snprintf(ss, sizeof(ss), "echo %s >> %s", names0[i], datx);
634  system(ss);
635  }
636  snprintf(ss, sizeof(ss), "echo %s >> %s", dir1, datx);
637  system(ss);
638  }
639  usleep(50 * 1000);
640  MPI_Barrier(MPI_COMM_WORLD);
641 
642  struct kmr_option nothreading = {.nothreading = 1};
643  char buf[64 * 1024];
644 
645  {
646  MPI_Barrier(MPI_COMM_WORLD);
647  usleep(50 * 1000);
648  if (rank == 0) {printf("kmr_map_file_names (no-option).\n");}
649  fflush(0);
650  usleep(50 * 1000);
651  MPI_Barrier(MPI_COMM_WORLD);
652 
653  struct kmr_file_option opt0 = kmr_fnoopt;
654  KMR_KVS *kvs0 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_INTEGER);
655  *buf = 0;
656  cc = kmr_map_file_names(mr, names0, nnames0, opt0,
657  kvs0, buf, nothreading, list_keys);
658  assert(cc == MPI_SUCCESS);
659 
660  long cnt0;
661  cc = kmr_get_element_count(kvs0, &cnt0);
662  assert(cc == MPI_SUCCESS);
663  assert(cnt0 == 4);
664  //printf("[%05d] [%s]\n", rank, buf);
665  //fflush(0);
666 
667  kmr_free_kvs(kvs0);
668  usleep(50 * 1000);
669  MPI_Barrier(MPI_COMM_WORLD);
670  }
671 
672  {
673  MPI_Barrier(MPI_COMM_WORLD);
674  usleep(50 * 1000);
675  if (rank == 0) {printf("kmr_map_file_names (each_rank).\n");}
676  fflush(0);
677  usleep(50 * 1000);
678  MPI_Barrier(MPI_COMM_WORLD);
679 
680  struct kmr_file_option opt1 = {.each_rank = 1};
681  KMR_KVS *kvs1 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_INTEGER);
682  *buf = 0;
683  cc = kmr_map_file_names(mr, names0, nnames0, opt1,
684  kvs1, buf, nothreading, list_keys);
685  assert(cc == MPI_SUCCESS);
686 
687  long cnt1;
688  cc = kmr_get_element_count(kvs1, &cnt1);
689  assert(cc == MPI_SUCCESS);
690  assert(cnt1 == (4 * nprocs));
691  //printf("[%05d] [%s]\n", rank, buf);
692  //fflush(0);
693 
694  kmr_free_kvs(kvs1);
695  usleep(50 * 1000);
696  MPI_Barrier(MPI_COMM_WORLD);
697  }
698 
699  {
700  MPI_Barrier(MPI_COMM_WORLD);
701  usleep(50 * 1000);
702  if (rank == 0) {printf("kmr_map_file_names (list_file).\n");}
703  fflush(0);
704  usleep(50 * 1000);
705  MPI_Barrier(MPI_COMM_WORLD);
706 
707  struct kmr_file_option opt2 = {.list_file = 1};
708  KMR_KVS *kvs2 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_INTEGER);
709  *buf = 0;
710  char *na[] = {datx};
711  cc = kmr_map_file_names(mr, na, 1, opt2,
712  kvs2, buf, nothreading, list_keys);
713  assert(cc == MPI_SUCCESS);
714 
715  long cnt2;
716  cc = kmr_get_element_count(kvs2, &cnt2);
717  assert(cc == MPI_SUCCESS);
718  assert(cnt2 == 12);
719  //printf("[%05d] [%s]\n", rank, buf);
720  //fflush(0);
721 
722  kmr_free_kvs(kvs2);
723  usleep(50 * 1000);
724  MPI_Barrier(MPI_COMM_WORLD);
725  }
726 
727  {
728  MPI_Barrier(MPI_COMM_WORLD);
729  usleep(50 * 1000);
730  if (rank == 0) {printf("kmr_map_file_names (subdirs).\n");}
731  fflush(0);
732  usleep(50 * 1000);
733  MPI_Barrier(MPI_COMM_WORLD);
734 
735  struct kmr_file_option opt3 = {.subdirectories = 1};
736  KMR_KVS *kvs3 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_INTEGER);
737  *buf = 0;
738  char *na[] = {dir1};
739  cc = kmr_map_file_names(mr, na, 1, opt3,
740  kvs3, buf, nothreading, list_keys);
741  assert(cc == MPI_SUCCESS);
742 
743  long cnt3;
744  cc = kmr_get_element_count(kvs3, &cnt3);
745  assert(cc == MPI_SUCCESS);
746  assert(cnt3 == 8);
747  //printf("[%05d] [%s]\n", rank, buf);
748  //fflush(0);
749 
750  kmr_free_kvs(kvs3);
751  usleep(50 * 1000);
752  MPI_Barrier(MPI_COMM_WORLD);
753  }
754 
755  {
756  MPI_Barrier(MPI_COMM_WORLD);
757  usleep(50 * 1000);
758  if (rank == 0) {printf("kmr_map_file_names (enumerate files).\n");}
759  fflush(0);
760  usleep(50 * 1000);
761  MPI_Barrier(MPI_COMM_WORLD);
762 
763  struct kmr_file_option opt4 = {.list_file = 1};
764  KMR_KVS *kvs4 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_INTEGER);
765  char *na[] = {datx};
766  cc = kmr_map_file_names(mr, na, 1, opt4,
767  kvs4, 0, nothreading, 0);
768  assert(cc == MPI_SUCCESS);
769 
770  long cnt4;
771  cc = kmr_get_element_count(kvs4, &cnt4);
772  assert(cc == MPI_SUCCESS);
773  assert(cnt4 == 12);
774 
775  kmr_free_kvs(kvs4);
776  usleep(50 * 1000);
777  MPI_Barrier(MPI_COMM_WORLD);
778  }
779 
780  {
781  MPI_Barrier(MPI_COMM_WORLD);
782  usleep(50 * 1000);
783  if (rank == 0) {printf("kmr_map_file_names (shuffle_names).\n");}
784  fflush(0);
785  usleep(50 * 1000);
786  MPI_Barrier(MPI_COMM_WORLD);
787 
788  struct kmr_file_option opt5 = {.list_file = 1, .shuffle_names = 1};
789  KMR_KVS *kvs5 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_INTEGER);
790  *buf = 0;
791  char *na[] = {datx};
792  cc = kmr_map_file_names(mr, na, 1, opt5,
793  kvs5, buf, nothreading, list_keys);
794  assert(cc == MPI_SUCCESS);
795 
796  long cnt5;
797  cc = kmr_get_element_count(kvs5, &cnt5);
798  assert(cc == MPI_SUCCESS);
799  assert(cnt5 == 12);
800  printf("[%05d] [%s]\n", rank, buf);
801  fflush(0);
802 
803  kmr_free_kvs(kvs5);
804  usleep(50 * 1000);
805  MPI_Barrier(MPI_COMM_WORLD);
806  }
807 
808  /* Clean up. */
809 
810  {
811  if (rank == 0) {
812  for (int i = 0; i < nnames0; i++) {
813  cc = unlink(names0[i]);
814  assert(cc == 0);
815  }
816  char ss[(8 * 1024)];
817  snprintf(ss, sizeof(ss), "rm -rf %s", dir1);
818  system(ss);
819  cc = unlink(datx);
820  assert(cc == 0);
821  }
822  }
823 }
824 
825 static int
826 compareline(const struct kmr_kv_box kv[], const long n,
827  const KMR_KVS *kvi, KMR_KVS *kvo, void *arg)
828 {
829  assert(n == 2);
830  assert(kv[0].vlen == kv[1].vlen);
831  assert(strncmp(kv[0].v.p, kv[1].v.p, (size_t)kv[0].vlen) == 0);
832  return MPI_SUCCESS;
833 }
834 
835 static void
836 simple3(KMR *mr)
837 {
838  //int nprocs = mr->nprocs;
839  int rank = mr->rank;
840 
841  MPI_Barrier(MPI_COMM_WORLD);
842  usleep(50 * 1000);
843  if (rank == 0) {printf("CHECK MAP GETLINE...\n");}
844  fflush(0);
845 
846  int cc;
847 
848  /* kmr_map_getline(), once with threads, and once without. */
849 
850  for (int i = 0; i < 2; i++) {
851  struct kmr_option opt = {.nothreading = ((i == 0) ? 0 : 1)};
852 
853  MPI_Barrier(MPI_COMM_WORLD);
854  usleep(50 * 1000);
855  if (rank == 0) {
856  printf("kmr_map_getline (%s threads).\n",
857  (opt.nothreading ? "without" : "with"));
858  }
859  fflush(0);
860  usleep(50 * 1000);
861  MPI_Barrier(MPI_COMM_WORLD);
862 
863  FILE *f = fopen("../LICENSE", "r");
864  if (f == 0) {
865  perror("fopen(../LICENSE)");
866  MPI_Abort(MPI_COMM_WORLD, 1);
867  }
868 
869  KMR_KVS *kvs1 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
870  cc = kmr_map_getline(mr, f, 0, 1, kvs1, 0, opt, kmr_add_identity_fn);
871  assert(cc == MPI_SUCCESS);
872  assert(kvs1->c.element_count == 25);
873 
874  rewind(f);
875 
876  KMR_KVS *kvs0 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
877  char *line = 0;
878  size_t linesz = 0;
879  ssize_t rc = 0;
880  long lineno = 0;
881  for (;;) {
882  rc = getline(&line, &linesz, f);
883  if (rc == -1) {
884  break;
885  }
886  assert(rc <= INT_MAX);
887  struct kmr_kv_box kv = {
888  .klen = (int)sizeof(long),
889  .vlen = (int)rc,
890  .k.i = lineno,
891  .v.p = (char *)line
892  };
893  cc = kmr_add_kv(kvs0, kv);
894  assert(cc == MPI_SUCCESS);
895  lineno++;
896  }
897  if (ferror(f)) {
898  perror("getline()");
899  MPI_Abort(MPI_COMM_WORLD, 1);
900  }
901  free(line);
902  line = 0;
903  kmr_add_kv_done(kvs0);
904  assert(kvs0->c.element_count == 25);
905 
906  cc = fclose(f);
907  assert(cc == 0);
908  f = 0;
909 
910  KMR_KVS *vec[] = {kvs0, kvs1};
911  KMR_KVS *kvs2 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
912  kmr_concatenate_kvs(vec, 2, kvs2, kmr_noopt);
913 
914  cc = kmr_reduce(kvs2, 0, 0, kmr_noopt, compareline);
915  assert(cc == MPI_SUCCESS);
916 
917  usleep(50 * 1000);
918  MPI_Barrier(MPI_COMM_WORLD);
919  }
920 }
921 
922 int
923 main(int argc, char *argv[])
924 {
925  int cc;
926 
927  int nprocs, rank, thlv;
928  MPI_Init_thread(&argc, &argv, MPI_THREAD_SERIALIZED, &thlv);
929  /*MPI_Init(&argc, &argv);*/
930  MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
931  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
932  kmr_init();
933 
934  if (rank == 0) {
935  if (nprocs < 4) {
936  printf("Run this with nprocs>=4.\n");
937  MPI_Abort(MPI_COMM_WORLD, 1);
938  return 0;
939  }
940  }
941 
942  static char props[] = "trace_file_io=1\n";
943  if (1) {
944  if (rank == 0) {
945  system("rm -f option");
946  FILE *f = fopen("option", "w");
947  assert(f != 0);
948  size_t cx = fwrite(props, (sizeof(props) - 1), 1, f);
949  assert(cx == 1);
950  cc = fclose(f);
951  assert(cc == 0);
952  }
953  cc = setenv("KMROPTION", "option", 0);
954  assert(cc == 0);
955  }
956 
957  KMR *mr = kmr_create_context(MPI_COMM_WORLD, MPI_INFO_NULL, 0);
958  assert(mr != 0);
959  assert(mr->trace_file_io == 1);
960 
961  const size_t SEGSZ0 = (3 * (1024 * 1024) + 512 + 0);
962  const size_t SEGSZ1 = (3 * (1024 * 1024) + 512 + 1);
963  simple0(mr, SEGSZ0, 0);
964  simple0(mr, SEGSZ1, 1);
965  const size_t SEGSZ2 = (1024 * 1024);
966  const size_t EXTRA0 = (512 + 0);
967  const size_t EXTRA1 = (512 + 1);
968  simple1(mr, SEGSZ2, EXTRA0, 0);
969  simple1(mr, SEGSZ2, EXTRA1, 1);
970  simple2(mr);
971  simple3(mr);
972 
973  MPI_Barrier(MPI_COMM_WORLD);
974  if (rank == 0) {printf("OK\n");}
975  fflush(0);
976 
977  cc = kmr_free_context(mr);
978  assert(cc == MPI_SUCCESS);
979 
980  if (1) {
981  if (rank == 0) {
982  system("rm -f option");
983  }
984  }
985 
986  kmr_fin();
987  MPI_Finalize();
988  return 0;
989 }
Key-Value Stream (abstract).
Definition: kmr.h:587
Utilities Private Part (do not include from applications).
int kmr_map_file_names(KMR *mr, char **names, int n, struct kmr_file_option fopt, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Maps on file names.
Definition: kmrfiles.c:1372
#define kmr_reduce(KVI, KVO, ARG, OPT, R)
Reduces key-value pairs.
Definition: kmr.h:88
Options to Mapping, Shuffling, and Reduction.
Definition: kmr.h:613
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
Definition: kmrbase.c:751
#define kmr_create_kvs(MR, KF, VF)
Makes a new key-value stream (of type KMR_KVS) with the specified field datatypes.
Definition: kmr.h:71
int kmr_add_kv_done(KMR_KVS *kvs)
Marks finished adding key-value pairs.
Definition: kmrbase.c:881
KMR Context.
Definition: kmr.h:222
int kmr_concatenate_kvs(KMR_KVS *kvs[], int nkvs, KMR_KVS *kvo, struct kmr_option opt)
Concatenates a number of KVSes to one.
Definition: kmrbase.c:2696
int kmr_free_kvs(KMR_KVS *kvs)
Releases a key-value stream (type KMR_KVS).
Definition: kmrbase.c:621
int kmr_map_getline(KMR *mr, FILE *f, long limit, _Bool largebuffering, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Calls a map-function M for each line by getline() on an input F.
Definition: kmrfiles.c:1561
int kmr_read_file_by_segments(KMR *mr, char *file, int color, void **buffer, off_t *readsize)
Reads one file by segments and reassembles by all-gather.
Definition: kmrfiles.c:1021
Handy Copy of a Key-Value Field.
Definition: kmr.h:358
int kmr_fin(void)
Clears the environment.
Definition: kmrbase.c:124
int kmr_get_element_count(KMR_KVS *kvs, long *v)
Gets the total number of key-value pairs.
Definition: kmrmoreops.c:114
#define kmr_init()
Sets up the environment.
Definition: kmr.h:747
int kmr_read_files_reassemble(KMR *mr, char *file, int color, off_t offset, off_t bytes, void **buffer, off_t *readsize)
Reassembles files reading by ranks.
Definition: kmrfiles.c:653
int kmr_free_context(KMR *mr)
Releases a context created with kmr_create_context().
Definition: kmrbase.c:326
KMR Interface.
int kmr_add_identity_fn(const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long i)
Adds a given key-value pair unmodified.
Definition: kmrbase.c:937
Options to Mapping on Files.
Definition: kmr.h:638
KMR * kmr_create_context(const MPI_Comm comm, const MPI_Info conf, const char *name)
Makes a new KMR context (a context has type KMR).
Definition: kmrbase.c:147