18 #define MIN(a,b) (((a)<(b))?(a):(b)) 19 #define MAX(a,b) (((a)>(b))?(a):(b)) 28 kmr_sorter_t cmp = kmr_choose_sorter(kvs);
29 if (cmp(&kv0, &kv) == 0) {
45 enum kmr_kv_field keyf = kmr_unit_sized_with_unmanaged(kvi->c.key_data);
46 enum kmr_kv_field valf = kmr_unit_sized_with_unmanaged(kvi->c.value_data);
49 int cc =
kmr_map(kvi, kvs0, &ki, insepct, kmr_find_key_fn);
50 if (kvs0->c.element_count == 1) {
52 assert(cc == MPI_SUCCESS);
54 cc = ((kvs0->c.element_count == 0) ? MPI_ERR_ARG : MPI_ERR_COUNT);
58 snprintf(ee, 80,
"kmr_find_key with not one key (keys=%ld)",
59 kvs0->c.element_count);
75 assert(k != 0 && vq != 0);
76 assert(kvi->c.key_data == KMR_KV_OPAQUE
77 || kvi->c.key_data == KMR_KV_CSTRING);
78 int klen = ((int)strlen(k) + 1);
79 struct kmr_kv_box ki = {.klen = klen, .k.p = k, .vlen = 0, .v.p = 0};
82 if (cc != MPI_SUCCESS) {
83 if (cc == MPI_ERR_ARG) {
84 kmr_warning(kvi->c.mr, 1,
"kmr_find_key no key found");
86 kmr_warning(kvi->c.mr, 1,
"kmr_find_key multiple keys found");
97 kmr_get_element_count_fn(
const struct kmr_kv_box kv[],
const long n,
101 assert(kvo == 0 && kvs->c.mr->nprocs == n && *cntq == 0);
103 for (
int i = 0; i < n; i++) {
116 kmr_assert_kvs_ok(kvs, 0, 1, 0);
121 struct kmr_kv_box kv = {.klen = (int)
sizeof(
long),
122 .vlen = (int)
sizeof(
long),
124 .v.i = kvs->c.element_count};
126 assert(cc == MPI_SUCCESS);
131 assert(cc == MPI_SUCCESS);
133 cc =
kmr_reduce(kvs1, 0, &cnt, kmr_noopt, kmr_get_element_count_fn);
134 assert(cc == MPI_SUCCESS);
161 return kmr_map(kvi, kvo, 0, opt, kmr_reverse_fn);
172 assert(kvo->c.value_data == KMR_KV_OPAQUE
173 || kvo->c.value_data == KMR_KV_CSTRING);
174 size_t sz = kmr_kvs_entry_netsize_of_box(kv);
175 assert(kmr_check_alignment(sz));
178 if (sz <=
sizeof(buf)) {
183 kmr_poke_kv2(e, kv, 0, keyf, valf, 0);
189 if (e != (
void *)buf) {
200 cc = kmr_add_pairing_under_key(kvo, kv.klen, kv.k, kv,
201 kvi->c.key_data, kvi->c.value_data);
202 assert(cc == MPI_SUCCESS);
214 return kmr_map(kvi, kvo, 0, opt, kmr_pairing_fn);
236 return kmr_map(kvs, kvo, 0, opt, kmr_unpairing_fn);
246 assert(kvo == 0 && p != 0);
249 long *mm = (
void *)kv.v.p;
250 long *ss = &samples2[2 * rank];
260 kmr_partition_by_ranking_fn(
const struct kmr_kv_box kv,
266 int nprocs = mr->nprocs;
268 long minkey = range[0];
269 long maxkey = range[1];
270 long v = kmr_stable_key(kv, kvi);
271 assert(minkey <= v && v < (maxkey + 1));
272 long d = (((maxkey + 1) - minkey + nprocs - 1) / nprocs);
273 int rank = (int)((v - minkey) / d);
274 assert(0 <= rank && rank < nprocs);
276 cc = kmr_add_pairing_under_key(kvo,
sizeof(
long), k, kv,
277 kvi->c.key_data, kvi->c.value_data);
278 assert(cc == MPI_SUCCESS);
292 int nprocs = mr->nprocs;
293 kmr_sorter_t cmp = kmr_choose_sorter(kvi);
297 long rank = (q - bv);
298 assert(0 <= rank && rank < nprocs);
300 kmr_add_pairing_under_key(kvo,
sizeof(
long), k, kv,
301 kvi->c.key_data, kvi->c.value_data);
312 assert(nsamples >= 0);
315 long cnt = kvi->c.element_count;
316 long stride = ((cnt < nsamples) ? 1 : (cnt / nsamples));
317 long limit = (stride * nsamples);
320 assert(cc == MPI_SUCCESS);
321 assert(kvo->c.element_count == MIN(cnt, nsamples));
324 assert(cc == MPI_SUCCESS);
332 assert(nsamples >= 0);
335 long cnt = kvi->c.element_count;
336 assert(cnt >= nsamples);
337 long start, stride, limit;
338 if (cnt == nsamples) {
343 stride = (cnt / (nsamples + 1));
345 limit = (start + (stride * nsamples));
350 assert(cc == MPI_SUCCESS);
358 kmr_minmax2_fn(
const struct kmr_kv_box kv[],
const long n,
362 long range0 = LONG_MAX;
363 long range1 = LONG_MIN;
364 for (
int i = 0; i < n; i++) {
365 long *mm = (
void *)kv[i].v.p;
390 kmr_assert_kvs_ok(kvi, kvo, 1, 1);
391 assert(kmr_shuffle_compatible_p(kvo, kvi));
393 struct kmr_option kmr_supported = {.nothreading = 1, .inspect = 1};
394 kmr_check_fn_options(mr, kmr_supported, opt, __func__);
398 struct kmr_option i_opt = kmr_copy_options_i_part(opt);
399 struct kmr_option o_opt = kmr_copy_options_o_part(opt);
400 struct kmr_option m_opt = kmr_copy_options_m_part(opt);
405 assert(cc == MPI_SUCCESS);
408 long cnt = kvs1->c.element_count;
412 }
else if (cnt == 1) {
414 e0 = kmr_kvs_first_entry(kvs1, kvs1->c.first_block);
416 long v = kmr_stable_key(b0, kvs1);
421 e0 = kmr_kvs_first_entry(kvs1, kvs1->c.first_block);
425 mm[0] = kmr_stable_key(b0, kvs1);
426 mm[1] = kmr_stable_key(b1, kvs1);
430 .klen = (int)
sizeof(
long),
431 .vlen = (int)
sizeof(
long[2]),
436 assert(cc == MPI_SUCCESS);
438 assert(cc == MPI_SUCCESS);
441 assert(cc == MPI_SUCCESS);
442 long range[2] = {LONG_MAX, LONG_MIN};
444 assert(cc == MPI_SUCCESS);
447 cc =
kmr_map(kvs1, kvs4, range, m_opt, kmr_partition_by_ranking_fn);
448 assert(cc == MPI_SUCCESS);
450 struct kmr_option ranking = {.key_as_rank = 1};
452 assert(cc == MPI_SUCCESS);
455 assert(cc == MPI_SUCCESS);
458 assert(cc == MPI_SUCCESS);
471 kmr_assert_kvs_ok(kvi, kvo, 1, 1);
472 assert(kmr_shuffle_compatible_p(kvo, kvi));
475 struct kmr_option kmr_supported = {.nothreading = 1, .inspect = 1};
476 kmr_check_fn_options(mr, kmr_supported, opt, __func__);
477 int nprocs = mr->nprocs;
478 struct kmr_option i_opt = kmr_copy_options_i_part(opt);
479 struct kmr_option o_opt = kmr_copy_options_o_part(opt);
480 struct kmr_option m_opt = kmr_copy_options_m_part(opt);
482 long factor = mr->sort_sample_factor;
484 if (nprocs < factor) {
485 nsamples = (nprocs - 1);
487 nsamples = MAX(10, ((factor*factor) / nprocs));
489 if (mr->verbosity >= 7) {
492 snprintf(ee,
sizeof(ee),
"sort-oversampling=%e",
493 ((
double)nsamples)/nprocs);
494 kmr_warning(mr, 7, ee);
501 cc = kmr_sample_kv(nsamples, kvi, kvs1);
502 assert(kvs1->c.element_count == MIN(kvi->c.element_count, nsamples));
505 assert(cc == MPI_SUCCESS);
508 assert(cc == MPI_SUCCESS);
510 int nbreaks = (nprocs - 1);
511 assert(kvs3->c.element_count >= nbreaks);
513 cc = kmr_sample_to_array(nbreaks, kvs3, bv);
514 assert(cc == MPI_SUCCESS);
517 size_t prealloc = ((size_t)(16 * kvi->c.element_count) + kvi->c.storage_netsize);
518 cc = kmr_allocate_block(kvs4, prealloc);
519 assert(cc == MPI_SUCCESS);
520 cc =
kmr_map(kvi, kvs4, bv, i_opt, kmr_rank_for_sort);
521 assert(cc == MPI_SUCCESS);
522 kmr_free(bv, (
sizeof(
struct kmr_kv_box) * (
size_t)nbreaks));
524 assert(cc == MPI_SUCCESS);
526 struct kmr_option ranking = {.key_as_rank = 1};
528 assert(cc == MPI_SUCCESS);
531 assert(cc == MPI_SUCCESS);
534 assert(cc == MPI_SUCCESS);
546 kmr_assert_kvs_ok(kvi, kvo, 1, 1);
547 assert(kmr_shuffle_compatible_p(kvo, kvi));
550 struct kmr_option kmr_supported = {.inspect = 1};
551 kmr_check_fn_options(mr, kmr_supported, opt, __func__);
552 struct kmr_option i_opt = kmr_copy_options_i_part(opt);
554 struct kmr_option o_opt = kmr_copy_options_o_part(opt);
559 assert(cc == MPI_SUCCESS);
562 assert(cc == MPI_SUCCESS);
577 kmr_assert_kvs_ok(kvi, kvo, 1, 1);
578 struct kmr_option kmr_supported = {.inspect = 1};
579 kmr_check_fn_options(kvi->c.mr, kmr_supported, opt, __func__);
580 const long lb = kvi->c.mr->sort_trivial;
581 const long threshold = kvi->c.mr->sort_threshold;
585 assert(cc == MPI_SUCCESS);
588 }
else if (cnt <= (threshold * kvi->c.mr->nprocs)) {
603 long *tagp = (
long *)p;
605 struct kmr_kv_box v = {.klen = (int)
sizeof(
long), .vlen = kv.vlen,
606 .k.i = tag, .v = kv.v};
607 cc = kmr_add_pairing_under_key(kvo, kv.klen, kv.k, v,
608 KMR_KV_INTEGER, kvi->c.value_data);
609 assert(cc == MPI_SUCCESS);
622 kmr_make_product_fn(
const struct kmr_kv_box kv[],
const long n,
632 for (
long i = 0; i < n; i++) {
634 struct kmr_kv_box vkv0 = kmr_pick_kv2(e, keyf, dummyf);
637 assert(tag == 0 || tag == 1);
644 if (kcnt == 0 || vcnt == 0) {
653 for (
long i = 0; i < n; i++) {
655 struct kmr_kv_box vkv0 = kmr_pick_kv2(e, keyf, dummyf);
658 assert(tag == 0 || tag == 1);
660 struct kmr_kv_box vkv = kmr_pick_kv2(e, keyf, valkf);
661 keys[ki].klen = vkv.vlen;
665 struct kmr_kv_box vkv = kmr_pick_kv2(e, keyf, valvf);
666 vals[vi].vlen = vkv.vlen;
671 assert(ki == kcnt && vi == vcnt);
672 for (
long i = 0; i < kcnt; i++) {
673 for (
long j = 0; j < vcnt; j++) {
674 struct kmr_kv_box nkv = {.klen = keys[i].klen, .k = keys[i].k,
675 .vlen = vals[j].vlen, .v = vals[j].v};
677 assert(cc == MPI_SUCCESS);
680 kmr_free(keys, (
sizeof(
struct kmr_kv_box) * (
size_t)kcnt));
681 kmr_free(vals, (
sizeof(
struct kmr_kv_box) * (
size_t)vcnt));
698 kmr_assert_kvs_ok(kvi0, kvo, 1, 1);
699 kmr_assert_kvs_ok(kvi1, kvo, 1, 1);
700 struct kmr_option kmr_supported = {.nothreading = 1};
701 kmr_check_fn_options(kvi0->c.mr, kmr_supported, opt, __func__);
702 assert(kvi0->c.key_data == kvi1->c.key_data
703 && kvo->c.key_data == kvi0->c.value_data
704 && kvo->c.value_data == kvi1->c.value_data);
706 KMR *mr = kvi0->c.mr;
707 struct kmr_option i_opt = kmr_copy_options_i_part(opt);
708 struct kmr_option o_opt = kmr_copy_options_o_part(opt);
712 keepopen.keep_open = 1;
715 cc =
kmr_map(kvi0, kvs2, &tag0, keepopen, kmr_tag_value_fn);
716 assert(cc == MPI_SUCCESS);
718 cc =
kmr_map(kvi1, kvs2, &tag1, keepopen, kmr_tag_value_fn);
719 assert(cc == MPI_SUCCESS);
724 assert(cc == MPI_SUCCESS);
725 cc =
kmr_reduce(kvs3, kvo, 0, o_opt, kmr_make_product_fn);
726 assert(cc == MPI_SUCCESS);
731 kmr_get_integer_values_fn(
const struct kmr_kv_box kv[],
const long n,
734 assert(kvo == 0 && kvs->c.mr->nprocs == n);
736 for (
int i = 0; i < n; i++) {
746 long rankingbase = *(
long *)p;
748 int klen =
sizeof(long);
750 cc = kmr_add_pairing_under_key(kvo, klen, k, kv,
751 kvi->c.key_data, kvi->c.value_data);
752 assert(cc == MPI_SUCCESS);
767 kmr_assert_kvs_ok(kvi, 0, 1, 0);
769 struct kmr_option kmr_supported = {.nothreading = 1, .inspect = 1,
771 kmr_check_fn_options(mr, kmr_supported, opt, __func__);
772 const int nprocs = mr->nprocs;
773 const int rank = mr->rank;
774 struct kmr_option m_opt = kmr_copy_options_m_part(opt);
777 struct kmr_kv_box kv = {.klen = (int)
sizeof(
long),
778 .vlen = (int)
sizeof(
long),
780 .v.i = kvi->c.element_count};
782 assert(cc == MPI_SUCCESS);
787 assert(cc == MPI_SUCCESS);
788 long *vec0 =
kmr_malloc(
sizeof(
long) * (
size_t)nprocs);
789 cc =
kmr_reduce(kvs1, 0, vec0, m_opt, kmr_get_integer_values_fn);
790 assert(cc == MPI_SUCCESS);
791 long *vec1 =
kmr_malloc(
sizeof(
long) * (
size_t)nprocs);
793 for (
int i = 0; i < nprocs; i++) {
797 cc =
kmr_map(kvi, kvo, &vec1[rank], opt, kmr_ranking_fn);
798 assert(cc == MPI_SUCCESS);
802 kmr_free(vec1, (
sizeof(
long) * (
size_t)nprocs));
803 kmr_free(vec0, (
sizeof(
long) * (
size_t)nprocs));
810 kmr_ranking_to_rank_fn(
const struct kmr_kv_box kv,
818 ? (kv.k.i % u->factor)
819 : (kv.k.i / u->factor)),
822 assert(cc == MPI_SUCCESS);
837 kmr_assert_kvs_ok(kvi, 0, 1, 0);
838 struct kmr_option kmr_supported = {.nothreading = 1, .inspect = 1,
840 kmr_check_fn_options(kvi->c.mr, kmr_supported, opt, __func__);
841 struct kmr_option i_opt = kmr_copy_options_i_part(opt);
842 struct kmr_option o_opt = kmr_copy_options_o_part(opt);
843 struct kmr_option m_opt = kmr_copy_options_m_part(opt);
845 const int nprocs = mr->nprocs;
850 assert(cc == MPI_SUCCESS);
854 .factor = (cyclic ? nprocs : ((count + nprocs - 1) / nprocs))
856 cc =
kmr_map(kvs0, kvs1, &u, m_opt, kmr_ranking_to_rank_fn);
857 assert(cc == MPI_SUCCESS);
859 struct kmr_option ranking = {.key_as_rank = 1};
861 assert(cc == MPI_SUCCESS);
863 assert(cc == MPI_SUCCESS);
883 struct kmr_option rankzero = {.rank_zero = 1};
885 assert(cc == MPI_SUCCESS);
893 cc = (*r)(0, 0, kvs0, zero, 0);
894 assert(cc == MPI_SUCCESS);
896 assert(cc == MPI_SUCCESS);
899 assert(cc == MPI_SUCCESS);
903 assert(cc == MPI_SUCCESS);
905 assert(cc == MPI_SUCCESS);
911 long rankingbase = 0;
912 cc =
kmr_map(kvs1, kvs2, &rankingbase, kmr_noopt, kmr_ranking_fn);
913 assert(cc == MPI_SUCCESS);
916 struct kmr_option keyasrank = {.key_as_rank = 1};
918 assert(cc == MPI_SUCCESS);
921 assert(cc == MPI_SUCCESS);
926 assert(cc == MPI_SUCCESS);
955 assert(cc == MPI_SUCCESS);
956 assert(kvs0->c.element_count == 1);
961 cc = kmr_scan_across_ranks_sequentially(kvs0, kvs1, total, r);
962 assert(cc == MPI_SUCCESS);
963 assert(kvs1->c.element_count == 1);
964 assert(total->c.element_count == 1);
969 assert(cc == MPI_SUCCESS);
975 kmr_count_key_fn(
const struct kmr_kv_box kv[],
const long n,
981 .vlen =
sizeof(long),
988 kmr_sum_key_counts_fn(
const struct kmr_kv_box kv[],
const long n,
992 for (
long i = 0; i < n; i++) {
998 .vlen =
sizeof(long),
1012 KMR *mr = kvi->c.mr;
1016 cc =
kmr_reduce(kvi, kvs0, 0, inspect, kmr_count_key_fn);
1017 assert(cc == MPI_SUCCESS);
1020 assert(cc == MPI_SUCCESS);
1021 cc =
kmr_reduce(kvs1, kvo, 0, kmr_noopt, kmr_sum_key_counts_fn);
1022 assert(cc == MPI_SUCCESS);
1030 kmr_scan_sum_fn(
const struct kmr_kv_box kv[],
const long n,
1034 for (
long i = 0; i < n; i++) {
1037 KMR *mr = kvs->c.mr;
1039 struct kmr_kv_box nkv = {.klen = ((n != 0) ? (kv[0].klen) : keylen),
1040 .vlen =
sizeof(
long),
1041 .k.i = ((n != 0) ? (kv[0].k.i) : 0),
1051 kmr_partition_by_count_fn(
const struct kmr_kv_box kv,
1055 KMR *mr = kvs->c.mr;
1056 const int nprocs = mr->nprocs;
1057 long nentries = *(
long *)p;
1058 long partialsum = kv.v.i;
1059 assert(partialsum < nentries);
1060 long rank = (partialsum * nprocs) / nentries;
1077 KMR *mr = kvi->c.mr;
1080 cc = kmr_count_keys(kvi, kvs0);
1081 assert(cc == MPI_SUCCESS);
1088 assert(cc == MPI_SUCCESS);
1094 long nentries = kv.v.i;
1098 cc =
kmr_map(kvs1, kvs2, &nentries, kmr_noopt, kmr_partition_by_count_fn);
1099 assert(cc == MPI_SUCCESS);
1105 assert(cc == MPI_SUCCESS);
1110 cc =
kmr_match(kvs2, kvs3, kvs4, kmr_noopt);
1111 assert(cc == MPI_SUCCESS);
1115 struct kmr_option ranking = {.key_as_rank = 1};
1118 assert(cc == MPI_SUCCESS);
1120 assert(cc == MPI_SUCCESS);
1127 kmr_first_n_elements_fn(
const struct kmr_kv_box kv,
1131 long n = *(
long *)p;
1148 kmr_assert_kvs_ok(kvi, kvo, 1, 1);
1149 KMR *mr = kvi->c.mr;
1150 struct kmr_option kmr_supported = {.inspect = 1, .keep_open = 1};
1151 kmr_check_fn_options(mr, kmr_supported, opt, __func__);
1152 struct kmr_option i_opt = kmr_copy_options_i_part(opt);
1157 assert(cc == MPI_SUCCESS);
1158 struct kmr_option opt1 = kmr_copy_options_o_part(opt);
1159 opt1.nothreading = 1;
1160 cc =
kmr_map(kvs0, kvo, &n, opt1, kmr_first_n_elements_fn);
1161 assert(cc == MPI_SUCCESS);
1174 cc =
kmr_map9(1, kvi, kvo, arg, opt, m, __FILE__, __LINE__, __func__);
1187 cc =
kmr_reduce9(1, kvi, kvo, arg, opt, r, __FILE__, __LINE__, __func__);
1200 assert(nth < u->n && u->index == u->n);
1201 char *p = (
void *)u;
1202 size_t off = kmr_ntuple_nth_offset(u, nth);
1204 e.len = u->len[nth];
1213 assert(u->index == u->n);
1214 return (
int)kmr_ntuple_nth_offset(u, u->n);
1224 off = kmr_ntuple_data_offset(n);
1225 for (
int i = 0; i < n; i++) {
1226 off += (size_t)kmr_ntuple_entry_size(len[i]);
1236 assert(n <= USHRT_MAX);
1238 u->n = (
unsigned short)n;
1240 int w = (
KMR_ALIGN((
int)
sizeof(u->len[0]) * n) / (int)
sizeof(u->len[0]));
1241 for (
int i = 0; i < w; i++) {
1253 const void *v,
const int len)
1255 assert(u->index < u->n);
1256 char *p = kmr_ntuple_insertion_point(u);
1257 int sz = kmr_ntuple_entry_size(len);
1258 char *end = ((
char *)p + sz);
1259 if (!(end <= ((
char *)(
void *)u + size))) {
1260 kmr_error(mr,
"kmr_put_ntuple exceeds the buffer size");
1263 u->len[u->index] = (
unsigned short)len;
1264 memcpy(p, v, (
size_t)len);
1265 memset((p + len), 0, (
size_t)(end - (p + len)));
1297 assert(u->index == u->n);
1298 assert(kvo->c.value_data == KMR_KV_OPAQUE
1299 || kvo->c.value_data == KMR_KV_CSTRING);
1321 int markers[2], _Bool disallow_other_entries)
1328 for (
long i = 0; i < n; i++) {
1330 if (u->marker == markers[0]) {
1332 }
else if (u->marker == markers[1]) {
1336 if (disallow_other_entries) {
1337 assert(u->marker == markers[0] || u->marker == markers[1]);
1351 for (
long i = 0; i < n; i++) {
1353 if (u->marker == markers[0]) {
1356 }
else if (u->marker == markers[1]) {
1363 assert(cnt0 == c0 && cnt1 == c1);
1373 kmr_product_ntuples_by_space(
KMR_KVS *kvo,
1376 int slots[][2],
int nslots,
1377 int keys[][2],
int nkeys,
1380 KMR *mr = kvo->c.mr;
1381 const int AUNIT = 1024;
1394 for (
long k0 = 0; k0 < cnt[0]; k0++) {
1396 for (
long k1 = 0; k1 < cnt[1]; k1++) {
1401 int *choice = keys[0];
1407 sz = (int)kmr_ntuple_data_offset(nkeys);
1408 for (
int i = 0; i < nkeys; i++) {
1409 int *choice = keys[i];
1412 sz += kmr_ntuple_entry_size(e.len);
1420 sz = (int)kmr_ntuple_data_offset(nslots);
1421 for (
int i = 0; i < nslots; i++) {
1422 int *choice = slots[i];
1425 sz += kmr_ntuple_entry_size(e.len);
1441 if ((
size_t)klen > keysz) {
1442 kmr_free(keyp, keysz);
1443 keysz = (size_t)((klen + AUNIT - 1) & ~(AUNIT - 1));
1446 if ((
size_t)vlen > valuesz) {
1447 kmr_free(valuep, valuesz);
1448 valuesz = (size_t)((vlen + AUNIT - 1) & ~(AUNIT - 1));
1451 assert(keysz > (
size_t)klen);
1452 assert(valuesz > (
size_t)vlen);
1458 int *keychoice = keys[0];
1461 memcpy(keyp, e.p, (
size_t)e.len);
1462 assert(klen == e.len);
1466 for (
int i = 0; i < nkeys; i++) {
1467 int *choice = keys[i];
1470 assert(i == k->index);
1481 for (
int i = 0; i < nslots; i++) {
1482 int *choice = slots[i];
1485 assert(i == v->index);
1505 kmr_free(keyp, keysz);
1508 kmr_free(valuep, valuesz);
1531 int slots[][2],
int nslots,
1532 int keys[][2],
int nkeys)
1535 if (kvo->c.magic == KMR_KVS_ONCORE) {
1536 cc = kmr_product_ntuples_by_space(kvo, vv, cnt, marker,
1537 slots, nslots, keys, nkeys,
1540 }
else if (kvo->c.magic == KMR_KVS_PUSHOFF) {
1541 cc = kmr_product_ntuples_by_space(kvo, vv, cnt, marker,
1542 slots, nslots, keys, nkeys,
1546 assert((kvo->c.magic == KMR_KVS_ONCORE)
1547 || (kvo->c.magic == KMR_KVS_PUSHOFF));
1554 kmr_put_integer_to_array_fn(
const struct kmr_kv_box kv,
1556 void *p,
const long i)
1572 kmr_assert_kvs_ok(kvs, 0, 1, 0);
1573 assert(kvs->c.magic == KMR_KVS_ONCORE);
1574 KMR *mr = kvs->c.mr;
1575 int nprocs = mr->nprocs;
1582 vec =
kmr_malloc(
sizeof(
long) * (
size_t)nprocs);
1587 .klen =
sizeof(long),
1589 .vlen =
sizeof(
long),
1590 .v.i = kvs->c.element_count
1595 struct kmr_option opt = {.rank_zero = rankzeroonly};
1597 assert(cc == MPI_SUCCESS);
1601 cc =
kmr_map(kvs1, 0, vec, kmr_noopt, kmr_put_integer_to_array_fn);
1602 assert(cc == MPI_SUCCESS);
1604 if (var != 0 && (!rankzeroonly || mr->rank == 0)) {
1605 double min = (double)LONG_MAX;
1609 for (
int r = 0; r < nprocs; r++) {
1610 double x = (double)vec[r];
1616 double a1 = (s1 / (double)nprocs);
1617 double a2 = (s2 / (double)nprocs);
1619 var[1] = a2 - (a1 * a1);
1625 kmr_free(vec,
sizeof(
long) * (
size_t)nprocs);
int kmr_map_for_some(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Maps until some key-value are added.
int kmr_product_ntuples(KMR_KVS *kvo, struct kmr_ntuple **vv[2], long cnt[2], int marker, int slots[][2], int nslots, int keys[][2], int nkeys)
Makes a direct product of the two sets of n-tuples VV[0] and VV[1] with their counts in CNT[0] and CN...
Key-Value Stream (abstract).
Utilities Private Part (do not include from applications).
#define kmr_reduce(KVI, KVO, ARG, OPT, R)
Reduces key-value pairs.
Options to Mapping, Shuffling, and Reduction.
#define KMR_ALIGN(X)
Rounds up a given size to the alignment restriction (currently eight bytes).
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
#define kmr_malloc(Z)
Allocates memory, or aborts when failed.
int kmr_map9(_Bool stop_when_some_added, KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m, const char *, const int, const char *)
Maps simply.
int kmr_put_ntuple_entry(KMR *mr, struct kmr_ntuple *u, const int sz, struct kmr_ntuple_entry e)
Adds an n-tuple entry E in an n-tuple U whose size is limited to SIZE.
int kmr_scan_locally(KMR_KVS *kvi, KMR_KVS *carryin, KMR_KVS *kvo, KMR_KVS *carryout, kmr_redfn_t r)
Scans every key-value with a reduce-function locally (independently on each rank).
#define kmr_create_kvs(MR, KF, VF)
Makes a new key-value stream (of type KMR_KVS) with the specified field datatypes.
int kmr_reverse(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Makes a new pair by swapping the key and the value in each pair.
int kmr_shuffle(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Shuffles key-value pairs to the appropriate destination ranks.
int kmr_copy_to_array_fn(const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long i)
Copies the entry in the array.
int kmr_pairing(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Replaces a value part with a key-value pairing.
int kmr_reduce9(_Bool stop_when_some_added, KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_redfn_t r, const char *, const int, const char *)
Reduces key-value pairs.
int kmr_sort(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Sorts a key-value stream globally.
int kmr_ranking(KMR_KVS *kvi, KMR_KVS *kvo, long *count, struct kmr_option opt)
Assigns a ranking to key-value pairs, and returns the number of the total elements in COUNT...
int kmr_add_kv_done(KMR_KVS *kvs)
Marks finished adding key-value pairs.
int kmr_histogram_count_by_ranks(KMR_KVS *kvs, long *frq, double *var, _Bool rankzeroonly)
Fills an integer array FRQ[i] with the count of the elements of each rank.
int kmr_add_kv_space(KMR_KVS *kvs, const struct kmr_kv_box kv, void **keyp, void **valuep)
Adds a key-value pair, but only allocates a space and returns the pointers to the key and the value p...
int kmr_free_kvs(KMR_KVS *kvs)
Releases a key-value stream (type KMR_KVS).
int kmr_separate_ntuples(KMR *mr, const struct kmr_kv_box kv[], const long n, struct kmr_ntuple **vv[2], long cnt[2], int markers[2], _Bool disallow_other_entries)
Separates the n-tuples stored in the value part of KV into the two sets by their marker values...
int kmr_distribute(KMR_KVS *kvi, KMR_KVS *kvo, _Bool cyclic, struct kmr_option opt)
Distributes key-values so that each rank has approximately the same number of pairs.
int kmr_sort_by_one(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Sort by rank0, a degenerated case for small number of keys.
struct kmr_kvs_entry * kmr_find_kvs_last_entry(KMR_KVS *kvs)
Finds the last entry of a key-value stream.
kmr_kv_field
Datatypes of Keys or Values.
int kmr_take_one(KMR_KVS *kvi, struct kmr_kv_box *kv)
Extracts a single key-value pair locally in the key-value stream KVI.
#define kmr_map(KVI, KVO, ARG, OPT, M)
Maps simply.
Handy Copy of a Key-Value Field.
int kmr_find_key(KMR_KVS *kvi, struct kmr_kv_box ki, struct kmr_kv_box *ko)
Finds a key-value pair for a key.
int kmr_match(KMR_KVS *kvi0, KMR_KVS *kvi1, KMR_KVS *kvo, struct kmr_option opt)
Makes key-value pairs as products of the two values in two key-value stream.
struct kmr_ntuple_entry kmr_nth_ntuple(struct kmr_ntuple *u, int nth)
Returns an NTH entry of an n-tuple.
int kmr_sort_small(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Sorts a key-value stream, by partitioning to equal ranges.
int kmr_scan_on_values(KMR_KVS *kvi, KMR_KVS *kvo, KMR_KVS *total, kmr_redfn_t r)
Prefix-scans every key-value with a reduce-function (non-self-inclusively) and generates the final va...
void * kmr_bsearch(const void *key, const void *base, size_t nel, size_t size, int(*compar)(const void *, const void *))
Searches a key entry like bsearch(3C), but returns a next greater entry instead of null on no match...
int kmr_sort_large(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Sorts a key-value stream by the regular or the random sampling-sort.
int kmr_add_ntuple(KMR_KVS *kvo, void *k, int klen, struct kmr_ntuple *u)
Adds an n-tuple U with a given key K and KLEN in a key-value stream KVO.
int kmr_map_skipping(long from, long stride, long limit, _Bool stop_when_some_added, KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Maps by skipping the number of entries.
int kmr_unpairing(KMR_KVS *kvs, KMR_KVS *kvo, struct kmr_option opt)
Extracts a key-value pair from a pairing in the value part, discarding the original key...
int kmr_reduce_for_some(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_redfn_t r)
Reduces until some key-value are added.
int kmr_choose_first_part(KMR_KVS *kvi, KMR_KVS *kvo, long n, struct kmr_option opt)
Chooses the first N entries from a key-value stream KVI.
int kmr_add_identity_fn(const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long i)
Adds a given key-value pair unmodified.
void kmr_reset_ntuple(struct kmr_ntuple *u, int n, int marker)
Resets an n-tuple U with N entries and a MARKER.
int kmr_legal_minimum_field_size(KMR *mr, enum kmr_kv_field f)
Returns a minimum byte size of the field: 8 for INTEGER and FLOAT8, 0 for others. ...
int kmr_get_element_count(KMR_KVS *kvs, long *v)
Gets the total number of key-value pairs.
int kmr_replicate(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Replicates key-value pairs to be visible on all ranks, that is, it has the effect of bcast or all-gat...
int kmr_size_ntuple_by_lengths(int n, int len[])
Returns the storage size of an n-tuple for N entries with LEN[i] size each.
static struct kmr_kv_box kmr_pick_kv(struct kmr_kvs_entry *e, KMR_KVS *kvs)
Returns a handle to a key-value entry – a reverse of kmr_poke_kv().
int kmr_sort_locally(KMR_KVS *kvi, KMR_KVS *kvo, _Bool shuffling, struct kmr_option opt)
Reorders key-value pairs in a single rank.
int kmr_put_ntuple_long(KMR *mr, struct kmr_ntuple *u, const int sz, long v)
Adds an integer value in an n-tuple U whose size is limited to SIZE.
int kmr_reduce_as_one(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_redfn_t r)
Calls a reduce-function once as if all key-value pairs had the same key.
int(* kmr_redfn_t)(const struct kmr_kv_box kv[], const long n, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg)
Reduce-function Type.
int kmr_shuffle_leveling_pair_count(KMR_KVS *kvi, KMR_KVS *kvo)
Shuffles key-values so that each rank has approximately the same number of pairs. ...
int kmr_put_ntuple(KMR *mr, struct kmr_ntuple *u, const int size, const void *v, const int len)
Adds an entry V with LEN in an n-tuple U whose size is limited to SIZE.
int(* kmr_mapfn_t)(const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long index)
Map-function Type.
int kmr_size_ntuple(struct kmr_ntuple *u)
Returns the storage size of an n-tuple.
int kmr_find_string(KMR_KVS *kvi, const char *k, const char **vq)
Finds the key K in the key-value stream KVS.