17 #if defined(__linux__) 32 #include <sys/types.h> 33 #include <sys/param.h> 39 #define MIN(a,b) (((a)<(b))?(a):(b)) 40 #define MAX(a,b) (((a)>(b))?(a):(b)) 41 #define ABS(X) ((X) < 0 ? -(X) : (X)) 42 #define CEILING(N,D) (((N)+(D)-1)/(D)) 48 #define CHUNK_LIMIT (16 * 1024 *1024) 51 kmr_bad_stripe = {.size=0, .count=0, .offset=0};
87 assert(p[0] < 0x7fff && p[1] < 0x7fff);
88 return (p[0] << 16 | p[1]);
94 kmr_iogroup_distance(
int a0,
int a1)
96 int x0 = ((a0 >> 16) & 0xffff);
97 int y0 = (a0 & 0xffff);
98 int x1 = ((a1 >> 16) & 0xffff);
99 int y1 = (a1 & 0xffff);
100 return (ABS(x0 - y0) + ABS(x1 - y1));
111 assert(cc == MPI_SUCCESS);
112 return kmr_iogroup(p);
122 int x = (obdidx / 2048);
123 int y = ((obdidx % 2048) / 64);
125 return kmr_iogroup(p);
129 kmr_assert_file_readers_are_sorted(
struct kmr_file_reader *sgv,
long n)
131 for (
long i = 1; i < n; i++) {
132 assert(sgv[i - 1].rank <= sgv[i].rank);
137 kmr_file_reader_compare(
const void *p0,
const void *p1)
141 return (q0->rank - q1->rank);
145 kmr_copyout_file_readers(
const struct kmr_kv_box kv[],
const long n,
151 long segment = kv[0].k.i;
152 for (
long i = 0; i < n; i++) {
157 kmr_file_reader_compare);
158 kmr_assert_file_readers_are_sorted(sgv, n);
160 assert(vlen <= INT_MAX);
162 .klen = (int)
sizeof(
long),
168 assert(cc == MPI_SUCCESS);
184 kmr_share_segment_information(
KMR *mr,
char *file,
int color,
185 _Bool ingesting, _Bool digesting,
186 off_t offset, off_t bytes,
188 int *colorsetsizeq,
int *colorindexq)
190 assert(sgvq != 0 && colorsetsizeq != 0);
191 assert(
sizeof(ino_t) <=
sizeof(
long));
193 int nprocs = mr->nprocs;
195 _Bool active = (color != -1);
196 assert(active == (ingesting || digesting));
207 }
while (cc == -1 && errno == EINTR);
210 if ((s.st_mode & S_IFMT) != S_IFREG) {
212 snprintf(ee,
sizeof(ee),
213 "File (%s) is not a regular file", file);
218 }
else if (errno == ENOENT) {
222 snprintf(ee,
sizeof(ee),
"File (%s) does not exist", file);
226 }
else if (errno == EACCES
229 || errno == ENOTDIR) {
233 char *m = strerror(errno);
234 snprintf(ee,
sizeof(ee),
"File (%s) inaccessible: %s", file, m);
242 char *m = strerror(errno);
243 snprintf(ee,
sizeof(ee),
"stat(%s): %s", file, m);
248 reads = ((bytes == -1) ? (fsz - offset) : bytes);
256 int colorsetsize = 0;
269 .ingesting = ingesting,
270 .digesting = digesting,
274 .klen = (int)
sizeof(
long),
280 assert(cc == MPI_SUCCESS);
285 assert(cc == MPI_SUCCESS);
287 cc =
kmr_reduce(kvs1, kvs2, 0, kmr_noopt, kmr_copyout_file_readers);
288 assert(cc == MPI_SUCCESS);
291 assert(cc == MPI_SUCCESS);
294 .klen = (int)
sizeof(
long),
301 assert(cc == MPI_SUCCESS);
305 * (size_t)colorsetsize);
307 memcpy(sgv, kv.v.p, sz);
313 assert(cc == MPI_SUCCESS);
314 kmr_assert_file_readers_are_sorted(sgv, colorsetsize);
321 for (
int k = 0; k < colorsetsize; k++) {
322 if (sgv[k].ingesting) {
323 r0 = MIN(r0, sgv[k].rank);
326 assert(colorsetsize == 0 || r0 < nprocs);
327 for (
int k = 0; k < colorsetsize; k++) {
328 if (sgv[k].rank == r0) {
337 for (
int k = 0; k < colorsetsize; k++) {
338 if (sgv[k].rank == rank) {
343 assert(active == (colorindex != -1));
346 *colorsetsizeq = colorsetsize;
347 *colorindexq = colorindex;
356 kmr_take_maximum_loop_count(
KMR *mr, off_t reads,
362 const off_t singlestripe = (stripe->s.size * stripe->s.count);
364 if (singlestripe != 0) {
365 long nloops = CEILING(reads, singlestripe);
367 .klen = (int)
sizeof(
long),
368 .vlen = (int)
sizeof(
long),
373 assert(cc == MPI_SUCCESS);
378 assert(cc == MPI_SUCCESS);
381 assert(cc == MPI_SUCCESS);
382 assert(maxloops > 0);
383 *maxloopsq = maxloops;
398 kmr_read_and_gather(
KMR *mr, _Bool reassembling,
399 char *file,
int fd, off_t baseoffset,
400 char *buffer, off_t totalsize,
char *tmpbuf,
406 int nprocs = mr->nprocs;
408 const off_t singlestripe = (stripe->s.size * stripe->s.count);
409 _Bool tracing5 = (mr->trace_file_io && (5 <= mr->verbosity));
410 _Bool tracing7 = (mr->trace_file_io && (7 <= mr->verbosity));
412 long *scnts =
kmr_malloc(
sizeof(
long) * (
size_t)nprocs);
413 long *sdsps =
kmr_malloc(
sizeof(
long) * (
size_t)nprocs);
414 long *rcnts =
kmr_malloc(
sizeof(
long) * (
size_t)nprocs);
415 long *rdsps =
kmr_malloc(
sizeof(
long) * (
size_t)nprocs);
417 int *scnts =
kmr_malloc(
sizeof(
int) * (
size_t)nprocs);
418 int *sdsps =
kmr_malloc(
sizeof(
int) * (
size_t)nprocs);
419 int *rcnts =
kmr_malloc(
sizeof(
int) * (
size_t)nprocs);
420 int *rdsps =
kmr_malloc(
sizeof(
int) * (
size_t)nprocs);
422 assert(reassembling || (baseoffset == 0));
423 _Bool head = ((colorindex != -1) ? sgv[colorindex].head : 0);
426 for (
int k = 0; k < colorsetsize; k++) {
427 if (sgv[k].rank == rank) {
432 _Bool ingesting = ((sgindex != -1) && sgv[sgindex].stripe != -1);
433 _Bool digesting = ((sgindex != -1) && sgv[sgindex].digesting);
435 if (tracing5 && rank == 0) {
436 fprintf(stderr,
";;KMR [%05d] file-read: maxloops=%zd\n",
440 if (tracing5 && !reassembling && head) {
442 for (
int k = 0; k < colorsetsize; k++) {
444 if (sgv[k].stripe != -1) {
448 for (
int k = 0; k < colorsetsize; k++) {
450 if (sgv[k].stripe != -1) {
452 ";;KMR [%05d] file-read:" 453 " stripe unit index=%d/%d (color=%d nprocs=%d)\n",
454 r, sgv[k].stripe, nreaders,
455 sgv[k].color, colorsetsize);
463 for (
int k = 0; k < colorsetsize; k++) {
464 if (sgv[k].rank == rank) {
467 if (sgv[k].digesting) {
471 assert(colorsetsize == 0 || segi >= 0);
475 for (
long i = 0; i < maxloops; i++) {
479 off_t start = (i * singlestripe);
483 assert(sgindex != -1);
484 pos0 = start + ((long)stripe->s.size * sgv[sgindex].stripe);
485 cnt0 = MAX(MIN((sgv[segi].reads - pos0), stripe->s.size), 0);
494 ";;KMR [%05d] file-read: offset=%zd size=%zd" 496 rank, (baseoffset + pos0), cnt0, file);
498 fprintf(stderr,
";;KMR [%05d] file-read: (noread)\n", rank);
504 for (off_t rx = 0; rx < cnt0;) {
508 cx1 = pread(fd, (tmpbuf + rx), (
size_t)chunk,
509 (baseoffset + pos0 + rx));
510 }
while (cx1 == -1 && errno == EINTR);
513 char *m = strerror(errno);
514 snprintf(ee,
sizeof(ee),
"read(%s): %s", file, m);
524 for (
int r = 0; r < nprocs; r++) {
531 for (
int k = 0; k < colorsetsize; k++) {
533 long sendsz = (sgv[k].digesting ? cnt0 : 0);
534 scnts[r] = (int)sendsz;
536 assert(sendsz <= stripe->s.size);
537 assert(sendsz <= INT_MAX);
539 for (
int k = 0; k < colorsetsize; k++) {
543 if (digesting && sgv[k].stripe != -1) {
544 off_t posx = start + ((int)stripe->s.size * sgv[k].stripe);
545 cntx = MAX(MIN((sgv[k].reads - posx), stripe->s.size), 0);
546 offx = ((cntx != 0) ? (start + sgv[k].offset) : 0);
552 rcnts[r] = (int)cntx;
553 rdsps[r] = (int)(offx - start);
554 assert((offx + cntx) <= totalsize);
555 assert(cntx <= INT_MAX && (offx - start) <= INT_MAX);
559 for (
int r = 0; r < nprocs; r++) {
560 if (scnts[r] != 0 || rcnts[r] != 0) {
562 (
";;KMR [%05d] file-read: data exchange:" 563 " *<->[%05d] send=%d (disp=%d);" 564 " recv=%d disp=%zd\n"),
565 rank, r, scnts[r], sdsps[r],
566 rcnts[r], (start + rdsps[r]));
572 char *bufptr = (buffer + start);
573 if (ndigestings == nprocs && !mr->file_io_always_alltoallv) {
576 buffer, rcnts, rdsps);
577 assert(cc == MPI_SUCCESS);
579 cc = MPI_Allgatherv(tmpbuf, (
int)cnt0, MPI_BYTE,
580 bufptr, rcnts, rdsps, MPI_BYTE,
582 assert(cc == MPI_SUCCESS);
586 buffer, rcnts, rdsps);
587 assert(cc == MPI_SUCCESS);
589 cc = MPI_Alltoallv(tmpbuf, scnts, sdsps, MPI_BYTE,
590 bufptr, rcnts, rdsps, MPI_BYTE,
592 assert(cc == MPI_SUCCESS);
597 kmr_free(scnts, (
sizeof(
long) * (
size_t)nprocs));
598 kmr_free(sdsps, (
sizeof(
long) * (
size_t)nprocs));
599 kmr_free(rcnts, (
sizeof(
long) * (
size_t)nprocs));
600 kmr_free(rdsps, (
sizeof(
long) * (
size_t)nprocs));
602 kmr_free(scnts, (
sizeof(
int) * (
size_t)nprocs));
603 kmr_free(sdsps, (
sizeof(
int) * (
size_t)nprocs));
604 kmr_free(rcnts, (
sizeof(
int) * (
size_t)nprocs));
605 kmr_free(rdsps, (
sizeof(
int) * (
size_t)nprocs));
615 kmr_assign_ranks_trivially(
KMR *mr,
char *file,
621 for (
int k = 0; k < colorsetsize; k++) {
622 if (sgv[k].ingesting) {
627 for (
int k = 0; k < colorsetsize; k++) {
628 if (sgv[k].ingesting) {
654 off_t offset, off_t bytes,
655 void **buffer, off_t *size)
657 assert((color != -1) == ((file != 0) || (buffer != 0)));
662 _Bool ingesting = (file != 0);
663 _Bool digesting = (buffer != 0);
664 _Bool active = (color != -1);
671 cc = kmr_share_segment_information(mr, file, color, ingesting, digesting,
673 &sgv, &colorsetsize, &colorindex);
674 off_t reads = ((colorindex != -1) ? sgv[colorindex].reads : 0);
677 assert(cc == MPI_SUCCESS);
678 assert(!active || (sgv != 0 && colorsetsize != 0));
679 assert((sgv == 0) == (colorsetsize == 0));
681 if (ingesting && (reads < bytes)) {
683 snprintf(ee,
sizeof(ee),
684 "File (%s) too small to read offset=%zd bytes=%zd",
685 file, offset, bytes);
689 for (
int k = 0; k < colorsetsize; k++) {
690 if (sgv[k].ingesting) {
691 totalsize += sgv[k].reads;
698 .size = (uint32_t)mr->file_io_block_size,
703 cc = kmr_assign_ranks_trivially(mr, file, sgv, colorsetsize, 0,
705 assert(cc == MPI_SUCCESS);
706 assert(ingesting == (sgv[colorindex].stripe != -1));
711 cc = kmr_take_maximum_loop_count(mr, reads, &stripe, &maxloops);
712 assert(cc == MPI_SUCCESS && maxloops > 0);
716 char *tmpbuf =
kmr_malloc((
size_t)stripe.s.size);
720 fd = open(file, O_RDONLY, 0);
721 }
while (fd == -1 && errno == EINTR);
724 char *m = strerror(errno);
725 snprintf(ee,
sizeof(ee),
"open(%s): %s", file, m);
730 cc = kmr_read_and_gather(mr, 1, file, fd, offset,
731 b, totalsize, tmpbuf, &stripe, maxloops,
732 sgv, colorsetsize, colorindex);
733 assert(cc == MPI_SUCCESS);
738 }
while (cc == -1 && errno == EINTR);
741 char *m = strerror(errno);
742 snprintf(ee,
sizeof(ee),
"close(%s): %s", file, m);
746 kmr_free(tmpbuf, (
size_t)stripe.s.size);
749 kmr_free(sgv, (
sizeof(
struct kmr_file_reader) * (
size_t)colorsetsize));
754 kmr_free(b, (
size_t)totalsize);
767 kmr_get_stripe(
KMR *mr,
char *file,
int colorsetsize, _Bool leader,
772 size_t len = strlen(file);
773 assert(len < PATH_MAX);
774 memcpy(path, file, (len + 1));
777 for (
char *p = &path[len - 1]; p >= path; p--) {
790 _Bool dumpi = (mr->trace_file_io && (7 <= mr->verbosity));
794 assert(0 <= cc && cc <=7);
801 stripe->s = kmr_bad_stripe;
805 stripe->s = kmr_bad_stripe;
808 char *m = strerror(errori);
809 snprintf(ee,
sizeof(ee),
"malloc(ostdata): %s", m);
815 stripe->s = kmr_bad_stripe;
818 char *m = strerror(errori);
819 snprintf(ee,
sizeof(ee),
"open(dir=%s): %s", d, m);
832 stripe->s = kmr_bad_stripe;
835 if (stripe->s.count == 0 && mr->file_io_dummy_striping) {
838 snprintf(ee,
sizeof(ee),
839 (
"FILE (%s) ASSIGNED WITH DUMMY STRIPE," 840 " missing striping information"), file);
841 kmr_warning(mr, 5, ee);
843 stripe->s.size = (uint32_t)mr->file_io_block_size;
844 stripe->s.count = (uint16_t)MIN(colorsetsize, 20000);
845 stripe->s.offset = 0;
846 assert((stripe->s.count * 64) < 0x10000);
847 for (
int i = 0; i < stripe->s.count; i++) {
848 stripe->obdidx[i] = (uint16_t)(i * 64);
857 kmr_share_striping_information(
KMR *mr,
char *file,
int color,
860 _Bool ingesting, _Bool digesting,
864 _Bool active = (colorsetsize != 0);
866 cc = kmr_get_stripe(mr, file, colorsetsize, leader, stripe);
867 assert(cc == MPI_SUCCESS);
870 if (ingesting && leader) {
872 + (
sizeof(uint16_t) * stripe->s.count));
874 .klen = (int)
sizeof(
long),
877 .v.p = (
void *)stripe
880 assert(cc == MPI_SUCCESS);
885 assert(cc == MPI_SUCCESS);
886 if (active && !ingesting) {
888 .klen = (int)
sizeof(
long),
895 assert(cc == MPI_SUCCESS);
896 memcpy(stripe, kv.v.p, (
size_t)kv.vlen);
899 assert(cc == MPI_SUCCESS);
908 kmr_assign_ranks_to_stripe(
KMR *mr,
char *file,
913 _Bool active = (colorsetsize != 0);
918 for (
int k = 0; k < colorsetsize; k++) {
919 if (sgv[k].ingesting) {
923 assert(nreaders > 0);
924 if ((stripe->s.count > nreaders) || (stripe->s.count == 0)) {
925 if ((stripe->s.count == 0) && leader) {
927 snprintf(ee,
sizeof(ee),
928 (
"FILE (%s) ASSIGNED TO ARBITRARY READERS," 929 " no striping information"), file);
930 kmr_warning(mr, 5, ee);
932 if ((stripe->s.count > nreaders) && leader) {
934 snprintf(ee,
sizeof(ee),
935 (
"FILE (%s) ASSIGNED TO ARBITRARY READERS," 936 " more stripes than ranks"), file);
937 kmr_warning(mr, 5, ee);
942 stripe->s.count = (uint16_t)nreaders;
944 for (
int k = 0; k < colorsetsize; k++) {
945 if (sgv[k].ingesting) {
947 sgv[k].stripe = sindex;
951 assert(sindex == stripe->s.count);
953 _Bool assigned[stripe->s.count];
955 for (
int i = 0; i < stripe->s.count; i++) {
958 for (
int i = 0; i < stripe->s.count; i++) {
960 for (
int k = 0; k < colorsetsize; k++) {
962 int g = sgv[k].iogroup;
963 if (sgv[k].ingesting && iog == g && sgv[k].stripe != -1) {
971 if ((nassigned < stripe->s.count) && leader) {
973 snprintf(ee,
sizeof(ee),
974 (
"FILE (%s) ASSIGNED TO ARBITRARY READERS," 975 " some stripes not covered by ranks"), file);
976 kmr_warning(mr, 5, ee);
981 if (nassigned < stripe->s.count) {
982 for (
int i = 0; i < stripe->s.count; i++) {
984 for (
int k = 0; k < colorsetsize; k++) {
986 if (sgv[k].ingesting && sgv[k].stripe == -1) {
996 assert(nassigned == stripe->s.count);
999 for (
int k = 0; k < colorsetsize; k++) {
1001 if (sgv[k].stripe != -1) {
1002 sgv[k].offset = ((int)stripe->s.size * sgv[k].stripe);
1022 void **buffer, off_t *size)
1024 assert((color != -1) == ((file != 0) || (buffer != 0)));
1025 assert(color >= -1);
1029 _Bool ingesting = (file != 0);
1030 _Bool digesting = (buffer != 0);
1031 _Bool active = (color != -1);
1038 const off_t offset = 0;
1039 const int bytes = -1;
1040 cc = kmr_share_segment_information(mr, file, color, ingesting, digesting,
1042 &sgv, &colorsetsize, &colorindex);
1043 assert(cc == MPI_SUCCESS);
1044 assert(active == (sgv != 0) && active == (colorsetsize != 0));
1045 _Bool head = ((colorindex != -1) ? sgv[colorindex].head : 0);
1049 off_t totalsize = 0;
1051 for (
int k = 0; k < colorsetsize; k++) {
1052 if (sgv[k].ingesting) {
1053 if (totalsize == 0) {
1055 totalsize = sgv[k].reads;
1060 for (
int k = 0; k < colorsetsize; k++) {
1061 if (sgv[k].ingesting) {
1062 if (ino != sgv[k].ino) {
1064 snprintf(ee,
sizeof(ee),
"File (%s) with different ino", file);
1067 if (totalsize != sgv[k].reads) {
1069 snprintf(ee,
sizeof(ee),
"File (%s) returns different sizes", file);
1078 cc = kmr_share_striping_information(mr, file, color,
1080 head, ingesting, digesting,
1082 assert(cc == MPI_SUCCESS);
1086 cc = kmr_assign_ranks_to_stripe(mr, file, sgv, colorsetsize, head,
1088 assert(cc == MPI_SUCCESS);
1093 cc = kmr_take_maximum_loop_count(mr, totalsize, &stripe, &maxloops);
1094 assert(cc == MPI_SUCCESS && maxloops > 0);
1098 char *tmpbuf =
kmr_malloc((
size_t)stripe.s.size);
1100 if (colorindex != -1 && sgv[colorindex].stripe != -1) {
1102 fd = open(file, O_RDONLY, 0);
1103 }
while (fd == -1 && errno == EINTR);
1106 char *m = strerror(errno);
1107 snprintf(ee,
sizeof(ee),
"open(%s): %s", file, m);
1114 cc = kmr_read_and_gather(mr, 0, file, fd, offset,
1115 b, totalsize, tmpbuf, &stripe, maxloops,
1116 sgv, colorsetsize, colorindex);
1117 assert(cc == MPI_SUCCESS);
1119 if (colorindex != -1 && sgv[colorindex].stripe != -1) {
1122 }
while (cc == -1 && errno == EINTR);
1125 char *m = strerror(errno);
1126 snprintf(ee,
sizeof(ee),
"close(%s): %s", file, m);
1130 kmr_free(tmpbuf, (
size_t)stripe.s.size);
1133 kmr_free(sgv, (
sizeof(
struct kmr_file_reader) * (
size_t)colorsetsize));
1138 kmr_free(b, (
size_t)totalsize);
1150 #define COMINGSOON 0 1160 assert(!fopt.list_file || fopt.subdirectories);
1161 for (
int i = 0; i < n; i++) {
1162 char *path = names[i];
1166 cc = stat(path, &s);
1167 }
while (cc == -1 && errno == EINTR);
1169 if (!(S_ISREG(s.st_mode) || S_ISDIR(s.st_mode))) {
1171 if (S_ISFIFO(s.st_mode)) {
1173 }
else if (S_ISCHR(s.st_mode)) {
1174 m =
"character special";
1175 }
else if (S_ISDIR(s.st_mode)) {
1177 }
else if (S_ISBLK(s.st_mode)) {
1178 m =
"block special";
1179 }
else if (S_ISREG(s.st_mode)) {
1185 snprintf(ee,
sizeof(ee),
"Path (%s): type is %s", path, m);
1190 }
else if (errno == ENOENT) {
1193 snprintf(ee,
sizeof(ee),
"Path (%s): nonexistent", path);
1195 assert(errno != ENOENT);
1196 }
else if (errno == EACCES
1199 || errno == ENOTDIR) {
1202 char *m = strerror(errno);
1203 snprintf(ee,
sizeof(ee),
"stat(%s): %s", path, m);
1209 char *m = strerror(errno);
1210 snprintf(ee,
sizeof(ee),
"stat(%s): %s", path, m);
1216 if (S_ISREG(s.st_mode)) {
1218 if (fopt.list_file) {
1221 f = fopen(path,
"r");
1222 }
while (f == 0 && errno == EINTR);
1225 char *e = strerror(errno);
1226 snprintf(ee,
sizeof(ee),
"fopen(%s): %s", path, e);
1229 char line[MAXPATHLEN];
1230 while ((fgets(line,
sizeof(line), f)) != 0) {
1231 char *e = strchr(line,
'\n');
1238 snprintf(ee,
sizeof(ee),
1239 (
"File (%s) misses newline" 1240 " at the end"), path);
1241 kmr_warning(mr, 5, ee);
1243 snprintf(ee,
sizeof(ee),
1244 "File (%s) misses newline", path);
1250 while (e != line && *e ==
' ') {
1255 assert(*e == 0 || *e ==
' ');
1260 while (*p != 0 && *p ==
' ') {
1263 if (*p == 0 || *p ==
'#') {
1268 assert(foptx.list_file);
1269 foptx.list_file = 0;
1272 assert(cc == MPI_SUCCESS);
1276 char *e = strerror(fileno(f));
1277 snprintf(ee,
sizeof(ee),
"fgets(%s): %s", path, e);
1281 size_t len = (strlen(path) + 1);
1283 .vlen = (
int)
sizeof(long),
1287 assert(cc == MPI_SUCCESS);
1289 }
else if (S_ISDIR(s.st_mode)) {
1291 if (fopt.subdirectories) {
1293 long nmax = pathconf(path, _PC_NAME_MAX);
1297 dsz = (offsetof(
struct dirent, d_name) + (size_t)(nmax + 1));
1305 char *m = strerror(errno);
1306 snprintf(ee,
sizeof(ee),
"opendir(%s): %s", path, m);
1310 while ((cc = readdir_r(d, (
void *)b, &dp)) == 0) {
1311 char subdir[MAXPATHLEN];
1315 char *m = strerror(errno);
1316 snprintf(ee,
sizeof(ee),
1317 "readdir_r(%s): %s", path, m);
1323 if (*(dp->d_name) ==
'.') {
1326 cc = snprintf(subdir,
sizeof(subdir),
1327 "%s/%s", path, dp->d_name);
1328 if (cc > ((
int)
sizeof(subdir) - 1)) {
1330 snprintf(ee,
sizeof(ee),
1331 "Path (%s): too long", subdir);
1334 char *na[] = {subdir};
1336 assert(cc == MPI_SUCCESS);
1340 }
while (cc == -1 && errno == EINTR);
1343 char *m = strerror(errno);
1344 snprintf(ee,
sizeof(ee),
"closedir(%s): %s", path, m);
1376 struct kmr_option kmr_supported = {.nothreading = 1, .keep_open = 1};
1377 kmr_check_fn_options(mr, kmr_supported, opt, __func__);
1379 const int rank = mr->rank;
1382 if (fopt.each_rank || rank == 0) {
1384 if (foptx.list_file) {
1385 foptx.subdirectories = 1;
1393 if (fopt.shuffle_names) {
1396 assert(cc == MPI_SUCCESS);
1400 cc =
kmr_map(kvs1, kvo, arg, opt, m);
1401 assert(cc == MPI_SUCCESS);
1422 kmr_map_file_contents(
KMR *mr,
int nchunks,
char **names,
int n,
1423 _Bool subdirs, _Bool listfile,
1424 char *sep,
long deltaub,
1439 kmr_map_getline_threading(
KMR *mr, FILE *f,
long limit, _Bool largebuffering,
1449 const _Bool threading = !(mr->single_thread || opt.nothreading);
1451 KMR_OMP_PARALLEL_IF_(threading)
1463 if (lineno < limit) {
1464 rc = getline(&line, &linesz, f);
1473 if (!(lineno < limit && rc != -1)) {
1477 assert(rc <= INT_MAX);
1479 .klen =
sizeof(long),
1484 cc = (*m)(kv, 0, kvo, arg, lineno);
1485 if (cc != MPI_SUCCESS) {
1487 snprintf(ee,
sizeof(ee),
1488 "Map-fn returned with error cc=%d", cc);
1492 if (rc == -1 && ferror(f)) {
1494 char *w = strerror(errno);
1495 snprintf(ee,
sizeof(ee),
1496 "kmr_map_getline: getline at line %ld failed: %s",
1509 kmr_map_getline_nothreading(
KMR *mr, FILE *f,
long limit, _Bool largebuffering,
1519 while ((lineno < limit) && (rc = getline(&line, &linesz, f)) != -1) {
1520 assert(rc <= INT_MAX);
1522 .klen =
sizeof(long),
1527 cc = (*m)(kv, 0, kvo, arg, lineno);
1528 if (cc != MPI_SUCCESS) {
1530 snprintf(ee,
sizeof(ee),
1531 "Map-fn returned with error cc=%d", cc);
1536 if (rc == -1 && ferror(f)) {
1538 char *w = strerror(errno);
1539 snprintf(ee,
sizeof(ee),
1540 "kmr_map_getline: getline at line %ld failed: %s",
1565 kmr_assert_kvs_ok(0, kvo, 0, 1);
1566 struct kmr_option kmr_supported = {.nothreading = 1, .keep_open = 1,
1568 kmr_check_fn_options(mr, kmr_supported, opt, __func__);
1572 if (kvo != 0 && !opt.keep_open) {
1579 size_t BFSZ = (8 * 1024 * 1024);
1588 if (largebuffering) {
1590 cc = setvbuf(f, b, _IOFBF, BFSZ);
1593 char *w = strerror(errno);
1594 snprintf(ee,
sizeof(ee),
"%s: setvbuf failed (ignored): %s",
1596 kmr_warning(mr, 5, ee);
1602 if (mr->single_thread || opt.nothreading) {
1603 cc = kmr_map_getline_nothreading(mr, f, limit, largebuffering,
1606 cc = kmr_map_getline_threading(mr, f, limit, largebuffering,
1611 assert(largebuffering);
1612 cc = setvbuf(f, 0, _IONBF, 0);
1615 char *w = strerror(errno);
1616 snprintf(ee,
sizeof(ee),
1617 "%s: setvbuf() at the end failed (ignored): %s",
1619 kmr_warning(mr, 5, ee);
1626 if (kvo != 0 && !opt.keep_open) {
1644 kmr_map_getline_in_memory_(
KMR *mr,
void *b,
size_t sz,
long limit,
1648 kmr_assert_kvs_ok(0, kvo, 0, 1);
1649 struct kmr_option kmr_supported = {.nothreading = 1, .keep_open = 1,
1651 kmr_check_fn_options(mr, kmr_supported, opt, __func__);
1656 if (kvo != 0 && !opt.keep_open) {
1669 char *
const end = (gline + sz);
1672 const _Bool threading = !(mr->single_thread || opt.nothreading);
1674 KMR_OMP_PARALLEL_IF_(threading)
1683 if (!((gline < end) && (glineno < limit))) {
1690 while (p < end && *p !=
'\n') {
1693 if (p < end && *p ==
'\n') {
1697 linesz = (size_t)(p - gline);
1701 rc = (ssize_t)linesz;
1709 assert(rc <= INT_MAX);
1711 .klen =
sizeof(long),
1716 cc = (*m)(kv, 0, kvo, arg, lineno);
1717 if (cc != MPI_SUCCESS) {
1719 snprintf(ee,
sizeof(ee),
1720 "Map-fn returned with error cc=%d", cc);
1726 if (kvo != 0 && !opt.keep_open) {
int kmr_ckpt_progress_init(KMR_KVS *, KMR_KVS *, struct kmr_option)
It initializes a progress of MapReduce checkpointing.
int kmr_iogroup_of_node(KMR *mr)
Returns an I/O-group (an integer key) of a compute node.
Key-Value Stream (abstract).
int kmr_read_file_by_segments(KMR *mr, char *file, int color, void **buffer, off_t *size)
Reads one file by segments and reassembles by all-gather.
Utilities Private Part (do not include from applications).
Lustre Striping Information with OBDIDX.
Lustre Striping Information.
#define kmr_reduce(KVI, KVO, ARG, OPT, R)
Reduces key-value pairs.
Options to Mapping, Shuffling, and Reduction.
void kmr_ckpt_progress_fin(KMR *)
It finalizes the progress of MapReduce checkpointing.
int kmr_file_enumerate(KMR *mr, char *names[], int n, KMR_KVS *kvo, struct kmr_file_option fopt)
Adds file names in a key-value stream KVO.
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
int kmr_find_key(KMR_KVS *kvi, struct kmr_kv_box ki, struct kmr_kv_box *vo)
Finds a key-value pair for a key.
#define kmr_malloc(Z)
Allocates memory, or aborts when failed.
Segment Reading Information of Each Rank.
#define kmr_create_kvs(MR, KF, VF)
Makes a new key-value stream (of type KMR_KVS) with the specified field datatypes.
int kmr_shuffle(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Shuffles key-value pairs to the appropriate destination ranks.
int kmr_ckpt_enabled(KMR *)
Check if checkpoint/restart is enabled.
int kmr_add_kv_done(KMR_KVS *kvs)
Marks finished adding key-value pairs.
int kmr_read_files_reassemble(KMR *mr, char *file, int color, off_t offset, off_t bytes, void **buffer, off_t *size)
Reassembles files reading by ranks.
int kmr_map_file_names(KMR *mr, char **names, int n, struct kmr_file_option fopt, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Maps on file names.
int kmr_free_kvs(KMR_KVS *kvs)
Releases a key-value stream (type KMR_KVS).
unsigned short kmr_k_position_t[4]
Positions of node by (X,Y,Z,ABC), with ABC axes collapsed.
void kmr_ckpt_save_kvo_whole(KMR *, KMR_KVS *)
It saves all key-value pairs in the output KVS to a checkpoint data file.
int kmr_allgatherv(KMR *mr, _Bool rankzeroonly, void *sbuf, long scnt, void *rbuf, long *rcnts, long *rdsps)
All-gathers data, or gathers data when RANKZEROONLY.
int kmr_map_getline(KMR *mr, FILE *f, long limit, _Bool largebuffering, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Calls a map-function M for each line by getline() on an input F.
#define kmr_map(KVI, KVO, ARG, OPT, M)
Maps simply.
Handy Copy of a Key-Value Field.
int kmr_fefs_get_stripe(const char *dir, const char *file, struct kmr_fefs_stripe *stripe, int *err, _Bool debug_and_dump)
Gets the OBDIDX information on the file or directory.
Lustre File System (or Fujitsu FEFS) Support.
int kmr_iogroup_of_obd(int obdidx)
Returns an I/O-group (an integer key) of a disk from an OBDIDX of Lustre file-system.
static const size_t kmr_fefs_stripe_info_size
Offset to Striping OBDIDX Information.
Options to Mapping on Files.
int kmr_replicate(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Replicates key-value pairs to be visible on all ranks, that is, it has the effect of bcast or all-gat...
int kmr_k_node(KMR *mr, kmr_k_position_t p)
Gets TOFU position (physical coordinates) of the node.
int kmr_reduce_as_one(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_redfn_t r)
Calls a reduce-function once as if all key-value pairs had the same key.
#define CHUNK_LIMIT
Read size limit.
int(* kmr_mapfn_t)(const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long index)
Map-function Type.
int kmr_alltoallv(KMR *mr, void *sbuf, long *scounts, long *sdsps, void *rbuf, long *rcounts, long *rdsps)
Does all-to-all-v, but it takes arguments of long-integers.