23 #include "../config.h" 29 const int kmr_version = KMR_H;
31 #define MIN(a,b) (((a)<(b))?(a):(b)) 32 #define MAX(a,b) (((a)>(b))?(a):(b)) 38 #define BLOCK_SIZE (64 * 1024 * 1024) 43 #define MAP_PARK_SIZE (1024) 48 #define PUSHOFF_SIZE (64 * 1024) 55 kmr_assert_on_tail_marker(
KMR_KVS *kvs)
57 if (kvs != 0 && kvs->c.block_count == 1) {
59 size_t netsz = kvs->c.storage_netsize;
61 assert((((intptr_t)e) & 3) == 0
62 && e->klen == -1 && e->vlen == -1);
72 kmr_init_2(
int ignore)
75 assert(
sizeof(
long) ==
sizeof(
size_t)
76 &&
sizeof(
long) ==
sizeof(ssize_t)
77 &&
sizeof(
long) ==
sizeof(off_t)
78 &&
sizeof(
long) ==
sizeof(uint64_t)
79 &&
sizeof(
long) >=
sizeof(intptr_t)
80 &&
sizeof(
long) >=
sizeof(
void *));
81 assert(kmr_check_alignment(offsetof(
struct kmr_kvs_entry, c)));
82 assert(kmr_check_alignment(offsetof(
struct kmr_kvs_block, data)));
84 assert(
sizeof(
struct kmr_option) ==
sizeof(
long)
89 cc = MPI_Type_get_extent(MPI_LONG, &lb, &extent);
90 assert(cc == MPI_SUCCESS);
91 assert(lb == 0 && extent == 8);
96 int tid = omp_get_thread_num();
110 union {
struct kmr_option o;
unsigned long i;} opt0 = {.o = opt};
111 union {
struct kmr_file_option o;
unsigned long i;} fopt0 = {.o = fopt};
112 opt0.o.rank_zero = 0;
113 fopt0.o.shuffle_names = 0;
114 assert(kf == KMR_KV_POINTER_UNMANAGED
115 && opt.rank_zero && fopt.shuffle_names
116 && opt0.i == 0 && fopt0.i == 0);
148 const char *identifying_name)
152 KMR_DEBUGX(memset(mr, 0,
sizeof(
struct kmr_ctx)));
154 cc = MPI_Comm_size(comm, &mr->nprocs);
155 assert(cc == MPI_SUCCESS);
156 cc = MPI_Comm_rank(comm, &mr->rank);
157 assert(cc == MPI_SUCCESS);
158 cc = MPI_Comm_dup(comm, &mr->comm);
159 if (cc != MPI_SUCCESS) {
160 kmr_error_mpi(mr,
"MPI_Comm_dup", cc);
161 MPI_Abort(MPI_COMM_WORLD, 1);
165 int omp_thrd = omp_get_thread_limit();
169 assert(omp_thrd >= 1);
172 cc = MPI_Query_thread(&mpi_thrd);
173 assert(cc == MPI_SUCCESS);
174 assert(mpi_thrd == MPI_THREAD_SINGLE
175 || mpi_thrd == MPI_THREAD_FUNNELED
176 || mpi_thrd == MPI_THREAD_SERIALIZED
177 || mpi_thrd == MPI_THREAD_MULTIPLE);
178 if (mpi_thrd == MPI_THREAD_SINGLE
179 || mpi_thrd == MPI_THREAD_FUNNELED) {
182 char *s = ((mpi_thrd == MPI_THREAD_SINGLE)
183 ?
"MPI_THREAD_SINGLE" 184 :
"MPI_THREAD_FUNNELED");
185 snprintf(ee,
sizeof(ee),
"Thread support of MPI is low: %s", s);
186 kmr_warning(mr, 1, ee);
193 mr->ckpt_kvs_id_counter = 0;
196 mr->ckpt_selective = 0;
197 mr->ckpt_no_fsync = 0;
207 mr->mapper_park_size = MAP_PARK_SIZE;
208 mr->preset_block_size = BLOCK_SIZE;
209 mr->malloc_overhead = (int)
sizeof(
void *);
211 mr->atoa_threshold = 512;
213 mr->sort_trivial = 100000;
214 mr->sort_threshold = 100L;
215 mr->sort_sample_factor = 10000;
216 mr->sort_threads_depth = 5;
218 mr->file_io_block_size = (1024 * 1024);
220 mr->pushoff_block_size = PUSHOFF_SIZE;
221 mr->pushoff_poll_rate = 0;
223 #if defined(KMRLIBDIR) 224 mr->kmr_installation_path = KMRLIBDIR;
226 mr->kmr_installation_path = 0;
228 mr->spawn_watch_program = 0;
229 mr->spawn_watch_prefix = 0;
230 mr->spawn_watch_host_name = 0;
231 mr->spawn_max_processes = 0;
232 mr->spawn_watch_af = 4;
233 mr->spawn_watch_port_range[0] = 0;
234 mr->spawn_watch_port_range[1] = 0;
235 mr->spawn_gap_msec[0] = 1000;
236 mr->spawn_gap_msec[1] = 10000;
237 mr->spawn_watch_accept_onhold_msec = (60 * 1000);
242 mr->single_thread = 0;
243 mr->one_step_sort = 0;
245 mr->trace_sorting = 0;
246 mr->trace_file_io = 0;
247 mr->trace_map_ms = 0;
248 mr->trace_map_spawn = 0;
249 mr->trace_alltoall = 0;
253 mr->file_io_dummy_striping = 1;
254 mr->file_io_always_alltoallv = 0;
255 mr->spawn_sync_at_startup = 0;
256 mr->spawn_watch_all = 0;
257 mr->spawn_disconnect_early = 0;
258 mr->spawn_disconnect_but_free = 0;
259 mr->spawn_pass_intercomm_in_argument = 0;
260 mr->keep_fds_at_fork = 0;
262 mr->mpi_thread_support = (mpi_thrd == MPI_THREAD_SERIALIZED
263 || mpi_thrd == MPI_THREAD_MULTIPLE);
265 mr->stop_at_some_check_globally = 0;
266 mr->pushoff_hang_out = 0;
267 mr->pushoff_fast_notice = 0;
268 mr->pushoff_stat = 1;
269 memset(&mr->pushoff_statistics, 0,
sizeof(mr->pushoff_statistics));
271 mr->kmrviz_trace = 0;
273 if (identifying_name != 0) {
274 size_t s = strlen(identifying_name);
275 assert(s < KMR_JOB_NAME_LEN);
276 strncpy(mr->identifying_name, identifying_name, KMR_JOB_NAME_LEN);
277 mr->identifying_name[KMR_JOB_NAME_LEN - 1] = 0;
279 mr->identifying_name[0] = 0;
286 cc = MPI_Info_create(&mr->conf);
287 assert(cc == MPI_SUCCESS);
288 cc = kmr_load_preference(mr, mr->conf);
289 assert(cc == MPI_SUCCESS);
290 if (conf != MPI_INFO_NULL) {
292 assert(cc == MPI_SUCCESS);
295 kmr_check_options(mr, mr->conf);
307 kmr_create_context_world()
314 kmr_create_context_ff(
const int fcomm,
const int finfo,
315 const char *identifying_name)
317 MPI_Comm comm = MPI_Comm_f2c(fcomm);
318 MPI_Info info = MPI_Info_f2c(finfo);
329 if (mr->kvses.head != 0 || mr->kvses.tail != 0) {
330 kmr_warning(mr, 1,
"Some key-value streams remain unfreed");
331 for (
KMR_KVS *p = mr->kvses.head; p != 0; p = p->c.link.next) {
332 if (!KMR_KVS_MAGIC_OK(p->c.magic)) {
333 kmr_warning(mr, 1,
"- unfreed kvs in bad state");
334 }
else if (p->c.magic == KMR_KVS_ONCORE) {
335 if (p->c.info_line0.file != 0) {
337 snprintf(ee, 80,
"- kvs allocated at %s:%d: %s",
338 p->c.info_line0.file, p->c.info_line0.line,
339 p->c.info_line0.func);
340 kmr_warning(mr, 1, ee);
343 kmr_warning(mr, 1,
"- unfreed kvs in bad state");
354 if (mr->log_traces != 0) {
355 cc = fclose(mr->log_traces);
358 char *m = strerror(errno);
359 snprintf(ee,
sizeof(ee),
"Closing log file failed: %s", m);
360 kmr_warning(mr, 1, ee);
365 cc = MPI_Comm_free(&mr->comm);
366 assert(cc == MPI_SUCCESS);
367 if (mr->conf != MPI_INFO_NULL) {
368 cc = MPI_Info_free(&mr->conf);
369 assert(cc == MPI_SUCCESS);
372 if (mr->spawn_watch_program != 0) {
373 size_t s = (strlen(mr->spawn_watch_program) + 1);
374 kmr_free(mr->spawn_watch_program, s);
376 assert(mr->spawn_comms == 0);
381 kmr_free(mr,
sizeof(
struct kmr_ctx));
386 kmr_get_context_of_kvs(
KMR_KVS const *kvs)
398 KMR_KVS *prev = kvs->c.link.prev;
399 KMR_KVS *next = kvs->c.link.next;
401 prev->c.link.next = next;
403 assert(mr->kvses.head == kvs);
404 mr->kvses.head = next;
407 next->c.link.prev = prev;
409 assert(mr->kvses.tail == kvs);
410 mr->kvses.tail = prev;
423 KMR_DEBUGX(memset(kvs, 0,
sizeof(
KMR_KVS)));
424 kvs->c.magic = KMR_KVS_ONCORE;
429 mr->ckpt_kvs_id_counter++;
430 kvs->c.ckpt_kvs_id = mr->ckpt_kvs_id_counter;
431 kvs->c.ckpt_generated_op = 0;
432 kvs->c.ckpt_consumed_op = 0;
435 kvs->c.key_data = KMR_KV_BAD;
436 kvs->c.value_data = KMR_KV_BAD;
437 kvs->c.element_count = 0;
443 kvs->c.shuffled_in_pushoff = 0;
444 kvs->c._uniformly_sized_ = 0;
446 kvs->c.block_size = (mr->preset_block_size - mr->malloc_overhead);
447 kvs->c.element_size_limit = (kvs->c.block_size / 4);
448 kvs->c.storage_netsize = 0;
449 kvs->c.block_count = 0;
450 kvs->c.first_block = 0;
455 kvs->c.under_threaded_operation = 0;
456 kvs->c.current_block = 0;
457 kvs->c.adding_point = 0;
458 kvs->c.temporary_data = 0;
472 kvs->c.magic = KMR_KVS_ONCORE;
480 kvs->c.key_data = KMR_KV_BAD;
481 kvs->c.value_data = KMR_KV_BAD;
482 kvs->c.element_count = 0;
488 kvs->c.shuffled_in_pushoff = 0;
489 kvs->c._uniformly_sized_ = 0;
491 kvs->c.block_size = (mr->preset_block_size - mr->malloc_overhead);
492 kvs->c.element_size_limit = (kvs->c.block_size / 4);
493 kvs->c.storage_netsize = 0;
494 kvs->c.block_count = 0;
495 kvs->c.first_block = 0;
500 kvs->c.under_threaded_operation = 0;
501 kvs->c.current_block = 0;
502 kvs->c.adding_point = 0;
503 kvs->c.temporary_data = 0;
512 const char *file,
const int line,
const char *func)
515 kvs->c.key_data = kf;
516 kvs->c.value_data = vf;
517 kvs->c.info_line0.file = file;
518 kvs->c.info_line0.func = func;
519 kvs->c.info_line0.line = line;
536 assert(kvi != 0 && kvo != 0
537 && kvi->c.magic == KMR_KVS_ONCORE
538 && kvo->c.magic == KMR_KVS_ONCORE
539 && kvi->c.oncore && kvo->c.oncore);
540 assert(kvi->c.key_data == kvo->c.key_data
541 && kvi->c.value_data == kvo->c.value_data);
542 assert(kvi->c.stowed && !kvo->c.stowed);
548 kvi->c.first_block = 0;
557 kvo->c.stowed = kvi->c.stowed;
558 kvo->c.nogrow = kvi->c.nogrow;
559 kvo->c.sorted = kvi->c.sorted;
560 kvo->c.element_count = kvi->c.element_count;
561 kvo->c.storage_netsize = kvi->c.storage_netsize;
562 kvo->c.block_count = kvi->c.block_count;
563 kvo->c.first_block = kvi->c.first_block;
564 kvo->c.ms = kvi->c.ms;
572 kvi->c.first_block = 0;
575 assert(cc == MPI_SUCCESS);
584 kmr_free_kvs_oncore(
KMR_KVS *kvs)
589 kmr_free(b, b->size);
592 if (kvs->c.ms != 0) {
593 long cnt = kvs->c.element_count;
595 + (sizeof(char) * (size_t)cnt));
596 kmr_free(kvs->c.ms, sz);
598 if (kvs->c.temporary_data != 0) {
599 kmr_free(kvs->c.temporary_data, 0);
601 kvs->c.magic = KMR_KVS_BAD;
623 if (!KMR_KVS_MAGIC_OK(kvs->c.magic)) {
624 kmr_error(0,
"kmr_free_kvs: kvs already freed or corrupted");
629 if (kvs->c.magic == KMR_KVS_ONCORE) {
630 cc = kmr_free_kvs_oncore(kvs);
632 }
else if (kvs->c.magic == KMR_KVS_PUSHOFF) {
633 cc = kmr_free_kvs_pushoff(kvs, 1);
636 assert((kvs->c.magic == KMR_KVS_ONCORE)
637 || (kvs->c.magic == KMR_KVS_PUSHOFF));
654 kmr_allocate_block(
KMR_KVS *kvs,
size_t size)
657 assert(kvs->c.element_count == 0 && kvs->c.storage_netsize == 0
658 && kvs->c.block_count == 0 && kvs->c.first_block == 0
659 && kvs->c.current_block == 0 && kvs->c.adding_point == 0);
664 kvs->c.block_size = 0;
667 }
else if (size == 1) {
669 sz = kvs->c.block_size;
670 assert(kvs->c.nogrow == 0);
672 assert(kvs->c.first_block == 0 && kvs->c.current_block == 0
673 && kvs->c.block_count == 0 && kvs->c.adding_point == 0);
677 kvs->c.block_size = sz;
678 kvs->c.storage_netsize = netsz;
682 kmr_kvs_reset_block(kvs, b, sz, netsz);
683 kmr_kvs_insert_block(kvs, b);
692 kmr_kvs_adjust_adding_point(
KMR_KVS *kvs)
694 if (kvs->c.block_count == 0) {
695 assert(kvs->c.current_block == 0 && kvs->c.adding_point == 0);
697 assert(kvs->c.current_block != 0 && kvs->c.adding_point != 0);
699 assert(kmr_kvs_first_entry(kvs, b) == kvs->c.adding_point);
700 kvs->c.adding_point = kmr_kvs_last_entry_limit(kvs, b);
701 assert(kvs->c.adding_point == kmr_kvs_adding_point(b));
712 struct kmr_kv_box *xkv, _Bool reserve_space_only)
714 kmr_assert_kv_sizes(kvs, kv);
715 assert(!kvs->c.nogrow || kvs->c.storage_netsize != 0);
718 size_t sz = kmr_kvs_entry_size_of_box(kvs, kv);
719 if (sz > (kvs->c.element_size_limit)) {
721 snprintf(ee, 80,
"key-value too large (size=%zd)", sz);
724 if (kvs->c.first_block == 0) {
725 assert(kvs->c.element_count == 0);
726 cc = kmr_allocate_block(kvs, 1);
727 assert(cc == MPI_SUCCESS);
729 if (!kmr_kvs_entry_fits_in_block(kvs, kvs->c.current_block, sz)) {
730 assert(!kvs->c.nogrow);
731 kmr_kvs_mark_entry_tail(kvs->c.adding_point);
732 cc = kmr_allocate_block(kvs, 1);
733 assert(cc == MPI_SUCCESS);
737 if (!kvs->c.nogrow) {
738 kvs->c.storage_netsize += kmr_kvs_entry_netsize(e);
740 kvs->c.current_block->partial_element_count++;
741 kvs->c.current_block->fill_size += kmr_kvs_entry_size(kvs, e);
742 kvs->c.adding_point = kmr_kvs_next_entry(kvs, e);
743 kvs->c.element_count++;
753 kmr_assert_kvs_ok(0, kvs, 0, 1);
755 if (kvs->c.magic == KMR_KVS_ONCORE) {
758 cc = kmr_add_kv_nomutex(kvs, kv, 0, 0);
761 }
else if (kvs->c.magic == KMR_KVS_PUSHOFF) {
768 assert((kvs->c.magic == KMR_KVS_ONCORE)
769 || (kvs->c.magic == KMR_KVS_PUSHOFF));
782 switch (kvs->c.key_data) {
784 xassert(kvs->c.key_data != KMR_KV_BAD);
795 case KMR_KV_POINTER_OWNED:
796 case KMR_KV_POINTER_UNMANAGED:
806 switch (kvs->c.value_data) {
808 xassert(kvs->c.value_data != KMR_KV_BAD);
819 case KMR_KV_POINTER_OWNED:
820 case KMR_KV_POINTER_UNMANAGED:
829 struct kmr_kv_box kv = {.klen = klen, .vlen = vlen, .k = xk, .v = xv};
844 void **keyp,
void **valuep)
846 kmr_assert_kvs_ok(0, kvs, 0, 1);
847 assert(kvs->c.magic == KMR_KVS_ONCORE);
855 cc = kmr_add_kv_nomutex(kvs, kv, &xkv, 1);
858 *keyp = (
void *)xkv.k.p;
861 *valuep = (
void *)xkv.v.p;
869 int cc = kmr_add_kv_nomutex(kvs, kv, 0, 0);
883 kmr_assert_kvs_ok(0, kvs, 0, 1);
884 if (kvs->c.magic == KMR_KVS_ONCORE) {
886 kmr_error(kvs->c.mr,
"kmr_add_kv_done: may be called already");
888 if (kvs->c.element_count == 0) {
889 assert(kvs->c.current_block == 0 && kvs->c.adding_point == 0);
891 assert(kvs->c.current_block != 0 && kvs->c.adding_point != 0);
892 kmr_kvs_mark_entry_tail(kvs->c.adding_point);
895 kvs->c.current_block = 0;
896 kvs->c.adding_point = 0;
897 assert(kvs->c.block_count == 0 || kvs->c.first_block != 0);
898 }
else if (kvs->c.magic == KMR_KVS_PUSHOFF) {
901 assert((kvs->c.magic == KMR_KVS_ONCORE)
902 || (kvs->c.magic == KMR_KVS_PUSHOFF));
915 if (!((kvs->c.key_data == KMR_KV_OPAQUE
916 || kvs->c.key_data == KMR_KV_CSTRING)
917 && (kvs->c.value_data == KMR_KV_OPAQUE
918 || kvs->c.value_data == KMR_KV_CSTRING))) {
920 "key-value data-types need be opaque for strings");
922 size_t klen = (strlen(k) + 1);
923 size_t vlen = (strlen(v) + 1);
924 assert(klen <= INT_MAX && vlen <= INT_MAX);
948 kmr_collapse_as_opaque(
KMR_KVS *kvi,
KMR_KVS *kvo, _Bool inspectp)
950 assert(kvi != 0 && kvo != 0);
951 assert(kmr_fields_pointer_p(kvi) || kvi->c.block_count > 1);
953 cc = kmr_allocate_block(kvo, kvi->c.storage_netsize);
954 assert(cc == MPI_SUCCESS);
955 struct kmr_option collapse = {.collapse = 1, .inspect = inspectp};
957 assert(cc == MPI_SUCCESS);
972 kmr_error_at_site(0,
"Null input kvs", 0);
973 }
else if (!KMR_KVS_MAGIC_OK(kvs->c.magic)) {
974 kmr_error_at_site(0,
"Bad input kvs (freed or corrupted)", 0);
976 assert(kvs->c.magic == KMR_KVS_ONCORE);
977 kmr_check_fn_options(kvs->c.mr, kmr_noopt, opt, __func__);
979 if (kvs->c.ms != 0 || kvs->c.temporary_data != 0) {
980 kmr_warning(kvs->c.mr, 5,
981 "Some fields in KVS may be lost in saved image");
984 if (kmr_fields_pointer_p(kvs) || (kvs->c.block_count > 1)) {
985 enum kmr_kv_field keyf = kmr_unit_sized_or_opaque(kvs->c.key_data);
986 enum kmr_kv_field valf = kmr_unit_sized_or_opaque(kvs->c.value_data);
988 cc = kmr_collapse_as_opaque(kvs, kvs1, 0);
989 assert(cc == MPI_SUCCESS);
990 assert(!kmr_fields_pointer_p(kvs1) && kvs->c.block_count <= 1);
992 assert(cc == MPI_SUCCESS);
996 assert(!kmr_fields_pointer_p(kvs));
997 size_t netsz = kvs->c.storage_netsize;
999 size_t sz = (
sizeof(
KMR_KVS) + blocksz);
1000 unsigned char *b = malloc(sz);
1002 return MPI_ERR_BUFFER;
1006 memcpy(h, kvs,
sizeof(
KMR_KVS));
1007 h->c.magic = KMR_KVS_ONCORE_PACKED;
1011 h->c.block_count = 1;
1012 h->c.first_block = 0;
1013 h->c.current_block = 0;
1014 h->c.adding_point = 0;
1016 h->c.temporary_data = 0;
1017 if (kvs->c.block_count == 0) {
1019 }
else if (kvs->c.block_count == 1) {
1020 memcpy(s, kvs->c.first_block, blocksz);
1037 assert(kvo != 0 && kvo->c.magic == KMR_KVS_ONCORE);
1038 kmr_check_fn_options(kvo->c.mr, kmr_noopt, opt, __func__);
1040 unsigned char *b = data;
1042 unsigned char *s = (b +
sizeof(
KMR_KVS));
1043 if (h->c.magic != KMR_KVS_ONCORE_PACKED) {
1044 kmr_warning(kvo->c.mr, 1,
"Bad packed data, magic mismatch");
1045 return MPI_ERR_TYPE;
1047 size_t netsz = h->c.storage_netsize;
1049 cc = kmr_allocate_block(kvo, netsz);
1050 assert(cc == MPI_SUCCESS);
1052 memcpy(kvo->c.first_block, s, blocksz);
1054 kvo->c.key_data = h->c.key_data;
1055 kvo->c.value_data = h->c.value_data;
1056 assert(kvo->c.sorted == 0);
1057 kvo->c.element_count = h->c.element_count;
1058 kmr_kvs_adjust_adding_point(kvo);
1070 kmr_map_parked(
struct kmr_kv_box *ev,
long evcnt,
long mapcount,
1071 _Bool k_reclaim, _Bool v_reclaim,
1076 KMR *mr = kvi->c.mr;
1077 long cnt = kvi->c.element_count;
1078 if (mr->single_thread || opt.nothreading) {
1079 for (
long i = 0; i < evcnt; i++) {
1080 double t0 = ((mr->log_traces == 0) ? 0.0 : MPI_Wtime());
1081 cc = (*m)(ev[i], kvi, kvo, arg, (mapcount + i));
1082 double t1 = ((mr->log_traces == 0) ? 0.0 : MPI_Wtime());
1083 if (cc != MPI_SUCCESS) {
1085 snprintf(ee,
sizeof(ee),
1086 "Map-fn returned with error cc=%d", cc);
1089 if (mr->log_traces != 0) {
1090 kmr_log_map(mr, kvi, &ev[i], (mapcount + 1), cnt,
1096 kvo->c.under_threaded_operation = 1;
1098 KMR_OMP_PARALLEL_FOR_
1099 for (
long i = 0; i < evcnt; i++) {
1100 double t0 = ((mr->log_traces == 0) ? 0.0 : MPI_Wtime());
1101 int ccx = (*m)(ev[i], kvi, kvo, arg, (mapcount + i));
1102 double t1 = ((mr->log_traces == 0) ? 0.0 : MPI_Wtime());
1103 if (ccx != MPI_SUCCESS) {
1105 snprintf(ee,
sizeof(ee),
1106 "Map-fn returned with error cc=%d", ccx);
1109 if (mr->log_traces != 0) {
1110 kmr_log_map(mr, kvi, &ev[i], (mapcount + 1), cnt,
1115 kvo->c.under_threaded_operation = 0;
1118 for (
long i = 0; i < evcnt; i++) {
1120 kmr_free((
void *)ev[i].k.p, (
size_t)ev[i].klen);
1123 kmr_free((
void *)ev[i].v.p, (
size_t)ev[i].vlen);
1135 _Bool stop_when_some_added,
1139 kmr_assert_kvs_ok(kvi, kvo, 1, 0);
1140 assert(from >= 0 && stride > 0 && limit >= 0);
1141 assert(kvi->c.current_block == 0);
1142 limit = ((limit != 0) ? limit : LONG_MAX);
1143 KMR *mr = kvi->c.mr;
1147 if (kvo != 0 && !opt.keep_open) {
1161 if (mr->step_sync) {
1162 cc = MPI_Barrier(mr->comm);
1163 assert(MPI_SUCCESS);
1165 if (kvo != 0 && opt.collapse) {
1166 assert(!kmr_fields_pointer_p(kvo));
1168 _Bool k_reclaim = (!opt.inspect && (kmr_key_pointer_p(kvi)));
1169 _Bool v_reclaim = (!opt.inspect && (kmr_value_pointer_p(kvi)));
1170 long evsz = mr->mapper_park_size;
1175 long nextindex = from;
1177 kvi->c.current_block = kvi->c.first_block;
1178 while (index < kvi->c.element_count) {
1179 assert(kvi->c.current_block != 0);
1182 for (
int i = 0; i < b->partial_element_count; i++) {
1184 if (index == nextindex && index < limit) {
1186 nextindex = (index + stride);
1198 kmr_free((
void *)w->p, (
size_t)e->klen);
1203 kmr_free((
void *)w->p, (
size_t)e->vlen);
1207 if (evcnt >= evsz) {
1208 cc = kmr_map_parked(ev, evcnt, mapcount, k_reclaim, v_reclaim,
1209 kvi, kvo, m, arg, opt);
1210 assert(cc == MPI_SUCCESS);
1219 if (stop_when_some_added) {
1221 if (mr->stop_at_some_check_globally) {
1224 done = (kvo->c.element_count != 0);
1228 index = (kvi->c.element_count - 1);
1229 while (b->next != 0) {
1235 e = kmr_kvs_next(kvi, e, 1);
1238 kvi->c.current_block = b->next;
1240 assert(kvi->c.current_block == 0);
1242 cc = kmr_map_parked(ev, evcnt, mapcount, k_reclaim, v_reclaim,
1243 kvi, kvo, m, arg, opt);
1244 assert(cc == MPI_SUCCESS);
1253 if (kvo != 0 && !opt.keep_open) {
1265 kmr_free(ev, (
sizeof(
struct kmr_kv_box) * (
size_t)evsz));
1292 const char *file,
const int line,
const char *func)
1295 kmr_assert_kvs_ok(kvi, kvo, 1, 0);
1296 KMR *mr = kvi->c.mr;
1297 struct kmr_option kmr_supported = {.nothreading = 1, .inspect = 1,
1298 .keep_open = 1, .collapse = 1,
1300 kmr_check_fn_options(mr, kmr_supported, opt, __func__);
1303 if (mr->kmrviz_trace) {
1307 if (mr->atwork == 0) {
1316 kvi, kvo, arg, opt, m);
1318 assert(!opt.inspect && !opt.keep_open);
1322 if (mr->atwork == &info) {
1325 if (mr->kmrviz_trace) {
1342 KMR *mr = kvi->c.mr;
1343 int nprocs = mr->nprocs;
1345 if (mr->rank != 0) {
1346 cc = MPI_Recv(0, 0, MPI_INT, (mr->rank - 1),
1347 KMR_TAG_MAP_BY_RANK, mr->comm, MPI_STATUS_IGNORE);
1348 assert(cc == MPI_SUCCESS);
1350 cc =
kmr_map(kvi, kvo, arg, opt, m);
1351 assert(cc == MPI_SUCCESS);
1353 if (mr->rank != (nprocs - 1)) {
1355 cc = MPI_Send(0, 0, MPI_INT, (mr->rank + 1),
1356 KMR_TAG_MAP_BY_RANK, mr->comm);
1357 assert(cc == MPI_SUCCESS);
1371 kmr_assert_kvs_ok(kvi, 0, 1, 0);
1372 assert(kvi->c.current_block == 0);
1373 KMR *mr = kvi->c.mr;
1374 kvi->c.current_block = kvi->c.first_block;
1375 if (kvi->c.element_count == 1) {
1376 assert(kvi->c.current_block != 0);
1379 assert(b->partial_element_count == 1);
1382 kvi->c.current_block = 0;
1385 if (kvi->c.element_count == 0) {
1386 kmr_warning(mr, 1,
"kmr_take_one for no entries");
1389 kmr_warning(mr, 1,
"kmr_take_one for multiple entries");
1392 MPI_Abort(MPI_COMM_WORLD, 1);
1405 kmr_assert_kvs_ok(0, kvo, 0, 1);
1406 KMR *mr = kvo->c.mr;
1407 struct kmr_option kmr_supported = {.keep_open = 1, .take_ckpt = 1};
1408 kmr_check_fn_options(mr, kmr_supported, opt, __func__);
1409 int rank = mr->rank;
1413 if (mr->kmrviz_trace) {
1424 if (!rank_zero_only || rank == 0) {
1425 struct kmr_kv_box kv = {.klen = 0, .vlen = 0, .k.i = 0, .v.i = 0};
1426 cc = (*m)(kv, 0, kvo, arg, 0);
1427 if (cc != MPI_SUCCESS) {
1429 snprintf(ee,
sizeof(ee),
1430 "Map-fn returned with error cc=%d", cc);
1434 if (!opt.keep_open) {
1436 assert(cc == MPI_SUCCESS);
1443 if (mr->kmrviz_trace) {
1461 assert(cc == MPI_SUCCESS);
1471 static inline unsigned long 1472 kmr_hash_key_opaque(
const unsigned char *p,
int n)
1474 #define ROT(X,N) ((X) << (N) | (X) >> (64-(N))) 1475 #define KEY(V) (k = (V), k *= 0x87c37b91114253d5UL, \ 1476 k = ROT(k, 31), k *= 0x4cf5ad432745937fUL) 1477 #define MIX() (h ^= k, h = ROT(h, 31), h = h * 5 + 0xe6546b64) 1478 #define FIN() (h ^= (h >> 33), h *= 0xff51afd7ed558ccdUL, h ^= (h >> 33), \ 1479 h *= 0xc4ceb9fe1a85ec53UL, h ^= (h >> 33)) 1480 unsigned long h = 0x85ebca6bUL;
1482 const unsigned long *v = (
void *)p;
1484 int rn = (n - (8 * n8));
1485 const unsigned char *r = &p[(8 * n8)];
1486 for (
int i = 0; i < n8; i++) {
1490 union {
unsigned long i;
unsigned char c[8];} u = {.i = 0UL};
1491 for (
int i = 0; i < rn; i++) {
1508 static inline unsigned long 1509 kmr_hash_key_opaque(
const unsigned char *p,
int n)
1511 unsigned long hash = 0;
1512 for (i = 0 ; i < n ; i++) {
1523 static inline unsigned long 1524 kmr_hash_key_opaque(
const unsigned char *p,
int n)
1526 unsigned long hash = 0;
1530 for (i = 0 ; i < n ; i++) {
1537 for (i = 0 ; i < n ; i += k) {
1552 static inline signed long 1555 switch (kvs->c.key_data) {
1557 xassert(kvs->c.key_data != KMR_KV_BAD);
1560 case KMR_KV_CSTRING:
1561 return (
signed long)kmr_hash_key_opaque((
void *)kv.k.p, kv.klen);
1562 case KMR_KV_INTEGER:
1566 case KMR_KV_POINTER_OWNED:
1567 case KMR_KV_POINTER_UNMANAGED:
1568 xassert(kvs->c.key_data != KMR_KV_POINTER_OWNED
1569 && kvs->c.key_data != KMR_KV_POINTER_UNMANAGED);
1582 kmr_stable_key_opaque(
const struct kmr_kv_box kv)
1584 unsigned char *p = (
unsigned char *)kv.k.p;
1586 unsigned long hash = 0;
1587 for (
int i = 0; i < (int)
sizeof(
long); i++) {
1588 unsigned char v = ((i < n) ? p[i] : 0);
1589 hash = ((hash << 8) + v);
1591 return (
long)(hash >> 1);
1602 switch (kvs->c.key_data) {
1604 xassert(kvs->c.key_data != KMR_KV_BAD);
1607 case KMR_KV_CSTRING:
1608 return kmr_stable_key_opaque(kv);
1609 case KMR_KV_INTEGER:
1614 long v1 = ((v0 >= 0L) ? v0 : ((-v0) | (1L << 63)));
1618 case KMR_KV_POINTER_OWNED:
1619 case KMR_KV_POINTER_UNMANAGED:
1620 xassert(kvs->c.key_data != KMR_KV_POINTER_OWNED
1621 && kvs->c.key_data != KMR_KV_POINTER_UNMANAGED);
1635 unsigned int nprocs = (
unsigned int)kvs->c.mr->nprocs;
1636 unsigned long v = (
unsigned long)kmr_hash_key(kvs, kv);
1637 unsigned int h = (((v >> 32) ^ v) & ((1L << 32) - 1));
1638 return (
int)(h % nprocs);
1643 #define KMR_CMP3(X, Y) (((X) == (Y)) ? 0 : ((X) < (Y)) ? -1 : 1) 1650 kmr_compare_lexicographically(
const unsigned char *p,
const int plen,
1651 const unsigned char *q,
const int qlen)
1653 int s = MIN(plen, qlen);
1655 for (
int i = 0; i < s; i++) {
1657 return (p[i] - q[i]);
1661 int cc = memcmp(p, q, (
size_t)s);
1665 return (plen - qlen);
1672 kmr_compare_opaque(
const struct kmr_kv_box *p,
1675 return kmr_compare_lexicographically((
unsigned char *)p->k.p, p->klen,
1676 (
unsigned char *)q->k.p, q->klen);
1680 kmr_compare_integer(
const struct kmr_kv_box *p0,
1683 return KMR_CMP3(p0->k.i, p1->k.i);
1687 kmr_compare_float8(
const struct kmr_kv_box *p0,
1690 return KMR_CMP3(p0->k.d, p1->k.d);
1699 struct kmr_kv_box b0 = kmr_pick_kv2(p0->e, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
1700 struct kmr_kv_box b1 = kmr_pick_kv2(p1->e, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
1701 return kmr_compare_lexicographically((
unsigned char *)b0.k.p, b0.klen,
1702 (
unsigned char *)b1.k.p, b1.klen);
1713 struct kmr_kv_box b0 = kmr_pick_kv2(p0->e, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
1714 struct kmr_kv_box b1 = kmr_pick_kv2(p1->e, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
1717 return KMR_CMP3(v0, v1);
1728 struct kmr_kv_box b0 = kmr_pick_kv2(p0->e, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
1729 struct kmr_kv_box b1 = kmr_pick_kv2(p1->e, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
1732 return KMR_CMP3(v0, v1);
1738 kmr_choose_sorter(
const KMR_KVS *kvs)
1740 switch (kvs->c.key_data) {
1742 xassert(kvs->c.key_data != KMR_KV_BAD);
1744 case KMR_KV_INTEGER:
1745 return kmr_compare_integer;
1747 return kmr_compare_float8;
1749 case KMR_KV_CSTRING:
1750 case KMR_KV_POINTER_OWNED:
1751 case KMR_KV_POINTER_UNMANAGED:
1752 return kmr_compare_opaque;
1762 static kmr_record_sorter_t
1763 kmr_choose_record_sorter(
const KMR_KVS *kvs)
1765 switch (kvs->c.key_data) {
1767 xassert(kvs->c.key_data != KMR_KV_BAD);
1769 case KMR_KV_INTEGER:
1771 return kmr_compare_record_integer_;
1774 return kmr_compare_record_float8_;
1776 case KMR_KV_CSTRING:
1777 case KMR_KV_POINTER_OWNED:
1778 case KMR_KV_POINTER_UNMANAGED:
1779 return kmr_compare_record_opaque;
1792 kmr_copy_record_shuffle_fn(
const struct kmr_kv_box kv,
1803 kmr_copy_record_sorting_fn(
const struct kmr_kv_box kv,
1821 return KMR_CMP3(v0, v1);
1828 kmr_assert_kvs_ok(kvi, kvo, 1, 1);
1829 assert(kmr_shuffle_compatible_p(kvo, kvi));
1830 KMR *mr = kvi->c.mr;
1833 if (mr->kmrviz_trace) {
1846 _Bool twostep = !mr->one_step_sort;
1847 _Bool primekey = ((kvi->c.key_data == KMR_KV_INTEGER)
1848 || (kvi->c.key_data == KMR_KV_FLOAT8));
1849 double timestamp[5];
1851 long cnt = kvi->c.element_count;
1852 timestamp[0] = MPI_Wtime();
1858 .nothreading = opt.nothreading
1861 cc =
kmr_map(kvi, 0, ev, inspect, kmr_copy_record_shuffle_fn);
1862 assert(cc == MPI_SUCCESS);
1864 cc =
kmr_map(kvi, 0, ev, inspect, kmr_copy_record_sorting_fn);
1865 assert(cc == MPI_SUCCESS);
1869 assert(cc == MPI_SUCCESS);
1871 timestamp[1] = MPI_Wtime();
1872 if (shuffling || twostep || primekey) {
1873 if (mr->single_thread || opt.nothreading) {
1877 mr->sort_threads_depth);
1880 timestamp[2] = MPI_Wtime();
1881 if (!shuffling && !primekey) {
1883 long *runs =
kmr_malloc(
sizeof(
long) * (
size_t)cnt);
1893 cc = KMR_CMP3(ev[i - 1].v, ev[i].v);
1895 assert(nruns < cnt);
1899 assert(i == cnt && (cnt == 0 || runs[nruns - 1] == cnt));
1901 nruns = (cnt == 0 ? 0 : 1);
1904 kmr_record_sorter_t cmp1 = kmr_choose_record_sorter(kvi);
1905 if (mr->single_thread || opt.nothreading) {
1906 for (
long k = 0; k < nruns; k++) {
1907 long j = (k == 0 ? 0 : runs[k - 1]);
1911 qsort(&ev[j], (
size_t)(i - j),
1913 (kmr_qsorter_t)cmp1);
1917 KMR_OMP_PARALLEL_FOR_
1918 for (
long k = 0; k < nruns; k++) {
1919 long j = (k == 0 ? 0 : runs[k - 1]);
1923 qsort(&ev[j], (
size_t)(i - j),
1925 (kmr_qsorter_t)cmp1);
1929 kmr_free(runs, (
sizeof(
long) * (
size_t)cnt));
1931 timestamp[3] = MPI_Wtime();
1932 size_t sz = kvi->c.storage_netsize;
1933 cc = kmr_allocate_block(kvo, sz);
1934 assert(cc == MPI_SUCCESS);
1936 for (
long i = 0 ; i < cnt; i++) {
1938 kmr_add_kv_nomutex(kvo, kv, 0, 0);
1940 timestamp[4] = MPI_Wtime();
1941 assert(sz == 0 || kmr_kvs_entry_tail_p(kvo->c.adding_point));
1942 assert(sz == 0 || kvo->c.block_count == 1);
1944 kmr_assert_on_tail_marker(kvo);
1947 _Bool tracing = mr->trace_sorting;
1948 if (tracing && (5 <= mr->verbosity)) {
1949 fprintf(stderr, (
";;KMR [%05d] kmr_sort_locally" 1950 " time=(%f %f %f %f) (msec)\n"),
1952 ((timestamp[1] - timestamp[0]) * 1e3),
1953 ((timestamp[2] - timestamp[1]) * 1e3),
1954 ((timestamp[3] - timestamp[2]) * 1e3),
1955 ((timestamp[4] - timestamp[3]) * 1e3));
1968 if (mr->kmrviz_trace) {
1996 kmr_assert_kvs_ok(kvi, kvo, 1, 1);
1997 assert(kmr_shuffle_compatible_p(kvo, kvi));
1998 struct kmr_option kmr_supported = {.nothreading = 1, .inspect = 1,
2000 kmr_check_fn_options(kvi->c.mr, kmr_supported, opt, __func__);
2001 _Bool ranking = opt.key_as_rank;
2002 kmr_sort_locally_lo(kvi, kvo, shuffling, ranking, opt);
2013 kmr_count_entries(
KMR_KVS *kvs, _Bool bound_in_block)
2015 kvs->c.current_block = kvs->c.first_block;
2016 struct kmr_kvs_entry *e = kmr_kvs_first_entry(kvs, kvs->c.first_block);
2022 e = kmr_kvs_next(kvs, e, bound_in_block);
2038 kmr_assert_kvs_ok(kvi, kvo, 1, 1);
2039 assert(kmr_shuffle_compatible_p(kvo, kvi));
2040 KMR *mr = kvi->c.mr;
2041 struct kmr_option kmr_supported = {.inspect = 1, .key_as_rank = 1,
2043 kmr_check_fn_options(mr, kmr_supported, opt, __func__);
2044 _Bool ranking = opt.key_as_rank;
2047 if (mr->kmrviz_trace) {
2054 if (kvi->c.magic == KMR_KVS_PUSHOFF) {
2055 kmr_pushoff_make_stationary(kvi);
2057 if (kvi->c.shuffled_in_pushoff) {
2058 assert(!mr->ckpt_enable);
2076 enum kmr_kv_field keyf = kmr_unit_sized_or_opaque(kvi->c.key_data);
2077 enum kmr_kv_field valf = kmr_unit_sized_or_opaque(kvi->c.value_data);
2081 kmr_sort_locally_lo(kvi, kvs1, 1, ranking, n_opt);
2082 assert(kvs1->c.stowed);
2085 assert(!kmr_fields_pointer_p(kvs1));
2086 assert(kvs1->c.block_count <= 1);
2089 int nprocs = mr->nprocs;
2090 long cnt = kvs1->c.element_count;
2091 long *ssz =
kmr_malloc(
sizeof(
long) * (
size_t)nprocs);
2092 long *sdp =
kmr_malloc(
sizeof(
long) * (
size_t)nprocs);
2093 long *rsz =
kmr_malloc(
sizeof(
long) * (
size_t)nprocs);
2094 long *rdp =
kmr_malloc(
sizeof(
long) * (
size_t)nprocs);
2095 for (
int r = 0; r < nprocs; r++) {
2100 assert(kvs1->c.current_block == 0);
2101 kvs1->c.current_block = kvs1->c.first_block;
2102 struct kmr_kvs_entry *e = kmr_kvs_first_entry(kvs1, kvs1->c.first_block);
2103 for (
long i = 0; i < cnt; i++) {
2106 int r = (ranking ? (int)kv.k.i : kmr_pitch_rank(kv, kvs1));
2107 assert(0 <= r && r < nprocs);
2108 if (ranking && !(0 <= r && r < nprocs)) {
2109 kmr_error(mr,
"key entries are not ranks");
2112 kmr_error(mr,
"key-value entries are not sorted (internal error)");
2114 ssz[r] += (long)kmr_kvs_entry_netsize(e);
2116 e = kmr_kvs_next(kvs1, e, 0);
2120 assert(cc == MPI_SUCCESS);
2123 for (
int r = 0; r < nprocs; r++) {
2129 cc = kmr_allocate_block(kvo, (
size_t)recvsz);
2130 assert(cc == MPI_SUCCESS);
2136 assert(cc == MPI_SUCCESS);
2137 long ocnt = kmr_count_entries(kvo, 1);
2138 assert(kvo->c.sorted == 0);
2139 kvo->c.element_count = ocnt;
2141 assert(kvo->c.block_count == 1);
2142 rb->partial_element_count = ocnt;
2143 rb->fill_size = (size_t)recvsz;
2145 kmr_kvs_adjust_adding_point(kvo);
2157 assert(kvo->c.element_count == 0 || kvo->c.storage_netsize != 0);
2158 xassert(!kmr_fields_pointer_p(kvo));
2160 kmr_free(ssz, (
sizeof(
long) * (
size_t)nprocs));
2161 kmr_free(sdp, (
sizeof(
long) * (
size_t)nprocs));
2162 kmr_free(rsz, (
sizeof(
long) * (
size_t)nprocs));
2163 kmr_free(rdp, (
sizeof(
long) * (
size_t)nprocs));
2167 if (mr->kmrviz_trace) {
2184 kmr_assert_kvs_ok(kvi, kvo, 1, 1);
2185 KMR *mr = kvi->c.mr;
2186 struct kmr_option kmr_supported = {.inspect = 1, .rank_zero = 1,
2188 kmr_check_fn_options(mr, kmr_supported, opt, __func__);
2189 int nprocs = mr->nprocs;
2190 int rank = mr->rank;
2195 if (mr->kmrviz_trace) {
2200 if (kmr_fields_pointer_p(kvi) || kvi->c.block_count > 1) {
2206 cc = kmr_collapse_as_opaque(kvi, kvs1, 1);
2207 assert(cc == MPI_SUCCESS);
2212 kmr_assert_on_tail_marker(kvs1);
2213 assert(kvs1->c.block_count <= 1);
2220 assert(cc == MPI_SUCCESS);
2224 assert(cc == MPI_SUCCESS);
2230 long *rsz =
kmr_malloc(
sizeof(
long) * (
size_t)nprocs);
2231 long *rdp =
kmr_malloc(
sizeof(
long) * (
size_t)nprocs);
2233 long ssz = (long)kvs1->c.storage_netsize;
2235 assert(cc == MPI_SUCCESS);
2237 if (!opt.rank_zero || rank == 0) {
2238 for (
int r = 0; r < nprocs; r++) {
2243 if (!(kvo->c.key_data == kvs1->c.key_data
2244 && kvo->c.value_data == kvs1->c.value_data)) {
2245 kmr_error(mr,
"key-data or value-data types mismatch");
2247 cc = kmr_allocate_block(kvo, (
size_t)recvsz);
2248 assert(cc == MPI_SUCCESS);
2253 cc =
kmr_allgatherv(mr, opt.rank_zero, sbuf, ssz, rbuf, rsz, rdp);
2254 assert(cc == MPI_SUCCESS);
2255 assert(kvo->c.element_count == 0);
2256 long ocnt = kmr_count_entries(kvo, 1);
2257 kvo->c.element_count = ocnt;
2259 rb->partial_element_count = ocnt;
2260 rb->fill_size = (size_t)recvsz;
2262 kmr_kvs_adjust_adding_point(kvo);
2264 assert(cc == MPI_SUCCESS);
2265 kmr_assert_on_tail_marker(kvo);
2266 assert(kvo->c.element_count == 0 || kvo->c.storage_netsize != 0);
2274 assert(cc == MPI_SUCCESS);
2278 assert(cc == MPI_SUCCESS);
2281 kmr_free(rsz, (
sizeof(
long) * (
size_t)nprocs));
2282 kmr_free(rdp, (
sizeof(
long) * (
size_t)nprocs));
2287 if (mr->kmrviz_trace) {
2300 kmr_copy_record_fn(
const struct kmr_kv_box kv,
2316 xassert(kvi->c.current_block == 0);
2317 kmr_sorter_t cmp = kmr_choose_sorter(kvi);
2318 KMR *mr = kvi->c.mr;
2319 long cnt = kvi->c.element_count;
2323 kvi->c.current_block = kvi->c.first_block;
2324 struct kmr_kvs_entry *e = kmr_kvs_first_entry(kvi, kvi->c.first_block);
2336 while (index < start_from) {
2337 e = kmr_kvs_next(kvi, e, 1);
2347 assert(index < cnt);
2348 e = kmr_kvs_next(kvi, e, 1);
2358 cc = (*cmp)(&kv0, &kv1);
2364 assert(ej == 0 && e == 0);
2374 for (
long i = 0; i < n; i++) {
2377 e = kmr_kvs_next(kvi, e, 1);
2380 double t0 = ((mr->log_traces == 0) ? 0.0 : MPI_Wtime());
2381 cc = (*r)(ev, n, kvi, kvo, arg);
2382 double t1 = ((mr->log_traces == 0) ? 0.0 : MPI_Wtime());
2383 if (cc != MPI_SUCCESS) {
2385 snprintf(ee,
sizeof(ee),
2386 "Reduce-fn returned with error cc=%d", cc);
2389 if (mr->log_traces != 0) {
2390 kmr_log_reduce(mr, kvi, ev, n, r, (t1 - t0));
2399 assert(index == cnt);
2407 kmr_free(ev, (
sizeof(
struct kmr_kv_box) * (
size_t)evsz));
2413 kmr_reduce_threading(_Bool stop_when_some_added,
2428 .nothreading = opt.nothreading
2430 assert(kvi->c.current_block == 0);
2431 long cnt = kvi->c.element_count;
2435 cc =
kmr_map(kvi, 0, ev, inspect, kmr_copy_record_fn);
2436 assert(cc == MPI_SUCCESS);
2439 kmr_sorter_t cmp = kmr_choose_sorter(kvi);
2440 long *runs =
kmr_malloc(
sizeof(
long) * (
size_t)cnt);
2456 cc = (*cmp)(&ev[i - 1], &ev[i]);
2459 assert(nruns < cnt);
2463 assert(i == cnt && (cnt == 0 || runs[nruns - 1] == cnt));
2467 kvo->c.under_threaded_operation = 1;
2469 KMR *mr = kvi->c.mr;
2471 KMR_OMP_PARALLEL_FOR_
2472 for (
long k = 0; k < nruns; k++) {
2475 long j = (k == 0 ? 0 : runs[k - 1]);
2478 double t0 = ((mr->log_traces == 0) ? 0.0 : MPI_Wtime());
2479 int ccx = (*r)(&ev[j], (i - j), kvi, kvo, arg);
2480 double t1 = ((mr->log_traces == 0) ? 0.0 : MPI_Wtime());
2481 if (ccx != MPI_SUCCESS) {
2483 snprintf(ee,
sizeof(ee),
2484 "Reduce-fn returned with error cc=%d", ccx);
2487 if (mr->log_traces != 0) {
2488 kmr_log_reduce(mr, kvi, ev, (i - j), r, (t1 - t0));
2498 if (stop_when_some_added) {
2500 if (mr->stop_at_some_check_globally) {
2503 done = (kvo->c.element_count != 0);
2515 kvo->c.under_threaded_operation = 0;
2527 kmr_free(runs, (
sizeof(
long) * (
size_t)cnt));
2528 kmr_free(ev, (
sizeof(
struct kmr_kv_box) * (
size_t)cnt));
2552 const char *file,
const int line,
const char *func)
2554 kmr_assert_kvs_ok(kvi, kvo, 1, 0);
2555 KMR *mr = kvi->c.mr;
2556 struct kmr_option kmr_supported = {.nothreading = 1, .inspect = 1,
2558 kmr_check_fn_options(mr, kmr_supported, opt, __func__);
2559 struct kmr_option i_opt = kmr_copy_options_i_part(opt);
2560 struct kmr_option o_opt = kmr_copy_options_o_part(opt);
2563 if (mr->kmrviz_trace) {
2573 enum kmr_kv_field keyf = kmr_unit_sized_or_opaque(kvi->c.key_data);
2574 enum kmr_kv_field valf = kmr_unit_sized_or_opaque(kvi->c.value_data);
2579 kmr_sort_locally_lo(kvi, kvs1, 0, 0, i_opt);
2585 if (mr->atwork == 0) {
2592 if (mr->single_thread || opt.nothreading) {
2593 cc = kmr_reduce_nothreading(kvs1, kvo, arg, o_opt, r);
2595 cc = kmr_reduce_threading(stop_when_some_added,
2596 kvs1, kvo, arg, o_opt, r);
2598 if (mr->atwork == &info) {
2603 kmr_assert_on_tail_marker(kvs1);
2610 if (mr->kmrviz_trace) {
2628 kmr_assert_kvs_ok(kvi, kvo, 1, 0);
2629 KMR *mr = kvi->c.mr;
2630 assert(kvi->c.current_block == 0);
2631 struct kmr_option kmr_supported = {.inspect = 1, .take_ckpt = 1};
2632 kmr_check_fn_options(mr, kmr_supported, opt, __func__);
2647 long cnt = kvi->c.element_count;
2655 cc =
kmr_map(kvi, 0, ev, inspect, kmr_copy_record_fn);
2656 assert(cc == MPI_SUCCESS);
2658 cc = (*r)(&ev[0], cnt, kvi, kvo, arg);
2659 if (cc != MPI_SUCCESS) {
2661 snprintf(ee,
sizeof(ee),
2662 "Reduce-fn returned with error cc=%d", cc);
2675 kmr_free(ev, (
sizeof(
struct kmr_kv_box) * (
size_t)cnt));
2699 for (
int i = 0; i < nkvs; i++) {
2700 kmr_assert_i_kvs_ok(kvs[i], 1);
2702 kmr_assert_o_kvs_ok(kvo, 1);
2703 if (kvo->c.element_count > 0) {
2704 KMR *mr = kvo->c.mr;
2705 kmr_error(mr,
"kmr_concatenate_kvs: Output kvs has entries");
2707 kmr_check_fn_options(kvo->c.mr, kmr_noopt, opt, __func__);
2715 for (
int i = 0; i < nkvs; i++) {
2716 elements += kvs[i]->c.element_count;
2717 netsize += kvs[i]->c.storage_netsize;
2718 blocks += kvs[i]->c.block_count;
2722 kvs[i]->c.first_block = 0;
2724 assert(storage == 0);
2728 assert(blocks != 0 && p->next == 0);
2733 while (p->next != 0) {
2740 kvo->c.first_block = storage;
2741 kvo->c.element_count = elements;
2742 kvo->c.storage_netsize = netsize;
2743 kvo->c.block_count = blocks;
2747 kvo->c.current_block = 0;
2748 kvo->c.adding_point = 0;
2749 assert(kvo->c.block_count == 0 || kvo->c.first_block != 0);
2761 kmr_assert_kvs_ok(kvs, 0, 1, 0);
2762 assert(kvs->c.magic == KMR_KVS_ONCORE);
2764 long cnt = kvs->c.element_count;
2765 kvs->c.current_block = kvs->c.first_block;
2766 struct kmr_kvs_entry *e = kmr_kvs_first_entry(kvs, kvs->c.first_block);
2768 for (
long i = 0; i < cnt && e != 0; i++) {
2770 e = kmr_kvs_next(kvs, e, 0);
2772 kvs->c.current_block = 0;
2775 if (kvs->c.element_count == 0) {
2779 for (b = kvs->c.first_block; b->next != 0; b = b->next);
2780 kvs->c.current_block = b;
2783 e = kmr_kvs_first_entry(kvs, b);
2785 long cnt = b->partial_element_count;
2786 for (
long i = 0; i < cnt && e != 0; i++) {
2788 e = kmr_kvs_next(kvs, e, 1);
2790 kvs->c.current_block = 0;
2803 kmr_assert_kvs_ok(kvs, 0, 1, 0);
2804 assert(kvs->c.magic == KMR_KVS_ONCORE);
2805 long cnt = MIN(n, kvs->c.element_count);
2806 kvs->c.current_block = kvs->c.first_block;
2807 struct kmr_kvs_entry *e = kmr_kvs_first_entry(kvs, kvs->c.first_block);
2808 for (
long i = 0; i < cnt && e != 0; i++) {
2810 e = kmr_kvs_next(kvs, e, 0);
2812 kvs->c.current_block = 0;
2821 long n, _Bool shuffling, _Bool ranking)
2823 kmr_assert_kvs_ok(kvs, 0, 1, 0);
2824 assert(kvs->c.magic == KMR_KVS_ONCORE);
2825 long cnt = MIN(n, kvs->c.element_count);
2826 kvs->c.current_block = kvs->c.first_block;
2827 struct kmr_kvs_entry *e = kmr_kvs_first_entry(kvs, kvs->c.first_block);
2828 for (
long i = 0; i < cnt && e != 0; i++) {
2831 ev[i].v = (ranking ? kv.k.i : kmr_pitch_rank(kv, kvs));
2834 ev[i].v = kmr_stable_key(kv, kvs);
2837 e = kmr_kvs_next(kvs, e, 0);
2839 kvs->c.current_block = 0;
2851 kmr_error(mr,
"kmr_legal_minimum_field_size: Bad field");
2853 case KMR_KV_INTEGER:
2854 return sizeof(long);
2856 return sizeof(double);
2858 case KMR_KV_CSTRING:
2859 case KMR_KV_POINTER_OWNED:
2860 case KMR_KV_POINTER_UNMANAGED:
2863 kmr_error(mr,
"kmr_legal_minimum_field_size: Bad field");
2884 KMR *mr = kvo->c.mr;
2888 long cnt = kvi->c.element_count;
2892 assert(cc == MPI_SUCCESS);
2895 for (
long i = 0; i < cnt; i++) {
2898 assert(cc == MPI_SUCCESS);
2903 cc = (*r)(bx, 2, kvi, xs, 0);
2904 if (cc != MPI_SUCCESS) {
2906 snprintf(ee,
sizeof(ee),
2907 "Reduce-fn returned with error cc=%d", cc);
2911 assert(cc == MPI_SUCCESS);
2913 bx[0].klen = bx[1].klen;
2916 assert(cc == MPI_SUCCESS);
2921 assert(cc == MPI_SUCCESS);
2923 if (carryout != 0) {
2926 assert(cc == MPI_SUCCESS);
2928 assert(cc == MPI_SUCCESS);
2930 assert(cc == MPI_SUCCESS);
int kmr_free_kvs(KMR_KVS *kvs)
Releases a key-value stream (type KMR_KVS).
int kmr_ckpt_progress_init(KMR_KVS *, KMR_KVS *, struct kmr_option)
It initializes a progress of MapReduce checkpointing.
Key-Value Stream (abstract).
int kmr_add_kv1(KMR_KVS *kvs, void *k, int klen, void *v, int vlen)
Adds a key-value pair as given directly by a pointer.
int kmr_save_kvs(KMR_KVS *kvs, void **dataq, size_t *szq, struct kmr_option opt)
Packs locally the contents of a key-value stream to a byte array.
kmr_trace_entry_t * kmr_trace_add_entry(KMR *mr, kmr_trace_event_t ev, kmr_trace_entry_t *pre, KMR_KVS *kvi, KMR_KVS *kvo)
Add an entry to the trace.
int kmr_shuffle(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Shuffles key-value pairs to the appropriate destination ranks.
KMR * kmr_create_context(const MPI_Comm comm, const MPI_Info conf, const char *identifying_name)
Makes a new KMR context (a context has type KMR).
Utilities Private Part (do not include from applications).
int kmr_add_identity_fn(const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long i)
Adds a given key-value pair unmodified.
Options to Mapping, Shuffling, and Reduction.
void kmr_ckpt_progress_fin(KMR *)
It finalizes the progress of MapReduce checkpointing.
static KMR_KVS * kmr_create_raw_kvs(KMR *mr, const KMR_KVS *_similar)
Makes a new key-value stream (type KMR_KVS).
void kmr_ckpt_restore_ckpt(KMR_KVS *)
It restores checkpoint data to kvs.
int kmr_map_skipping(long from, long stride, long limit, _Bool stop_when_some_added, KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Maps by skipping the number of entries.
int kmr_add_string(KMR_KVS *kvs, const char *k, const char *v)
Adds a key-value pair of strings.
static int kmr_icmp(const void *a0, const void *a1)
Compares the key field of keyed-records for qsort/bsearch.
#define kmr_malloc(Z)
Allocates memory, or aborts when failed.
#define kmr_create_kvs(MR, KF, VF)
Makes a new key-value stream (of type KMR_KVS) with the specified field datatypes.
int kmr_sort_locally(KMR_KVS *kvi, KMR_KVS *kvo, _Bool shuffling, struct kmr_option opt)
Reorders key-value pairs in a single rank.
int kmr_map9(_Bool stop_when_some_added, KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m, const char *file, const int line, const char *func)
Maps simply.
int kmr_ckpt_enabled(KMR *)
Check if checkpoint/restart is enabled.
static const size_t kmr_kvs_entry_header
Size of an Entry Header.
Keyed-Record for Sorting.
int kmr_add_kv_space(KMR_KVS *kvs, const struct kmr_kv_box kv, void **keyp, void **valuep)
Adds a key-value pair, but only allocates a space and returns the pointers to the key and the value p...
long kmr_ckpt_first_unprocessed_kv(KMR *)
It returns the index of the first unprocessed key-value in the input KVS.
int kmr_move_kvs(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Moves the contents of the input KVI to the output KVO.
int kmr_map_rank_by_rank(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Maps sequentially with rank by rank for debugging.
struct kmr_kvs_entry * kmr_find_kvs_last_entry(KMR_KVS *kvs)
Finds the last entry of a key-value stream.
void kmr_ckpt_save_kvo_whole(KMR *, KMR_KVS *)
It saves all key-value pairs in the output KVS to a checkpoint data file.
int kmr_allgatherv(KMR *mr, _Bool rankzeroonly, void *sbuf, long scnt, void *rbuf, long *rcnts, long *rdsps)
All-gathers data, or gathers data when RANKZEROONLY.
void kmr_ckpt_save_kvo_block_add(KMR *, KMR_KVS *, long)
It adds a new block of key-value pairs of the output KVS to the checkpoint data file.
int kmr_map_on_rank_zero(KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Maps on rank0 only.
kmr_kv_field
Datatypes of Keys or Values.
void kmr_isort(void *a, size_t n, size_t es, int depth)
Sorts by comparator on long integers.
int kmr_take_one(KMR_KVS *kvi, struct kmr_kv_box *kv)
Extracts a single key-value pair locally in the key-value stream KVI.
int kmr_concatenate_kvs(KMR_KVS *kvs[], int nkvs, KMR_KVS *kvo, struct kmr_option opt)
Concatenates a number of KVSes to one.
void kmr_ckpt_create_context(KMR *)
Initialize checkpoint context.
#define kmr_map(KVI, KVO, ARG, OPT, M)
Maps simply.
Handy Copy of a Key-Value Field.
int kmr_reduce9(_Bool stop_when_some_added, KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_redfn_t r, const char *file, const int line, const char *func)
Reduces key-value pairs.
Options to Mapping by Spawns.
void kmr_ckpt_save_kvo_block_fin(KMR *, KMR_KVS *)
It finalizes saving block of key-value pairs of the output KVS to the checkpoint data file...
State during kmr_map_ms().
void kmr_ckpt_free_context(KMR *)
Free checkpoint context.
int kmr_add_kv_done(KMR_KVS *kvs)
Marks finished adding key-value pairs.
int kmr_ckpt_disable_ckpt(KMR *)
It temporally disables checkpoint/restart.
int kmr_add_kv_pushoff(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
void kmr_trace_initialize(KMR *mr)
Initialize a trace.
int kmr_assert_sorted(KMR_KVS *kvi, _Bool locally, _Bool shuffling, _Bool ranking)
Checks a key-value stream is sorted.
void kmr_trace_finalize(KMR *mr)
Finalize a trace.
void kmr_ckpt_save_kvo_block_init(KMR *, KMR_KVS *)
It initializes saving blocks of key-value pairs of the output KVS to a checkpoint data file...
int kmr_retrieve_keyed_records(KMR_KVS *kvs, struct kmr_keyed_record *ev, long n, _Bool shuffling, _Bool ranking)
Fills keyed records in an array for sorting.
#define xassert(X)
Asserts and aborts, but it cannot be disabled.
#define kmr_realloc(P, Z)
Allocates memory, or aborts when failed.
int kmr_scan_locally(KMR_KVS *kvi, KMR_KVS *carryin, KMR_KVS *kvo, KMR_KVS *carryout, kmr_redfn_t r)
Scans every key-value with a reduce-function locally (independently on each rank).
int kmr_retrieve_kvs_entries(KMR_KVS *kvs, struct kmr_kvs_entry **ev, long n)
Fills local key-value entries in an array for inspection.
int kmr_add_kv_done_pushoff(KMR_KVS *kvs)
Marks finished adding key-value pairs, called from kmr_add_kv_done().
Options to Mapping on Files.
int kmr_fin(void)
Clears the environment.
int kmr_replicate(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Replicates key-value pairs to be visible on all ranks, that is, it has the effect of bcast or all-gat...
static struct kmr_kv_box kmr_pick_kv(struct kmr_kvs_entry *e, KMR_KVS *kvs)
Returns a handle to a key-value entry – a reverse of kmr_poke_kv().
int(* kmr_redfn_t)(const struct kmr_kv_box kv[], const long n, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg)
Reduce-function Type.
int kmr_gather_sizes(KMR *mr, long siz, long *rbuf)
Calls all-gather for collecting one long-integer.
int kmr_map_once(KMR_KVS *kvo, void *arg, struct kmr_option opt, _Bool rank_zero_only, kmr_mapfn_t m)
Maps once.
int kmr_ckpt_enable_ckpt(KMR *, int)
It temporally enables checkpoint/restart which has been disabled by calling kmr_ckpt_disable_ckpt().
static void kmr_poke_kv(struct kmr_kvs_entry *e, const struct kmr_kv_box kv, struct kmr_kv_box *xkv, const KMR_KVS *kvs, _Bool reserve_space_only)
Stores a key-value pair at the entry E in the store – a reverse of kmr_pick_kv().
Information of Source Code Line.
int(* kmr_mapfn_t)(const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long index)
Map-function Type.
void kmr_ckpt_lock_start(KMR *)
Define the start position of code region that is referred when restart.
int kmr_copy_mpi_info(MPI_Info src, MPI_Info dst)
Copies contents of MPI_Info.
KMR_KVS * kmr_create_kvs7(KMR *mr, enum kmr_kv_field kf, enum kmr_kv_field vf, struct kmr_option opt, const char *file, const int line, const char *func)
Makes a new key-value stream with the specified field data-types.
int kmr_alltoallv(KMR *mr, void *sbuf, long *scounts, long *sdsps, void *rbuf, long *rcounts, long *rdsps)
Does all-to-all-v, but it takes arguments of long-integers.
int kmr_restore_kvs(KMR_KVS *kvo, void *data, size_t sz_, struct kmr_option opt)
Unpacks locally the contents of a key-value stream from a byte array.
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
int kmr_free_context(KMR *mr)
Releases a context created with kmr_create_context().
int kmr_reduce_as_one(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_redfn_t r)
Calls a reduce-function once as if all key-value pairs had the same key.
void kmr_ckpt_remove_ckpt(KMR_KVS *)
It removes checkpoint data file.
void kmr_ckpt_lock_finish(KMR *)
Define the end position of code region that is referred when restart.
int kmr_exchange_sizes(KMR *mr, long *sbuf, long *rbuf)
Calls all-to-all to exchange one long-integer.
int kmr_legal_minimum_field_size(KMR *mr, enum kmr_kv_field f)
Returns a minimum byte size of the field: 8 for INTEGER and FLOAT8, 0 for others. ...