16 const _Bool runall = 1;
21 static int filval[] = {907, 911, 919, 929, 937, 941, 947, 953,
22 967, 971, 977, 983, 991, 997};
23 int N =
sizeof(filval) /
sizeof(
int);
24 return (filval[(i % N)] + (503 + 1) * (i / N));
30 INTAT(
void *v,
int index)
33 union {
int i;
unsigned char c[
sizeof(int)];} u;
34 for (
int j = 0; j < (int)
sizeof(
int); j++) {
35 u.c[j] = p[((int)
sizeof(
int) * index) + j];
43 simple0(
KMR *mr,
size_t SEGSZ,
int step)
45 int nprocs = mr->nprocs;
48 MPI_Barrier(MPI_COMM_WORLD);
50 if (rank == 0) {printf(
"CHECK READ BY REASSEMBLE...\n");}
53 MPI_Barrier(MPI_COMM_WORLD);
61 snprintf(file,
sizeof(file),
"dat.%d.%d", step, rank);
66 char *b = malloc(SEGSZ);
69 MPI_Abort(MPI_COMM_WORLD, 1);
73 for (
int i = 0; i < (int)(SEGSZ /
sizeof(
int)); i++) {
77 FILE *f = fopen(file,
"w");
79 size_t cx = fwrite(&b[0], SEGSZ, 1, f);
86 MPI_Barrier(MPI_COMM_WORLD);
92 if (rank == 0) {printf(
"Read+take by all (nprocs=%d)\n", nprocs);}
103 assert(cc == MPI_SUCCESS);
104 assert((
size_t)size == (SEGSZ * (
size_t)nprocs));
105 printf(
"[%05d] read size=%ld (segment-size=%ld); checking...\n",
108 char *cbuffer = buffer;
109 for (
int r = 0; r < nprocs; r++) {
110 int *v = (
int *)&cbuffer[SEGSZ * (
size_t)r];
112 for (
int i = 0; i < (int)(SEGSZ /
sizeof(
int)); i++) {
113 assert(INTAT(v, i) == x);
120 MPI_Barrier(MPI_COMM_WORLD);
126 if (rank == 0) {printf(
"Read+take by all with 2 colors" 127 " (color by even/odd ranks)\n");}
131 int color = (rank % 2);
132 int colorset = ((rank % 2) == 0
141 assert(cc == MPI_SUCCESS);
142 assert((
size_t)size == (SEGSZ * (
size_t)colorset));
143 printf(
"[%05d] read size=%ld (color=%d/%d segment-size=%ld);" 145 rank, size, color, colorset, SEGSZ);
147 char *cbuffer = buffer;
149 for (
int r = 0; r < nprocs; r++) {
150 if ((r % 2) == color) {
151 int *v = (
int *)&cbuffer[SEGSZ * (
size_t)index];
153 for (
int i = 0; i < (int)(SEGSZ /
sizeof(
int)); i++) {
154 assert(INTAT(v, i) == x);
163 MPI_Barrier(MPI_COMM_WORLD);
169 if (rank == 0) {printf(
"Read+take by all 2 colors" 170 " with offset/size (64K/16K)\n");}
174 off_t off2 = (64 * 1024);
175 off_t sz2 = (16 * 1024);
176 assert((
size_t)(off2 + sz2) < SEGSZ);
177 int color = (rank % 2);
178 int colorset = ((rank % 2) == 0
187 assert(cc == MPI_SUCCESS);
188 assert(size == (sz2 * colorset));
189 printf(
"[%05d] read size=%ld (color=%d/%d off=%ld sz=%ld);" 191 rank, size, color, colorset, off2, sz2);
193 char *cbuffer = buffer;
195 for (
int r = 0; r < nprocs; r++) {
196 if ((r % 2) == color) {
197 int *v = (
int *)&cbuffer[sz2 * index];
199 for (
int i = 0; i < (int)((
size_t)off2 /
sizeof(
int)); i++) {
202 for (
int i = 0; i < (int)((
size_t)sz2 / sizeof(
int)); i++) {
203 assert(INTAT(v, i) == x);
212 MPI_Barrier(MPI_COMM_WORLD);
218 if (rank == 0) {printf(
"Read by all but rank=2, take by all\n");}
223 int nreaders = ((SKIPRANK < nprocs) ? (nprocs - 1) : nprocs);
224 char *filename = ((SKIPRANK == rank) ? 0 : file);
232 assert(cc == MPI_SUCCESS);
234 assert((
size_t)size == (SEGSZ * (
size_t)nreaders));
235 printf(
"[%05d] read size=%ld (segment-size=%ld); checking...\n",
238 char *cbuffer = buffer;
241 for (
int r = 0; r < nprocs; r++) {
243 int *v = (
int *)&cbuffer[SEGSZ * (
size_t)index];
245 for (
int i = 0; i < (int)(SEGSZ /
sizeof(
int)); i++) {
246 assert(INTAT(v, i) == x);
256 MPI_Barrier(MPI_COMM_WORLD);
263 printf(
"Read by all but rank=2, take by all but rank=1\n");
270 int nreaders = ((skiprank < nprocs) ? (nprocs - 1) : nprocs);
271 char *filename = ((skiprank == rank) ? 0 : file);
277 void **buf = ((notakerank == rank) ? 0 : &buffer);
280 assert(cc == MPI_SUCCESS);
281 assert((notakerank == rank)
282 || (
size_t)size == (SEGSZ * (
size_t)nreaders));
283 printf(
"[%05d] read size=%ld (segment-size=%ld); checking...\n",
286 char *cbuffer = buffer;
287 if (notakerank != rank) {
289 for (
int r = 0; r < nprocs; r++) {
291 int *v = (
int *)&cbuffer[SEGSZ * (
size_t)index];
293 for (
int i = 0; i < (int)(SEGSZ /
sizeof(
int)); i++) {
294 assert(INTAT(v, i) == x);
301 if (notakerank != rank) {
306 MPI_Barrier(MPI_COMM_WORLD);
320 simple1(
KMR *mr,
size_t SEGSZ,
size_t EXTRA,
int step)
325 int nprocs = mr->nprocs;
328 MPI_Barrier(MPI_COMM_WORLD);
330 if (rank == 0) {printf(
"CHECK READ BY SEGMENTS...\n");}
333 MPI_Barrier(MPI_COMM_WORLD);
340 cc = snprintf(file0,
sizeof(file0),
"dat.%d.0", step);
341 assert((
size_t)cc <
sizeof(file0));
342 cc = snprintf(file1,
sizeof(file1),
"dat.%d.1", step);
343 assert((
size_t)cc <
sizeof(file1));
345 size_t totsz = ((5 * SEGSZ * (size_t)nprocs) / 2 + EXTRA);
351 char *b = malloc(totsz);
354 MPI_Abort(MPI_COMM_WORLD, 1);
356 int nn = (int)(totsz /
sizeof(
int));
360 for (
int i = 0; i < nn; i++) {
364 FILE *f = fopen(file0,
"w");
366 size_t cx = fwrite(&b[0], totsz, 1, f);
373 for (
int i = 0; i < nn; i++) {
377 FILE *f = fopen(file1,
"w");
379 size_t cx = fwrite(&b[0], totsz, 1, f);
387 MPI_Barrier(MPI_COMM_WORLD);
393 MPI_Barrier(MPI_COMM_WORLD);
395 if (rank == 0) {printf(
"Read+take by all (nprocs=%d)\n", nprocs);}
398 MPI_Barrier(MPI_COMM_WORLD);
407 assert(cc == MPI_SUCCESS);
408 printf(
"[%05d] read size=%ld (segsize=%ld);" 413 assert((
size_t)size == totsz);
414 int nn = (int)(totsz /
sizeof(
int));
415 int *bx = (
int *)buffer;
417 for (
int i = 0; i < nn; i++) {
424 MPI_Barrier(MPI_COMM_WORLD);
430 MPI_Barrier(MPI_COMM_WORLD);
432 if (rank == 0) {printf(
"Read+take by all for 2 files" 433 " (color by even/odd ranks)\n");}
436 MPI_Barrier(MPI_COMM_WORLD);
438 char *file = (((rank % 2) == 0) ? file0 : file1);
439 int color = ((rank % 2) == 0);
446 assert(cc == MPI_SUCCESS);
447 printf(
"[%05d] read size=%ld (segsize=%ld);" 452 assert((
size_t)size == totsz);
453 int nn = (int)(totsz /
sizeof(
int));
454 int *bx = (
int *)buffer;
455 int x = (((rank % 2) == 0) ? FILVAL(0) : FILVAL(1));
456 for (
int i = 0; i < nn; i++) {
463 MPI_Barrier(MPI_COMM_WORLD);
469 MPI_Barrier(MPI_COMM_WORLD);
471 if (rank == 0) {printf(
"Read+take by all but rank=2\n");}
474 MPI_Barrier(MPI_COMM_WORLD);
477 char *file = ((skiprank == rank) ? 0 : file0);
478 int color = ((skiprank == rank) ? -1 : 0);
483 void **buf = ((skiprank == rank) ? 0 : &buffer);
486 assert(cc == MPI_SUCCESS);
487 printf(
"[%05d] read size=%ld (segsize=%ld);" 492 if (skiprank != rank) {
493 assert((
size_t)size == totsz);
494 int nn = (int)(totsz /
sizeof(
int));
495 int *bx = (
int *)buffer;
497 for (
int i = 0; i < nn; i++) {
505 MPI_Barrier(MPI_COMM_WORLD);
511 MPI_Barrier(MPI_COMM_WORLD);
513 if (rank == 0) {printf(
"Read by rank=2, take by all but rank=2\n");}
516 MPI_Barrier(MPI_COMM_WORLD);
519 char *file = ((readrank == rank) ? file0 : 0);
525 void **buf = ((readrank == rank) ? 0 : &buffer);
528 assert(cc == MPI_SUCCESS);
529 printf(
"[%05d] read size=%ld (segsize=%ld);" 534 if (readrank != rank) {
535 assert((
size_t)size == totsz);
536 int nn = (int)(totsz /
sizeof(
int));
537 int *bx = (
int *)buffer;
539 for (
int i = 0; i < nn; i++) {
547 MPI_Barrier(MPI_COMM_WORLD);
573 assert(cc == MPI_SUCCESS);
580 int nprocs = mr->nprocs;
583 MPI_Barrier(MPI_COMM_WORLD);
585 if (rank == 0) {printf(
"CHECK MAP FILE NAMES...\n");}
588 MPI_Barrier(MPI_COMM_WORLD);
592 char *datx =
"dat.x";
593 char *names0[] = {
"dat.0",
"dat.1",
"dat.2",
"dat.3"};
594 int nnames0 = (
sizeof(names0) /
sizeof(
char *));
596 char *dir1 =
"dir.0";
597 char *names1[] = {
"dat.0.0",
"dat.0.1",
"dat.0.2",
"dat.0.3"};
598 int nnames1 = (
sizeof(names1) /
sizeof(
char *));
600 char *dir2 =
"dir.0/dir.0.0";
601 char *names2[] = {
"dat.0.0.0",
"dat.0.0.1",
"dat.0.0.2",
"dat.0.0.3"};
602 int nnames2 = (
sizeof(names2) /
sizeof(
char *));
605 printf(
"Making test files.\n");
609 for (
int i = 0; i < nnames0; i++) {
610 snprintf(ss,
sizeof(ss),
"touch %s", names0[i]);
614 snprintf(ss,
sizeof(ss),
"mkdir %s", dir1);
617 for (
int i = 0; i < nnames1; i++) {
618 snprintf(ss,
sizeof(ss),
"touch %s/%s", dir1, names1[i]);
622 snprintf(ss,
sizeof(ss),
"mkdir %s", dir2);
625 for (
int i = 0; i < nnames2; i++) {
626 snprintf(ss,
sizeof(ss),
"touch %s/%s", dir2, names2[i]);
630 snprintf(ss,
sizeof(ss),
"echo '' > %s", datx);
632 for (
int i = 0; i < nnames0; i++) {
633 snprintf(ss,
sizeof(ss),
"echo %s >> %s", names0[i], datx);
636 snprintf(ss,
sizeof(ss),
"echo %s >> %s", dir1, datx);
640 MPI_Barrier(MPI_COMM_WORLD);
642 struct kmr_option nothreading = {.nothreading = 1};
646 MPI_Barrier(MPI_COMM_WORLD);
648 if (rank == 0) {printf(
"kmr_map_file_names (no-option).\n");}
651 MPI_Barrier(MPI_COMM_WORLD);
657 kvs0, buf, nothreading, list_keys);
658 assert(cc == MPI_SUCCESS);
662 assert(cc == MPI_SUCCESS);
669 MPI_Barrier(MPI_COMM_WORLD);
673 MPI_Barrier(MPI_COMM_WORLD);
675 if (rank == 0) {printf(
"kmr_map_file_names (each_rank).\n");}
678 MPI_Barrier(MPI_COMM_WORLD);
684 kvs1, buf, nothreading, list_keys);
685 assert(cc == MPI_SUCCESS);
689 assert(cc == MPI_SUCCESS);
690 assert(cnt1 == (4 * nprocs));
696 MPI_Barrier(MPI_COMM_WORLD);
700 MPI_Barrier(MPI_COMM_WORLD);
702 if (rank == 0) {printf(
"kmr_map_file_names (list_file).\n");}
705 MPI_Barrier(MPI_COMM_WORLD);
712 kvs2, buf, nothreading, list_keys);
713 assert(cc == MPI_SUCCESS);
717 assert(cc == MPI_SUCCESS);
724 MPI_Barrier(MPI_COMM_WORLD);
728 MPI_Barrier(MPI_COMM_WORLD);
730 if (rank == 0) {printf(
"kmr_map_file_names (subdirs).\n");}
733 MPI_Barrier(MPI_COMM_WORLD);
740 kvs3, buf, nothreading, list_keys);
741 assert(cc == MPI_SUCCESS);
745 assert(cc == MPI_SUCCESS);
752 MPI_Barrier(MPI_COMM_WORLD);
756 MPI_Barrier(MPI_COMM_WORLD);
758 if (rank == 0) {printf(
"kmr_map_file_names (enumerate files).\n");}
761 MPI_Barrier(MPI_COMM_WORLD);
767 kvs4, 0, nothreading, 0);
768 assert(cc == MPI_SUCCESS);
772 assert(cc == MPI_SUCCESS);
777 MPI_Barrier(MPI_COMM_WORLD);
781 MPI_Barrier(MPI_COMM_WORLD);
783 if (rank == 0) {printf(
"kmr_map_file_names (shuffle_names).\n");}
786 MPI_Barrier(MPI_COMM_WORLD);
793 kvs5, buf, nothreading, list_keys);
794 assert(cc == MPI_SUCCESS);
798 assert(cc == MPI_SUCCESS);
800 printf(
"[%05d] [%s]\n", rank, buf);
805 MPI_Barrier(MPI_COMM_WORLD);
812 for (
int i = 0; i < nnames0; i++) {
813 cc = unlink(names0[i]);
817 snprintf(ss,
sizeof(ss),
"rm -rf %s", dir1);
826 compareline(
const struct kmr_kv_box kv[],
const long n,
830 assert(kv[0].vlen == kv[1].vlen);
831 assert(strncmp(kv[0].v.p, kv[1].v.p, (
size_t)kv[0].vlen) == 0);
841 MPI_Barrier(MPI_COMM_WORLD);
843 if (rank == 0) {printf(
"CHECK MAP GETLINE...\n");}
850 for (
int i = 0; i < 2; i++) {
851 struct kmr_option opt = {.nothreading = ((i == 0) ? 0 : 1)};
853 MPI_Barrier(MPI_COMM_WORLD);
856 printf(
"kmr_map_getline (%s threads).\n",
857 (opt.nothreading ?
"without" :
"with"));
861 MPI_Barrier(MPI_COMM_WORLD);
863 FILE *f = fopen(
"../LICENSE",
"r");
865 perror(
"fopen(../LICENSE)");
866 MPI_Abort(MPI_COMM_WORLD, 1);
871 assert(cc == MPI_SUCCESS);
872 assert(kvs1->c.element_count == 25);
882 rc = getline(&line, &linesz, f);
886 assert(rc <= INT_MAX);
888 .klen = (int)
sizeof(
long),
894 assert(cc == MPI_SUCCESS);
899 MPI_Abort(MPI_COMM_WORLD, 1);
904 assert(kvs0->c.element_count == 25);
914 cc =
kmr_reduce(kvs2, 0, 0, kmr_noopt, compareline);
915 assert(cc == MPI_SUCCESS);
918 MPI_Barrier(MPI_COMM_WORLD);
923 main(
int argc,
char *argv[])
927 int nprocs, rank, thlv;
928 MPI_Init_thread(&argc, &argv, MPI_THREAD_SERIALIZED, &thlv);
930 MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
931 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
936 printf(
"Run this with nprocs>=4.\n");
937 MPI_Abort(MPI_COMM_WORLD, 1);
942 static char props[] =
"trace_file_io=1\n";
945 system(
"rm -f option");
946 FILE *f = fopen(
"option",
"w");
948 size_t cx = fwrite(props, (
sizeof(props) - 1), 1, f);
953 cc = setenv(
"KMROPTION",
"option", 0);
959 assert(mr->trace_file_io == 1);
961 const size_t SEGSZ0 = (3 * (1024 * 1024) + 512 + 0);
962 const size_t SEGSZ1 = (3 * (1024 * 1024) + 512 + 1);
963 simple0(mr, SEGSZ0, 0);
964 simple0(mr, SEGSZ1, 1);
965 const size_t SEGSZ2 = (1024 * 1024);
966 const size_t EXTRA0 = (512 + 0);
967 const size_t EXTRA1 = (512 + 1);
968 simple1(mr, SEGSZ2, EXTRA0, 0);
969 simple1(mr, SEGSZ2, EXTRA1, 1);
973 MPI_Barrier(MPI_COMM_WORLD);
974 if (rank == 0) {printf(
"OK\n");}
978 assert(cc == MPI_SUCCESS);
982 system(
"rm -f option");
Key-Value Stream (abstract).
Utilities Private Part (do not include from applications).
int kmr_map_file_names(KMR *mr, char **names, int n, struct kmr_file_option fopt, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Maps on file names.
#define kmr_reduce(KVI, KVO, ARG, OPT, R)
Reduces key-value pairs.
Options to Mapping, Shuffling, and Reduction.
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
#define kmr_create_kvs(MR, KF, VF)
Makes a new key-value stream (of type KMR_KVS) with the specified field datatypes.
int kmr_add_kv_done(KMR_KVS *kvs)
Marks finished adding key-value pairs.
int kmr_concatenate_kvs(KMR_KVS *kvs[], int nkvs, KMR_KVS *kvo, struct kmr_option opt)
Concatenates a number of KVSes to one.
int kmr_free_kvs(KMR_KVS *kvs)
Releases a key-value stream (type KMR_KVS).
int kmr_map_getline(KMR *mr, FILE *f, long limit, _Bool largebuffering, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Calls a map-function M for each line by getline() on an input F.
int kmr_read_file_by_segments(KMR *mr, char *file, int color, void **buffer, off_t *readsize)
Reads one file by segments and reassembles by all-gather.
Handy Copy of a Key-Value Field.
int kmr_fin(void)
Clears the environment.
int kmr_get_element_count(KMR_KVS *kvs, long *v)
Gets the total number of key-value pairs.
#define kmr_init()
Sets up the environment.
int kmr_read_files_reassemble(KMR *mr, char *file, int color, off_t offset, off_t bytes, void **buffer, off_t *readsize)
Reassembles files reading by ranks.
int kmr_free_context(KMR *mr)
Releases a context created with kmr_create_context().
int kmr_add_identity_fn(const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long i)
Adds a given key-value pair unmodified.
Options to Mapping on Files.
KMR * kmr_create_context(const MPI_Comm comm, const MPI_Info conf, const char *name)
Makes a new KMR context (a context has type KMR).